bnmp

Slug: bnmp

26719 characters 2312 words
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ BNMP — UF-wide harvester (final refactor, 415 fix) Fix in this version ------------------- * Adds explicit **Content-Type: application/json;charset=UTF-8** for POST requests (some BNMP endpoints return HTTP 415 without it). Goal ---- Harvest **as much data as possible** from portal BNMP **filtered only by a given UF/Estado**, with strong robustness and good operator UX. This script merges the best ideas from the provided scripts and adds: * Resilient HTTP (browser-like headers, gzip handling, retries w/ exponential backoff + jitter). * Two complementary strategies for **maximum coverage**: 1) **per-órgão** pagination via `/bnmpportal/api/pesquisa-pecas/pecas` (UF + órgão filters); 2) **UF-wide** pagination via `/bnmpportal/api/pesquisa-pecas/filter` (buscaOrgaoRecursivo=True). * Streamed, memory-safe outputs (GZIP NDJSON) + merged deduplicated stream. * Detailed run manifest (JSON) and a small HTML preview of sample rows. * Rich progress logging, counters (401 / transient failures), and configurable knobs. * No external dependencies (stdlib only). Cross-platform. Outputs (under an auto-created run directory): - `raw_pecas_orgao.ndjson.gz` — flattened records from per-órgão strategy - `raw_filter_uf.ndjson.gz` — flattened records from UF `/filter` strategy - `merged_dedup.ndjson.gz` — flattened, deduplicated union of both - `manifest.json` — summary metrics and parameters - `sample_preview.html` — HTML preview (first N merged rows) Authentication -------------- Provide the BNMP session cookie and fingerprint via flags or environment: - `--cookie` or env `BNMP_COOKIE` - `--fp` or env `BNMP_FP` UF selection ------------ Use `--uf-id` (int) **or** `--state` (name/sigla). Examples: `--state "Goiás"`, `--state GO`, `--uf-id 9`. Basic usage ----------- $ python bnmp_uf_harvester.py --state "DF" --cookie "$BNMP_COOKIE" --fp "$BNMP_FP" Common knobs ------------ --strategy both|pecas-orgao|filter-uf (default: both) --page-size-pecas 200 page size for per-órgão calls --page-size-filter 500 page size for UF /filter calls --retries 6 max attempts for transient failures --sleep-page 0.25 throttle between pages --sleep-org 0.35 throttle between órgãos --sort-filter "dataExpedicao,DESC" sort for /filter (optional) --sample 200 number of rows kept for HTML preview --out-dir ./bnmp_out parent output folder (a timestamped run dir is created inside) Notes ----- * To target **only warrants** (mandados), pass `--tipo-peca MANDADO_DE_PRISAO`. Without `--tipo-peca`, the script collects **all peças** available for the UF. * The merged stream is deduped by best-effort key selection among common ID fields; if none found, a content hash is used. * On 401 errors for a given órgão/page, the script skips forward and continues (counts reported). """ from __future__ import annotations import argparse import gzip import hashlib import html import io import json import os import pathlib import random import sys import time from collections import defaultdict, deque from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple from urllib import error, request import unicodedata as u BASE = "https://portalbnmp.cnj.jus.br" PECAS_URL = f"{BASE}/bnmpportal/api/pesquisa-pecas/pecas" FILTER_URL = f"{BASE}/bnmpportal/api/pesquisa-pecas/filter" ORGAOS_UF_URL = f"{BASE}/bnmpportal/api/pesquisa-pecas/orgaos/unidade-federativa" TRANSIENT = {429, 502, 503, 504} STATE_MAP = { 'acre':1,'ac':1, 'alagoas':2,'al':2, 'amapa':3,'amapá':3,'ap':3, 'amazonas':4,'am':4, 'bahia':5,'ba':5, 'ceara':6,'ceará':6,'ce':6, 'distrito federal':7,'df':7, 'espirito santo':8,'espírito santo':8,'es':8, 'goias':9,'goiás':9,'go':9, 'maranhao':10,'maranhão':10,'ma':10, 'mato grosso':11,'mt':11, 'mato grosso do sul':12,'ms':12, 'minas gerais':13,'mg':13, 'para':14,'pará':14,'pa':14, 'paraiba':15,'paraíba':15,'pb':15, 'parana':16,'paraná':16,'pr':16, 'pernambuco':17,'pe':17, 'piaui':18,'piauí':18,'pi':18, 'rio de janeiro':19,'rj':19, 'rio grande do norte':20,'rn':20, 'rio grande do sul':21,'rs':21, 'rondonia':22,'rondônia':22,'ro':22, 'roraima':23,'rr':23, 'santa catarina':24,'sc':24, 'sao paulo':25,'são paulo':25,'sp':25, 'sergipe':26,'se':26, 'tocantins':27,'to':27, } def norm(s: str) -> str: return u.normalize('NFKD', (s or '')).encode('ASCII','ignore').decode().strip().lower() # --- 415 fix: always include JSON content-type for POSTs --- JSON_CT = 'application/json;charset=UTF-8' def build_headers(cookie: str, fp: str) -> Dict[str, str]: # Browser-like headers to reduce auth/throttle friction return { 'Accept': 'application/json, text/plain, */*', 'Accept-Encoding': 'gzip', 'Accept-Language': 'pt-BR,pt;q=0.9', 'User-Agent': 'Mozilla/5.0', 'Origin': BASE, 'Referer': BASE + '/', 'X-Requested-With': 'XMLHttpRequest', 'Connection': 'keep-alive', 'fingerprint': fp, 'Cookie': f'portalbnmp={cookie}', # Tolerated if ignored; some stacks mirror session validation here 'Authorization': f'Bearer {cookie}', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', # Important for POST endpoints (prevents HTTP 415 on /filter and /pecas) 'Content-Type': JSON_CT, } def open_json(url: str, method: str, headers: Dict[str, str], payload: Optional[Dict[str, Any]] = None, max_tries: int = 6, base_sleep: float = 0.6, timeout: int = 90, query: Optional[Dict[str, Any]] = None) -> Dict[str, Any] | List[Any]: """Perform an HTTP request and parse JSON, with retries on transient errors.""" from urllib.parse import urlencode body = json.dumps(payload, ensure_ascii=False).encode('utf-8') if payload is not None else None full = url if query: qs = urlencode(query, doseq=True) if qs: full = f"{url}?{qs}" last_exc: Optional[Exception] = None for attempt in range(1, max_tries + 1): # ensure JSON content-type for POSTs even if caller headers were modified req_headers = dict(headers) if body is not None: req_headers.setdefault('Content-Type', JSON_CT) req = request.Request(full, headers=req_headers, data=body, method=method) try: with request.urlopen(req, timeout=timeout) as r: raw = r.read() enc = (r.headers.get('Content-Encoding') or '').lower() if 'gzip' in enc: raw = gzip.decompress(raw) return json.loads(raw.decode('utf-8', 'replace')) except error.HTTPError as e: last_exc = e code = getattr(e, 'code', None) if code in TRANSIENT: sleep = base_sleep * (2 ** (attempt - 1)) + random.random() * 0.2 print(f"[retry {attempt}/{max_tries}] {code} {method} {full} — aguardando {sleep:.2f}s…", file=sys.stderr) time.sleep(sleep) continue # Non-transient: bubble up raise except error.URLError as e: last_exc = e sleep = base_sleep * (2 ** (attempt - 1)) + random.random() * 0.2 print(f"[retry {attempt}/{max_tries}] URLError {getattr(e, 'reason', e)} — aguardando {sleep:.2f}s…", file=sys.stderr) time.sleep(sleep) continue if last_exc: raise last_exc # Should not reach here return {} def autodetect_items(container: Any) -> List[Any]: # Broad detection (covers observed envelopes across scripts) if isinstance(container, list): return container if not isinstance(container, dict): return [] for key in ( 'content','resultados','itens','dados','items','lista','data','registros','records', 'pecas','mandados','result' ): v = container.get(key) if isinstance(v, list): return v page = container.get('page') or {} if isinstance(page, dict): for k in ('content','items','itens','dados'): v = page.get(k) if isinstance(v, list): return v for v in container.values(): if isinstance(v, list) and v and isinstance(v[0], dict): return v return [] def flatten(obj: Any, prefix: str = '') -> Dict[str, Any]: out: Dict[str, Any] = {} if isinstance(obj, dict): for k, v in obj.items(): kk = f"{prefix}.{k}" if prefix else str(k) if isinstance(v, (dict, list)): out.update(flatten(v, kk)) else: out[kk] = v elif isinstance(obj, list): for i, v in enumerate(obj): kk = f"{prefix}[{i}]" if isinstance(v, (dict, list)): out.update(flatten(v, kk)) else: out[kk] = v else: out[prefix or 'valor'] = obj return out def enrich_person_columns(flat: Dict[str, Any]) -> Dict[str, Any]: # Surface likely person fields into predictable columns (best-effort) out = dict(flat) for k in list(flat.keys()): nk = norm(k) if nk.endswith('.nome') and 'pessoa.nome' not in out: out['pessoa.nome'] = flat[k] if nk.endswith('.cpf') and 'pessoa.cpf' not in out: out['pessoa.cpf'] = flat[k] if nk.endswith('.rg') and 'pessoa.rg' not in out: out['pessoa.rg'] = flat[k] if 'nascimento' in nk and 'pessoa.nascimento' not in out: out['pessoa.nascimento'] = flat[k] if 'envolvido' in nk and 'nome' in nk and 'pessoa.nome' not in out: out['pessoa.nome'] = flat[k] return out def make_run_dir(parent: pathlib.Path) -> pathlib.Path: ts = datetime.now().strftime('%Y%m%d_%H%M%S') run_dir = parent / f"bnmp_run_{ts}" run_dir.mkdir(parents=True, exist_ok=True) return run_dir def ndjson_gzip_writer(path: pathlib.Path): f = gzip.open(path, 'wt', encoding='utf-8') def write(obj: Dict[str, Any]): f.write(json.dumps(obj, ensure_ascii=False) + "\n") return f, write def html_preview(rows: List[Dict[str, Any]], title: str) -> str: # Determine columns: person/context first, then others preferred = ['pessoa.nome','pessoa.cpf','pessoa.rg','pessoa.nascimento','__orgao.id','__orgao.nome','__uf.id'] cols, seen = [], set() for c in preferred: if any(c in r for r in rows) and c not in seen: seen.add(c); cols.append(c) for r in rows: for k in r.keys(): if k not in seen: seen.add(k); cols.append(k) thead = ''.join(f'<th>{html.escape(c)}</th>' for c in cols) body = [] for r in rows: tds = [] for c in cols: v = r.get(c) if isinstance(v, (dict, list)): cell = f"<code>{html.escape(json.dumps(v, ensure_ascii=False))}</code>" else: cell = html.escape('' if v is None else str(v)) tds.append(f'<td>{cell}</td>') body.append(f"<tr>{''.join(tds)}</tr>") head = f""" <!doctype html><html lang="pt-br"><meta charset="utf-8"> <title>{html.escape(title)}</title> <style> body{{font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;margin:16px}} h1{{font-size:20px;margin:0}} .meta{{color:#444;margin:6px 0 14px 0;font-size:12px}} table{{border-collapse:collapse;width:100%}} th,td{{border:1px solid #ddd;padding:6px 8px;font-size:12px;vertical-align:top}} th{{position:sticky;top:0;background:#f6f6f6;text-align:left}} tr:nth-child(even){{background:#fafafa}} code{{white-space:pre-wrap}} </style> """ return head + f"<h1>{html.escape(title)}</h1>" + f"<table><thead><tr>{thead}</tr></thead><tbody>{''.join(body)}</tbody></table></html>" # --- ID / dedupe helpers --- CANDIDATE_ID_KEYS = [ 'id', 'peca.id', 'pecaId', 'idPeca', 'mandado.id', 'idMandado', 'processo.id', 'idProcesso', 'pecas[0].id', # occasional shapes ] def best_effort_uid(flat: Dict[str, Any]) -> str: for k in CANDIDATE_ID_KEYS: if k in flat and flat[k] is not None: return f"{k}:{flat[k]}" # fallback: stable hash of sorted items items = sorted((str(k), json.dumps(v, ensure_ascii=False, sort_keys=True)) for k, v in flat.items()) h = hashlib.sha1() for k, v in items: h.update(k.encode('utf-8')) h.update(b'\x00') h.update(v.encode('utf-8')) h.update(b'\x00') return f"hash:{h.hexdigest()}" # --- Harvest strategies --- def list_orgaos_for_uf(headers: Dict[str, str], uf_id: int, *, retries: int, base_sleep: float, timeout: int) -> List[Dict[str, Any]]: url = f"{ORGAOS_UF_URL}/{uf_id}" resp = open_json(url, 'GET', headers, None, max_tries=retries, base_sleep=base_sleep, timeout=timeout) if not isinstance(resp, list): raise SystemExit('Resposta inesperada ao listar órgãos (array esperado).') resp.sort(key=lambda o: ((o.get('nome') or '').casefold(), int(o.get('id') or 0))) return resp def harvest_pecas_por_orgao(headers: Dict[str, str], uf_id: int, *, writer, counters: Dict[str, int], page_size: int, retries: int, base_sleep: float, timeout: int, sleep_page: float, sleep_org: float, tipo_peca: Optional[List[str]], sample_buf: deque, sample_cap: int) -> None: orgaos = list_orgaos_for_uf(headers, uf_id, retries=retries, base_sleep=base_sleep, timeout=timeout) counters['orgaos_total'] = len(orgaos) print(f"[pecas-orgao] órgãos encontrados: {len(orgaos)}", file=sys.stderr) for org in orgaos: org_id = org.get('id'); org_nome = org.get('nome') if org_id is None: continue counters['orgaos_consultados'] += 1 page = 0 while True: payload = { 'pagina': page, 'tamanhoPagina': page_size, 'page': page, 'size': page_size, 'filtros': { 'unidadeFederativaId': uf_id, 'orgaoId': org_id, }, 'ordenacao': [ {'propriedade':'dataExpedicao','direcao':'DESC'} ] } if tipo_peca: payload['filtros']['tipoPeca'] = tipo_peca try: resp = open_json(PECAS_URL, 'POST', headers, payload, max_tries=retries, base_sleep=base_sleep, timeout=timeout) except error.HTTPError as e: if e.code == 401: counters['count_401'] += 1 print(f"[401] órgão {org_id} — {org_nome}: acesso não autorizado; seguindo…", file=sys.stderr) break elif e.code in TRANSIENT: counters['transient_failures'] += 1 print(f"[WARN] {e.code} persistente em órgão {org_id} — {org_nome}; seguindo…", file=sys.stderr) break else: print(f"[WARN] órgão {org_id} — {org_nome}: erro '{e}'; seguindo…", file=sys.stderr) break except error.URLError as e: counters['transient_failures'] += 1 print(f"[WARN] URLError em órgão {org_id} — {org_nome}: '{e.reason}'; seguindo…", file=sys.stderr) break items = autodetect_items(resp) if not items: break added = 0 for it in items: flat = enrich_person_columns(flatten(it)) flat['__orgao.id'] = org_id flat['__orgao.nome'] = org_nome flat['__uf.id'] = uf_id writer(flat) counters['rows_pecas_orgao'] += 1 added += 1 if len(sample_buf) < sample_cap: sample_buf.append(dict(flat)) page += 1 print(f"[pecas-orgao] UF {uf_id} — órgão {org_id} — página {page} (+{added}) total={counters['rows_pecas_orgao']}", file=sys.stderr) time.sleep(sleep_page) time.sleep(sleep_org) def harvest_filter_uf(headers: Dict[str, str], uf_id: int, *, writer, counters: Dict[str, int], page_size: int, retries: int, base_sleep: float, timeout: int, sleep_page: float, sort_filter: Optional[str], sample_buf: deque, sample_cap: int) -> None: page = 0 while True: query = {'page': page, 'size': page_size} if sort_filter: query['sort'] = sort_filter payload = { 'buscaOrgaoRecursivo': True, 'orgaoExpeditor': {}, 'idEstado': uf_id, } try: resp = open_json(FILTER_URL, 'POST', headers, payload, max_tries=retries, base_sleep=base_sleep, timeout=timeout, query=query) except error.HTTPError as e: if e.code == 401: counters['count_401'] += 1 print(f"[401] filter UF {uf_id}: acesso não autorizado; encerrando.", file=sys.stderr) break elif e.code in TRANSIENT: counters['transient_failures'] += 1 print(f"[WARN] {e.code} persistente no filter UF; encerrando.", file=sys.stderr) break else: print(f"[WARN] filter UF erro '{e}'; encerrando.", file=sys.stderr) break except error.URLError as e: counters['transient_failures'] += 1 print(f"[WARN] URLError no filter UF: '{e.reason}'; encerrando.", file=sys.stderr) break items = autodetect_items(resp) if not items: break added = 0 for it in items: flat = enrich_person_columns(flatten(it)) flat['__uf.id'] = uf_id writer(flat) counters['rows_filter_uf'] += 1 added += 1 if len(sample_buf) < sample_cap: sample_buf.append(dict(flat)) last = bool(resp.get('last')) if isinstance(resp, dict) else False number = int(resp.get('number', page)) if isinstance(resp, dict) else page total_pages = int(resp.get('totalPages', (page + 1))) if isinstance(resp, dict) else None print(f"[filter-uf] UF {uf_id} — página {page} (+{added}) total={counters['rows_filter_uf']} last={last} totalPages={total_pages}", file=sys.stderr) if last: break if total_pages is not None and (number + 1) >= total_pages: break page += 1 time.sleep(sleep_page) # --- Merging & HTML preview --- def merge_streams_dedup(in_paths: List[pathlib.Path], out_path: pathlib.Path, sample_buf: deque, sample_cap: int) -> Dict[str, int]: seen: set[str] = set() counts = defaultdict(int) with gzip.open(out_path, 'wt', encoding='utf-8') as out: for p in in_paths: if not p or not p.exists(): continue with gzip.open(p, 'rt', encoding='utf-8') as f: for line in f: try: obj = json.loads(line) except Exception: counts['decode_errors'] += 1 continue uid = best_effort_uid(obj) if uid in seen: counts['dedup_skipped'] += 1 continue seen.add(uid) out.write(json.dumps(obj, ensure_ascii=False) + "\n") counts['merged_rows'] += 1 if len(sample_buf) < sample_cap: sample_buf.append(dict(obj)) counts['unique_keys'] = len(seen) return counts # --- CLI & main --- def resolve_uf_id(args_state: Optional[str], args_uf_id: Optional[int]) -> Tuple[int, str]: if args_uf_id is not None: return int(args_uf_id), f"UF {int(args_uf_id)}" if not args_state: raise SystemExit('Defina --state ou --uf-id.') uf_id = STATE_MAP.get(norm(args_state)) if not uf_id: raise SystemExit(f"UF/Estado desconhecido: '{args_state}'. Use nome por extenso ou sigla.") return uf_id, args_state def main(): ap = argparse.ArgumentParser(description='BNMP — UF-wide harvester (both per-órgão and UF filter).') ap.add_argument('--state', help='UF name/sigla, e.g., "Goiás", "GO", "DF"') ap.add_argument('--uf-id', type=int, help='UF id num (1..27)') ap.add_argument('--cookie', default=os.environ.get('BNMP_COOKIE'), help='portalbnmp cookie (JWT) or env BNMP_COOKIE') ap.add_argument('--fp', default=os.environ.get('BNMP_FP'), help='fingerprint header or env BNMP_FP') ap.add_argument('--strategy', choices=['both','pecas-orgao','filter-uf'], default='both') ap.add_argument('--tipo-peca', nargs='*', help='Optional tipoPeca list (e.g., MANDADO_DE_PRISAO). If omitted, collects ALL peças.') ap.add_argument('--page-size-pecas', type=int, default=int(os.environ.get('BNMP_PAGE_SIZE_PECAS', '200'))) ap.add_argument('--page-size-filter', type=int, default=int(os.environ.get('BNMP_PAGE_SIZE_FILTER', '500'))) ap.add_argument('--retries', type=int, default=int(os.environ.get('BNMP_RETRIES', '6'))) ap.add_argument('--timeout', type=int, default=int(os.environ.get('BNMP_TIMEOUT', '90'))) ap.add_argument('--sleep-page', type=float, default=float(os.environ.get('BNMP_SLEEP_PAGE', '0.25'))) ap.add_argument('--sleep-org', type=float, default=float(os.environ.get('BNMP_SLEEP_ORG', '0.35'))) ap.add_argument('--base-sleep', type=float, default=float(os.environ.get('BNMP_BASE_SLEEP', '0.6'))) ap.add_argument('--sort-filter', default=os.environ.get('BNMP_SORT_FILTER', 'dataExpedicao,DESC')) ap.add_argument('--sample', type=int, default=int(os.environ.get('BNMP_SAMPLE', '200'))) ap.add_argument('--out-dir', default=os.environ.get('BNMP_OUT_DIR', './bnmp_out')) args = ap.parse_args() if not args.cookie or not args.fp: raise SystemExit('Defina --cookie/BNMP_COOKIE e --fp/BNMP_FP.') uf_id, state_label = resolve_uf_id(args.state, args.uf_id) headers = build_headers(args.cookie, args.fp) parent = pathlib.Path(args.out_dir) run_dir = make_run_dir(parent) # Prepare outputs f1_path = run_dir / 'raw_pecas_orgao.ndjson.gz' f2_path = run_dir / 'raw_filter_uf.ndjson.gz' merged_path = run_dir / 'merged_dedup.ndjson.gz' sample_path = run_dir / 'sample_preview.html' manifest_path = run_dir / 'manifest.json' sample_buf: deque = deque() counters: Dict[str, int] = defaultdict(int) meta: Dict[str, Any] = { 'started_at': datetime.now().isoformat(timespec='seconds'), 'uf_id': uf_id, 'state_label': state_label, 'strategy': args.strategy, 'page_size_pecas': args.page_size_pecas, 'page_size_filter': args.page_size_filter, 'retries': args.retries, 'timeout': args.timeout, 'sleep_page': args.sleep_page, 'sleep_org': args.sleep_org, 'base_sleep': args.base_sleep, 'sort_filter': args.sort_filter, 'tipo_peca': args.tipo_peca or [], 'out_dir': str(run_dir), } print(f"[init] UF {uf_id} / {state_label} — strategy={args.strategy} — out={run_dir}", file=sys.stderr) t0 = time.time() # Strategy: per-órgão if args.strategy in ('both', 'pecas-orgao'): f1, write1 = ndjson_gzip_writer(f1_path) try: harvest_pecas_por_orgao(headers, uf_id, writer=write1, counters=counters, page_size=args.page_size_pecas, retries=args.retries, base_sleep=args.base_sleep, timeout=args.timeout, sleep_page=args.sleep_page, sleep_org=args.sleep_org, tipo_peca=(args.tipo_peca or None), sample_buf=sample_buf, sample_cap=args.sample) finally: f1.close() # Strategy: UF filter if args.strategy in ('both', 'filter-uf'): f2, write2 = ndjson_gzip_writer(f2_path) try: harvest_filter_uf(headers, uf_id, writer=write2, counters=counters, page_size=args.page_size_filter, retries=args.retries, base_sleep=args.base_sleep, timeout=args.timeout, sleep_page=args.sleep_page, sort_filter=args.sort_filter, sample_buf=sample_buf, sample_cap=args.sample) finally: f2.close() # Merge & dedup merge_counts = merge_streams_dedup( [f1_path if f1_path.exists() else None, f2_path if f2_path.exists() else None], merged_path, sample_buf, args.sample ) # HTML preview from sample title = ( f"BNMP — UF {uf_id} / {state_label} — merged rows={merge_counts.get('merged_rows',0)} — " f"{datetime.now():%Y-%m-%d %H:%M:%S}" ) html_text = html_preview(list(sample_buf), title) sample_path.write_text(html_text, encoding='utf-8') # Manifest meta.update({ 'finished_at': datetime.now().isoformat(timespec='seconds'), 'duration_sec': round(time.time() - t0, 2), 'counters': dict(counters), 'merge_counts': dict(merge_counts), 'outputs': { 'per_orgao': str(f1_path) if f1_path.exists() else None, 'filter_uf': str(f2_path) if f2_path.exists() else None, 'merged': str(merged_path), 'html_preview': str(sample_path), } }) manifest_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8') print(json.dumps(meta, ensure_ascii=False, indent=2)) if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n[abort] interrupted by user", file=sys.stderr) sys.exit(130)
URL: https://ib.bsb.br/bnmp