add phase 2 supabase persistence layer

- supabase_client.py: lazy singleton client (no-ops when env vars absent)
- persistence.py: persist_upload writes batch, source_files, normalized_records,
  mapping_decisions, report_runs; persist_export records export_files
- schema.sql: 11-table schema with RLS + WORM rules for audit/raw tables
- main.py: wire persist_upload/persist_export; add ExportRequest body model
  so export accepts {records, batch_id}; batch_id returned on upload response
- api.js: add exportFromBackend helper passing batch_id through
- requirements.txt: add supabase>=2.0.0
- smoke_test.py: update export call to new body format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Kisa 2026-05-29 06:50:34 -04:00
parent cf171a3f87
commit 4a0e043a6d
7 changed files with 427 additions and 5 deletions

View file

@ -20,6 +20,7 @@ if str(_backend_root) not in sys.path:
from core.coverage_calculator import ShipmentRecord, calculate_batch
from core.audit_logger import AuditAction, log_event
from core.persistence import persist_export, persist_upload
from api.normalizer import normalize_csv
app = FastAPI(title="Signal API", version="1.0.0", docs_url="/docs")
@ -102,6 +103,7 @@ class UploadResponse(BaseModel):
skipped_reasons: list[str]
stats: dict
mapping_summary: dict
batch_id: Optional[str] = None
def _build_reason(flag_val: str, days_until_end: int, days_until_visit: Optional[int]) -> str:
@ -197,6 +199,15 @@ async def upload_csv(
log_event(AuditAction.CSV_INGEST, file.filename or "unknown", "demo_user",
"success", "0.0.0.0", detail=f"{len(out)} records scored")
batch_id = persist_upload(
filename=file.filename or "unknown",
content_bytes=content,
shipment_records=records,
coverage_results=results,
skipped_count=len(skipped_reasons),
mapping_summary=mapping_summary,
)
return UploadResponse(
records=out,
total=len(out),
@ -204,15 +215,22 @@ async def upload_csv(
skipped_reasons=skipped_reasons[:20],
stats=_compute_stats(out),
mapping_summary=mapping_summary,
batch_id=batch_id,
)
class ExportRequest(BaseModel):
records: list[RecordOut]
batch_id: Optional[str] = None
@app.post("/api/export")
async def export_work_queue(
records: list[RecordOut],
body: ExportRequest,
_auth: None = Depends(_require_api_key),
):
"""Generate a downloadable work-queue CSV from a list of scored records."""
records = body.records
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
@ -243,8 +261,10 @@ async def export_work_queue(
output.seek(0)
today = date.today().isoformat()
log_event(AuditAction.WORKLIST_EXPORT, f"work-queue-{today}", "demo_user",
export_filename = f"signal-work-queue-{today}.csv"
log_event(AuditAction.WORKLIST_EXPORT, export_filename, "demo_user",
"success", "0.0.0.0", detail=f"{len(records)} records exported")
persist_export(batch_id=body.batch_id, filename=export_filename, row_count=len(records))
return StreamingResponse(
io.BytesIO(output.getvalue().encode("utf-8")),
media_type="text/csv",

View file

@ -0,0 +1,175 @@
"""
Supabase persistence for Signal upload batches, scored records, and report runs.
All writes are best-effort: failures are logged but never surface to the API caller.
The core scoring pipeline works without Supabase (dev mode / env vars not set).
"""
import hashlib
import logging
from datetime import date
from core.supabase_client import get_client
logger = logging.getLogger(__name__)
DEMO_ORG_SLUG = "gaboro-pilot"
_demo_org_id: str | None = None
def _sha256(value: str) -> str:
return hashlib.sha256(value.encode()).hexdigest()
def _get_or_create_org() -> str | None:
global _demo_org_id
if _demo_org_id:
return _demo_org_id
client = get_client()
if not client:
return None
try:
result = client.table("organizations").select("id").eq("slug", DEMO_ORG_SLUG).execute()
if result.data:
_demo_org_id = result.data[0]["id"]
return _demo_org_id
result = client.table("organizations").insert({
"name": "Gaboro DME — Pilot",
"slug": DEMO_ORG_SLUG,
}).execute()
_demo_org_id = result.data[0]["id"]
logger.info(f"Created pilot org: {_demo_org_id}")
return _demo_org_id
except Exception as e:
logger.error(f"Failed to get/create org: {e}")
return None
def persist_upload(
filename: str,
content_bytes: bytes,
shipment_records: list,
coverage_results: list,
skipped_count: int,
mapping_summary: dict,
) -> str | None:
"""
Persist one upload batch and all related records to Supabase.
Returns the batch_id UUID string, or None if persistence is unavailable.
"""
client = get_client()
if not client:
return None
org_id = _get_or_create_org()
if not org_id:
return None
try:
# 1. Upload batch
batch_res = client.table("upload_batches").insert({
"org_id": org_id,
"filename": filename,
"row_count": len(coverage_results),
"skipped_count": skipped_count,
"status": "complete",
}).execute()
batch_id = batch_res.data[0]["id"]
# 2. Source file metadata
content_hash = _sha256(content_bytes.decode("utf-8", errors="replace"))
client.table("source_files").insert({
"batch_id": batch_id,
"filename": filename,
"content_hash": content_hash,
"byte_size": len(content_bytes),
}).execute()
# 3. Normalized records — one row per scored patient
# shipment_records and coverage_results are same-indexed
qty_map = {sr.patient_id: sr.quantity for sr in shipment_records}
norm_rows = []
for r in coverage_results:
flag_val = r.flag.value if hasattr(r.flag, "value") else str(r.flag)
norm_rows.append({
"batch_id": batch_id,
"patient_id_hash": _sha256(r.patient_id),
"device_type": r.device_type,
"shipment_date": r.last_shipment_date.isoformat(),
"quantity": qty_map.get(r.patient_id, 1),
"payer": r.payer,
"component": r.component,
"coverage_status": flag_val,
"days_remaining": r.days_until_coverage_end,
"rule_version": r.rule_version,
})
if norm_rows:
client.table("normalized_records").insert(norm_rows).execute()
# 4. Mapping decisions — how each CSV header was resolved
mapping_rows = []
for canonical, detail in mapping_summary.get("mapped", {}).items():
mapping_rows.append({
"batch_id": batch_id,
"raw_header": detail["raw_header"],
"canonical_field": canonical,
"confidence": detail["confidence"],
})
for raw_h in mapping_summary.get("unmapped_columns", []):
mapping_rows.append({
"batch_id": batch_id,
"raw_header": raw_h,
"canonical_field": None,
"confidence": "unmapped",
})
if mapping_rows:
client.table("mapping_decisions").insert(mapping_rows).execute()
# 5. Report run summary
flagged = sum(
1 for r in coverage_results
if (r.flag.value if hasattr(r.flag, "value") else str(r.flag)) != "OK"
)
client.table("report_runs").insert({
"batch_id": batch_id,
"org_id": org_id,
"status": "complete",
"total_records": len(coverage_results),
"flagged_count": flagged,
}).execute()
logger.info(f"Persisted batch {batch_id}: {len(coverage_results)} records, {flagged} flagged")
return batch_id
except Exception as e:
logger.error(f"Persistence error on upload '{filename}': {e}")
return None
def persist_export(batch_id: str | None, filename: str, row_count: int) -> None:
"""Record that a work queue CSV was exported. Best-effort."""
if not batch_id:
return
client = get_client()
if not client:
return
try:
# Find the report_run for this batch
run_res = client.table("report_runs").select("id").eq("batch_id", batch_id).execute()
if not run_res.data:
return
run_id = run_res.data[0]["id"]
client.table("export_files").insert({
"report_run_id": run_id,
"filename": filename,
"row_count": row_count,
}).execute()
except Exception as e:
logger.error(f"Persistence error on export: {e}")

View file

@ -0,0 +1,28 @@
import logging
import os
logger = logging.getLogger(__name__)
_client = None
def get_client():
"""Return a Supabase client or None if env vars are not set (dev mode)."""
global _client
if _client is not None:
return _client
url = os.getenv("SUPABASE_URL", "")
key = os.getenv("SUPABASE_SERVICE_KEY", "")
if not url or not key:
return None
try:
from supabase import create_client
_client = create_client(url, key)
logger.info("Supabase client initialized")
except Exception as e:
logger.error(f"Supabase client init failed: {e}")
return _client

View file

@ -0,0 +1,175 @@
-- Signal Phase 2 Schema
-- Run this in the Supabase SQL editor (Dashboard > SQL Editor > New query)
-- Safe to run multiple times — uses IF NOT EXISTS throughout
create extension if not exists "uuid-ossp";
-- Organizations (DME supplier accounts)
create table if not exists organizations (
id uuid primary key default uuid_generate_v4(),
name text not null,
slug text unique not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
-- Users (staff at each org)
create table if not exists users (
id uuid primary key default uuid_generate_v4(),
org_id uuid not null references organizations(id) on delete cascade,
email text not null unique,
role text not null default 'staff', -- 'admin' | 'staff'
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
-- Upload batches (one per CSV upload event)
create table if not exists upload_batches (
id uuid primary key default uuid_generate_v4(),
org_id uuid not null references organizations(id) on delete cascade,
uploaded_by uuid references users(id),
filename text not null,
row_count int not null default 0,
skipped_count int not null default 0,
status text not null default 'processing', -- 'processing' | 'complete' | 'failed'
created_at timestamptz not null default now()
);
-- Source files (raw CSV metadata, WORM)
create table if not exists source_files (
id uuid primary key default uuid_generate_v4(),
batch_id uuid not null references upload_batches(id) on delete cascade,
filename text not null,
content_hash text not null, -- SHA-256 of file bytes
byte_size int not null,
created_at timestamptz not null default now()
);
-- Raw rows (original CSV row data before normalization, WORM)
create table if not exists raw_rows (
id uuid primary key default uuid_generate_v4(),
batch_id uuid not null references upload_batches(id) on delete cascade,
row_number int not null,
raw_data jsonb not null, -- original column key/value pairs
created_at timestamptz not null default now()
);
-- Normalized records (scored output)
create table if not exists normalized_records (
id uuid primary key default uuid_generate_v4(),
batch_id uuid not null references upload_batches(id) on delete cascade,
patient_id_hash text not null, -- SHA-256 of patient_id — no raw PHI stored
device_type text not null,
shipment_date date not null,
quantity int not null default 1,
payer text not null,
component text not null default 'sensor',
coverage_status text not null, -- OUT_OF_COVERAGE | VISIT_DUE | REFILL_WINDOW | OK
days_remaining int,
reason text,
recommended_action text,
rule_version text not null,
created_at timestamptz not null default now()
);
-- Mapping decisions (header-to-field mapping log per batch)
create table if not exists mapping_decisions (
id uuid primary key default uuid_generate_v4(),
batch_id uuid not null references upload_batches(id) on delete cascade,
raw_header text not null,
canonical_field text,
confidence text not null, -- 'high' | 'inferred' | 'unmapped'
created_at timestamptz not null default now()
);
-- Report runs (one per scored batch delivered to user)
create table if not exists report_runs (
id uuid primary key default uuid_generate_v4(),
batch_id uuid not null references upload_batches(id) on delete cascade,
org_id uuid not null references organizations(id) on delete cascade,
generated_by uuid references users(id),
status text not null default 'complete',
total_records int not null default 0,
flagged_count int not null default 0,
created_at timestamptz not null default now()
);
-- Report items (one row per patient record in the worklist)
create table if not exists report_items (
id uuid primary key default uuid_generate_v4(),
report_run_id uuid not null references report_runs(id) on delete cascade,
normalized_record_id uuid not null references normalized_records(id),
patient_id_hash text not null,
status text not null,
days_remaining int,
reason text,
recommended_action text,
created_at timestamptz not null default now()
);
-- Export files (downloaded work queue CSVs)
create table if not exists export_files (
id uuid primary key default uuid_generate_v4(),
report_run_id uuid not null references report_runs(id) on delete cascade,
exported_by uuid references users(id),
filename text not null,
row_count int not null default 0,
created_at timestamptz not null default now()
);
-- Audit events (WORM — append only, never update or delete)
create table if not exists audit_events (
id uuid primary key default uuid_generate_v4(),
org_id uuid references organizations(id),
user_id uuid references users(id),
action text not null, -- 'upload' | 'export' | 'login' | 'view_report'
resource_type text,
resource_id uuid,
patient_id_hash text,
metadata jsonb,
ip_address text,
created_at timestamptz not null default now()
);
-- Enable Row Level Security on all tables
alter table organizations enable row level security;
alter table users enable row level security;
alter table upload_batches enable row level security;
alter table source_files enable row level security;
alter table raw_rows enable row level security;
alter table normalized_records enable row level security;
alter table mapping_decisions enable row level security;
alter table report_runs enable row level security;
alter table report_items enable row level security;
alter table export_files enable row level security;
alter table audit_events enable row level security;
-- WORM rules: audit_events, raw_rows, source_files are append-only
do $$ begin
if not exists (select 1 from pg_rules where rulename = 'audit_events_no_update') then
create rule audit_events_no_update as on update to audit_events do instead nothing;
end if;
if not exists (select 1 from pg_rules where rulename = 'audit_events_no_delete') then
create rule audit_events_no_delete as on delete to audit_events do instead nothing;
end if;
if not exists (select 1 from pg_rules where rulename = 'raw_rows_no_update') then
create rule raw_rows_no_update as on update to raw_rows do instead nothing;
end if;
if not exists (select 1 from pg_rules where rulename = 'raw_rows_no_delete') then
create rule raw_rows_no_delete as on delete to raw_rows do instead nothing;
end if;
if not exists (select 1 from pg_rules where rulename = 'source_files_no_update') then
create rule source_files_no_update as on update to source_files do instead nothing;
end if;
if not exists (select 1 from pg_rules where rulename = 'source_files_no_delete') then
create rule source_files_no_delete as on delete to source_files do instead nothing;
end if;
end $$;
-- Indexes
create index if not exists idx_upload_batches_org on upload_batches(org_id);
create index if not exists idx_normalized_records_batch on normalized_records(batch_id);
create index if not exists idx_normalized_records_status on normalized_records(coverage_status);
create index if not exists idx_audit_events_org on audit_events(org_id);
create index if not exists idx_audit_events_action on audit_events(action);
create index if not exists idx_report_items_run on report_items(report_run_id);

View file

@ -2,3 +2,4 @@ fastapi>=0.111.0
uvicorn[standard]>=0.29.0
python-multipart>=0.0.9
pydantic>=2.0.0
supabase>=2.0.0

View file

@ -30,6 +30,28 @@ export async function uploadToBackend(file) {
}
}
/**
* Export a work queue CSV from the backend.
* @param {Array} records - scored RecordOut objects from the upload response
* @param {string|null} batchId - batch_id from the upload response (for audit trail)
*/
export async function exportFromBackend(records, batchId = null) {
try {
const resp = await fetch(`${BACKEND_URL}/api/export`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(API_KEY ? { "X-API-Key": API_KEY } : {}),
},
body: JSON.stringify({ records, batch_id: batchId }),
});
if (!resp.ok) return null;
return resp.blob();
} catch {
return null;
}
}
/**
* Convert backend record shape to local record shape.
*/

View file

@ -73,10 +73,10 @@ def _post_file(path: Path) -> dict:
return json.loads(data)
def _post_export(records: list) -> bytes:
def _post_export(records: list, batch_id: str | None = None) -> bytes:
import http.client
body = json.dumps(records).encode()
body = json.dumps({"records": records, "batch_id": batch_id}).encode()
conn = http.client.HTTPConnection("localhost", BACKEND_PORT, timeout=30)
conn.request(
"POST",
@ -151,7 +151,8 @@ def run() -> bool:
# Test 4: export
print("Exporting work queue...")
csv_bytes = _post_export(records)
batch_id = result.get("batch_id")
csv_bytes = _post_export(records, batch_id=batch_id)
lines = csv_bytes.decode("utf-8").strip().splitlines()
if len(lines) < 2:
print(f"FAIL — /api/export returned fewer than 2 lines: {lines}")