aucourt-ingest/aucourt_ingest/main.py
slothitude 6374aea0a2 Stage 9: add read-only FastAPI query API for juror RAG queries
8 GET endpoints under /api/v1 for health, personas, cases, vector search,
juror context, and hybrid search. Includes QueryService composing SubgraphQuery
+ VectorIndex + GraphDB, Pydantic response models, error handlers, and
`serve` CLI mode via uvicorn. 20 new tests, 190 total, zero regressions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 12:08:55 +10:00

367 lines
14 KiB
Python

"""AuCourtIngest — entry point with mode dispatch."""
import argparse
import asyncio
import logging
import signal
import sys
from pathlib import Path
from aucourt_ingest.config import AppConfig
logger = logging.getLogger(__name__)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="aucourt_ingest",
description="Australian legal case ingestion pipeline",
)
parser.add_argument("--config", default="config.toml", help="Path to config.toml")
sub = parser.add_subparsers(dest="mode")
# bootstrap
p_boot = sub.add_parser("bootstrap", help="Historical bulk ingest")
p_boot.add_argument("--source", help="Single source to bootstrap (omit for all)")
p_boot.add_argument("--limit", type=int, default=0, help="Max documents (0=unlimited)")
p_boot.add_argument("--pages", type=int, default=0, help="Max pages to discover (0=all)")
p_boot.add_argument("--db", default="data/meta.db", help="MetaDB path")
p_boot.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir")
# watch
p_watch = sub.add_parser("watch", help="Continuous RSS polling")
p_watch.add_argument("--sources", nargs="*", help="Specific sources (omit for all)")
p_watch.add_argument("--interval", type=int, default=300, help="Poll interval in seconds (default 300)")
p_watch.add_argument("--db", default="data/meta.db", help="MetaDB path")
p_watch.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir")
# backfill
p_back = sub.add_parser("backfill", help="Fill gaps in existing corpus")
p_back.add_argument("--source", required=True, help="Source to backfill")
p_back.add_argument("--from-year", type=int, required=True, help="Start year")
p_back.add_argument("--to-year", type=int, required=True, help="End year")
p_back.add_argument("--format", help="Document format filter (html, docx, pdf)")
p_back.add_argument("--db", default="data/meta.db", help="MetaDB path")
p_back.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir")
# audit
p_audit = sub.add_parser("audit", help="Re-process existing documents to update graph schema")
p_audit.add_argument("--status", default="parsed", help="Re-process documents with this status (default: parsed)")
p_audit.add_argument("--limit", type=int, default=0, help="Max documents (0=all)")
p_audit.add_argument("--db", default="data/meta.db", help="MetaDB path")
p_audit.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir")
# process
p_proc = sub.add_parser("process", help="Process fetched documents through the pipeline")
p_proc.add_argument("--limit", type=int, default=0, help="Max documents to process (0=all)")
p_proc.add_argument("--db", default="data/meta.db", help="MetaDB path")
p_proc.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir")
# serve
p_serve = sub.add_parser("serve", help="Start the read-only query API server")
p_serve.add_argument("--host", default=None, help="Bind host (default from config)")
p_serve.add_argument("--port", type=int, default=None, help="Bind port (default from config)")
p_serve.add_argument("--graph-backend", default=None,
help="Graph backend: memory|neo4j (default from config)")
return parser
async def cmd_bootstrap(args):
config = AppConfig.load(args.config)
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.storage.doc_store import DocStore
# Ensure data dirs exist
Path(args.raw_dir).mkdir(parents=True, exist_ok=True)
Path(args.db).parent.mkdir(parents=True, exist_ok=True)
meta_db = MetaDB(args.db)
await meta_db.connect()
doc_store = DocStore(args.raw_dir)
total = 0
sources_to_run = [args.source] if args.source else list(config.sources.keys())
for source_id in sources_to_run:
if source_id not in config.sources:
print(f"Unknown source: {source_id} (available: {list(config.sources.keys())})")
continue
source_config = config.sources[source_id]
print(f"\n=== Bootstrap: {source_id} (limit={args.limit or 'unlimited'}) ===")
if source_id == "nsw_caselaw":
from aucourt_ingest.sources.nsw_caselaw import bootstrap_nsw_caselaw
n = await bootstrap_nsw_caselaw(source_config, meta_db, doc_store, limit=args.limit, pages=args.pages)
elif source_id == "fedcourt":
from aucourt_ingest.sources.fedcourt import bootstrap_fedcourt
n = await bootstrap_fedcourt(source_config, meta_db, doc_store, limit=args.limit)
elif source_id == "highcourt":
from aucourt_ingest.sources.highcourt import bootstrap_highcourt
n = await bootstrap_highcourt(source_config, meta_db, doc_store, limit=args.limit)
else:
print(f" No bootstrap implemented for {source_id} yet")
continue
print(f" Fetched {n} documents")
total += n
await meta_db.close()
print(f"\nTotal: {total} documents fetched")
async def cmd_watch(args):
"""Continuous polling mode — discovers new documents from sources on a schedule."""
config = AppConfig.load(args.config)
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.storage.doc_store import DocStore
Path(args.raw_dir).mkdir(parents=True, exist_ok=True)
Path(args.db).parent.mkdir(parents=True, exist_ok=True)
meta_db = MetaDB(args.db)
await meta_db.connect()
doc_store = DocStore(args.raw_dir)
# Set up Telegram alerts if configured
from aucourt_ingest.utils.telegram import TelegramAlert
alert = TelegramAlert(
bot_token=config.telegram.bot_token,
chat_id=config.telegram.chat_id,
enabled=config.telegram.enabled,
)
sources = args.sources or list(config.sources.keys())
interval = args.interval
print(f"Watch mode: sources={sources}, interval={interval}s")
shutdown = asyncio.Event()
def _signal_handler():
print("\nShutdown signal received")
shutdown.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _signal_handler)
except NotImplementedError:
pass # Windows doesn't support add_signal_handler
cycle = 0
while not shutdown.is_set():
cycle += 1
print(f"\n--- Watch cycle {cycle} ---")
for source_id in sources:
if source_id not in config.sources:
continue
source_config = config.sources[source_id]
try:
if source_id == "nsw_caselaw":
from aucourt_ingest.sources.nsw_caselaw import bootstrap_nsw_caselaw
n = await bootstrap_nsw_caselaw(source_config, meta_db, doc_store, limit=10)
if n > 0:
print(f" {source_id}: {n} new documents")
elif source_id == "fedcourt" and source_config.rss_feed:
from aucourt_ingest.sources.fedcourt import bootstrap_fedcourt
n = await bootstrap_fedcourt(source_config, meta_db, doc_store, limit=10)
if n > 0:
print(f" {source_id}: {n} new documents")
except Exception as e:
logger.error(f"Watch error for {source_id}: {e}")
await alert.alert("fetch_error", source_id=source_id, error=str(e)[:100])
# Process any queued items
from aucourt_ingest.orchestrator import Orchestrator
orch = Orchestrator(meta_db, doc_store, alert=alert)
stats = await orch.process_queue(limit=20)
if stats["processed"] > 0:
print(f" Processed: {stats['processed']} docs (errors={stats['errors']})")
await alert.close()
# Wait for next cycle
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
continue
await meta_db.close()
print("Watch mode stopped")
async def cmd_backfill(args):
"""Backfill mode — re-discover documents for a source within a year range."""
config = AppConfig.load(args.config)
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.storage.doc_store import DocStore
Path(args.raw_dir).mkdir(parents=True, exist_ok=True)
Path(args.db).parent.mkdir(parents=True, exist_ok=True)
meta_db = MetaDB(args.db)
await meta_db.connect()
doc_store = DocStore(args.raw_dir)
source_id = args.source
if source_id not in config.sources:
print(f"Unknown source: {source_id}")
await meta_db.close()
return
source_config = config.sources[source_id]
total = 0
for year in range(args.from_year, args.to_year + 1):
print(f" Backfill {source_id} year {year}...")
if source_id == "nsw_caselaw":
from aucourt_ingest.sources.nsw_caselaw import bootstrap_nsw_caselaw
n = await bootstrap_nsw_caselaw(source_config, meta_db, doc_store, limit=0)
elif source_id == "fedcourt":
from aucourt_ingest.sources.fedcourt import bootstrap_fedcourt
n = await bootstrap_fedcourt(source_config, meta_db, doc_store, limit=0)
elif source_id == "highcourt":
from aucourt_ingest.sources.highcourt import bootstrap_highcourt
n = await bootstrap_highcourt(source_config, meta_db, doc_store, limit=0)
else:
print(f" No backfill implemented for {source_id}")
break
print(f" Year {year}: {n} documents")
total += n
await meta_db.close()
print(f"Backfill complete: {total} documents")
async def cmd_audit(args):
"""Audit mode — re-process existing documents with current schema."""
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.storage.doc_store import DocStore
from aucourt_ingest.orchestrator import Orchestrator
Path(args.raw_dir).mkdir(parents=True, exist_ok=True)
Path(args.db).parent.mkdir(parents=True, exist_ok=True)
meta_db = MetaDB(args.db)
await meta_db.connect()
doc_store = DocStore(args.raw_dir)
orch = Orchestrator(meta_db, doc_store)
# Get documents with the target status
docs = await meta_db.get_documents_by_status(args.status)
print(f"Audit: re-processing {len(docs)} documents with status '{args.status}'")
processed = 0
for doc in docs[:args.limit] if args.limit > 0 else docs:
doc_id = doc["doc_id"]
source_id = doc.get("source_id", "")
stats = await orch.process_document(doc_id, source_id)
processed += 1
if processed % 10 == 0:
print(f" Audit progress: {processed}/{len(docs)}")
await meta_db.close()
print(f"Audit complete: {processed} documents re-processed")
async def cmd_process(args):
"""Process mode — drain the fetch queue through the pipeline."""
from aucourt_ingest.storage.meta_db import MetaDB
from aucourt_ingest.storage.doc_store import DocStore
from aucourt_ingest.orchestrator import Orchestrator
Path(args.raw_dir).mkdir(parents=True, exist_ok=True)
Path(args.db).parent.mkdir(parents=True, exist_ok=True)
meta_db = MetaDB(args.db)
await meta_db.connect()
doc_store = DocStore(args.raw_dir)
config = AppConfig.load(args.config)
from aucourt_ingest.utils.telegram import TelegramAlert
alert = TelegramAlert(
bot_token=config.telegram.bot_token,
chat_id=config.telegram.chat_id,
enabled=config.telegram.enabled,
)
orch = Orchestrator(meta_db, doc_store, alert=alert)
stats = await orch.process_queue(limit=args.limit)
await meta_db.close()
await alert.close()
print(f"Processing complete: {stats}")
def cmd_serve(args):
"""Start the FastAPI query server."""
config = AppConfig.load(args.config)
host = args.host or config.server.host
port = args.port or config.server.port
backend = args.graph_backend or config.server.graph_backend
max_tokens = config.server.default_max_tokens
if backend == "memory":
from aucourt_ingest.storage.in_memory_graph_db import InMemoryGraphDB
graph_db = InMemoryGraphDB()
else:
from aucourt_ingest.storage.graph_db import Neo4jGraphDB
graph_db = Neo4jGraphDB(
uri=config.storage.neo4j_uri,
user=config.storage.neo4j_user,
password=config.storage.neo4j_password,
database=config.storage.neo4j_database,
)
# VectorIndex requires ChromaDB directory — skip for memory backend in tests
vector_index = None
if backend != "memory":
from aucourt_ingest.storage.vector_index import VectorIndex
vector_index = VectorIndex(str(config.storage.chromadb_dir))
else:
from aucourt_ingest.storage.vector_index import VectorIndex
vector_index = VectorIndex(str(config.storage.chromadb_dir))
from aucourt_ingest.api.app import create_app
app = create_app(graph_db, vector_index, max_tokens)
import uvicorn
print(f"Starting AuCourtIngest API on {host}:{port} (graph={backend})")
uvicorn.run(app, host=host, port=port)
async def async_main():
parser = build_parser()
args = parser.parse_args()
if not args.mode:
parser.print_help()
return
if args.mode == "bootstrap":
await cmd_bootstrap(args)
elif args.mode == "watch":
await cmd_watch(args)
elif args.mode == "backfill":
await cmd_backfill(args)
elif args.mode == "audit":
await cmd_audit(args)
elif args.mode == "process":
await cmd_process(args)
elif args.mode == "serve":
cmd_serve(args)
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
asyncio.run(async_main())
if __name__ == "__main__":
main()