#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
BNMP — UF-wide harvester (final refactor, 415 fix)
Fix in this version
-------------------
* Adds explicit **Content-Type: application/json;charset=UTF-8** for POST requests
(some BNMP endpoints return HTTP 415 without it).
Goal
----
Harvest **as much data as possible** from portal BNMP **filtered only by a given UF/Estado**, with
strong robustness and good operator UX. This script merges the best ideas from the provided
scripts and adds:
* Resilient HTTP (browser-like headers, gzip handling, retries w/ exponential backoff + jitter).
* Two complementary strategies for **maximum coverage**:
1) **per-órgão** pagination via `/bnmpportal/api/pesquisa-pecas/pecas` (UF + órgão filters);
2) **UF-wide** pagination via `/bnmpportal/api/pesquisa-pecas/filter` (buscaOrgaoRecursivo=True).
* Streamed, memory-safe outputs (GZIP NDJSON) + merged deduplicated stream.
* Detailed run manifest (JSON) and a small HTML preview of sample rows.
* Rich progress logging, counters (401 / transient failures), and configurable knobs.
* No external dependencies (stdlib only). Cross-platform.
Outputs (under an auto-created run directory):
- `raw_pecas_orgao.ndjson.gz` — flattened records from per-órgão strategy
- `raw_filter_uf.ndjson.gz` — flattened records from UF `/filter` strategy
- `merged_dedup.ndjson.gz` — flattened, deduplicated union of both
- `manifest.json` — summary metrics and parameters
- `sample_preview.html` — HTML preview (first N merged rows)
Authentication
--------------
Provide the BNMP session cookie and fingerprint via flags or environment:
- `--cookie` or env `BNMP_COOKIE`
- `--fp` or env `BNMP_FP`
UF selection
------------
Use `--uf-id` (int) **or** `--state` (name/sigla). Examples: `--state "Goiás"`, `--state GO`, `--uf-id 9`.
Basic usage
-----------
$ python bnmp_uf_harvester.py --state "DF" --cookie "$BNMP_COOKIE" --fp "$BNMP_FP"
Common knobs
------------
--strategy both|pecas-orgao|filter-uf (default: both)
--page-size-pecas 200 page size for per-órgão calls
--page-size-filter 500 page size for UF /filter calls
--retries 6 max attempts for transient failures
--sleep-page 0.25 throttle between pages
--sleep-org 0.35 throttle between órgãos
--sort-filter "dataExpedicao,DESC" sort for /filter (optional)
--sample 200 number of rows kept for HTML preview
--out-dir ./bnmp_out parent output folder (a timestamped run dir is created inside)
Notes
-----
* To target **only warrants** (mandados), pass `--tipo-peca MANDADO_DE_PRISAO`. Without `--tipo-peca`,
the script collects **all peças** available for the UF.
* The merged stream is deduped by best-effort key selection among common ID fields; if none found,
a content hash is used.
* On 401 errors for a given órgão/page, the script skips forward and continues (counts reported).
"""
from __future__ import annotations
import argparse
import gzip
import hashlib
import html
import io
import json
import os
import pathlib
import random
import sys
import time
from collections import defaultdict, deque
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple
from urllib import error, request
import unicodedata as u
BASE = "https://portalbnmp.cnj.jus.br"
PECAS_URL = f"{BASE}/bnmpportal/api/pesquisa-pecas/pecas"
FILTER_URL = f"{BASE}/bnmpportal/api/pesquisa-pecas/filter"
ORGAOS_UF_URL = f"{BASE}/bnmpportal/api/pesquisa-pecas/orgaos/unidade-federativa"
TRANSIENT = {429, 502, 503, 504}
STATE_MAP = {
'acre':1,'ac':1,
'alagoas':2,'al':2,
'amapa':3,'amapá':3,'ap':3,
'amazonas':4,'am':4,
'bahia':5,'ba':5,
'ceara':6,'ceará':6,'ce':6,
'distrito federal':7,'df':7,
'espirito santo':8,'espírito santo':8,'es':8,
'goias':9,'goiás':9,'go':9,
'maranhao':10,'maranhão':10,'ma':10,
'mato grosso':11,'mt':11,
'mato grosso do sul':12,'ms':12,
'minas gerais':13,'mg':13,
'para':14,'pará':14,'pa':14,
'paraiba':15,'paraíba':15,'pb':15,
'parana':16,'paraná':16,'pr':16,
'pernambuco':17,'pe':17,
'piaui':18,'piauí':18,'pi':18,
'rio de janeiro':19,'rj':19,
'rio grande do norte':20,'rn':20,
'rio grande do sul':21,'rs':21,
'rondonia':22,'rondônia':22,'ro':22,
'roraima':23,'rr':23,
'santa catarina':24,'sc':24,
'sao paulo':25,'são paulo':25,'sp':25,
'sergipe':26,'se':26,
'tocantins':27,'to':27,
}
def norm(s: str) -> str:
return u.normalize('NFKD', (s or '')).encode('ASCII','ignore').decode().strip().lower()
# --- 415 fix: always include JSON content-type for POSTs ---
JSON_CT = 'application/json;charset=UTF-8'
def build_headers(cookie: str, fp: str) -> Dict[str, str]:
# Browser-like headers to reduce auth/throttle friction
return {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip',
'Accept-Language': 'pt-BR,pt;q=0.9',
'User-Agent': 'Mozilla/5.0',
'Origin': BASE,
'Referer': BASE + '/',
'X-Requested-With': 'XMLHttpRequest',
'Connection': 'keep-alive',
'fingerprint': fp,
'Cookie': f'portalbnmp={cookie}',
# Tolerated if ignored; some stacks mirror session validation here
'Authorization': f'Bearer {cookie}',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
# Important for POST endpoints (prevents HTTP 415 on /filter and /pecas)
'Content-Type': JSON_CT,
}
def open_json(url: str, method: str, headers: Dict[str, str], payload: Optional[Dict[str, Any]] = None,
max_tries: int = 6, base_sleep: float = 0.6, timeout: int = 90,
query: Optional[Dict[str, Any]] = None) -> Dict[str, Any] | List[Any]:
"""Perform an HTTP request and parse JSON, with retries on transient errors."""
from urllib.parse import urlencode
body = json.dumps(payload, ensure_ascii=False).encode('utf-8') if payload is not None else None
full = url
if query:
qs = urlencode(query, doseq=True)
if qs:
full = f"{url}?{qs}"
last_exc: Optional[Exception] = None
for attempt in range(1, max_tries + 1):
# ensure JSON content-type for POSTs even if caller headers were modified
req_headers = dict(headers)
if body is not None:
req_headers.setdefault('Content-Type', JSON_CT)
req = request.Request(full, headers=req_headers, data=body, method=method)
try:
with request.urlopen(req, timeout=timeout) as r:
raw = r.read()
enc = (r.headers.get('Content-Encoding') or '').lower()
if 'gzip' in enc:
raw = gzip.decompress(raw)
return json.loads(raw.decode('utf-8', 'replace'))
except error.HTTPError as e:
last_exc = e
code = getattr(e, 'code', None)
if code in TRANSIENT:
sleep = base_sleep * (2 ** (attempt - 1)) + random.random() * 0.2
print(f"[retry {attempt}/{max_tries}] {code} {method} {full} — aguardando {sleep:.2f}s…", file=sys.stderr)
time.sleep(sleep)
continue
# Non-transient: bubble up
raise
except error.URLError as e:
last_exc = e
sleep = base_sleep * (2 ** (attempt - 1)) + random.random() * 0.2
print(f"[retry {attempt}/{max_tries}] URLError {getattr(e, 'reason', e)} — aguardando {sleep:.2f}s…", file=sys.stderr)
time.sleep(sleep)
continue
if last_exc:
raise last_exc
# Should not reach here
return {}
def autodetect_items(container: Any) -> List[Any]:
# Broad detection (covers observed envelopes across scripts)
if isinstance(container, list):
return container
if not isinstance(container, dict):
return []
for key in (
'content','resultados','itens','dados','items','lista','data','registros','records',
'pecas','mandados','result'
):
v = container.get(key)
if isinstance(v, list):
return v
page = container.get('page') or {}
if isinstance(page, dict):
for k in ('content','items','itens','dados'):
v = page.get(k)
if isinstance(v, list):
return v
for v in container.values():
if isinstance(v, list) and v and isinstance(v[0], dict):
return v
return []
def flatten(obj: Any, prefix: str = '') -> Dict[str, Any]:
out: Dict[str, Any] = {}
if isinstance(obj, dict):
for k, v in obj.items():
kk = f"{prefix}.{k}" if prefix else str(k)
if isinstance(v, (dict, list)):
out.update(flatten(v, kk))
else:
out[kk] = v
elif isinstance(obj, list):
for i, v in enumerate(obj):
kk = f"{prefix}[{i}]"
if isinstance(v, (dict, list)):
out.update(flatten(v, kk))
else:
out[kk] = v
else:
out[prefix or 'valor'] = obj
return out
def enrich_person_columns(flat: Dict[str, Any]) -> Dict[str, Any]:
# Surface likely person fields into predictable columns (best-effort)
out = dict(flat)
for k in list(flat.keys()):
nk = norm(k)
if nk.endswith('.nome') and 'pessoa.nome' not in out:
out['pessoa.nome'] = flat[k]
if nk.endswith('.cpf') and 'pessoa.cpf' not in out:
out['pessoa.cpf'] = flat[k]
if nk.endswith('.rg') and 'pessoa.rg' not in out:
out['pessoa.rg'] = flat[k]
if 'nascimento' in nk and 'pessoa.nascimento' not in out:
out['pessoa.nascimento'] = flat[k]
if 'envolvido' in nk and 'nome' in nk and 'pessoa.nome' not in out:
out['pessoa.nome'] = flat[k]
return out
def make_run_dir(parent: pathlib.Path) -> pathlib.Path:
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
run_dir = parent / f"bnmp_run_{ts}"
run_dir.mkdir(parents=True, exist_ok=True)
return run_dir
def ndjson_gzip_writer(path: pathlib.Path):
f = gzip.open(path, 'wt', encoding='utf-8')
def write(obj: Dict[str, Any]):
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
return f, write
def html_preview(rows: List[Dict[str, Any]], title: str) -> str:
# Determine columns: person/context first, then others
preferred = ['pessoa.nome','pessoa.cpf','pessoa.rg','pessoa.nascimento','__orgao.id','__orgao.nome','__uf.id']
cols, seen = [], set()
for c in preferred:
if any(c in r for r in rows) and c not in seen:
seen.add(c); cols.append(c)
for r in rows:
for k in r.keys():
if k not in seen:
seen.add(k); cols.append(k)
thead = ''.join(f'<th>{html.escape(c)}</th>' for c in cols)
body = []
for r in rows:
tds = []
for c in cols:
v = r.get(c)
if isinstance(v, (dict, list)):
cell = f"<code>{html.escape(json.dumps(v, ensure_ascii=False))}</code>"
else:
cell = html.escape('' if v is None else str(v))
tds.append(f'<td>{cell}</td>')
body.append(f"<tr>{''.join(tds)}</tr>")
head = f"""
<!doctype html><html lang="pt-br"><meta charset="utf-8">
<title>{html.escape(title)}</title>
<style>
body{{font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;margin:16px}}
h1{{font-size:20px;margin:0}}
.meta{{color:#444;margin:6px 0 14px 0;font-size:12px}}
table{{border-collapse:collapse;width:100%}}
th,td{{border:1px solid #ddd;padding:6px 8px;font-size:12px;vertical-align:top}}
th{{position:sticky;top:0;background:#f6f6f6;text-align:left}}
tr:nth-child(even){{background:#fafafa}}
code{{white-space:pre-wrap}}
</style>
"""
return head + f"<h1>{html.escape(title)}</h1>" + f"<table><thead><tr>{thead}</tr></thead><tbody>{''.join(body)}</tbody></table></html>"
# --- ID / dedupe helpers ---
CANDIDATE_ID_KEYS = [
'id',
'peca.id', 'pecaId', 'idPeca',
'mandado.id', 'idMandado',
'processo.id', 'idProcesso',
'pecas[0].id', # occasional shapes
]
def best_effort_uid(flat: Dict[str, Any]) -> str:
for k in CANDIDATE_ID_KEYS:
if k in flat and flat[k] is not None:
return f"{k}:{flat[k]}"
# fallback: stable hash of sorted items
items = sorted((str(k), json.dumps(v, ensure_ascii=False, sort_keys=True)) for k, v in flat.items())
h = hashlib.sha1()
for k, v in items:
h.update(k.encode('utf-8'))
h.update(b'\x00')
h.update(v.encode('utf-8'))
h.update(b'\x00')
return f"hash:{h.hexdigest()}"
# --- Harvest strategies ---
def list_orgaos_for_uf(headers: Dict[str, str], uf_id: int, *, retries: int, base_sleep: float, timeout: int) -> List[Dict[str, Any]]:
url = f"{ORGAOS_UF_URL}/{uf_id}"
resp = open_json(url, 'GET', headers, None, max_tries=retries, base_sleep=base_sleep, timeout=timeout)
if not isinstance(resp, list):
raise SystemExit('Resposta inesperada ao listar órgãos (array esperado).')
resp.sort(key=lambda o: ((o.get('nome') or '').casefold(), int(o.get('id') or 0)))
return resp
def harvest_pecas_por_orgao(headers: Dict[str, str], uf_id: int, *, writer, counters: Dict[str, int],
page_size: int, retries: int, base_sleep: float, timeout: int,
sleep_page: float, sleep_org: float, tipo_peca: Optional[List[str]],
sample_buf: deque, sample_cap: int) -> None:
orgaos = list_orgaos_for_uf(headers, uf_id, retries=retries, base_sleep=base_sleep, timeout=timeout)
counters['orgaos_total'] = len(orgaos)
print(f"[pecas-orgao] órgãos encontrados: {len(orgaos)}", file=sys.stderr)
for org in orgaos:
org_id = org.get('id'); org_nome = org.get('nome')
if org_id is None:
continue
counters['orgaos_consultados'] += 1
page = 0
while True:
payload = {
'pagina': page, 'tamanhoPagina': page_size,
'page': page, 'size': page_size,
'filtros': {
'unidadeFederativaId': uf_id,
'orgaoId': org_id,
},
'ordenacao': [ {'propriedade':'dataExpedicao','direcao':'DESC'} ]
}
if tipo_peca:
payload['filtros']['tipoPeca'] = tipo_peca
try:
resp = open_json(PECAS_URL, 'POST', headers, payload,
max_tries=retries, base_sleep=base_sleep, timeout=timeout)
except error.HTTPError as e:
if e.code == 401:
counters['count_401'] += 1
print(f"[401] órgão {org_id} — {org_nome}: acesso não autorizado; seguindo…", file=sys.stderr)
break
elif e.code in TRANSIENT:
counters['transient_failures'] += 1
print(f"[WARN] {e.code} persistente em órgão {org_id} — {org_nome}; seguindo…", file=sys.stderr)
break
else:
print(f"[WARN] órgão {org_id} — {org_nome}: erro '{e}'; seguindo…", file=sys.stderr)
break
except error.URLError as e:
counters['transient_failures'] += 1
print(f"[WARN] URLError em órgão {org_id} — {org_nome}: '{e.reason}'; seguindo…", file=sys.stderr)
break
items = autodetect_items(resp)
if not items:
break
added = 0
for it in items:
flat = enrich_person_columns(flatten(it))
flat['__orgao.id'] = org_id
flat['__orgao.nome'] = org_nome
flat['__uf.id'] = uf_id
writer(flat)
counters['rows_pecas_orgao'] += 1
added += 1
if len(sample_buf) < sample_cap:
sample_buf.append(dict(flat))
page += 1
print(f"[pecas-orgao] UF {uf_id} — órgão {org_id} — página {page} (+{added}) total={counters['rows_pecas_orgao']}", file=sys.stderr)
time.sleep(sleep_page)
time.sleep(sleep_org)
def harvest_filter_uf(headers: Dict[str, str], uf_id: int, *, writer, counters: Dict[str, int],
page_size: int, retries: int, base_sleep: float, timeout: int,
sleep_page: float, sort_filter: Optional[str], sample_buf: deque, sample_cap: int) -> None:
page = 0
while True:
query = {'page': page, 'size': page_size}
if sort_filter:
query['sort'] = sort_filter
payload = {
'buscaOrgaoRecursivo': True,
'orgaoExpeditor': {},
'idEstado': uf_id,
}
try:
resp = open_json(FILTER_URL, 'POST', headers, payload,
max_tries=retries, base_sleep=base_sleep, timeout=timeout,
query=query)
except error.HTTPError as e:
if e.code == 401:
counters['count_401'] += 1
print(f"[401] filter UF {uf_id}: acesso não autorizado; encerrando.", file=sys.stderr)
break
elif e.code in TRANSIENT:
counters['transient_failures'] += 1
print(f"[WARN] {e.code} persistente no filter UF; encerrando.", file=sys.stderr)
break
else:
print(f"[WARN] filter UF erro '{e}'; encerrando.", file=sys.stderr)
break
except error.URLError as e:
counters['transient_failures'] += 1
print(f"[WARN] URLError no filter UF: '{e.reason}'; encerrando.", file=sys.stderr)
break
items = autodetect_items(resp)
if not items:
break
added = 0
for it in items:
flat = enrich_person_columns(flatten(it))
flat['__uf.id'] = uf_id
writer(flat)
counters['rows_filter_uf'] += 1
added += 1
if len(sample_buf) < sample_cap:
sample_buf.append(dict(flat))
last = bool(resp.get('last')) if isinstance(resp, dict) else False
number = int(resp.get('number', page)) if isinstance(resp, dict) else page
total_pages = int(resp.get('totalPages', (page + 1))) if isinstance(resp, dict) else None
print(f"[filter-uf] UF {uf_id} — página {page} (+{added}) total={counters['rows_filter_uf']} last={last} totalPages={total_pages}", file=sys.stderr)
if last:
break
if total_pages is not None and (number + 1) >= total_pages:
break
page += 1
time.sleep(sleep_page)
# --- Merging & HTML preview ---
def merge_streams_dedup(in_paths: List[pathlib.Path], out_path: pathlib.Path, sample_buf: deque, sample_cap: int) -> Dict[str, int]:
seen: set[str] = set()
counts = defaultdict(int)
with gzip.open(out_path, 'wt', encoding='utf-8') as out:
for p in in_paths:
if not p or not p.exists():
continue
with gzip.open(p, 'rt', encoding='utf-8') as f:
for line in f:
try:
obj = json.loads(line)
except Exception:
counts['decode_errors'] += 1
continue
uid = best_effort_uid(obj)
if uid in seen:
counts['dedup_skipped'] += 1
continue
seen.add(uid)
out.write(json.dumps(obj, ensure_ascii=False) + "\n")
counts['merged_rows'] += 1
if len(sample_buf) < sample_cap:
sample_buf.append(dict(obj))
counts['unique_keys'] = len(seen)
return counts
# --- CLI & main ---
def resolve_uf_id(args_state: Optional[str], args_uf_id: Optional[int]) -> Tuple[int, str]:
if args_uf_id is not None:
return int(args_uf_id), f"UF {int(args_uf_id)}"
if not args_state:
raise SystemExit('Defina --state ou --uf-id.')
uf_id = STATE_MAP.get(norm(args_state))
if not uf_id:
raise SystemExit(f"UF/Estado desconhecido: '{args_state}'. Use nome por extenso ou sigla.")
return uf_id, args_state
def main():
ap = argparse.ArgumentParser(description='BNMP — UF-wide harvester (both per-órgão and UF filter).')
ap.add_argument('--state', help='UF name/sigla, e.g., "Goiás", "GO", "DF"')
ap.add_argument('--uf-id', type=int, help='UF id num (1..27)')
ap.add_argument('--cookie', default=os.environ.get('BNMP_COOKIE'), help='portalbnmp cookie (JWT) or env BNMP_COOKIE')
ap.add_argument('--fp', default=os.environ.get('BNMP_FP'), help='fingerprint header or env BNMP_FP')
ap.add_argument('--strategy', choices=['both','pecas-orgao','filter-uf'], default='both')
ap.add_argument('--tipo-peca', nargs='*', help='Optional tipoPeca list (e.g., MANDADO_DE_PRISAO). If omitted, collects ALL peças.')
ap.add_argument('--page-size-pecas', type=int, default=int(os.environ.get('BNMP_PAGE_SIZE_PECAS', '200')))
ap.add_argument('--page-size-filter', type=int, default=int(os.environ.get('BNMP_PAGE_SIZE_FILTER', '500')))
ap.add_argument('--retries', type=int, default=int(os.environ.get('BNMP_RETRIES', '6')))
ap.add_argument('--timeout', type=int, default=int(os.environ.get('BNMP_TIMEOUT', '90')))
ap.add_argument('--sleep-page', type=float, default=float(os.environ.get('BNMP_SLEEP_PAGE', '0.25')))
ap.add_argument('--sleep-org', type=float, default=float(os.environ.get('BNMP_SLEEP_ORG', '0.35')))
ap.add_argument('--base-sleep', type=float, default=float(os.environ.get('BNMP_BASE_SLEEP', '0.6')))
ap.add_argument('--sort-filter', default=os.environ.get('BNMP_SORT_FILTER', 'dataExpedicao,DESC'))
ap.add_argument('--sample', type=int, default=int(os.environ.get('BNMP_SAMPLE', '200')))
ap.add_argument('--out-dir', default=os.environ.get('BNMP_OUT_DIR', './bnmp_out'))
args = ap.parse_args()
if not args.cookie or not args.fp:
raise SystemExit('Defina --cookie/BNMP_COOKIE e --fp/BNMP_FP.')
uf_id, state_label = resolve_uf_id(args.state, args.uf_id)
headers = build_headers(args.cookie, args.fp)
parent = pathlib.Path(args.out_dir)
run_dir = make_run_dir(parent)
# Prepare outputs
f1_path = run_dir / 'raw_pecas_orgao.ndjson.gz'
f2_path = run_dir / 'raw_filter_uf.ndjson.gz'
merged_path = run_dir / 'merged_dedup.ndjson.gz'
sample_path = run_dir / 'sample_preview.html'
manifest_path = run_dir / 'manifest.json'
sample_buf: deque = deque()
counters: Dict[str, int] = defaultdict(int)
meta: Dict[str, Any] = {
'started_at': datetime.now().isoformat(timespec='seconds'),
'uf_id': uf_id,
'state_label': state_label,
'strategy': args.strategy,
'page_size_pecas': args.page_size_pecas,
'page_size_filter': args.page_size_filter,
'retries': args.retries,
'timeout': args.timeout,
'sleep_page': args.sleep_page,
'sleep_org': args.sleep_org,
'base_sleep': args.base_sleep,
'sort_filter': args.sort_filter,
'tipo_peca': args.tipo_peca or [],
'out_dir': str(run_dir),
}
print(f"[init] UF {uf_id} / {state_label} — strategy={args.strategy} — out={run_dir}", file=sys.stderr)
t0 = time.time()
# Strategy: per-órgão
if args.strategy in ('both', 'pecas-orgao'):
f1, write1 = ndjson_gzip_writer(f1_path)
try:
harvest_pecas_por_orgao(headers, uf_id,
writer=write1, counters=counters,
page_size=args.page_size_pecas, retries=args.retries,
base_sleep=args.base_sleep, timeout=args.timeout,
sleep_page=args.sleep_page, sleep_org=args.sleep_org,
tipo_peca=(args.tipo_peca or None),
sample_buf=sample_buf, sample_cap=args.sample)
finally:
f1.close()
# Strategy: UF filter
if args.strategy in ('both', 'filter-uf'):
f2, write2 = ndjson_gzip_writer(f2_path)
try:
harvest_filter_uf(headers, uf_id,
writer=write2, counters=counters,
page_size=args.page_size_filter, retries=args.retries,
base_sleep=args.base_sleep, timeout=args.timeout,
sleep_page=args.sleep_page, sort_filter=args.sort_filter,
sample_buf=sample_buf, sample_cap=args.sample)
finally:
f2.close()
# Merge & dedup
merge_counts = merge_streams_dedup(
[f1_path if f1_path.exists() else None, f2_path if f2_path.exists() else None],
merged_path, sample_buf, args.sample
)
# HTML preview from sample
title = (
f"BNMP — UF {uf_id} / {state_label} — merged rows={merge_counts.get('merged_rows',0)} — "
f"{datetime.now():%Y-%m-%d %H:%M:%S}"
)
html_text = html_preview(list(sample_buf), title)
sample_path.write_text(html_text, encoding='utf-8')
# Manifest
meta.update({
'finished_at': datetime.now().isoformat(timespec='seconds'),
'duration_sec': round(time.time() - t0, 2),
'counters': dict(counters),
'merge_counts': dict(merge_counts),
'outputs': {
'per_orgao': str(f1_path) if f1_path.exists() else None,
'filter_uf': str(f2_path) if f2_path.exists() else None,
'merged': str(merged_path),
'html_preview': str(sample_path),
}
})
manifest_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(meta, ensure_ascii=False, indent=2))
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n[abort] interrupted by user", file=sys.stderr)
sys.exit(130)