diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..3a999cb --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,6 @@ +# License + +© 2026 STTIL Solutions LLC. All rights reserved. + +This software is proprietary and may not be copied, modified, or distributed +without explicit written permission from STTIL Solutions LLC. diff --git a/README.md b/README.md new file mode 100644 index 0000000..4020116 --- /dev/null +++ b/README.md @@ -0,0 +1,83 @@ +# Signal CGM powered by STTIL Solutions + +B2B CGM coverage worklist tool for DMEPOS suppliers. Ingests CSV shipment data +(Brightree/WellSky exports), calculates coverage expiration per patient using +device wear-day rules, and produces a prioritized worklist for proactive outreach +— so small DME teams act before claims deny, not after. + +**Self-hosted. Data never leaves the supplier network.** + +--- + +## What It Does + +Most DMEPOS suppliers manage CGM coverage reactively: a claim denies, then staff +scramble to appeal. Signal CGM flips that. The system watches coverage windows +continuously and surfaces patients approaching expiration before the denial +condition exists. + +- Ingests shipment CSV from Brightree or WellSky +- Calculates coverage expiration per patient per device using payer-specific + wear-day rules +- Flags each patient: `REFILL_WINDOW`, `VISIT_DUE`, `OUT_OF_COVERAGE`, or `OK` +- Delivers a prioritized worklist to DME staff via encrypted email +- Staff handle outreach locally — Signal CGM never contacts patients directly + +## Stack + +| Layer | Technology | +|-------|-----------| +| Backend | Python / FastAPI | +| Database | PostgreSQL (encrypted at rest) | +| Orchestration | n8n (self-hosted, 24-hour batch trigger) | +| Notifications | Mailcow (self-hosted SMTP — staff email only) | +| Hosting | Hostinger VPS — data stays on-prem | + +## PHI Architecture + +Signal CGM is designed to minimize PHI surface area: + +- **Sole crosswalk key:** `patient_id` (the supplier's internal MRN or account + number). No names, SSNs, DOBs, or contact information enter the system. +- DME staff maintain the `patient_id` ↔ real identity mapping in their own + systems (Brightree, EHR, etc.). +- The calculation layer sees: `patient_id`, `device_type`, `shipment_date`, + `quantity`, `payer` — nothing else. +- All audit logs hash `patient_id` before storage. Raw identifiers never appear + in logs. + +## Coverage Flag Logic + +| Flag | Meaning | +|------|---------| +| `REFILL_WINDOW` | Patient is within the refillable window — safe to ship | +| `VISIT_DUE` | Physician visit renewal is approaching (Medicare: 180 days) | +| `OUT_OF_COVERAGE` | Coverage has lapsed — outreach required before next shipment | +| `OK` | No action needed at this time | + +## Directory Structure + +``` +signal-cgm/ +├── python-backend/ +│ ├── core/ +│ │ ├── coverage_calculator.py # Coverage clock logic +│ │ ├── audit_logger.py # PHI-safe audit logging +│ │ └── db_models.py # PostgreSQL models +│ └── config/ +│ └── payer_rules.json # Wear-day rules by device and payer +├── n8n-workflows/ # n8n batch trigger exports +└── CLAUDE.md # Active dev context +``` + +## BAA Status (Level 1) + +| Vendor | BAA Required | Status | +|--------|-------------|--------| +| Hostinger VPS | Yes — PHI host | Pending | +| Anthropic API | Only if AI layer touches PHI | Not applicable (Level 1) | +| All other components | Self-hosted — operator is STTIL | N/A | + +--- + +© 2026 STTIL Solutions LLC. Proprietary software — see LICENSE.md. diff --git a/python-backend/config/payer_rules.json b/python-backend/config/payer_rules.json new file mode 100644 index 0000000..72e8267 --- /dev/null +++ b/python-backend/config/payer_rules.json @@ -0,0 +1,63 @@ +{ + "_comment": "Wear-day rules by device type and payer. Used by coverage_calculator.py. Update when payer LCD policies change.", + "devices": { + "dexcom_g6": { + "display_name": "Dexcom G6", + "sensor_wear_days": 10, + "transmitter_wear_days": 90, + "components": ["sensor", "transmitter"] + }, + "dexcom_g7": { + "display_name": "Dexcom G7", + "sensor_wear_days": 10, + "components": ["sensor"] + }, + "freestyle_libre_2": { + "display_name": "FreeStyle Libre 2", + "sensor_wear_days": 14, + "components": ["sensor"] + }, + "freestyle_libre_3": { + "display_name": "FreeStyle Libre 3", + "sensor_wear_days": 14, + "components": ["sensor"] + }, + "omnipod_5": { + "display_name": "Omnipod 5", + "pod_wear_days": 3, + "sensor_wear_days": 14, + "components": ["pod", "sensor"], + "_note": "Sensor wear days apply to the paired CGM (typically Dexcom G6 or Libre). Pod is 3 days. Track components separately." + } + }, + "payer_rules": { + "medicare": { + "visit_renewal_days": 180, + "refill_window_days": 30, + "_note": "Medicare requires face-to-face physician visit every 6 months for continued CGM coverage. Refill window opens 30 days before coverage end.", + "covered_devices": [ + "dexcom_g6", + "dexcom_g7", + "freestyle_libre_2", + "freestyle_libre_3" + ] + }, + "medicaid": { + "visit_renewal_days": null, + "refill_window_days": 30, + "_note": "Medicaid rules vary by state. Renewal cadence not enforced at this layer — flag for manual review.", + "covered_devices": [] + }, + "commercial": { + "visit_renewal_days": null, + "refill_window_days": 30, + "_note": "Commercial payer rules vary by plan. Refill window is a conservative default.", + "covered_devices": [] + }, + "default": { + "visit_renewal_days": null, + "refill_window_days": 30, + "covered_devices": [] + } + } +} diff --git a/python-backend/core/audit_logger.py b/python-backend/core/audit_logger.py new file mode 100644 index 0000000..0a66f98 --- /dev/null +++ b/python-backend/core/audit_logger.py @@ -0,0 +1,187 @@ +""" +audit_logger.py +Signal CGM — STTIL Solutions + +PHI-safe audit logging for all system actions. + +RETENTION POLICY: + Audit logs must be retained for a minimum of six (6) years from the + date of creation, per 45 CFR § 164.530(j) (HIPAA administrative + safeguards) and applicable state regulations. Retention enforcement + is the responsibility of the hosting operator (STTIL Solutions or + contracted DME supplier). Do not delete logs without documented + authorization. + +PHI CONTRACT: + Raw patient_id values MUST NOT appear in any log entry. All identity + fields are SHA-256 hashed before storage. IP addresses are also hashed + to limit incidental PII exposure. The hash function is one-way — this + module cannot reverse a hash to recover an identifier. + +Log fields: + timestamp ISO-8601 UTC datetime of the event + user_id_hash SHA-256 of the staff user's internal ID + action Verb describing the operation (see AuditAction) + resource_hash SHA-256 of the affected resource identifier + (patient_id, file name, record ID, etc.) + outcome "success" | "failure" + ip_address_hash SHA-256 of the requester's IP address + detail Optional free-text note (must not contain PHI) +""" + +import hashlib +import json +import logging +import os +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + +logger = logging.getLogger(__name__) + + +class AuditAction(str, Enum): + CSV_INGEST = "csv_ingest" + COVERAGE_CALC = "coverage_calc" + WORKLIST_EXPORT = "worklist_export" + EMAIL_DISPATCH = "email_dispatch" + USER_LOGIN = "user_login" + USER_LOGOUT = "user_logout" + RULE_UPDATE = "rule_update" + RECORD_VIEW = "record_view" + + +def _hash(value: str) -> str: + """ + SHA-256 hash of a string value. Returns a hex digest. + Never pass a raw patient_id, SSN, or IP to any log sink — use this first. + """ + if not value: + return "" + return hashlib.sha256(value.encode("utf-8")).hexdigest() + + +def _utc_now() -> str: + return datetime.now(tz=timezone.utc).isoformat() + + +def build_audit_entry( + action: AuditAction, + resource_id: str, + user_id: str, + outcome: str, + ip_address: str, + detail: Optional[str] = None, +) -> dict: + """ + Build a single audit log entry dict with all identity fields hashed. + + Args: + action: The operation being logged (AuditAction enum). + resource_id: The raw resource identifier (patient_id, file name, etc.). + This value is hashed before inclusion — do not pre-hash. + user_id: The raw internal staff user ID. + Hashed before inclusion. + outcome: "success" or "failure". + ip_address: The requester's IP address string. + Hashed before inclusion. + detail: Optional context note. MUST NOT contain PHI. + + Returns: + Dict suitable for JSON serialization and PostgreSQL insertion. + """ + if outcome not in ("success", "failure"): + raise ValueError(f"outcome must be 'success' or 'failure', got: '{outcome}'") + + entry = { + "timestamp": _utc_now(), + "user_id_hash": _hash(user_id), + "action": action.value if isinstance(action, AuditAction) else str(action), + "resource_hash": _hash(resource_id), + "outcome": outcome, + "ip_address_hash": _hash(ip_address), + } + if detail is not None: + entry["detail"] = detail + + return entry + + +def log_event( + action: AuditAction, + resource_id: str, + user_id: str, + outcome: str, + ip_address: str, + detail: Optional[str] = None, + db_conn=None, +) -> dict: + """ + Build an audit entry and write it to the configured sink(s). + + Sinks (applied in order): + 1. Python logger (always — goes to stdout/file handler configured + by the application). + 2. PostgreSQL audit_log table (if db_conn is provided). + + Args: + db_conn: An active psycopg2 or asyncpg connection. If None, only + the logger sink is used (useful for unit tests). + + Returns: + The audit entry dict that was written. + """ + entry = build_audit_entry( + action=action, + resource_id=resource_id, + user_id=user_id, + outcome=outcome, + ip_address=ip_address, + detail=detail, + ) + + # Sink 1: structured log line + logger.info("AUDIT %s", json.dumps(entry)) + + # Sink 2: PostgreSQL (synchronous psycopg2 path) + if db_conn is not None: + _write_to_postgres(db_conn, entry) + + return entry + + +def _write_to_postgres(conn, entry: dict) -> None: + """ + Insert an audit entry into the audit_log table. + + Expected table schema (see db_models.py): + CREATE TABLE audit_log ( + id BIGSERIAL PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL, + user_id_hash TEXT NOT NULL, + action TEXT NOT NULL, + resource_hash TEXT NOT NULL, + outcome TEXT NOT NULL, + ip_address_hash TEXT NOT NULL, + detail TEXT + ); + """ + sql = """ + INSERT INTO audit_log + (timestamp, user_id_hash, action, resource_hash, + outcome, ip_address_hash, detail) + VALUES + (%(timestamp)s, %(user_id_hash)s, %(action)s, %(resource_hash)s, + %(outcome)s, %(ip_address_hash)s, %(detail)s) + """ + with conn.cursor() as cur: + cur.execute(sql, { + "timestamp": entry["timestamp"], + "user_id_hash": entry["user_id_hash"], + "action": entry["action"], + "resource_hash": entry["resource_hash"], + "outcome": entry["outcome"], + "ip_address_hash": entry["ip_address_hash"], + "detail": entry.get("detail"), + }) + conn.commit() diff --git a/python-backend/core/coverage_calculator.py b/python-backend/core/coverage_calculator.py new file mode 100644 index 0000000..750b7d2 --- /dev/null +++ b/python-backend/core/coverage_calculator.py @@ -0,0 +1,196 @@ +""" +coverage_calculator.py +Signal CGM — STTIL Solutions + +Calculates CGM coverage status per patient based on shipment history +and payer-specific wear-day rules. + +PHI CONTRACT: + This module receives only: patient_id, device_type, shipment_date, + quantity, payer. No names, SSNs, DOBs, or contact fields may be + added to any function signature or data structure in this file. + +Flag types emitted: + REFILL_WINDOW — patient is within the billable refill window + VISIT_DUE — physician visit renewal is approaching or overdue + OUT_OF_COVERAGE — coverage has lapsed; outreach required before shipment + OK — no action needed at this time +""" + +import json +import logging +from dataclasses import dataclass +from datetime import date, timedelta +from enum import Enum +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +PAYER_RULES_PATH = Path(__file__).parent.parent / "config" / "payer_rules.json" + + +class CoverageFlag(str, Enum): + REFILL_WINDOW = "REFILL_WINDOW" + VISIT_DUE = "VISIT_DUE" + OUT_OF_COVERAGE = "OUT_OF_COVERAGE" + OK = "OK" + + +@dataclass(frozen=True) +class ShipmentRecord: + """ + Minimal shipment record. Only non-PHI fields allowed. + + patient_id: Supplier's internal MRN or account number. + This is the sole crosswalk key — no real identity data here. + """ + patient_id: str + device_type: str + shipment_date: date + quantity: int + payer: str + component: str = "sensor" # "sensor" | "transmitter" | "pod" + + +@dataclass +class CoverageResult: + patient_id: str + device_type: str + payer: str + component: str + last_shipment_date: date + coverage_end_date: date + next_visit_due_date: Optional[date] + flag: CoverageFlag + days_until_coverage_end: int + days_until_visit_due: Optional[int] + priority_score: int # Higher = more urgent; used for worklist sort + + +def _load_payer_rules() -> dict: + with open(PAYER_RULES_PATH, "r") as f: + return json.load(f) + + +def _get_wear_days(rules: dict, device_type: str, component: str) -> int: + """ + Return wear days for a given device type and component. + Raises ValueError for unknown device types. + """ + devices = rules.get("devices", {}) + device = devices.get(device_type) + if device is None: + raise ValueError(f"Unknown device_type: '{device_type}'. " + f"Valid types: {list(devices.keys())}") + + component_key = f"{component}_wear_days" + wear_days = device.get(component_key) + if wear_days is None: + raise ValueError( + f"Device '{device_type}' has no wear-day rule for component " + f"'{component}'. Check payer_rules.json." + ) + return wear_days + + +def _get_payer_config(rules: dict, payer: str) -> dict: + payer_rules = rules.get("payer_rules", {}) + return payer_rules.get(payer.lower(), payer_rules.get("default", {})) + + +def _compute_priority(flag: CoverageFlag, days_until_end: int) -> int: + """ + Priority score for worklist ordering. Higher = act sooner. + + OUT_OF_COVERAGE patients are highest priority regardless of days. + Within REFILL_WINDOW and VISIT_DUE, urgency increases as days decrease. + """ + if flag == CoverageFlag.OUT_OF_COVERAGE: + return 1000 + abs(days_until_end) + if flag == CoverageFlag.VISIT_DUE: + return 500 + max(0, 90 - days_until_end) + if flag == CoverageFlag.REFILL_WINDOW: + return 200 + max(0, 30 - days_until_end) + return 0 # OK + + +def calculate_coverage(record: ShipmentRecord, + as_of: Optional[date] = None) -> CoverageResult: + """ + Calculate coverage status for a single shipment record. + + Args: + record: ShipmentRecord with non-PHI fields only. + as_of: Date to evaluate against. Defaults to today. + + Returns: + CoverageResult with flag, dates, and priority score. + """ + rules = _load_payer_rules() + today = as_of or date.today() + + wear_days = _get_wear_days(rules, record.device_type, record.component) + payer_config = _get_payer_config(rules, record.payer) + + # Coverage end = last shipment date + (wear days × quantity shipped) + total_wear_days = wear_days * record.quantity + coverage_end = record.shipment_date + timedelta(days=total_wear_days) + days_until_end = (coverage_end - today).days + + refill_window_days = payer_config.get("refill_window_days", 30) + visit_renewal_days = payer_config.get("visit_renewal_days") + + # Visit due date (Medicare: every 180 days from shipment) + next_visit_due: Optional[date] = None + days_until_visit: Optional[int] = None + if visit_renewal_days: + next_visit_due = record.shipment_date + timedelta(days=visit_renewal_days) + days_until_visit = (next_visit_due - today).days + + # Determine flag — evaluated in priority order + if days_until_end < 0: + flag = CoverageFlag.OUT_OF_COVERAGE + elif days_until_visit is not None and days_until_visit <= 30: + flag = CoverageFlag.VISIT_DUE + elif days_until_end <= refill_window_days: + flag = CoverageFlag.REFILL_WINDOW + else: + flag = CoverageFlag.OK + + priority = _compute_priority(flag, days_until_end) + + return CoverageResult( + patient_id=record.patient_id, + device_type=record.device_type, + payer=record.payer, + component=record.component, + last_shipment_date=record.shipment_date, + coverage_end_date=coverage_end, + next_visit_due_date=next_visit_due, + flag=flag, + days_until_coverage_end=days_until_end, + days_until_visit_due=days_until_visit, + priority_score=priority, + ) + + +def calculate_batch(records: list[ShipmentRecord], + as_of: Optional[date] = None) -> list[CoverageResult]: + """ + Calculate coverage for a list of shipment records and return a + worklist sorted by priority (highest first). + + Skips records that raise ValueError (unknown device/component) and + logs a warning so the batch continues. + """ + results = [] + for record in records: + try: + result = calculate_coverage(record, as_of=as_of) + results.append(result) + except ValueError as exc: + logger.warning("Skipping record for patient_id hash — %s", exc) + + results.sort(key=lambda r: r.priority_score, reverse=True) + return results