Signal/python-backend/api/normalizer.py
Kisa cf171a3f87 add Phase 1 security hardening, mapping confidence, audit logging, pilot docs
- lock CORS to Vercel domain via ALLOWED_ORIGINS env var (removes allow_origins=*)
- add X-API-Key header auth on /api/upload and /api/export
- normalizer: add mapping confidence (high/inferred), new aliases for Acct #,
  Member ID, External Patient Ref, DME Description, dispensedate; 63/63 CSV files pass
- coverage_calculator: add RULE_VERSION = "v0.1", rule_version on every CoverageResult
- main.py: audit logging wired on upload + export, rule_version + mapping_summary in response
- generate_samples.py: 25 CSV files now use 25 different real-world header formats
- add generate_10k.py for 10,000-patient synthetic dataset
- add tests/smoke_test.py (passes against local backend)
- add docs/pilot-guide-v1.md for Robert Robinson pilot onboarding
- add docs/daniel-pilot-readiness-whitepaper.md and .pdf

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 05:41:25 -04:00

259 lines
8.3 KiB
Python

"""
CSV header normalization for Signal.
Maps messy supplier CSV exports to canonical ShipmentRecord fields.
Tolerates header drift, alternative column names, and common date formats.
"""
import csv
import io
import re
from datetime import date, datetime
from typing import Optional
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.coverage_calculator import ShipmentRecord
HEADER_MAP: dict[str, list[str]] = {
"patient_id": [
"patient_id", "patientid", "patient id", "pt_id", "pt id",
"mrn", "account_number", "account number", "account_no",
"patient_account", "acct_no", "acct no", "acct #", "acct#",
"id", "patient", "member_id", "member id",
"external patient ref", "external_patient_ref", "external ref",
],
"device_type": [
"device_type", "device type", "device", "devicetype",
"product_type", "product type", "product", "item",
"item_description", "item description", "hcpcs_description",
"hcpcs description", "description", "product_name",
"dme", "dme description", "dme_description", "dme desc",
"equipment", "equipment description",
],
"shipment_date": [
"shipment_date", "shipment date", "ship_date", "ship date",
"dispense_date", "dispense date", "dispensedate",
"service_date", "service date",
"order_date", "order date", "date_of_service", "dos",
"fill_date", "fill date", "last_ship_date", "last ship date",
],
"quantity": [
"quantity", "qty", "units", "count", "qty_dispensed",
"units_dispensed", "quantity_dispensed", "qty_shipped",
],
"payer": [
"payer", "insurance", "insurance_name", "insurance name",
"plan", "plan_name", "plan name", "payer_name", "payer name",
"primary_payer", "primary payer", "ins_name", "carrier",
],
"component": [
"component", "item_type", "component_type", "type", "supply_type",
],
}
DEVICE_MAP: dict[str, str] = {
"dexcom g7": "dexcom_g7",
"dexcom_g7": "dexcom_g7",
"dexcomg7": "dexcom_g7",
"g7": "dexcom_g7",
"dexcom g6": "dexcom_g6",
"dexcom_g6": "dexcom_g6",
"dexcomg6": "dexcom_g6",
"g6": "dexcom_g6",
"freestyle libre 2": "freestyle_libre_2",
"freestyle_libre_2": "freestyle_libre_2",
"freestylelibre2": "freestyle_libre_2",
"libre 2": "freestyle_libre_2",
"libre2": "freestyle_libre_2",
"fsl2": "freestyle_libre_2",
"fs libre 2": "freestyle_libre_2",
"freestyle libre 3": "freestyle_libre_3",
"freestyle_libre_3": "freestyle_libre_3",
"freestylelibre3": "freestyle_libre_3",
"libre 3": "freestyle_libre_3",
"libre3": "freestyle_libre_3",
"fsl3": "freestyle_libre_3",
"fs libre 3": "freestyle_libre_3",
"omnipod 5": "omnipod_5",
"omnipod_5": "omnipod_5",
"omnipod5": "omnipod_5",
"omnipod": "omnipod_5",
"op5": "omnipod_5",
}
PAYER_MAP: dict[str, str] = {
"medicare part b": "medicare",
"medicare part a": "medicare",
"medicare advantage": "commercial",
"medicare": "medicare",
"cms": "medicare",
"medicaid": "medicaid",
"mcd": "medicaid",
"molina": "medicaid",
"centene": "medicaid",
"wellcare": "medicaid",
"bcbs": "commercial",
"blue cross": "commercial",
"blue shield": "commercial",
"aetna": "commercial",
"cigna": "commercial",
"unitedhealthcare": "commercial",
"united health": "commercial",
"uhc": "commercial",
"humana": "commercial",
"anthem": "commercial",
"united": "commercial",
}
DATE_FORMATS = [
"%Y-%m-%d",
"%m/%d/%Y",
"%m-%d-%Y",
"%d/%m/%Y",
"%m/%d/%y",
"%Y%m%d",
"%d-%b-%Y",
"%b %d, %Y",
"%B %d, %Y",
"%m/%d/%Y %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
]
def _normalize_key(s: str) -> str:
return s.strip().lower().replace("-", " ").replace("_", " ")
def _map_header(raw: str) -> Optional[str]:
key = _normalize_key(raw)
for canonical, aliases in HEADER_MAP.items():
if key in [_normalize_key(a) for a in aliases]:
return canonical
return None
def _map_header_with_confidence(raw: str) -> tuple[Optional[str], str]:
"""Return (canonical_field, confidence) where confidence is 'high' or 'inferred'."""
key = _normalize_key(raw)
for canonical, aliases in HEADER_MAP.items():
if key == _normalize_key(canonical):
return canonical, "high"
if key in [_normalize_key(a) for a in aliases]:
return canonical, "inferred"
return None, "unmapped"
def _parse_date(value: str) -> Optional[date]:
value = value.strip()
for fmt in DATE_FORMATS:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
return None
def _normalize_device(value: str) -> Optional[str]:
key = _normalize_key(value)
key_compact = re.sub(r"\s+", "", key)
for alias, canonical in DEVICE_MAP.items():
alias_compact = re.sub(r"\s+", "", alias)
if key == alias or key_compact == alias_compact:
return canonical
return None
def _normalize_payer(value: str) -> str:
key = _normalize_key(value)
# Longest-match first (payer_map keys are already ordered longest first for medicare)
for alias, canonical in PAYER_MAP.items():
if alias in key:
return canonical
return "commercial"
def normalize_csv(text: str) -> tuple[list[ShipmentRecord], list[str], dict]:
"""
Parse raw CSV text and return (records, skipped_reasons, mapping_summary).
Tolerates header drift and normalizes device/payer/date values.
mapping_summary format:
{
"mapped": {canonical_field: {"raw_header": str, "confidence": "high"|"inferred"}},
"unmapped_columns": [str],
"required_missing": [str],
}
"""
reader = csv.DictReader(io.StringIO(text.strip()))
if not reader.fieldnames:
return [], ["No headers found in file"], {}
column_map: dict[str, str] = {}
mapping_detail: dict[str, dict] = {}
unmapped_columns: list[str] = []
for raw_header in reader.fieldnames:
canonical, confidence = _map_header_with_confidence(raw_header)
if canonical:
column_map[raw_header] = canonical
mapping_detail[canonical] = {"raw_header": raw_header, "confidence": confidence}
else:
unmapped_columns.append(raw_header)
required_fields = {"patient_id", "device_type", "shipment_date"}
required_missing = [f for f in required_fields if f not in mapping_detail]
mapping_summary = {
"mapped": mapping_detail,
"unmapped_columns": unmapped_columns,
"required_missing": required_missing,
}
records: list[ShipmentRecord] = []
skipped: list[str] = []
for i, row in enumerate(reader, start=2): # noqa: B007
mapped: dict[str, str] = {}
for raw_h, canonical in column_map.items():
mapped[canonical] = (row.get(raw_h) or "").strip()
patient_id = mapped.get("patient_id", "").strip()
if not patient_id:
skipped.append(f"Row {i}: missing patient_id")
continue
raw_device = mapped.get("device_type", "")
device_type = _normalize_device(raw_device)
if not device_type:
skipped.append(f"Row {i} ({patient_id}): unrecognized device '{raw_device}'")
continue
raw_date = mapped.get("shipment_date", "")
shipment_date = _parse_date(raw_date)
if not shipment_date:
skipped.append(f"Row {i} ({patient_id}): unparseable date '{raw_date}'")
continue
raw_qty = mapped.get("quantity", "1")
try:
quantity = max(1, int(float(raw_qty)))
except (ValueError, TypeError):
quantity = 1
payer = _normalize_payer(mapped.get("payer", ""))
component = (mapped.get("component", "sensor") or "sensor").lower().strip()
if component not in ("sensor", "transmitter", "pod"):
component = "sensor"
records.append(ShipmentRecord(
patient_id=patient_id,
device_type=device_type,
shipment_date=shipment_date,
quantity=quantity,
payer=payer,
component=component,
))
return records, skipped, mapping_summary