From 4a0e043a6d21ac69161cd62feee76c56c6c6f365 Mon Sep 17 00:00:00 2001 From: Kisa Date: Fri, 29 May 2026 06:50:34 -0400 Subject: [PATCH] add phase 2 supabase persistence layer - supabase_client.py: lazy singleton client (no-ops when env vars absent) - persistence.py: persist_upload writes batch, source_files, normalized_records, mapping_decisions, report_runs; persist_export records export_files - schema.sql: 11-table schema with RLS + WORM rules for audit/raw tables - main.py: wire persist_upload/persist_export; add ExportRequest body model so export accepts {records, batch_id}; batch_id returned on upload response - api.js: add exportFromBackend helper passing batch_id through - requirements.txt: add supabase>=2.0.0 - smoke_test.py: update export call to new body format Co-Authored-By: Claude Sonnet 4.6 --- python-backend/api/main.py | 24 +++- python-backend/core/persistence.py | 175 +++++++++++++++++++++++++ python-backend/core/supabase_client.py | 28 ++++ python-backend/db/schema.sql | 175 +++++++++++++++++++++++++ requirements.txt | 1 + signal-ui/src/lib/api.js | 22 ++++ tests/smoke_test.py | 7 +- 7 files changed, 427 insertions(+), 5 deletions(-) create mode 100644 python-backend/core/persistence.py create mode 100644 python-backend/core/supabase_client.py create mode 100644 python-backend/db/schema.sql diff --git a/python-backend/api/main.py b/python-backend/api/main.py index c947c2a..2d2c7ab 100644 --- a/python-backend/api/main.py +++ b/python-backend/api/main.py @@ -20,6 +20,7 @@ if str(_backend_root) not in sys.path: from core.coverage_calculator import ShipmentRecord, calculate_batch from core.audit_logger import AuditAction, log_event +from core.persistence import persist_export, persist_upload from api.normalizer import normalize_csv app = FastAPI(title="Signal API", version="1.0.0", docs_url="/docs") @@ -102,6 +103,7 @@ class UploadResponse(BaseModel): skipped_reasons: list[str] stats: dict mapping_summary: dict + batch_id: Optional[str] = None def _build_reason(flag_val: str, days_until_end: int, days_until_visit: Optional[int]) -> str: @@ -197,6 +199,15 @@ async def upload_csv( log_event(AuditAction.CSV_INGEST, file.filename or "unknown", "demo_user", "success", "0.0.0.0", detail=f"{len(out)} records scored") + batch_id = persist_upload( + filename=file.filename or "unknown", + content_bytes=content, + shipment_records=records, + coverage_results=results, + skipped_count=len(skipped_reasons), + mapping_summary=mapping_summary, + ) + return UploadResponse( records=out, total=len(out), @@ -204,15 +215,22 @@ async def upload_csv( skipped_reasons=skipped_reasons[:20], stats=_compute_stats(out), mapping_summary=mapping_summary, + batch_id=batch_id, ) +class ExportRequest(BaseModel): + records: list[RecordOut] + batch_id: Optional[str] = None + + @app.post("/api/export") async def export_work_queue( - records: list[RecordOut], + body: ExportRequest, _auth: None = Depends(_require_api_key), ): """Generate a downloadable work-queue CSV from a list of scored records.""" + records = body.records output = io.StringIO() writer = csv.writer(output) writer.writerow([ @@ -243,8 +261,10 @@ async def export_work_queue( output.seek(0) today = date.today().isoformat() - log_event(AuditAction.WORKLIST_EXPORT, f"work-queue-{today}", "demo_user", + export_filename = f"signal-work-queue-{today}.csv" + log_event(AuditAction.WORKLIST_EXPORT, export_filename, "demo_user", "success", "0.0.0.0", detail=f"{len(records)} records exported") + persist_export(batch_id=body.batch_id, filename=export_filename, row_count=len(records)) return StreamingResponse( io.BytesIO(output.getvalue().encode("utf-8")), media_type="text/csv", diff --git a/python-backend/core/persistence.py b/python-backend/core/persistence.py new file mode 100644 index 0000000..6074e37 --- /dev/null +++ b/python-backend/core/persistence.py @@ -0,0 +1,175 @@ +""" +Supabase persistence for Signal upload batches, scored records, and report runs. + +All writes are best-effort: failures are logged but never surface to the API caller. +The core scoring pipeline works without Supabase (dev mode / env vars not set). +""" + +import hashlib +import logging +from datetime import date + +from core.supabase_client import get_client + +logger = logging.getLogger(__name__) + +DEMO_ORG_SLUG = "gaboro-pilot" +_demo_org_id: str | None = None + + +def _sha256(value: str) -> str: + return hashlib.sha256(value.encode()).hexdigest() + + +def _get_or_create_org() -> str | None: + global _demo_org_id + if _demo_org_id: + return _demo_org_id + + client = get_client() + if not client: + return None + + try: + result = client.table("organizations").select("id").eq("slug", DEMO_ORG_SLUG).execute() + if result.data: + _demo_org_id = result.data[0]["id"] + return _demo_org_id + + result = client.table("organizations").insert({ + "name": "Gaboro DME — Pilot", + "slug": DEMO_ORG_SLUG, + }).execute() + _demo_org_id = result.data[0]["id"] + logger.info(f"Created pilot org: {_demo_org_id}") + return _demo_org_id + + except Exception as e: + logger.error(f"Failed to get/create org: {e}") + return None + + +def persist_upload( + filename: str, + content_bytes: bytes, + shipment_records: list, + coverage_results: list, + skipped_count: int, + mapping_summary: dict, +) -> str | None: + """ + Persist one upload batch and all related records to Supabase. + Returns the batch_id UUID string, or None if persistence is unavailable. + """ + client = get_client() + if not client: + return None + + org_id = _get_or_create_org() + if not org_id: + return None + + try: + # 1. Upload batch + batch_res = client.table("upload_batches").insert({ + "org_id": org_id, + "filename": filename, + "row_count": len(coverage_results), + "skipped_count": skipped_count, + "status": "complete", + }).execute() + batch_id = batch_res.data[0]["id"] + + # 2. Source file metadata + content_hash = _sha256(content_bytes.decode("utf-8", errors="replace")) + client.table("source_files").insert({ + "batch_id": batch_id, + "filename": filename, + "content_hash": content_hash, + "byte_size": len(content_bytes), + }).execute() + + # 3. Normalized records — one row per scored patient + # shipment_records and coverage_results are same-indexed + qty_map = {sr.patient_id: sr.quantity for sr in shipment_records} + norm_rows = [] + for r in coverage_results: + flag_val = r.flag.value if hasattr(r.flag, "value") else str(r.flag) + norm_rows.append({ + "batch_id": batch_id, + "patient_id_hash": _sha256(r.patient_id), + "device_type": r.device_type, + "shipment_date": r.last_shipment_date.isoformat(), + "quantity": qty_map.get(r.patient_id, 1), + "payer": r.payer, + "component": r.component, + "coverage_status": flag_val, + "days_remaining": r.days_until_coverage_end, + "rule_version": r.rule_version, + }) + if norm_rows: + client.table("normalized_records").insert(norm_rows).execute() + + # 4. Mapping decisions — how each CSV header was resolved + mapping_rows = [] + for canonical, detail in mapping_summary.get("mapped", {}).items(): + mapping_rows.append({ + "batch_id": batch_id, + "raw_header": detail["raw_header"], + "canonical_field": canonical, + "confidence": detail["confidence"], + }) + for raw_h in mapping_summary.get("unmapped_columns", []): + mapping_rows.append({ + "batch_id": batch_id, + "raw_header": raw_h, + "canonical_field": None, + "confidence": "unmapped", + }) + if mapping_rows: + client.table("mapping_decisions").insert(mapping_rows).execute() + + # 5. Report run summary + flagged = sum( + 1 for r in coverage_results + if (r.flag.value if hasattr(r.flag, "value") else str(r.flag)) != "OK" + ) + client.table("report_runs").insert({ + "batch_id": batch_id, + "org_id": org_id, + "status": "complete", + "total_records": len(coverage_results), + "flagged_count": flagged, + }).execute() + + logger.info(f"Persisted batch {batch_id}: {len(coverage_results)} records, {flagged} flagged") + return batch_id + + except Exception as e: + logger.error(f"Persistence error on upload '{filename}': {e}") + return None + + +def persist_export(batch_id: str | None, filename: str, row_count: int) -> None: + """Record that a work queue CSV was exported. Best-effort.""" + if not batch_id: + return + + client = get_client() + if not client: + return + + try: + # Find the report_run for this batch + run_res = client.table("report_runs").select("id").eq("batch_id", batch_id).execute() + if not run_res.data: + return + run_id = run_res.data[0]["id"] + + client.table("export_files").insert({ + "report_run_id": run_id, + "filename": filename, + "row_count": row_count, + }).execute() + except Exception as e: + logger.error(f"Persistence error on export: {e}") diff --git a/python-backend/core/supabase_client.py b/python-backend/core/supabase_client.py new file mode 100644 index 0000000..c73d108 --- /dev/null +++ b/python-backend/core/supabase_client.py @@ -0,0 +1,28 @@ +import logging +import os + +logger = logging.getLogger(__name__) + +_client = None + + +def get_client(): + """Return a Supabase client or None if env vars are not set (dev mode).""" + global _client + if _client is not None: + return _client + + url = os.getenv("SUPABASE_URL", "") + key = os.getenv("SUPABASE_SERVICE_KEY", "") + + if not url or not key: + return None + + try: + from supabase import create_client + _client = create_client(url, key) + logger.info("Supabase client initialized") + except Exception as e: + logger.error(f"Supabase client init failed: {e}") + + return _client diff --git a/python-backend/db/schema.sql b/python-backend/db/schema.sql new file mode 100644 index 0000000..4f85d48 --- /dev/null +++ b/python-backend/db/schema.sql @@ -0,0 +1,175 @@ +-- Signal Phase 2 Schema +-- Run this in the Supabase SQL editor (Dashboard > SQL Editor > New query) +-- Safe to run multiple times — uses IF NOT EXISTS throughout + +create extension if not exists "uuid-ossp"; + +-- Organizations (DME supplier accounts) +create table if not exists organizations ( + id uuid primary key default uuid_generate_v4(), + name text not null, + slug text unique not null, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +-- Users (staff at each org) +create table if not exists users ( + id uuid primary key default uuid_generate_v4(), + org_id uuid not null references organizations(id) on delete cascade, + email text not null unique, + role text not null default 'staff', -- 'admin' | 'staff' + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +-- Upload batches (one per CSV upload event) +create table if not exists upload_batches ( + id uuid primary key default uuid_generate_v4(), + org_id uuid not null references organizations(id) on delete cascade, + uploaded_by uuid references users(id), + filename text not null, + row_count int not null default 0, + skipped_count int not null default 0, + status text not null default 'processing', -- 'processing' | 'complete' | 'failed' + created_at timestamptz not null default now() +); + +-- Source files (raw CSV metadata, WORM) +create table if not exists source_files ( + id uuid primary key default uuid_generate_v4(), + batch_id uuid not null references upload_batches(id) on delete cascade, + filename text not null, + content_hash text not null, -- SHA-256 of file bytes + byte_size int not null, + created_at timestamptz not null default now() +); + +-- Raw rows (original CSV row data before normalization, WORM) +create table if not exists raw_rows ( + id uuid primary key default uuid_generate_v4(), + batch_id uuid not null references upload_batches(id) on delete cascade, + row_number int not null, + raw_data jsonb not null, -- original column key/value pairs + created_at timestamptz not null default now() +); + +-- Normalized records (scored output) +create table if not exists normalized_records ( + id uuid primary key default uuid_generate_v4(), + batch_id uuid not null references upload_batches(id) on delete cascade, + patient_id_hash text not null, -- SHA-256 of patient_id — no raw PHI stored + device_type text not null, + shipment_date date not null, + quantity int not null default 1, + payer text not null, + component text not null default 'sensor', + coverage_status text not null, -- OUT_OF_COVERAGE | VISIT_DUE | REFILL_WINDOW | OK + days_remaining int, + reason text, + recommended_action text, + rule_version text not null, + created_at timestamptz not null default now() +); + +-- Mapping decisions (header-to-field mapping log per batch) +create table if not exists mapping_decisions ( + id uuid primary key default uuid_generate_v4(), + batch_id uuid not null references upload_batches(id) on delete cascade, + raw_header text not null, + canonical_field text, + confidence text not null, -- 'high' | 'inferred' | 'unmapped' + created_at timestamptz not null default now() +); + +-- Report runs (one per scored batch delivered to user) +create table if not exists report_runs ( + id uuid primary key default uuid_generate_v4(), + batch_id uuid not null references upload_batches(id) on delete cascade, + org_id uuid not null references organizations(id) on delete cascade, + generated_by uuid references users(id), + status text not null default 'complete', + total_records int not null default 0, + flagged_count int not null default 0, + created_at timestamptz not null default now() +); + +-- Report items (one row per patient record in the worklist) +create table if not exists report_items ( + id uuid primary key default uuid_generate_v4(), + report_run_id uuid not null references report_runs(id) on delete cascade, + normalized_record_id uuid not null references normalized_records(id), + patient_id_hash text not null, + status text not null, + days_remaining int, + reason text, + recommended_action text, + created_at timestamptz not null default now() +); + +-- Export files (downloaded work queue CSVs) +create table if not exists export_files ( + id uuid primary key default uuid_generate_v4(), + report_run_id uuid not null references report_runs(id) on delete cascade, + exported_by uuid references users(id), + filename text not null, + row_count int not null default 0, + created_at timestamptz not null default now() +); + +-- Audit events (WORM — append only, never update or delete) +create table if not exists audit_events ( + id uuid primary key default uuid_generate_v4(), + org_id uuid references organizations(id), + user_id uuid references users(id), + action text not null, -- 'upload' | 'export' | 'login' | 'view_report' + resource_type text, + resource_id uuid, + patient_id_hash text, + metadata jsonb, + ip_address text, + created_at timestamptz not null default now() +); + +-- Enable Row Level Security on all tables +alter table organizations enable row level security; +alter table users enable row level security; +alter table upload_batches enable row level security; +alter table source_files enable row level security; +alter table raw_rows enable row level security; +alter table normalized_records enable row level security; +alter table mapping_decisions enable row level security; +alter table report_runs enable row level security; +alter table report_items enable row level security; +alter table export_files enable row level security; +alter table audit_events enable row level security; + +-- WORM rules: audit_events, raw_rows, source_files are append-only +do $$ begin + if not exists (select 1 from pg_rules where rulename = 'audit_events_no_update') then + create rule audit_events_no_update as on update to audit_events do instead nothing; + end if; + if not exists (select 1 from pg_rules where rulename = 'audit_events_no_delete') then + create rule audit_events_no_delete as on delete to audit_events do instead nothing; + end if; + if not exists (select 1 from pg_rules where rulename = 'raw_rows_no_update') then + create rule raw_rows_no_update as on update to raw_rows do instead nothing; + end if; + if not exists (select 1 from pg_rules where rulename = 'raw_rows_no_delete') then + create rule raw_rows_no_delete as on delete to raw_rows do instead nothing; + end if; + if not exists (select 1 from pg_rules where rulename = 'source_files_no_update') then + create rule source_files_no_update as on update to source_files do instead nothing; + end if; + if not exists (select 1 from pg_rules where rulename = 'source_files_no_delete') then + create rule source_files_no_delete as on delete to source_files do instead nothing; + end if; +end $$; + +-- Indexes +create index if not exists idx_upload_batches_org on upload_batches(org_id); +create index if not exists idx_normalized_records_batch on normalized_records(batch_id); +create index if not exists idx_normalized_records_status on normalized_records(coverage_status); +create index if not exists idx_audit_events_org on audit_events(org_id); +create index if not exists idx_audit_events_action on audit_events(action); +create index if not exists idx_report_items_run on report_items(report_run_id); diff --git a/requirements.txt b/requirements.txt index e42679f..93ee358 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ fastapi>=0.111.0 uvicorn[standard]>=0.29.0 python-multipart>=0.0.9 pydantic>=2.0.0 +supabase>=2.0.0 diff --git a/signal-ui/src/lib/api.js b/signal-ui/src/lib/api.js index 3201b88..dea68e4 100644 --- a/signal-ui/src/lib/api.js +++ b/signal-ui/src/lib/api.js @@ -30,6 +30,28 @@ export async function uploadToBackend(file) { } } +/** + * Export a work queue CSV from the backend. + * @param {Array} records - scored RecordOut objects from the upload response + * @param {string|null} batchId - batch_id from the upload response (for audit trail) + */ +export async function exportFromBackend(records, batchId = null) { + try { + const resp = await fetch(`${BACKEND_URL}/api/export`, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(API_KEY ? { "X-API-Key": API_KEY } : {}), + }, + body: JSON.stringify({ records, batch_id: batchId }), + }); + if (!resp.ok) return null; + return resp.blob(); + } catch { + return null; + } +} + /** * Convert backend record shape to local record shape. */ diff --git a/tests/smoke_test.py b/tests/smoke_test.py index 825df72..74eb65e 100644 --- a/tests/smoke_test.py +++ b/tests/smoke_test.py @@ -73,10 +73,10 @@ def _post_file(path: Path) -> dict: return json.loads(data) -def _post_export(records: list) -> bytes: +def _post_export(records: list, batch_id: str | None = None) -> bytes: import http.client - body = json.dumps(records).encode() + body = json.dumps({"records": records, "batch_id": batch_id}).encode() conn = http.client.HTTPConnection("localhost", BACKEND_PORT, timeout=30) conn.request( "POST", @@ -151,7 +151,8 @@ def run() -> bool: # Test 4: export print("Exporting work queue...") - csv_bytes = _post_export(records) + batch_id = result.get("batch_id") + csv_bytes = _post_export(records, batch_id=batch_id) lines = csv_bytes.decode("utf-8").strip().splitlines() if len(lines) < 2: print(f"FAIL — /api/export returned fewer than 2 lines: {lines}")