Backend: JWT middleware validates Clerk tokens on every request, extracts org ID from claims, enforces org-scoped queries via Supabase RLS. Frontend: ClerkProvider wraps the app, auth gate blocks unauthenticated access, UserButton in header, token injected into every API call. Supabase production wired to trust Clerk JWTs via Third-Party Auth integration. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
195 lines
6.6 KiB
Python
195 lines
6.6 KiB
Python
"""
|
|
Supabase persistence for Signal upload batches, scored records, and report runs.
|
|
|
|
All writes are best-effort: failures are logged but never surface to the API caller.
|
|
The core scoring pipeline works without Supabase (dev mode / env vars not set).
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
from datetime import date
|
|
|
|
from core.supabase_client import get_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEMO_ORG_SLUG = "gaboro-pilot"
|
|
_demo_org_id: str | None = None
|
|
|
|
|
|
def _sha256(value: str) -> str:
|
|
return hashlib.sha256(value.encode()).hexdigest()
|
|
|
|
|
|
def _get_or_create_org(clerk_org_id: str | None = None) -> str | None:
|
|
"""
|
|
Look up the Supabase org UUID.
|
|
If clerk_org_id is provided (from a verified Clerk JWT), look up by that first.
|
|
Falls back to the demo slug for API-key / dev sessions.
|
|
"""
|
|
client = get_client()
|
|
if not client:
|
|
return None
|
|
|
|
try:
|
|
if clerk_org_id:
|
|
# Production path: look up by the Clerk org ID stored on the org record
|
|
result = client.table("organizations").select("id").eq("clerk_org_id", clerk_org_id).execute()
|
|
if result.data:
|
|
return result.data[0]["id"]
|
|
# Org exists in Clerk but not yet provisioned in Supabase — create it
|
|
result = client.table("organizations").insert({
|
|
"name": f"Org {clerk_org_id}",
|
|
"slug": clerk_org_id,
|
|
"clerk_org_id": clerk_org_id,
|
|
}).execute()
|
|
org_id = result.data[0]["id"]
|
|
logger.info(f"Auto-provisioned org for Clerk org {clerk_org_id}: {org_id}")
|
|
return org_id
|
|
|
|
# Fallback: demo slug (API key auth or dev mode)
|
|
global _demo_org_id
|
|
if _demo_org_id:
|
|
return _demo_org_id
|
|
result = client.table("organizations").select("id").eq("slug", DEMO_ORG_SLUG).execute()
|
|
if result.data:
|
|
_demo_org_id = result.data[0]["id"]
|
|
return _demo_org_id
|
|
result = client.table("organizations").insert({
|
|
"name": "Gaboro DME — Pilot",
|
|
"slug": DEMO_ORG_SLUG,
|
|
}).execute()
|
|
_demo_org_id = result.data[0]["id"]
|
|
logger.info(f"Created pilot org: {_demo_org_id}")
|
|
return _demo_org_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get/create org: {e}")
|
|
return None
|
|
|
|
|
|
def persist_upload(
|
|
filename: str,
|
|
content_bytes: bytes,
|
|
shipment_records: list,
|
|
coverage_results: list,
|
|
skipped_count: int,
|
|
mapping_summary: dict,
|
|
clerk_org_id: str | None = None,
|
|
) -> str | None:
|
|
"""
|
|
Persist one upload batch and all related records to Supabase.
|
|
Returns the batch_id UUID string, or None if persistence is unavailable.
|
|
"""
|
|
client = get_client()
|
|
if not client:
|
|
return None
|
|
|
|
org_id = _get_or_create_org(clerk_org_id=clerk_org_id)
|
|
if not org_id:
|
|
return None
|
|
|
|
try:
|
|
# 1. Upload batch
|
|
batch_res = client.table("upload_batches").insert({
|
|
"org_id": org_id,
|
|
"filename": filename,
|
|
"row_count": len(coverage_results),
|
|
"skipped_count": skipped_count,
|
|
"status": "complete",
|
|
}).execute()
|
|
batch_id = batch_res.data[0]["id"]
|
|
|
|
# 2. Source file metadata
|
|
content_hash = _sha256(content_bytes.decode("utf-8", errors="replace"))
|
|
client.table("source_files").insert({
|
|
"batch_id": batch_id,
|
|
"filename": filename,
|
|
"content_hash": content_hash,
|
|
"byte_size": len(content_bytes),
|
|
}).execute()
|
|
|
|
# 3. Normalized records — one row per scored patient
|
|
# shipment_records and coverage_results are same-indexed
|
|
qty_map = {sr.patient_id: sr.quantity for sr in shipment_records}
|
|
norm_rows = []
|
|
for r in coverage_results:
|
|
flag_val = r.flag.value if hasattr(r.flag, "value") else str(r.flag)
|
|
norm_rows.append({
|
|
"batch_id": batch_id,
|
|
"patient_id_hash": _sha256(r.patient_id),
|
|
"device_type": r.device_type,
|
|
"shipment_date": r.last_shipment_date.isoformat(),
|
|
"quantity": qty_map.get(r.patient_id, 1),
|
|
"payer": r.payer,
|
|
"component": r.component,
|
|
"coverage_status": flag_val,
|
|
"days_remaining": r.days_until_coverage_end,
|
|
"rule_version": r.rule_version,
|
|
})
|
|
if norm_rows:
|
|
client.table("normalized_records").insert(norm_rows).execute()
|
|
|
|
# 4. Mapping decisions — how each CSV header was resolved
|
|
mapping_rows = []
|
|
for canonical, detail in mapping_summary.get("mapped", {}).items():
|
|
mapping_rows.append({
|
|
"batch_id": batch_id,
|
|
"raw_header": detail["raw_header"],
|
|
"canonical_field": canonical,
|
|
"confidence": detail["confidence"],
|
|
})
|
|
for raw_h in mapping_summary.get("unmapped_columns", []):
|
|
mapping_rows.append({
|
|
"batch_id": batch_id,
|
|
"raw_header": raw_h,
|
|
"canonical_field": None,
|
|
"confidence": "unmapped",
|
|
})
|
|
if mapping_rows:
|
|
client.table("mapping_decisions").insert(mapping_rows).execute()
|
|
|
|
# 5. Report run summary
|
|
flagged = sum(
|
|
1 for r in coverage_results
|
|
if (r.flag.value if hasattr(r.flag, "value") else str(r.flag)) != "OK"
|
|
)
|
|
client.table("report_runs").insert({
|
|
"batch_id": batch_id,
|
|
"org_id": org_id,
|
|
"status": "complete",
|
|
"total_records": len(coverage_results),
|
|
"flagged_count": flagged,
|
|
}).execute()
|
|
|
|
logger.info(f"Persisted batch {batch_id}: {len(coverage_results)} records, {flagged} flagged")
|
|
return batch_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Persistence error on upload '{filename}': {e}")
|
|
return None
|
|
|
|
|
|
def persist_export(batch_id: str | None, filename: str, row_count: int) -> None:
|
|
"""Record that a work queue CSV was exported. Best-effort."""
|
|
if not batch_id:
|
|
return
|
|
|
|
client = get_client()
|
|
if not client:
|
|
return
|
|
|
|
try:
|
|
# Find the report_run for this batch
|
|
run_res = client.table("report_runs").select("id").eq("batch_id", batch_id).execute()
|
|
if not run_res.data:
|
|
return
|
|
run_id = run_res.data[0]["id"]
|
|
|
|
client.table("export_files").insert({
|
|
"report_run_id": run_id,
|
|
"filename": filename,
|
|
"row_count": row_count,
|
|
}).execute()
|
|
except Exception as e:
|
|
logger.error(f"Persistence error on export: {e}")
|