Signal/python-backend/api/normalizer.py
Kisa e3afd9038c feat: FastAPI backend + full deployment stack (Railway + Vercel)
- FastAPI backend: /health, /api/upload (CSV parse + score), /api/export (work queue CSV)
- CSV normalizer: tolerates 10+ header aliases per field, 8 date formats, all 5 devices, all major payers
- Python coverage_calculator wired as the authoritative scoring engine
- Frontend: backend-first upload with local fallback, export CSV wired, J. Sullivan placeholder removed
- Dockerfile + railway.toml for Railway deploy
- vercel.json for static frontend deploy
- Railway MCP installed for future sessions

Backend live: https://signal-api-production-91c2.up.railway.app
Frontend live: https://signal-ui-xi.vercel.app

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 19:01:35 -04:00

221 lines
6.7 KiB
Python

"""
CSV header normalization for Signal.
Maps messy supplier CSV exports to canonical ShipmentRecord fields.
Tolerates header drift, alternative column names, and common date formats.
"""
import csv
import io
import re
from datetime import date, datetime
from typing import Optional
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.coverage_calculator import ShipmentRecord
HEADER_MAP: dict[str, list[str]] = {
"patient_id": [
"patient_id", "patientid", "patient id", "pt_id", "pt id",
"mrn", "account_number", "account number", "account_no",
"patient_account", "acct_no", "id", "patient",
],
"device_type": [
"device_type", "device type", "device", "devicetype",
"product_type", "product type", "product", "item",
"item_description", "item description", "hcpcs_description",
"description", "product_name",
],
"shipment_date": [
"shipment_date", "shipment date", "ship_date", "ship date",
"dispense_date", "dispense date", "service_date", "service date",
"order_date", "order date", "date_of_service", "dos",
"fill_date", "fill date", "last_ship_date", "last ship date",
],
"quantity": [
"quantity", "qty", "units", "count", "qty_dispensed",
"units_dispensed", "quantity_dispensed", "qty_shipped",
],
"payer": [
"payer", "insurance", "insurance_name", "insurance name",
"plan", "plan_name", "plan name", "payer_name", "payer name",
"primary_payer", "primary payer", "ins_name", "carrier",
],
"component": [
"component", "item_type", "component_type", "type", "supply_type",
],
}
DEVICE_MAP: dict[str, str] = {
"dexcom g7": "dexcom_g7",
"dexcom_g7": "dexcom_g7",
"dexcomg7": "dexcom_g7",
"g7": "dexcom_g7",
"dexcom g6": "dexcom_g6",
"dexcom_g6": "dexcom_g6",
"dexcomg6": "dexcom_g6",
"g6": "dexcom_g6",
"freestyle libre 2": "freestyle_libre_2",
"freestyle_libre_2": "freestyle_libre_2",
"freestylelibre2": "freestyle_libre_2",
"libre 2": "freestyle_libre_2",
"libre2": "freestyle_libre_2",
"fsl2": "freestyle_libre_2",
"fs libre 2": "freestyle_libre_2",
"freestyle libre 3": "freestyle_libre_3",
"freestyle_libre_3": "freestyle_libre_3",
"freestylelibre3": "freestyle_libre_3",
"libre 3": "freestyle_libre_3",
"libre3": "freestyle_libre_3",
"fsl3": "freestyle_libre_3",
"fs libre 3": "freestyle_libre_3",
"omnipod 5": "omnipod_5",
"omnipod_5": "omnipod_5",
"omnipod5": "omnipod_5",
"omnipod": "omnipod_5",
"op5": "omnipod_5",
}
PAYER_MAP: dict[str, str] = {
"medicare part b": "medicare",
"medicare part a": "medicare",
"medicare advantage": "commercial",
"medicare": "medicare",
"cms": "medicare",
"medicaid": "medicaid",
"mcd": "medicaid",
"molina": "medicaid",
"centene": "medicaid",
"wellcare": "medicaid",
"bcbs": "commercial",
"blue cross": "commercial",
"blue shield": "commercial",
"aetna": "commercial",
"cigna": "commercial",
"unitedhealthcare": "commercial",
"united health": "commercial",
"uhc": "commercial",
"humana": "commercial",
"anthem": "commercial",
"united": "commercial",
}
DATE_FORMATS = [
"%Y-%m-%d",
"%m/%d/%Y",
"%m-%d-%Y",
"%d/%m/%Y",
"%m/%d/%y",
"%Y%m%d",
"%d-%b-%Y",
"%b %d, %Y",
"%B %d, %Y",
"%m/%d/%Y %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
]
def _normalize_key(s: str) -> str:
return s.strip().lower().replace("-", " ").replace("_", " ")
def _map_header(raw: str) -> Optional[str]:
key = _normalize_key(raw)
for canonical, aliases in HEADER_MAP.items():
if key in [_normalize_key(a) for a in aliases]:
return canonical
return None
def _parse_date(value: str) -> Optional[date]:
value = value.strip()
for fmt in DATE_FORMATS:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
return None
def _normalize_device(value: str) -> Optional[str]:
key = _normalize_key(value)
key_compact = re.sub(r"\s+", "", key)
for alias, canonical in DEVICE_MAP.items():
alias_compact = re.sub(r"\s+", "", alias)
if key == alias or key_compact == alias_compact:
return canonical
return None
def _normalize_payer(value: str) -> str:
key = _normalize_key(value)
# Longest-match first (payer_map keys are already ordered longest first for medicare)
for alias, canonical in PAYER_MAP.items():
if alias in key:
return canonical
return "commercial"
def normalize_csv(text: str) -> tuple[list[ShipmentRecord], list[str]]:
"""
Parse raw CSV text and return (records, skipped_reasons).
Tolerates header drift and normalizes device/payer/date values.
"""
reader = csv.DictReader(io.StringIO(text.strip()))
if not reader.fieldnames:
return [], ["No headers found in file"]
column_map: dict[str, str] = {}
for raw_header in reader.fieldnames:
canonical = _map_header(raw_header)
if canonical:
column_map[raw_header] = canonical
records: list[ShipmentRecord] = []
skipped: list[str] = []
for i, row in enumerate(reader, start=2):
mapped: dict[str, str] = {}
for raw_h, canonical in column_map.items():
mapped[canonical] = (row.get(raw_h) or "").strip()
patient_id = mapped.get("patient_id", "").strip()
if not patient_id:
skipped.append(f"Row {i}: missing patient_id")
continue
raw_device = mapped.get("device_type", "")
device_type = _normalize_device(raw_device)
if not device_type:
skipped.append(f"Row {i} ({patient_id}): unrecognized device '{raw_device}'")
continue
raw_date = mapped.get("shipment_date", "")
shipment_date = _parse_date(raw_date)
if not shipment_date:
skipped.append(f"Row {i} ({patient_id}): unparseable date '{raw_date}'")
continue
raw_qty = mapped.get("quantity", "1")
try:
quantity = max(1, int(float(raw_qty)))
except (ValueError, TypeError):
quantity = 1
payer = _normalize_payer(mapped.get("payer", ""))
component = (mapped.get("component", "sensor") or "sensor").lower().strip()
if component not in ("sensor", "transmitter", "pod"):
component = "sensor"
records.append(ShipmentRecord(
patient_id=patient_id,
device_type=device_type,
shipment_date=shipment_date,
quantity=quantity,
payer=payer,
component=component,
))
return records, skipped