feat: Signal CGM Level 1 foundation — calculator, audit logger, payer rules, license

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Kisa 2026-04-11 19:48:31 -04:00
parent aa653f2343
commit e51e5ec947
5 changed files with 535 additions and 0 deletions

6
LICENSE.md Normal file
View file

@ -0,0 +1,6 @@
# License
© 2026 STTIL Solutions LLC. All rights reserved.
This software is proprietary and may not be copied, modified, or distributed
without explicit written permission from STTIL Solutions LLC.

83
README.md Normal file
View file

@ -0,0 +1,83 @@
# Signal CGM powered by STTIL Solutions
B2B CGM coverage worklist tool for DMEPOS suppliers. Ingests CSV shipment data
(Brightree/WellSky exports), calculates coverage expiration per patient using
device wear-day rules, and produces a prioritized worklist for proactive outreach
— so small DME teams act before claims deny, not after.
**Self-hosted. Data never leaves the supplier network.**
---
## What It Does
Most DMEPOS suppliers manage CGM coverage reactively: a claim denies, then staff
scramble to appeal. Signal CGM flips that. The system watches coverage windows
continuously and surfaces patients approaching expiration before the denial
condition exists.
- Ingests shipment CSV from Brightree or WellSky
- Calculates coverage expiration per patient per device using payer-specific
wear-day rules
- Flags each patient: `REFILL_WINDOW`, `VISIT_DUE`, `OUT_OF_COVERAGE`, or `OK`
- Delivers a prioritized worklist to DME staff via encrypted email
- Staff handle outreach locally — Signal CGM never contacts patients directly
## Stack
| Layer | Technology |
|-------|-----------|
| Backend | Python / FastAPI |
| Database | PostgreSQL (encrypted at rest) |
| Orchestration | n8n (self-hosted, 24-hour batch trigger) |
| Notifications | Mailcow (self-hosted SMTP — staff email only) |
| Hosting | Hostinger VPS — data stays on-prem |
## PHI Architecture
Signal CGM is designed to minimize PHI surface area:
- **Sole crosswalk key:** `patient_id` (the supplier's internal MRN or account
number). No names, SSNs, DOBs, or contact information enter the system.
- DME staff maintain the `patient_id` ↔ real identity mapping in their own
systems (Brightree, EHR, etc.).
- The calculation layer sees: `patient_id`, `device_type`, `shipment_date`,
`quantity`, `payer` — nothing else.
- All audit logs hash `patient_id` before storage. Raw identifiers never appear
in logs.
## Coverage Flag Logic
| Flag | Meaning |
|------|---------|
| `REFILL_WINDOW` | Patient is within the refillable window — safe to ship |
| `VISIT_DUE` | Physician visit renewal is approaching (Medicare: 180 days) |
| `OUT_OF_COVERAGE` | Coverage has lapsed — outreach required before next shipment |
| `OK` | No action needed at this time |
## Directory Structure
```
signal-cgm/
├── python-backend/
│ ├── core/
│ │ ├── coverage_calculator.py # Coverage clock logic
│ │ ├── audit_logger.py # PHI-safe audit logging
│ │ └── db_models.py # PostgreSQL models
│ └── config/
│ └── payer_rules.json # Wear-day rules by device and payer
├── n8n-workflows/ # n8n batch trigger exports
└── CLAUDE.md # Active dev context
```
## BAA Status (Level 1)
| Vendor | BAA Required | Status |
|--------|-------------|--------|
| Hostinger VPS | Yes — PHI host | Pending |
| Anthropic API | Only if AI layer touches PHI | Not applicable (Level 1) |
| All other components | Self-hosted — operator is STTIL | N/A |
---
© 2026 STTIL Solutions LLC. Proprietary software — see LICENSE.md.

View file

@ -0,0 +1,63 @@
{
"_comment": "Wear-day rules by device type and payer. Used by coverage_calculator.py. Update when payer LCD policies change.",
"devices": {
"dexcom_g6": {
"display_name": "Dexcom G6",
"sensor_wear_days": 10,
"transmitter_wear_days": 90,
"components": ["sensor", "transmitter"]
},
"dexcom_g7": {
"display_name": "Dexcom G7",
"sensor_wear_days": 10,
"components": ["sensor"]
},
"freestyle_libre_2": {
"display_name": "FreeStyle Libre 2",
"sensor_wear_days": 14,
"components": ["sensor"]
},
"freestyle_libre_3": {
"display_name": "FreeStyle Libre 3",
"sensor_wear_days": 14,
"components": ["sensor"]
},
"omnipod_5": {
"display_name": "Omnipod 5",
"pod_wear_days": 3,
"sensor_wear_days": 14,
"components": ["pod", "sensor"],
"_note": "Sensor wear days apply to the paired CGM (typically Dexcom G6 or Libre). Pod is 3 days. Track components separately."
}
},
"payer_rules": {
"medicare": {
"visit_renewal_days": 180,
"refill_window_days": 30,
"_note": "Medicare requires face-to-face physician visit every 6 months for continued CGM coverage. Refill window opens 30 days before coverage end.",
"covered_devices": [
"dexcom_g6",
"dexcom_g7",
"freestyle_libre_2",
"freestyle_libre_3"
]
},
"medicaid": {
"visit_renewal_days": null,
"refill_window_days": 30,
"_note": "Medicaid rules vary by state. Renewal cadence not enforced at this layer — flag for manual review.",
"covered_devices": []
},
"commercial": {
"visit_renewal_days": null,
"refill_window_days": 30,
"_note": "Commercial payer rules vary by plan. Refill window is a conservative default.",
"covered_devices": []
},
"default": {
"visit_renewal_days": null,
"refill_window_days": 30,
"covered_devices": []
}
}
}

View file

@ -0,0 +1,187 @@
"""
audit_logger.py
Signal CGM STTIL Solutions
PHI-safe audit logging for all system actions.
RETENTION POLICY:
Audit logs must be retained for a minimum of six (6) years from the
date of creation, per 45 CFR § 164.530(j) (HIPAA administrative
safeguards) and applicable state regulations. Retention enforcement
is the responsibility of the hosting operator (STTIL Solutions or
contracted DME supplier). Do not delete logs without documented
authorization.
PHI CONTRACT:
Raw patient_id values MUST NOT appear in any log entry. All identity
fields are SHA-256 hashed before storage. IP addresses are also hashed
to limit incidental PII exposure. The hash function is one-way this
module cannot reverse a hash to recover an identifier.
Log fields:
timestamp ISO-8601 UTC datetime of the event
user_id_hash SHA-256 of the staff user's internal ID
action Verb describing the operation (see AuditAction)
resource_hash SHA-256 of the affected resource identifier
(patient_id, file name, record ID, etc.)
outcome "success" | "failure"
ip_address_hash SHA-256 of the requester's IP address
detail Optional free-text note (must not contain PHI)
"""
import hashlib
import json
import logging
import os
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class AuditAction(str, Enum):
CSV_INGEST = "csv_ingest"
COVERAGE_CALC = "coverage_calc"
WORKLIST_EXPORT = "worklist_export"
EMAIL_DISPATCH = "email_dispatch"
USER_LOGIN = "user_login"
USER_LOGOUT = "user_logout"
RULE_UPDATE = "rule_update"
RECORD_VIEW = "record_view"
def _hash(value: str) -> str:
"""
SHA-256 hash of a string value. Returns a hex digest.
Never pass a raw patient_id, SSN, or IP to any log sink use this first.
"""
if not value:
return ""
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def _utc_now() -> str:
return datetime.now(tz=timezone.utc).isoformat()
def build_audit_entry(
action: AuditAction,
resource_id: str,
user_id: str,
outcome: str,
ip_address: str,
detail: Optional[str] = None,
) -> dict:
"""
Build a single audit log entry dict with all identity fields hashed.
Args:
action: The operation being logged (AuditAction enum).
resource_id: The raw resource identifier (patient_id, file name, etc.).
This value is hashed before inclusion do not pre-hash.
user_id: The raw internal staff user ID.
Hashed before inclusion.
outcome: "success" or "failure".
ip_address: The requester's IP address string.
Hashed before inclusion.
detail: Optional context note. MUST NOT contain PHI.
Returns:
Dict suitable for JSON serialization and PostgreSQL insertion.
"""
if outcome not in ("success", "failure"):
raise ValueError(f"outcome must be 'success' or 'failure', got: '{outcome}'")
entry = {
"timestamp": _utc_now(),
"user_id_hash": _hash(user_id),
"action": action.value if isinstance(action, AuditAction) else str(action),
"resource_hash": _hash(resource_id),
"outcome": outcome,
"ip_address_hash": _hash(ip_address),
}
if detail is not None:
entry["detail"] = detail
return entry
def log_event(
action: AuditAction,
resource_id: str,
user_id: str,
outcome: str,
ip_address: str,
detail: Optional[str] = None,
db_conn=None,
) -> dict:
"""
Build an audit entry and write it to the configured sink(s).
Sinks (applied in order):
1. Python logger (always goes to stdout/file handler configured
by the application).
2. PostgreSQL audit_log table (if db_conn is provided).
Args:
db_conn: An active psycopg2 or asyncpg connection. If None, only
the logger sink is used (useful for unit tests).
Returns:
The audit entry dict that was written.
"""
entry = build_audit_entry(
action=action,
resource_id=resource_id,
user_id=user_id,
outcome=outcome,
ip_address=ip_address,
detail=detail,
)
# Sink 1: structured log line
logger.info("AUDIT %s", json.dumps(entry))
# Sink 2: PostgreSQL (synchronous psycopg2 path)
if db_conn is not None:
_write_to_postgres(db_conn, entry)
return entry
def _write_to_postgres(conn, entry: dict) -> None:
"""
Insert an audit entry into the audit_log table.
Expected table schema (see db_models.py):
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
user_id_hash TEXT NOT NULL,
action TEXT NOT NULL,
resource_hash TEXT NOT NULL,
outcome TEXT NOT NULL,
ip_address_hash TEXT NOT NULL,
detail TEXT
);
"""
sql = """
INSERT INTO audit_log
(timestamp, user_id_hash, action, resource_hash,
outcome, ip_address_hash, detail)
VALUES
(%(timestamp)s, %(user_id_hash)s, %(action)s, %(resource_hash)s,
%(outcome)s, %(ip_address_hash)s, %(detail)s)
"""
with conn.cursor() as cur:
cur.execute(sql, {
"timestamp": entry["timestamp"],
"user_id_hash": entry["user_id_hash"],
"action": entry["action"],
"resource_hash": entry["resource_hash"],
"outcome": entry["outcome"],
"ip_address_hash": entry["ip_address_hash"],
"detail": entry.get("detail"),
})
conn.commit()

View file

@ -0,0 +1,196 @@
"""
coverage_calculator.py
Signal CGM STTIL Solutions
Calculates CGM coverage status per patient based on shipment history
and payer-specific wear-day rules.
PHI CONTRACT:
This module receives only: patient_id, device_type, shipment_date,
quantity, payer. No names, SSNs, DOBs, or contact fields may be
added to any function signature or data structure in this file.
Flag types emitted:
REFILL_WINDOW patient is within the billable refill window
VISIT_DUE physician visit renewal is approaching or overdue
OUT_OF_COVERAGE coverage has lapsed; outreach required before shipment
OK no action needed at this time
"""
import json
import logging
from dataclasses import dataclass
from datetime import date, timedelta
from enum import Enum
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
PAYER_RULES_PATH = Path(__file__).parent.parent / "config" / "payer_rules.json"
class CoverageFlag(str, Enum):
REFILL_WINDOW = "REFILL_WINDOW"
VISIT_DUE = "VISIT_DUE"
OUT_OF_COVERAGE = "OUT_OF_COVERAGE"
OK = "OK"
@dataclass(frozen=True)
class ShipmentRecord:
"""
Minimal shipment record. Only non-PHI fields allowed.
patient_id: Supplier's internal MRN or account number.
This is the sole crosswalk key no real identity data here.
"""
patient_id: str
device_type: str
shipment_date: date
quantity: int
payer: str
component: str = "sensor" # "sensor" | "transmitter" | "pod"
@dataclass
class CoverageResult:
patient_id: str
device_type: str
payer: str
component: str
last_shipment_date: date
coverage_end_date: date
next_visit_due_date: Optional[date]
flag: CoverageFlag
days_until_coverage_end: int
days_until_visit_due: Optional[int]
priority_score: int # Higher = more urgent; used for worklist sort
def _load_payer_rules() -> dict:
with open(PAYER_RULES_PATH, "r") as f:
return json.load(f)
def _get_wear_days(rules: dict, device_type: str, component: str) -> int:
"""
Return wear days for a given device type and component.
Raises ValueError for unknown device types.
"""
devices = rules.get("devices", {})
device = devices.get(device_type)
if device is None:
raise ValueError(f"Unknown device_type: '{device_type}'. "
f"Valid types: {list(devices.keys())}")
component_key = f"{component}_wear_days"
wear_days = device.get(component_key)
if wear_days is None:
raise ValueError(
f"Device '{device_type}' has no wear-day rule for component "
f"'{component}'. Check payer_rules.json."
)
return wear_days
def _get_payer_config(rules: dict, payer: str) -> dict:
payer_rules = rules.get("payer_rules", {})
return payer_rules.get(payer.lower(), payer_rules.get("default", {}))
def _compute_priority(flag: CoverageFlag, days_until_end: int) -> int:
"""
Priority score for worklist ordering. Higher = act sooner.
OUT_OF_COVERAGE patients are highest priority regardless of days.
Within REFILL_WINDOW and VISIT_DUE, urgency increases as days decrease.
"""
if flag == CoverageFlag.OUT_OF_COVERAGE:
return 1000 + abs(days_until_end)
if flag == CoverageFlag.VISIT_DUE:
return 500 + max(0, 90 - days_until_end)
if flag == CoverageFlag.REFILL_WINDOW:
return 200 + max(0, 30 - days_until_end)
return 0 # OK
def calculate_coverage(record: ShipmentRecord,
as_of: Optional[date] = None) -> CoverageResult:
"""
Calculate coverage status for a single shipment record.
Args:
record: ShipmentRecord with non-PHI fields only.
as_of: Date to evaluate against. Defaults to today.
Returns:
CoverageResult with flag, dates, and priority score.
"""
rules = _load_payer_rules()
today = as_of or date.today()
wear_days = _get_wear_days(rules, record.device_type, record.component)
payer_config = _get_payer_config(rules, record.payer)
# Coverage end = last shipment date + (wear days × quantity shipped)
total_wear_days = wear_days * record.quantity
coverage_end = record.shipment_date + timedelta(days=total_wear_days)
days_until_end = (coverage_end - today).days
refill_window_days = payer_config.get("refill_window_days", 30)
visit_renewal_days = payer_config.get("visit_renewal_days")
# Visit due date (Medicare: every 180 days from shipment)
next_visit_due: Optional[date] = None
days_until_visit: Optional[int] = None
if visit_renewal_days:
next_visit_due = record.shipment_date + timedelta(days=visit_renewal_days)
days_until_visit = (next_visit_due - today).days
# Determine flag — evaluated in priority order
if days_until_end < 0:
flag = CoverageFlag.OUT_OF_COVERAGE
elif days_until_visit is not None and days_until_visit <= 30:
flag = CoverageFlag.VISIT_DUE
elif days_until_end <= refill_window_days:
flag = CoverageFlag.REFILL_WINDOW
else:
flag = CoverageFlag.OK
priority = _compute_priority(flag, days_until_end)
return CoverageResult(
patient_id=record.patient_id,
device_type=record.device_type,
payer=record.payer,
component=record.component,
last_shipment_date=record.shipment_date,
coverage_end_date=coverage_end,
next_visit_due_date=next_visit_due,
flag=flag,
days_until_coverage_end=days_until_end,
days_until_visit_due=days_until_visit,
priority_score=priority,
)
def calculate_batch(records: list[ShipmentRecord],
as_of: Optional[date] = None) -> list[CoverageResult]:
"""
Calculate coverage for a list of shipment records and return a
worklist sorted by priority (highest first).
Skips records that raise ValueError (unknown device/component) and
logs a warning so the batch continues.
"""
results = []
for record in records:
try:
result = calculate_coverage(record, as_of=as_of)
results.append(result)
except ValueError as exc:
logger.warning("Skipping record for patient_id hash — %s", exc)
results.sort(key=lambda r: r.priority_score, reverse=True)
return results