aucourt-ingest/aucourt_ingest/sources/fedcourt.py
slothitude d77fe12cfc AuCourtIngest: complete 8-stage Australian legal case ingestion pipeline
Source layer (5 court sources), processing pipeline (parse/extract/chunk/embed/graph),
property graph with 8 node types, juror subgraph queries with 6 personas,
orchestrator with bootstrap/watch/backfill/audit/process modes, 170 tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 11:56:23 +10:00

143 lines
5.1 KiB
Python

"""Federal Court of Australia source — RSS poll + DOCX download."""
from __future__ import annotations
import logging
import re
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from xml.etree import ElementTree as ET
from aucourt_ingest.models import FetchQueueItem, FetchStatus, RawDocument
from aucourt_ingest.sources.base import BaseSource
from aucourt_ingest.storage.doc_store import DocStore
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.utils.mnc_parser import parse_mnc
logger = logging.getLogger(__name__)
FEDCOURT_RSS = "https://www.judgments.fedcourt.gov.au/rss/fca-judgments"
# Judgment page links look like: /judgments/Judgments/fca/single/YYYY/YYYYfcaNNNN
DOCX_PATTERN = re.compile(r'/judgments/Judgments/fca/single/(\d{4})/(\d{4}fca\d+)\b')
class FedCourtSource(BaseSource):
"""Federal Court of Australia — RSS poll for new judgments."""
source_id = "fedcourt"
async def discover(self, page: int = 1, **kwargs) -> list[FetchQueueItem]:
"""Poll RSS feed and return new judgment items."""
async with self.rate_limiter:
resp = await self.client.get(FEDCOURT_RSS)
resp.raise_for_status()
root = ET.fromstring(resp.text)
items = []
for item_elem in root.iter("item"):
link = item_elem.findtext("link", "")
if not link:
continue
title = item_elem.findtext("title", "")
# Extract MNC from title
parsed = parse_mnc(title)
doc_id = parsed.mnc if parsed else link.split("/")[-1]
items.append(FetchQueueItem(
source_id=self.source_id,
url=link,
priority=3,
doc_id=doc_id,
))
if self.meta_db:
await self.meta_db.insert_document(doc_id, self.source_id, link)
logger.info(f"FedCourt RSS: {len(items)} judgments")
return items
async def fetch(self, url: str, **kwargs) -> RawDocument:
"""Fetch a judgment page, find DOCX link if available, download content."""
async with self.rate_limiter:
resp = await self.client.get(url)
resp.raise_for_status()
html = resp.text
# Try to find a DOCX download link on the page
docx_url = None
for match in DOCX_PATTERN.finditer(html):
potential = f"https://www.judgments.fedcourt.gov.au{match.group(0)}.docx"
# Quick check if DOCX exists
try:
head_resp = await self.client.head(potential)
if head_resp.status_code == 200:
docx_url = potential
break
except Exception:
continue
if docx_url:
async with self.rate_limiter:
docx_resp = await self.client.get(docx_url)
docx_resp.raise_for_status()
content = docx_resp.content
fmt = "docx"
else:
content = html.encode("utf-8")
fmt = "html"
parsed = parse_mnc(html[:500])
doc_id = parsed.mnc if parsed else url.split("/")[-1]
# For DOCX we store bytes; for HTML we store text
if isinstance(content, bytes):
raw_text = content.decode("utf-8", errors="replace")
else:
raw_text = content.decode("utf-8")
return RawDocument(
source_id=self.source_id,
doc_id=doc_id,
url=url,
fetch_timestamp=datetime.now(timezone.utc).isoformat(),
raw_text=raw_text,
format=fmt,
)
async def bootstrap_fedcourt(
config, meta_db: MetaDB, doc_store: DocStore,
limit: int = 0,
) -> int:
"""Fetch latest judgments from Federal Court RSS."""
source = FedCourtSource(config, meta_db)
await meta_db.init_source(source.source_id)
total_fetched = 0
try:
items = await source.discover()
for item in items:
if limit and total_fetched >= limit:
break
existing = await meta_db.get_document(item.url)
if existing and existing["fetch_status"] in (FetchStatus.FETCHED, FetchStatus.PARSED):
continue
doc_id = item.url.split("/")[-1]
try:
raw = await source.fetch(item.url)
doc_id = raw.doc_id
doc_store.save(source.source_id, raw.doc_id, raw.raw_text, fmt=raw.format)
await meta_db.update_status(doc_id, FetchStatus.FETCHED)
await meta_db.update_doc_meta(doc_id, char_count=raw.char_count)
await meta_db.increment_source_stats(source.source_id, fetched=1)
total_fetched += 1
logger.info(f"[{total_fetched}] {doc_id} ({raw.char_count} chars, {raw.format})")
except Exception as e:
logger.error(f"Failed: {item.url}: {e}")
await meta_db.update_status(doc_id, FetchStatus.FAILED, error_message=str(e))
await meta_db.increment_source_stats(source.source_id, failed=1)
finally:
await source.close()
return total_fetched