"""Tests for MetaDB.""" import asyncio import os import tempfile import pytest import aiosqlite from aucourt_ingest.storage.meta_db import MetaDB from aucourt_ingest.models import FetchStatus @pytest.fixture def db_path(): fd, path = tempfile.mkstemp(suffix=".db") os.close(fd) yield path os.unlink(path) @pytest.fixture def db(db_path): return MetaDB(db_path) @pytest.mark.asyncio async def test_connect_creates_tables(db): await db.connect() async with db.db.execute("SELECT name FROM sqlite_master WHERE type='table'") as cur: tables = {row[0] for row in await cur.fetchall()} assert "documents" in tables assert "fetch_queue" in tables assert "source_state" in tables await db.close() @pytest.mark.asyncio async def test_insert_and_get_document(db): await db.connect() await db.insert_document("[2019] NSWSC 1234", "nsw_caselaw", "https://example.com/1") doc = await db.get_document("[2019] NSWSC 1234") assert doc is not None assert doc["source_id"] == "nsw_caselaw" assert doc["fetch_status"] == FetchStatus.PENDING assert doc["url"] == "https://example.com/1" await db.close() @pytest.mark.asyncio async def test_insert_idempotent(db): await db.connect() await db.insert_document("[2019] NSWSC 1234", "nsw_caselaw", "https://example.com/1") await db.insert_document("[2019] NSWSC 1234", "nsw_caselaw", "https://example.com/1") assert await db.total_documents() == 1 await db.close() @pytest.mark.asyncio async def test_status_transitions(db): await db.connect() doc_id = "[2019] NSWSC 1234" await db.insert_document(doc_id, "nsw_caselaw", "https://example.com/1") # pending → fetched await db.update_status(doc_id, FetchStatus.FETCHED) doc = await db.get_document(doc_id) assert doc["fetch_status"] == FetchStatus.FETCHED assert doc["fetch_timestamp"] is not None # fetched → parsed await db.update_status(doc_id, FetchStatus.PARSED) doc = await db.get_document(doc_id) assert doc["fetch_status"] == FetchStatus.PARSED assert doc["parse_timestamp"] is not None # parsed → embedded await db.update_status(doc_id, FetchStatus.EMBEDDED) doc = await db.get_document(doc_id) assert doc["fetch_status"] == FetchStatus.EMBEDDED assert doc["embed_timestamp"] is not None # embedded → graphed await db.update_status(doc_id, FetchStatus.GRAPHED) doc = await db.get_document(doc_id) assert doc["fetch_status"] == FetchStatus.GRAPHED assert doc["graph_timestamp"] is not None await db.close() @pytest.mark.asyncio async def test_status_with_error(db): await db.connect() doc_id = "[2019] NSWSC 1234" await db.insert_document(doc_id, "nsw_caselaw", "https://example.com/1") await db.update_status(doc_id, FetchStatus.FAILED, error_message="timeout") doc = await db.get_document(doc_id) assert doc["fetch_status"] == FetchStatus.FAILED assert doc["error_message"] == "timeout" await db.close() @pytest.mark.asyncio async def test_update_doc_meta(db): await db.connect() doc_id = "[2019] NSWSC 1234" await db.insert_document(doc_id, "nsw_caselaw", "https://example.com/1") await db.update_doc_meta(doc_id, char_count=15000, court="NSWSC", year=2019, matter_type="criminal") doc = await db.get_document(doc_id) assert doc["char_count"] == 15000 assert doc["court"] == "NSWSC" assert doc["year"] == 2019 assert doc["matter_type"] == "criminal" await db.close() @pytest.mark.asyncio async def test_get_documents_by_status(db): await db.connect() await db.insert_document("[2019] NSWSC 1", "nsw_caselaw", "https://example.com/1") await db.insert_document("[2019] NSWSC 2", "nsw_caselaw", "https://example.com/2") await db.insert_document("[2019] NSWSC 3", "nsw_caselaw", "https://example.com/3") await db.update_status("[2019] NSWSC 1", FetchStatus.FETCHED) await db.update_status("[2019] NSWSC 2", FetchStatus.FETCHED) fetched = await db.get_documents_by_status(FetchStatus.FETCHED) assert len(fetched) == 2 assert await db.count_by_status(FetchStatus.FETCHED) == 2 assert await db.count_by_status(FetchStatus.PENDING) == 1 await db.close() @pytest.mark.asyncio async def test_fetch_queue(db): await db.connect() # Enqueue with different priorities await db.enqueue("fedcourt", "https://low-pri.com", priority=10) await db.enqueue("fedcourt", "https://high-pri.com", priority=1) await db.enqueue("nsw_caselaw", "https://mid-pri.com", priority=5) assert await db.queue_size() == 3 # Dequeue should return highest priority first item = await db.dequeue() assert item["url"] == "https://high-pri.com" assert await db.queue_size() == 2 item = await db.dequeue() assert item["url"] == "https://mid-pri.com" item = await db.dequeue() assert item["url"] == "https://low-pri.com" assert await db.queue_size() == 0 # Empty queue returns None assert await db.dequeue() is None await db.close() @pytest.mark.asyncio async def test_source_state(db): await db.connect() await db.init_source("nsw_caselaw") state = await db.get_source_state("nsw_caselaw") assert state is not None assert state.docs_fetched == 0 assert state.docs_failed == 0 await db.increment_source_stats("nsw_caselaw", fetched=5, failed=1) state = await db.get_source_state("nsw_caselaw") assert state.docs_fetched == 5 assert state.docs_failed == 1 await db.update_source_poll("nsw_caselaw", etag='"abc123"') state = await db.get_source_state("nsw_caselaw") assert state.last_poll is not None assert state.last_rss_etag == '"abc123"' # Idempotent init await db.init_source("nsw_caselaw") await db.close() @pytest.mark.asyncio async def test_total_documents(db): await db.connect() assert await db.total_documents() == 0 await db.insert_document("[2019] NSWSC 1", "nsw_caselaw", "https://example.com/1") await db.insert_document("[2019] NSWSC 2", "nsw_caselaw", "https://example.com/2") assert await db.total_documents() == 2 await db.close() @pytest.mark.asyncio async def test_not_connected_raises(db): with pytest.raises(RuntimeError, match="not connected"): _ = db.db