aucourt-ingest/tests/test_pipeline.py
slothitude d77fe12cfc AuCourtIngest: complete 8-stage Australian legal case ingestion pipeline
Source layer (5 court sources), processing pipeline (parse/extract/chunk/embed/graph),
property graph with 8 node types, juror subgraph queries with 6 personas,
orchestrator with bootstrap/watch/backfill/audit/process modes, 170 tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 11:56:23 +10:00

432 lines
15 KiB
Python

"""End-to-end integration test: 3 documents through full pipeline to graph.
Uses mocked LLM and embedding clients. Tests the full flow:
RawDocument → DocParser → MetaExtractor → ChunkEngine → EmbedEngine → GraphBuilder
"""
from __future__ import annotations
import json
import pytest
from aucourt_ingest.models import (
CaseMeta, Chunk, RawDocument, FetchStatus, Verdict, MatterType,
)
from aucourt_ingest.processing.pipeline import FullPipeline
from aucourt_ingest.orchestrator import Orchestrator
from aucourt_ingest.storage.in_memory_graph_db import InMemoryGraphDB
# ── Mock LLM clients ──
class MockLLMClient:
"""Mock LLM that returns canned JSON for meta extraction and chunking."""
def _make_meta_response(self, text: str) -> str:
"""Generate meta JSON with MNC extracted from text."""
import re
mnc_match = re.search(r'\[(\d{4})\]\s+(\w+)\s+(\d+)', text)
if mnc_match:
mnc = f"[{mnc_match.group(1)}] {mnc_match.group(2)} {mnc_match.group(3)}"
name = f"R v Case{mnc_match.group(3)}"
else:
mnc = "[2023] NSWSC 999"
name = "R v Default"
return json.dumps({
"case_name": name,
"mnc": mnc,
"court": mnc.split()[1] if len(mnc.split()) > 1 else "NSWSC",
"date_delivered": "2023-03-15",
"jurisdiction": "NSW",
"matter_type": "criminal",
"judge": ["Judge Williams"],
"charges": ["murder", "assault"],
"verdict": "guilty",
"exoneration_flag": False,
"is_appeal": False,
"appeal_of": "",
"suppression_order": False,
"inadmissible_evidence": ["hearsay statement"],
})
CHUNK_RESPONSE = """```json
{
"sections": [
{"type": "opening", "start": 0, "end": 500, "title": "Opening"},
{"type": "testimony", "start": 500, "end": 2000, "title": "Witness Testimony", "speaker": "Dr Jones"},
{"type": "exhibit", "start": 2000, "end": 3000, "title": "Exhibit A"},
{"type": "closing", "start": 3000, "end": 3500, "title": "Closing Arguments"},
{"type": "judgment", "start": 3500, "end": 5000, "title": "Judgment"},
{"type": "sentence", "start": 5000, "end": 5500, "title": "Sentence"}
]
}
```"""
CHUNK_RESPONSE = """```json
{
"sections": [
{"type": "opening", "start": 0, "end": 500, "title": "Opening"},
{"type": "testimony", "start": 500, "end": 2000, "title": "Witness Testimony", "speaker": "Dr Jones"},
{"type": "exhibit", "start": 2000, "end": 3000, "title": "Exhibit A"},
{"type": "closing", "start": 3000, "end": 3500, "title": "Closing Arguments"},
{"type": "judgment", "start": 3500, "end": 5000, "title": "Judgment"},
{"type": "sentence", "start": 5000, "end": 5500, "title": "Sentence"}
]
}
```"""
async def create_message(self, prompt: str, system: str) -> str:
combined = (prompt + system).lower()
if "metadata" in combined or "case" in combined or "extract" in combined:
return self._make_meta_response(prompt)
if "section" in combined or "chunk" in combined or "boundary" in combined:
return self.CHUNK_RESPONSE
return '{"sections": []}'
class MockEmbeddingClient:
"""Mock embedding client that returns deterministic vectors."""
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
# Return deterministic 4-dim vectors
result = []
for text in texts:
vec = [(hash(text) % 1000) / 1000.0 for _ in range(4)]
import math
norm = math.sqrt(sum(v * v for v in vec)) or 1.0
result.append([v / norm for v in vec])
return result
# ── Mock storage ──
class MockMetaDB:
def __init__(self):
self._queue: list[dict] = []
self._status_updates: list[tuple[str, str, str | None]] = []
self._meta_updates: list[tuple[str, dict]] = []
async def connect(self): pass
async def close(self): pass
async def dequeue(self) -> dict | None:
return self._queue.pop(0) if self._queue else None
async def update_status(self, doc_id: str, status: str, error: str | None = None):
self._status_updates.append((doc_id, status, error))
async def update_doc_meta(self, doc_id: str, meta: dict):
self._meta_updates.append((doc_id, meta))
async def get_documents_by_status(self, status: str) -> list[dict]:
return []
class MockDocStore:
def __init__(self):
self._docs: dict[tuple[str, str], str] = {}
def store(self, source_id: str, doc_id: str, content: str):
self._docs[(source_id, doc_id)] = content
async def load(self, source_id: str, doc_id: str) -> str | None:
return self._docs.get((source_id, doc_id))
def exists(self, source_id: str, doc_id: str) -> bool:
return (source_id, doc_id) in self._docs
class MockVectorIndex:
def __init__(self):
self._stored: list[tuple[str, list[Chunk]]] = []
async def store_chunks(self, doc_id: str, chunks: list[Chunk]):
self._stored.append((doc_id, chunks))
# ── Test fixtures ──
SAMPLE_HTML = """
<html>
<head><title>[2023] NSWSC 456 — R v Smith</title></head>
<body>
<div class="judgment">
<h1>THE SUPREME COURT OF NEW SOUTH WALES</h1>
<h2>R v SMITH</h2>
<h3>[2023] NSWSC 456</h3>
<div class="judgment-body">
<p class="headnote">The accused was found guilty of murder after a trial before his Honour.</p>
<p>The Crown called three witnesses. Dr Jane Jones, a forensic pathologist, gave
expert testimony regarding the cause of death. She stated that the victim died
from blunt force trauma to the head consistent with the weapon recovered at
the scene.</p>
<p>The defence argued that the prosecution had failed to prove intent beyond
reasonable doubt. Counsel for the accused submitted that the evidence was
circumstantial and that the hearsay statement from the witness should be
excluded.</p>
<p>Exhibit A — a photograph of the scene showing the weapon placed near the
body. The photograph was authenticated by Senior Constable Brown.</p>
<p>Closing submissions were heard on 14 March 2023. The prosecution submitted
that the cumulative evidence established guilt beyond reasonable doubt.</p>
<p>His Honour found the accused GUILTY of murder. The conviction was based on
the forensic evidence, the witness testimony, and the exhibits presented during
the trial. The appeal against conviction was noted.</p>
<p>The accused was sentenced to imprisonment for a term of 25 years with a
non-parole period of 18 years.</p>
</div>
</div>
</body>
</html>
"""
def _enqueue_doc(meta_db: MockMetaDB, doc_store: MockDocStore,
doc_id: str, source_id: str, content: str):
doc_store.store(source_id, doc_id, content)
meta_db._queue.append({"doc_id": doc_id, "source_id": source_id})
# ── Integration tests ──
class TestFullPipeline:
"""Test FullPipeline with mocked LLM and embeddings."""
@pytest.fixture
def pipeline(self):
return FullPipeline(
llm_client=MockLLMClient(),
embedding_client=MockEmbeddingClient(),
)
@pytest.mark.asyncio
async def test_parse_html(self, pipeline):
raw = RawDocument(
source_id="nsw_caselaw",
doc_id="[2023] NSWSC 456",
url="https://example.com",
fetch_timestamp="2023-03-15",
raw_text=SAMPLE_HTML,
format="html",
)
text, fmt = await pipeline.parse(raw)
assert fmt == "html"
assert "SUPREME COURT" in text
assert "guilty of murder" in text
# Nav/script/header/footer should be stripped
assert "<script>" not in text
assert "<nav>" not in text
@pytest.mark.asyncio
async def test_extract_meta(self, pipeline):
meta = await pipeline.extract_meta("[2023] NSWSC 456", SAMPLE_HTML)
assert meta is not None
assert meta.case_name == "R v Case456"
assert meta.mnc == "[2023] NSWSC 456"
assert meta.court == "NSWSC"
assert len(meta.judge) == 1
assert "murder" in meta.charges
assert meta.inadmissible_evidence == ["hearsay statement"]
@pytest.mark.asyncio
async def test_chunk(self, pipeline):
chunks = await pipeline.chunk("[2023] NSWSC 456", SAMPLE_HTML)
assert len(chunks) > 0
for chunk in chunks:
assert chunk.doc_id == "[2023] NSWSC 456"
assert chunk.chunk_type in ["opening", "testimony", "exhibit",
"ruling", "closing", "judgment", "sentence"]
assert chunk.sequence >= 0
assert len(chunk.text) > 0
@pytest.mark.asyncio
async def test_embed(self, pipeline):
chunks = [
Chunk(chunk_id="c0", doc_id="doc1", chunk_type="opening",
sequence=0, text="Opening text"),
Chunk(chunk_id="c1", doc_id="doc1", chunk_type="testimony",
sequence=1, text="Testimony text"),
]
embedded = await pipeline.embed(chunks)
assert len(embedded) == 2
for chunk in embedded:
assert chunk.embedding is not None
assert len(chunk.embedding) > 0
@pytest.mark.asyncio
async def test_embed_empty_list(self, pipeline):
result = await pipeline.embed([])
assert result == []
class TestEndToEndOrchestration:
"""End-to-end: 3 documents through orchestrator with full pipeline to graph."""
@pytest.mark.asyncio
async def test_three_documents_to_graph(self):
meta_db = MockMetaDB()
doc_store = MockDocStore()
pipeline = FullPipeline(
llm_client=MockLLMClient(),
embedding_client=MockEmbeddingClient(),
)
graph_db = InMemoryGraphDB()
vector_index = MockVectorIndex()
# Enqueue 3 documents
for i, (doc_id, content) in enumerate([
("[2023] NSWSC 456", SAMPLE_HTML),
("[2023] NSWSC 789", SAMPLE_HTML.replace("R v Smith", "R v Jones").replace("456", "789")),
("[2023] NSWSC 101", SAMPLE_HTML.replace("R v Smith", "R v Brown").replace("456", "101")),
]):
_enqueue_doc(meta_db, doc_store, doc_id, "nsw_caselaw", content)
# Run orchestrator
orch = Orchestrator(meta_db, doc_store, graph_db, vector_index, pipeline)
stats = await orch.process_queue()
# Verify all 3 processed
assert stats["processed"] == 3
assert stats["errors"] == 0
assert stats["parsed"] == 3
assert stats["embedded"] == 3
assert stats["graphed"] == 3
# Verify graph has nodes
case_count = await graph_db.node_count("Case")
assert case_count == 3
judge_count = await graph_db.node_count("Judge")
assert judge_count == 1 # All 3 cases have same judge → deduped
charge_count = await graph_db.node_count("Charge")
assert charge_count == 2 # murder + assault
chunk_count = await graph_db.node_count("Chunk")
assert chunk_count >= 3 # At least 1 chunk per document
ruling_count = await graph_db.node_count("Ruling")
assert ruling_count == 1 # Same ruling deduped across cases
# Verify edges
heard_by = await graph_db.relationship_count("HEARD_BY")
assert heard_by >= 3
charged_with = await graph_db.relationship_count("CHARGED_WITH")
assert charged_with >= 3
has_ruling = await graph_db.relationship_count("HAS_RULING")
assert has_ruling >= 3
@pytest.mark.asyncio
async def test_metadata_stored_in_metadb(self):
meta_db = MockMetaDB()
doc_store = MockDocStore()
pipeline = FullPipeline(
llm_client=MockLLMClient(),
embedding_client=MockEmbeddingClient(),
)
_enqueue_doc(meta_db, doc_store, "[2023] NSWSC 456", "nsw_caselaw", SAMPLE_HTML)
orch = Orchestrator(meta_db, doc_store, pipeline=pipeline)
await orch.process_queue()
assert len(meta_db._meta_updates) == 1
_, meta = meta_db._meta_updates[0]
assert meta["mnc"] == "[2023] NSWSC 456"
assert "NSWSC" in meta.get("court", "")
@pytest.mark.asyncio
async def test_vector_index_receives_chunks(self):
meta_db = MockMetaDB()
doc_store = MockDocStore()
pipeline = FullPipeline(
llm_client=MockLLMClient(),
embedding_client=MockEmbeddingClient(),
)
graph_db = InMemoryGraphDB()
vector_index = MockVectorIndex()
_enqueue_doc(meta_db, doc_store, "[2023] NSWSC 456", "nsw_caselaw", SAMPLE_HTML)
orch = Orchestrator(meta_db, doc_store, graph_db, vector_index, pipeline)
await orch.process_queue()
assert len(vector_index._stored) == 1
doc_id, chunks = vector_index._stored[0]
assert doc_id == "[2023] NSWSC 456"
assert len(chunks) > 0
for chunk in chunks:
assert chunk.embedding is not None
@pytest.mark.asyncio
async def test_status_transitions_e2e(self):
meta_db = MockMetaDB()
doc_store = MockDocStore()
pipeline = FullPipeline(
llm_client=MockLLMClient(),
embedding_client=MockEmbeddingClient(),
)
graph_db = InMemoryGraphDB()
vector_index = MockVectorIndex()
_enqueue_doc(meta_db, doc_store, "[2023] NSWSC 456", "nsw_caselaw", SAMPLE_HTML)
orch = Orchestrator(meta_db, doc_store, graph_db, vector_index, pipeline)
await orch.process_queue()
statuses = [u[1] for u in meta_db._status_updates]
assert FetchStatus.PARSED in statuses
assert FetchStatus.EMBEDDED in statuses
assert FetchStatus.GRAPHED in statuses
@pytest.mark.asyncio
async def test_graph_queryable_after_processing(self):
"""Verify graph is queryable — can find nodes and traverse edges."""
meta_db = MockMetaDB()
doc_store = MockDocStore()
pipeline = FullPipeline(
llm_client=MockLLMClient(),
embedding_client=MockEmbeddingClient(),
)
graph_db = InMemoryGraphDB()
vector_index = MockVectorIndex()
_enqueue_doc(meta_db, doc_store, "[2023] NSWSC 456", "nsw_caselaw", SAMPLE_HTML)
orch = Orchestrator(meta_db, doc_store, graph_db, vector_index, pipeline)
await orch.process_queue()
# Find case node
case_nodes = await graph_db.query_nodes("Case", {"mnc": "[2023] NSWSC 456"})
assert len(case_nodes) == 1
case_id = case_nodes[0].node_id
# Traverse to judges
judge_rels = await graph_db.get_relationships(case_id, "HEARD_BY")
assert len(judge_rels) >= 1
judge_node = await graph_db.get_node(judge_rels[0].to_id)
assert judge_node is not None
assert judge_node.label == "Judge"
# Verify ruling (inadmissible evidence)
ruling_rels = await graph_db.get_relationships(case_id, "HAS_RULING")
assert len(ruling_rels) >= 1
ruling_node = await graph_db.get_node(ruling_rels[0].to_id)
assert ruling_node is not None
assert ruling_node.properties.get("outcome") == "inadmissible"