"""AuCourtIngest — entry point with mode dispatch.""" import argparse import asyncio import logging import signal import sys from pathlib import Path from aucourt_ingest.config import AppConfig logger = logging.getLogger(__name__) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="aucourt_ingest", description="Australian legal case ingestion pipeline", ) parser.add_argument("--config", default="config.toml", help="Path to config.toml") sub = parser.add_subparsers(dest="mode") # bootstrap p_boot = sub.add_parser("bootstrap", help="Historical bulk ingest") p_boot.add_argument("--source", help="Single source to bootstrap (omit for all)") p_boot.add_argument("--limit", type=int, default=0, help="Max documents (0=unlimited)") p_boot.add_argument("--pages", type=int, default=0, help="Max pages to discover (0=all)") p_boot.add_argument("--db", default="data/meta.db", help="MetaDB path") p_boot.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir") # watch p_watch = sub.add_parser("watch", help="Continuous RSS polling") p_watch.add_argument("--sources", nargs="*", help="Specific sources (omit for all)") p_watch.add_argument("--interval", type=int, default=300, help="Poll interval in seconds (default 300)") p_watch.add_argument("--db", default="data/meta.db", help="MetaDB path") p_watch.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir") # backfill p_back = sub.add_parser("backfill", help="Fill gaps in existing corpus") p_back.add_argument("--source", required=True, help="Source to backfill") p_back.add_argument("--from-year", type=int, required=True, help="Start year") p_back.add_argument("--to-year", type=int, required=True, help="End year") p_back.add_argument("--format", help="Document format filter (html, docx, pdf)") p_back.add_argument("--db", default="data/meta.db", help="MetaDB path") p_back.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir") # audit p_audit = sub.add_parser("audit", help="Re-process existing documents to update graph schema") p_audit.add_argument("--status", default="parsed", help="Re-process documents with this status (default: parsed)") p_audit.add_argument("--limit", type=int, default=0, help="Max documents (0=all)") p_audit.add_argument("--db", default="data/meta.db", help="MetaDB path") p_audit.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir") # process p_proc = sub.add_parser("process", help="Process fetched documents through the pipeline") p_proc.add_argument("--limit", type=int, default=0, help="Max documents to process (0=all)") p_proc.add_argument("--db", default="data/meta.db", help="MetaDB path") p_proc.add_argument("--raw-dir", default="data/raw", help="Raw document storage dir") # serve p_serve = sub.add_parser("serve", help="Start the read-only query API server") p_serve.add_argument("--host", default=None, help="Bind host (default from config)") p_serve.add_argument("--port", type=int, default=None, help="Bind port (default from config)") p_serve.add_argument("--graph-backend", default=None, help="Graph backend: memory|neo4j (default from config)") return parser async def cmd_bootstrap(args): config = AppConfig.load(args.config) from aucourt_ingest.storage.meta_db import MetaDB from aucourt_ingest.storage.doc_store import DocStore # Ensure data dirs exist Path(args.raw_dir).mkdir(parents=True, exist_ok=True) Path(args.db).parent.mkdir(parents=True, exist_ok=True) meta_db = MetaDB(args.db) await meta_db.connect() doc_store = DocStore(args.raw_dir) total = 0 sources_to_run = [args.source] if args.source else list(config.sources.keys()) for source_id in sources_to_run: if source_id not in config.sources: print(f"Unknown source: {source_id} (available: {list(config.sources.keys())})") continue source_config = config.sources[source_id] print(f"\n=== Bootstrap: {source_id} (limit={args.limit or 'unlimited'}) ===") if source_id == "nsw_caselaw": from aucourt_ingest.sources.nsw_caselaw import bootstrap_nsw_caselaw n = await bootstrap_nsw_caselaw(source_config, meta_db, doc_store, limit=args.limit, pages=args.pages) elif source_id == "fedcourt": from aucourt_ingest.sources.fedcourt import bootstrap_fedcourt n = await bootstrap_fedcourt(source_config, meta_db, doc_store, limit=args.limit) elif source_id == "highcourt": from aucourt_ingest.sources.highcourt import bootstrap_highcourt n = await bootstrap_highcourt(source_config, meta_db, doc_store, limit=args.limit) else: print(f" No bootstrap implemented for {source_id} yet") continue print(f" Fetched {n} documents") total += n await meta_db.close() print(f"\nTotal: {total} documents fetched") async def cmd_watch(args): """Continuous polling mode — discovers new documents from sources on a schedule.""" config = AppConfig.load(args.config) from aucourt_ingest.storage.meta_db import MetaDB from aucourt_ingest.storage.doc_store import DocStore Path(args.raw_dir).mkdir(parents=True, exist_ok=True) Path(args.db).parent.mkdir(parents=True, exist_ok=True) meta_db = MetaDB(args.db) await meta_db.connect() doc_store = DocStore(args.raw_dir) # Set up Telegram alerts if configured from aucourt_ingest.utils.telegram import TelegramAlert alert = TelegramAlert( bot_token=config.telegram.bot_token, chat_id=config.telegram.chat_id, enabled=config.telegram.enabled, ) sources = args.sources or list(config.sources.keys()) interval = args.interval print(f"Watch mode: sources={sources}, interval={interval}s") shutdown = asyncio.Event() def _signal_handler(): print("\nShutdown signal received") shutdown.set() loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, _signal_handler) except NotImplementedError: pass # Windows doesn't support add_signal_handler cycle = 0 while not shutdown.is_set(): cycle += 1 print(f"\n--- Watch cycle {cycle} ---") for source_id in sources: if source_id not in config.sources: continue source_config = config.sources[source_id] try: if source_id == "nsw_caselaw": from aucourt_ingest.sources.nsw_caselaw import bootstrap_nsw_caselaw n = await bootstrap_nsw_caselaw(source_config, meta_db, doc_store, limit=10) if n > 0: print(f" {source_id}: {n} new documents") elif source_id == "fedcourt" and source_config.rss_feed: from aucourt_ingest.sources.fedcourt import bootstrap_fedcourt n = await bootstrap_fedcourt(source_config, meta_db, doc_store, limit=10) if n > 0: print(f" {source_id}: {n} new documents") except Exception as e: logger.error(f"Watch error for {source_id}: {e}") await alert.alert("fetch_error", source_id=source_id, error=str(e)[:100]) # Process any queued items from aucourt_ingest.orchestrator import Orchestrator orch = Orchestrator(meta_db, doc_store, alert=alert) stats = await orch.process_queue(limit=20) if stats["processed"] > 0: print(f" Processed: {stats['processed']} docs (errors={stats['errors']})") await alert.close() # Wait for next cycle try: await asyncio.wait_for(shutdown.wait(), timeout=interval) except asyncio.TimeoutError: continue await meta_db.close() print("Watch mode stopped") async def cmd_backfill(args): """Backfill mode — re-discover documents for a source within a year range.""" config = AppConfig.load(args.config) from aucourt_ingest.storage.meta_db import MetaDB from aucourt_ingest.storage.doc_store import DocStore Path(args.raw_dir).mkdir(parents=True, exist_ok=True) Path(args.db).parent.mkdir(parents=True, exist_ok=True) meta_db = MetaDB(args.db) await meta_db.connect() doc_store = DocStore(args.raw_dir) source_id = args.source if source_id not in config.sources: print(f"Unknown source: {source_id}") await meta_db.close() return source_config = config.sources[source_id] total = 0 for year in range(args.from_year, args.to_year + 1): print(f" Backfill {source_id} year {year}...") if source_id == "nsw_caselaw": from aucourt_ingest.sources.nsw_caselaw import bootstrap_nsw_caselaw n = await bootstrap_nsw_caselaw(source_config, meta_db, doc_store, limit=0) elif source_id == "fedcourt": from aucourt_ingest.sources.fedcourt import bootstrap_fedcourt n = await bootstrap_fedcourt(source_config, meta_db, doc_store, limit=0) elif source_id == "highcourt": from aucourt_ingest.sources.highcourt import bootstrap_highcourt n = await bootstrap_highcourt(source_config, meta_db, doc_store, limit=0) else: print(f" No backfill implemented for {source_id}") break print(f" Year {year}: {n} documents") total += n await meta_db.close() print(f"Backfill complete: {total} documents") async def cmd_audit(args): """Audit mode — re-process existing documents with current schema.""" from aucourt_ingest.storage.meta_db import MetaDB from aucourt_ingest.storage.doc_store import DocStore from aucourt_ingest.orchestrator import Orchestrator Path(args.raw_dir).mkdir(parents=True, exist_ok=True) Path(args.db).parent.mkdir(parents=True, exist_ok=True) meta_db = MetaDB(args.db) await meta_db.connect() doc_store = DocStore(args.raw_dir) orch = Orchestrator(meta_db, doc_store) # Get documents with the target status docs = await meta_db.get_documents_by_status(args.status) print(f"Audit: re-processing {len(docs)} documents with status '{args.status}'") processed = 0 for doc in docs[:args.limit] if args.limit > 0 else docs: doc_id = doc["doc_id"] source_id = doc.get("source_id", "") stats = await orch.process_document(doc_id, source_id) processed += 1 if processed % 10 == 0: print(f" Audit progress: {processed}/{len(docs)}") await meta_db.close() print(f"Audit complete: {processed} documents re-processed") async def cmd_process(args): """Process mode — drain the fetch queue through the pipeline.""" from aucourt_ingest.storage.meta_db import MetaDB from aucourt_ingest.storage.doc_store import DocStore from aucourt_ingest.orchestrator import Orchestrator Path(args.raw_dir).mkdir(parents=True, exist_ok=True) Path(args.db).parent.mkdir(parents=True, exist_ok=True) meta_db = MetaDB(args.db) await meta_db.connect() doc_store = DocStore(args.raw_dir) config = AppConfig.load(args.config) from aucourt_ingest.utils.telegram import TelegramAlert alert = TelegramAlert( bot_token=config.telegram.bot_token, chat_id=config.telegram.chat_id, enabled=config.telegram.enabled, ) orch = Orchestrator(meta_db, doc_store, alert=alert) stats = await orch.process_queue(limit=args.limit) await meta_db.close() await alert.close() print(f"Processing complete: {stats}") def cmd_serve(args): """Start the FastAPI query server.""" config = AppConfig.load(args.config) host = args.host or config.server.host port = args.port or config.server.port backend = args.graph_backend or config.server.graph_backend max_tokens = config.server.default_max_tokens if backend == "memory": from aucourt_ingest.storage.in_memory_graph_db import InMemoryGraphDB graph_db = InMemoryGraphDB() elif backend == "neo4j": from aucourt_ingest.storage.graph_db import Neo4jGraphDB graph_db = Neo4jGraphDB( uri=config.storage.neo4j_uri, user=config.storage.neo4j_user, password=config.storage.neo4j_password, database=config.storage.neo4j_database, ) else: print(f"Unknown graph backend: {backend} (valid: memory, neo4j)") sys.exit(1) from aucourt_ingest.storage.vector_index import VectorIndex vector_index = VectorIndex(str(config.storage.chromadb_dir)) from aucourt_ingest.api.app import create_app app = create_app(graph_db, vector_index, max_tokens) import uvicorn print(f"Starting AuCourtIngest API on {host}:{port} (graph={backend})") uvicorn.run(app, host=host, port=port) async def async_main(args): if not args.mode: parser.print_help() return if args.mode == "bootstrap": await cmd_bootstrap(args) elif args.mode == "watch": await cmd_watch(args) elif args.mode == "backfill": await cmd_backfill(args) elif args.mode == "audit": await cmd_audit(args) elif args.mode == "process": await cmd_process(args) def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") parser = build_parser() args = parser.parse_args() if args.mode == "serve": cmd_serve(args) return asyncio.run(async_main(args)) if __name__ == "__main__": main()