aucourt-ingest/aucourt_ingest/sources/highcourt.py
slothitude d77fe12cfc AuCourtIngest: complete 8-stage Australian legal case ingestion pipeline
Source layer (5 court sources), processing pipeline (parse/extract/chunk/embed/graph),
property graph with 8 node types, juror subgraph queries with 6 personas,
orchestrator with bootstrap/watch/backfill/audit/process modes, 170 tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 11:56:23 +10:00

144 lines
5.3 KiB
Python

"""High Court of Australia source — crawl from /cases-and-judgments."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from bs4 import BeautifulSoup
from aucourt_ingest.models import FetchQueueItem, FetchStatus, RawDocument
from aucourt_ingest.sources.base import BaseSource
from aucourt_ingest.storage.doc_store import DocStore
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.utils.mnc_parser import parse_mnc
logger = logging.getLogger(__name__)
HIGHCOURT_BASE = "https://www.hcourt.gov.au"
# Priority keywords for MVP criminal filter
PRIORITY_KEYWORDS = [
"murder", "manslaughter", "sexual assault", "robbery",
"appeal allowed", "conviction quashed", "miscarriage of justice",
"criminal", "criminal appeal", "sentence",
]
class HighCourtSource(BaseSource):
"""High Court of Australia — index crawl.
NOTE: The exact page structure of /cases-and-judgments is not yet
fully mapped. This adapter targets the case list pages and individual
judgment pages. Adjust selectors as needed after inspection.
"""
source_id = "highcourt"
async def discover(self, page: int = 1, **kwargs) -> list[FetchQueueItem]:
"""Crawl the cases page for judgment links.
The High Court site structure is not fully documented.
This implementation looks for links to individual cases.
"""
async with self.rate_limiter:
resp = await self.client.get(f"{HIGHCOURT_BASE}/cases-and-judgments")
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
items = []
# Find all links that point to case pages
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
text = a_tag.get_text(strip=True).lower()
# Look for case links (typically /cases/case-sYYYY-N pattern)
if "/cases/case-" in href or "/judgments/" in href:
url = href if href.startswith("http") else f"{HIGHCOURT_BASE}{href}"
# Apply keyword filter for MVP
if any(kw in text for kw in PRIORITY_KEYWORDS):
parsed = parse_mnc(a_tag.get_text(strip=True))
doc_id = parsed.mnc if parsed else url.split("/")[-1]
items.append(FetchQueueItem(
source_id=self.source_id,
url=url,
priority=2, # High Court = higher priority
doc_id=doc_id,
))
if self.meta_db:
await self.meta_db.insert_document(doc_id, self.source_id, url)
logger.info(f"High Court: {len(items)} case links found")
return items
async def fetch(self, url: str, **kwargs) -> RawDocument:
"""Fetch a High Court judgment page."""
async with self.rate_limiter:
resp = await self.client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# High Court pages vary — try common content containers
content = (
soup.find("div", class_="judgment")
or soup.find("div", class_="content")
or soup.find("article")
or soup.find("main")
or soup.find("div", class_="container")
)
raw_text = content.get_text(separator="\n", strip=True) if content else resp.text
parsed = parse_mnc(raw_text[:500])
doc_id = parsed.mnc if parsed else url.split("/")[-1]
return RawDocument(
source_id=self.source_id,
doc_id=doc_id,
url=url,
fetch_timestamp=datetime.now(timezone.utc).isoformat(),
raw_text=raw_text,
format="html",
)
async def bootstrap_highcourt(
config, meta_db: MetaDB, doc_store: DocStore,
limit: int = 0,
) -> int:
"""Fetch criminal-relevant judgments from High Court."""
source = HighCourtSource(config, meta_db)
await meta_db.init_source(source.source_id)
total_fetched = 0
try:
items = await source.discover()
for item in items:
if limit and total_fetched >= limit:
break
existing = await meta_db.get_document(item.url)
if existing and existing["fetch_status"] in (FetchStatus.FETCHED, FetchStatus.PARSED):
continue
doc_id = item.url.split("/")[-1]
try:
raw = await source.fetch(item.url)
doc_id = raw.doc_id
doc_store.save(source.source_id, raw.doc_id, raw.raw_text, fmt="html")
await meta_db.update_status(doc_id, FetchStatus.FETCHED)
await meta_db.update_doc_meta(doc_id, char_count=raw.char_count)
await meta_db.increment_source_stats(source.source_id, fetched=1)
total_fetched += 1
logger.info(f"[{total_fetched}] {doc_id} ({raw.char_count} chars)")
except Exception as e:
logger.error(f"Failed: {item.url}: {e}")
await meta_db.update_status(doc_id, FetchStatus.FAILED, error_message=str(e))
await meta_db.increment_source_stats(source.source_id, failed=1)
finally:
await source.close()
return total_fetched