Signal/python-backend/core/coverage_calculator.py
Kisa cf171a3f87 add Phase 1 security hardening, mapping confidence, audit logging, pilot docs
- lock CORS to Vercel domain via ALLOWED_ORIGINS env var (removes allow_origins=*)
- add X-API-Key header auth on /api/upload and /api/export
- normalizer: add mapping confidence (high/inferred), new aliases for Acct #,
  Member ID, External Patient Ref, DME Description, dispensedate; 63/63 CSV files pass
- coverage_calculator: add RULE_VERSION = "v0.1", rule_version on every CoverageResult
- main.py: audit logging wired on upload + export, rule_version + mapping_summary in response
- generate_samples.py: 25 CSV files now use 25 different real-world header formats
- add generate_10k.py for 10,000-patient synthetic dataset
- add tests/smoke_test.py (passes against local backend)
- add docs/pilot-guide-v1.md for Robert Robinson pilot onboarding
- add docs/daniel-pilot-readiness-whitepaper.md and .pdf

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 05:41:25 -04:00

199 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
coverage_calculator.py
Signal CGM — STTIL Solutions
Calculates CGM coverage status per patient based on shipment history
and payer-specific wear-day rules.
PHI CONTRACT:
This module receives only: patient_id, device_type, shipment_date,
quantity, payer. No names, SSNs, DOBs, or contact fields may be
added to any function signature or data structure in this file.
Flag types emitted:
REFILL_WINDOW — patient is within the billable refill window
VISIT_DUE — physician visit renewal is approaching or overdue
OUT_OF_COVERAGE — coverage has lapsed; outreach required before shipment
OK — no action needed at this time
"""
import json
import logging
from dataclasses import dataclass
from datetime import date, timedelta
from enum import Enum
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
RULE_VERSION = "v0.1"
PAYER_RULES_PATH = Path(__file__).parent.parent / "config" / "payer_rules.json"
class CoverageFlag(str, Enum):
REFILL_WINDOW = "REFILL_WINDOW"
VISIT_DUE = "VISIT_DUE"
OUT_OF_COVERAGE = "OUT_OF_COVERAGE"
OK = "OK"
@dataclass(frozen=True)
class ShipmentRecord:
"""
Minimal shipment record. Only non-PHI fields allowed.
patient_id: Supplier's internal MRN or account number.
This is the sole crosswalk key — no real identity data here.
"""
patient_id: str
device_type: str
shipment_date: date
quantity: int
payer: str
component: str = "sensor" # "sensor" | "transmitter" | "pod"
@dataclass
class CoverageResult:
patient_id: str
device_type: str
payer: str
component: str
last_shipment_date: date
coverage_end_date: date
next_visit_due_date: Optional[date]
flag: CoverageFlag
days_until_coverage_end: int
days_until_visit_due: Optional[int]
priority_score: int # Higher = more urgent; used for worklist sort
rule_version: str = RULE_VERSION
def _load_payer_rules() -> dict:
with open(PAYER_RULES_PATH, "r") as f:
return json.load(f)
def _get_wear_days(rules: dict, device_type: str, component: str) -> int:
"""
Return wear days for a given device type and component.
Raises ValueError for unknown device types.
"""
devices = rules.get("devices", {})
device = devices.get(device_type)
if device is None:
raise ValueError(f"Unknown device_type: '{device_type}'. "
f"Valid types: {list(devices.keys())}")
component_key = f"{component}_wear_days"
wear_days = device.get(component_key)
if wear_days is None:
raise ValueError(
f"Device '{device_type}' has no wear-day rule for component "
f"'{component}'. Check payer_rules.json."
)
return wear_days
def _get_payer_config(rules: dict, payer: str) -> dict:
payer_rules = rules.get("payer_rules", {})
return payer_rules.get(payer.lower(), payer_rules.get("default", {}))
def _compute_priority(flag: CoverageFlag, days_until_end: int) -> int:
"""
Priority score for worklist ordering. Higher = act sooner.
OUT_OF_COVERAGE patients are highest priority regardless of days.
Within REFILL_WINDOW and VISIT_DUE, urgency increases as days decrease.
"""
if flag == CoverageFlag.OUT_OF_COVERAGE:
return 1000 + abs(days_until_end)
if flag == CoverageFlag.VISIT_DUE:
return 500 + max(0, 90 - days_until_end)
if flag == CoverageFlag.REFILL_WINDOW:
return 200 + max(0, 30 - days_until_end)
return 0 # OK
def calculate_coverage(record: ShipmentRecord,
as_of: Optional[date] = None) -> CoverageResult:
"""
Calculate coverage status for a single shipment record.
Args:
record: ShipmentRecord with non-PHI fields only.
as_of: Date to evaluate against. Defaults to today.
Returns:
CoverageResult with flag, dates, and priority score.
"""
rules = _load_payer_rules()
today = as_of or date.today()
wear_days = _get_wear_days(rules, record.device_type, record.component)
payer_config = _get_payer_config(rules, record.payer)
# Coverage end = last shipment date + (wear days × quantity shipped)
total_wear_days = wear_days * record.quantity
coverage_end = record.shipment_date + timedelta(days=total_wear_days)
days_until_end = (coverage_end - today).days
refill_window_days = payer_config.get("refill_window_days", 30)
visit_renewal_days = payer_config.get("visit_renewal_days")
# Visit due date (Medicare: every 180 days from shipment)
next_visit_due: Optional[date] = None
days_until_visit: Optional[int] = None
if visit_renewal_days:
next_visit_due = record.shipment_date + timedelta(days=visit_renewal_days)
days_until_visit = (next_visit_due - today).days
# Determine flag — evaluated in priority order
if days_until_end < 0:
flag = CoverageFlag.OUT_OF_COVERAGE
elif days_until_visit is not None and days_until_visit <= 30:
flag = CoverageFlag.VISIT_DUE
elif days_until_end <= refill_window_days:
flag = CoverageFlag.REFILL_WINDOW
else:
flag = CoverageFlag.OK
priority = _compute_priority(flag, days_until_end)
return CoverageResult(
patient_id=record.patient_id,
device_type=record.device_type,
payer=record.payer,
component=record.component,
last_shipment_date=record.shipment_date,
coverage_end_date=coverage_end,
next_visit_due_date=next_visit_due,
flag=flag,
days_until_coverage_end=days_until_end,
days_until_visit_due=days_until_visit,
priority_score=priority,
)
def calculate_batch(records: list[ShipmentRecord],
as_of: Optional[date] = None) -> list[CoverageResult]:
"""
Calculate coverage for a list of shipment records and return a
worklist sorted by priority (highest first).
Skips records that raise ValueError (unknown device/component) and
logs a warning so the batch continues.
"""
results = []
for record in records:
try:
result = calculate_coverage(record, as_of=as_of)
results.append(result)
except ValueError as exc:
logger.warning("Skipping record for patient_id hash — %s", exc)
results.sort(key=lambda r: r.priority_score, reverse=True)
return results