Source layer (5 court sources), processing pipeline (parse/extract/chunk/embed/graph), property graph with 8 node types, juror subgraph queries with 6 personas, orchestrator with bootstrap/watch/backfill/audit/process modes, 170 tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
143 lines
5.1 KiB
Python
143 lines
5.1 KiB
Python
"""Federal Court of Australia source — RSS poll + DOCX download."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from email.utils import parsedate_to_datetime
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from aucourt_ingest.models import FetchQueueItem, FetchStatus, RawDocument
|
|
from aucourt_ingest.sources.base import BaseSource
|
|
from aucourt_ingest.storage.doc_store import DocStore
|
|
from aucourt_ingest.storage.meta_db import MetaDB
|
|
from aucourt_ingest.utils.mnc_parser import parse_mnc
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FEDCOURT_RSS = "https://www.judgments.fedcourt.gov.au/rss/fca-judgments"
|
|
# Judgment page links look like: /judgments/Judgments/fca/single/YYYY/YYYYfcaNNNN
|
|
DOCX_PATTERN = re.compile(r'/judgments/Judgments/fca/single/(\d{4})/(\d{4}fca\d+)\b')
|
|
|
|
|
|
class FedCourtSource(BaseSource):
|
|
"""Federal Court of Australia — RSS poll for new judgments."""
|
|
|
|
source_id = "fedcourt"
|
|
|
|
async def discover(self, page: int = 1, **kwargs) -> list[FetchQueueItem]:
|
|
"""Poll RSS feed and return new judgment items."""
|
|
async with self.rate_limiter:
|
|
resp = await self.client.get(FEDCOURT_RSS)
|
|
resp.raise_for_status()
|
|
|
|
root = ET.fromstring(resp.text)
|
|
items = []
|
|
for item_elem in root.iter("item"):
|
|
link = item_elem.findtext("link", "")
|
|
if not link:
|
|
continue
|
|
title = item_elem.findtext("title", "")
|
|
# Extract MNC from title
|
|
parsed = parse_mnc(title)
|
|
doc_id = parsed.mnc if parsed else link.split("/")[-1]
|
|
|
|
items.append(FetchQueueItem(
|
|
source_id=self.source_id,
|
|
url=link,
|
|
priority=3,
|
|
doc_id=doc_id,
|
|
))
|
|
if self.meta_db:
|
|
await self.meta_db.insert_document(doc_id, self.source_id, link)
|
|
|
|
logger.info(f"FedCourt RSS: {len(items)} judgments")
|
|
return items
|
|
|
|
async def fetch(self, url: str, **kwargs) -> RawDocument:
|
|
"""Fetch a judgment page, find DOCX link if available, download content."""
|
|
async with self.rate_limiter:
|
|
resp = await self.client.get(url)
|
|
resp.raise_for_status()
|
|
|
|
html = resp.text
|
|
|
|
# Try to find a DOCX download link on the page
|
|
docx_url = None
|
|
for match in DOCX_PATTERN.finditer(html):
|
|
potential = f"https://www.judgments.fedcourt.gov.au{match.group(0)}.docx"
|
|
# Quick check if DOCX exists
|
|
try:
|
|
head_resp = await self.client.head(potential)
|
|
if head_resp.status_code == 200:
|
|
docx_url = potential
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if docx_url:
|
|
async with self.rate_limiter:
|
|
docx_resp = await self.client.get(docx_url)
|
|
docx_resp.raise_for_status()
|
|
content = docx_resp.content
|
|
fmt = "docx"
|
|
else:
|
|
content = html.encode("utf-8")
|
|
fmt = "html"
|
|
|
|
parsed = parse_mnc(html[:500])
|
|
doc_id = parsed.mnc if parsed else url.split("/")[-1]
|
|
|
|
# For DOCX we store bytes; for HTML we store text
|
|
if isinstance(content, bytes):
|
|
raw_text = content.decode("utf-8", errors="replace")
|
|
else:
|
|
raw_text = content.decode("utf-8")
|
|
|
|
return RawDocument(
|
|
source_id=self.source_id,
|
|
doc_id=doc_id,
|
|
url=url,
|
|
fetch_timestamp=datetime.now(timezone.utc).isoformat(),
|
|
raw_text=raw_text,
|
|
format=fmt,
|
|
)
|
|
|
|
|
|
async def bootstrap_fedcourt(
|
|
config, meta_db: MetaDB, doc_store: DocStore,
|
|
limit: int = 0,
|
|
) -> int:
|
|
"""Fetch latest judgments from Federal Court RSS."""
|
|
source = FedCourtSource(config, meta_db)
|
|
await meta_db.init_source(source.source_id)
|
|
|
|
total_fetched = 0
|
|
try:
|
|
items = await source.discover()
|
|
for item in items:
|
|
if limit and total_fetched >= limit:
|
|
break
|
|
existing = await meta_db.get_document(item.url)
|
|
if existing and existing["fetch_status"] in (FetchStatus.FETCHED, FetchStatus.PARSED):
|
|
continue
|
|
|
|
doc_id = item.url.split("/")[-1]
|
|
try:
|
|
raw = await source.fetch(item.url)
|
|
doc_id = raw.doc_id
|
|
doc_store.save(source.source_id, raw.doc_id, raw.raw_text, fmt=raw.format)
|
|
await meta_db.update_status(doc_id, FetchStatus.FETCHED)
|
|
await meta_db.update_doc_meta(doc_id, char_count=raw.char_count)
|
|
await meta_db.increment_source_stats(source.source_id, fetched=1)
|
|
total_fetched += 1
|
|
logger.info(f"[{total_fetched}] {doc_id} ({raw.char_count} chars, {raw.format})")
|
|
except Exception as e:
|
|
logger.error(f"Failed: {item.url}: {e}")
|
|
await meta_db.update_status(doc_id, FetchStatus.FAILED, error_message=str(e))
|
|
await meta_db.increment_source_stats(source.source_id, failed=1)
|
|
finally:
|
|
await source.close()
|
|
|
|
return total_fetched
|