""" coverage_calculator.py Signal CGM — STTIL Solutions Calculates CGM coverage status per patient based on shipment history and payer-specific wear-day rules. PHI CONTRACT: This module receives only: patient_id, device_type, shipment_date, quantity, payer. No names, SSNs, DOBs, or contact fields may be added to any function signature or data structure in this file. Flag types emitted: REFILL_WINDOW — patient is within the billable refill window VISIT_DUE — physician visit renewal is approaching or overdue OUT_OF_COVERAGE — coverage has lapsed; outreach required before shipment OK — no action needed at this time """ import json import logging from dataclasses import dataclass from datetime import date, timedelta from enum import Enum from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) PAYER_RULES_PATH = Path(__file__).parent.parent / "config" / "payer_rules.json" class CoverageFlag(str, Enum): REFILL_WINDOW = "REFILL_WINDOW" VISIT_DUE = "VISIT_DUE" OUT_OF_COVERAGE = "OUT_OF_COVERAGE" OK = "OK" @dataclass(frozen=True) class ShipmentRecord: """ Minimal shipment record. Only non-PHI fields allowed. patient_id: Supplier's internal MRN or account number. This is the sole crosswalk key — no real identity data here. """ patient_id: str device_type: str shipment_date: date quantity: int payer: str component: str = "sensor" # "sensor" | "transmitter" | "pod" @dataclass class CoverageResult: patient_id: str device_type: str payer: str component: str last_shipment_date: date coverage_end_date: date next_visit_due_date: Optional[date] flag: CoverageFlag days_until_coverage_end: int days_until_visit_due: Optional[int] priority_score: int # Higher = more urgent; used for worklist sort def _load_payer_rules() -> dict: with open(PAYER_RULES_PATH, "r") as f: return json.load(f) def _get_wear_days(rules: dict, device_type: str, component: str) -> int: """ Return wear days for a given device type and component. Raises ValueError for unknown device types. """ devices = rules.get("devices", {}) device = devices.get(device_type) if device is None: raise ValueError(f"Unknown device_type: '{device_type}'. " f"Valid types: {list(devices.keys())}") component_key = f"{component}_wear_days" wear_days = device.get(component_key) if wear_days is None: raise ValueError( f"Device '{device_type}' has no wear-day rule for component " f"'{component}'. Check payer_rules.json." ) return wear_days def _get_payer_config(rules: dict, payer: str) -> dict: payer_rules = rules.get("payer_rules", {}) return payer_rules.get(payer.lower(), payer_rules.get("default", {})) def _compute_priority(flag: CoverageFlag, days_until_end: int) -> int: """ Priority score for worklist ordering. Higher = act sooner. OUT_OF_COVERAGE patients are highest priority regardless of days. Within REFILL_WINDOW and VISIT_DUE, urgency increases as days decrease. """ if flag == CoverageFlag.OUT_OF_COVERAGE: return 1000 + abs(days_until_end) if flag == CoverageFlag.VISIT_DUE: return 500 + max(0, 90 - days_until_end) if flag == CoverageFlag.REFILL_WINDOW: return 200 + max(0, 30 - days_until_end) return 0 # OK def calculate_coverage(record: ShipmentRecord, as_of: Optional[date] = None) -> CoverageResult: """ Calculate coverage status for a single shipment record. Args: record: ShipmentRecord with non-PHI fields only. as_of: Date to evaluate against. Defaults to today. Returns: CoverageResult with flag, dates, and priority score. """ rules = _load_payer_rules() today = as_of or date.today() wear_days = _get_wear_days(rules, record.device_type, record.component) payer_config = _get_payer_config(rules, record.payer) # Coverage end = last shipment date + (wear days × quantity shipped) total_wear_days = wear_days * record.quantity coverage_end = record.shipment_date + timedelta(days=total_wear_days) days_until_end = (coverage_end - today).days refill_window_days = payer_config.get("refill_window_days", 30) visit_renewal_days = payer_config.get("visit_renewal_days") # Visit due date (Medicare: every 180 days from shipment) next_visit_due: Optional[date] = None days_until_visit: Optional[int] = None if visit_renewal_days: next_visit_due = record.shipment_date + timedelta(days=visit_renewal_days) days_until_visit = (next_visit_due - today).days # Determine flag — evaluated in priority order if days_until_end < 0: flag = CoverageFlag.OUT_OF_COVERAGE elif days_until_visit is not None and days_until_visit <= 30: flag = CoverageFlag.VISIT_DUE elif days_until_end <= refill_window_days: flag = CoverageFlag.REFILL_WINDOW else: flag = CoverageFlag.OK priority = _compute_priority(flag, days_until_end) return CoverageResult( patient_id=record.patient_id, device_type=record.device_type, payer=record.payer, component=record.component, last_shipment_date=record.shipment_date, coverage_end_date=coverage_end, next_visit_due_date=next_visit_due, flag=flag, days_until_coverage_end=days_until_end, days_until_visit_due=days_until_visit, priority_score=priority, ) def calculate_batch(records: list[ShipmentRecord], as_of: Optional[date] = None) -> list[CoverageResult]: """ Calculate coverage for a list of shipment records and return a worklist sorted by priority (highest first). Skips records that raise ValueError (unknown device/component) and logs a warning so the batch continues. """ results = [] for record in records: try: result = calculate_coverage(record, as_of=as_of) results.append(result) except ValueError as exc: logger.warning("Skipping record for patient_id hash — %s", exc) results.sort(key=lambda r: r.priority_score, reverse=True) return results