aucourt-ingest/aucourt_ingest/sources/qld_judgments.py
slothitude d77fe12cfc AuCourtIngest: complete 8-stage Australian legal case ingestion pipeline
Source layer (5 court sources), processing pipeline (parse/extract/chunk/embed/graph),
property graph with 8 node types, juror subgraph queries with 6 personas,
orchestrator with bootstrap/watch/backfill/audit/process modes, 170 tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 11:56:23 +10:00

92 lines
3.1 KiB
Python

"""Queensland Judgments source — HTML scraping at search endpoint."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from bs4 import BeautifulSoup
from aucourt_ingest.models import FetchQueueItem, FetchStatus, RawDocument
from aucourt_ingest.sources.base import BaseSource
from aucourt_ingest.storage.doc_store import DocStore
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.utils.mnc_parser import parse_mnc
logger = logging.getLogger(__name__)
QLD_BASE = "https://www.queenslandjudgments.com.au"
QLD_SEARCH = f"{QLD_BASE}/caselaw-search/query"
class QLDJudgmentsSource(BaseSource):
"""Queensland Judgments — search pagination.
NOTE: The search API is undocumented. This adapter uses the
search form endpoint observed at /caselaw-search/query.
Parameters may need adjustment after empirical testing.
"""
source_id = "qld_judgments"
async def discover(self, page: int = 1, **kwargs) -> list[FetchQueueItem]:
"""Search for criminal decisions."""
params = {
"queryStringSearchText": "criminal",
"page": page,
}
async with self.rate_limiter:
resp = await self.client.get(QLD_SEARCH, params=params)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
items = []
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
# Judgment links typically contain /judgment/ or a decision ID pattern
if "/judgment/" in href or "/decision/" in href:
url = href if href.startswith("http") else f"{QLD_BASE}{href}"
text = a_tag.get_text(strip=True)
parsed = parse_mnc(text)
doc_id = parsed.mnc if parsed else url.split("/")[-1]
items.append(FetchQueueItem(
source_id=self.source_id,
url=url,
priority=4,
))
if self.meta_db:
await self.meta_db.insert_document(doc_id, self.source_id, url)
logger.info(f"QLD Judgments page {page}: {len(items)} results")
return items
async def fetch(self, url: str, **kwargs) -> RawDocument:
"""Fetch a Queensland judgment page."""
async with self.rate_limiter:
resp = await self.client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
content = (
soup.find("div", class_="judgment")
or soup.find("div", class_="content")
or soup.find("article")
or soup.find("main")
)
raw_text = content.get_text(separator="\n", strip=True) if content else resp.text
parsed = parse_mnc(raw_text[:500])
doc_id = parsed.mnc if parsed else url.split("/")[-1]
return RawDocument(
source_id=self.source_id,
doc_id=doc_id,
url=url,
fetch_timestamp=datetime.now(timezone.utc).isoformat(),
raw_text=raw_text,
format="html",
)