Signal/python-backend/core/audit_logger.py
Kisa e51e5ec947 feat: Signal CGM Level 1 foundation — calculator, audit logger, payer rules, license
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 19:48:31 -04:00

187 lines
5.8 KiB
Python

"""
audit_logger.py
Signal CGM — STTIL Solutions
PHI-safe audit logging for all system actions.
RETENTION POLICY:
Audit logs must be retained for a minimum of six (6) years from the
date of creation, per 45 CFR § 164.530(j) (HIPAA administrative
safeguards) and applicable state regulations. Retention enforcement
is the responsibility of the hosting operator (STTIL Solutions or
contracted DME supplier). Do not delete logs without documented
authorization.
PHI CONTRACT:
Raw patient_id values MUST NOT appear in any log entry. All identity
fields are SHA-256 hashed before storage. IP addresses are also hashed
to limit incidental PII exposure. The hash function is one-way — this
module cannot reverse a hash to recover an identifier.
Log fields:
timestamp ISO-8601 UTC datetime of the event
user_id_hash SHA-256 of the staff user's internal ID
action Verb describing the operation (see AuditAction)
resource_hash SHA-256 of the affected resource identifier
(patient_id, file name, record ID, etc.)
outcome "success" | "failure"
ip_address_hash SHA-256 of the requester's IP address
detail Optional free-text note (must not contain PHI)
"""
import hashlib
import json
import logging
import os
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class AuditAction(str, Enum):
CSV_INGEST = "csv_ingest"
COVERAGE_CALC = "coverage_calc"
WORKLIST_EXPORT = "worklist_export"
EMAIL_DISPATCH = "email_dispatch"
USER_LOGIN = "user_login"
USER_LOGOUT = "user_logout"
RULE_UPDATE = "rule_update"
RECORD_VIEW = "record_view"
def _hash(value: str) -> str:
"""
SHA-256 hash of a string value. Returns a hex digest.
Never pass a raw patient_id, SSN, or IP to any log sink — use this first.
"""
if not value:
return ""
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def _utc_now() -> str:
return datetime.now(tz=timezone.utc).isoformat()
def build_audit_entry(
action: AuditAction,
resource_id: str,
user_id: str,
outcome: str,
ip_address: str,
detail: Optional[str] = None,
) -> dict:
"""
Build a single audit log entry dict with all identity fields hashed.
Args:
action: The operation being logged (AuditAction enum).
resource_id: The raw resource identifier (patient_id, file name, etc.).
This value is hashed before inclusion — do not pre-hash.
user_id: The raw internal staff user ID.
Hashed before inclusion.
outcome: "success" or "failure".
ip_address: The requester's IP address string.
Hashed before inclusion.
detail: Optional context note. MUST NOT contain PHI.
Returns:
Dict suitable for JSON serialization and PostgreSQL insertion.
"""
if outcome not in ("success", "failure"):
raise ValueError(f"outcome must be 'success' or 'failure', got: '{outcome}'")
entry = {
"timestamp": _utc_now(),
"user_id_hash": _hash(user_id),
"action": action.value if isinstance(action, AuditAction) else str(action),
"resource_hash": _hash(resource_id),
"outcome": outcome,
"ip_address_hash": _hash(ip_address),
}
if detail is not None:
entry["detail"] = detail
return entry
def log_event(
action: AuditAction,
resource_id: str,
user_id: str,
outcome: str,
ip_address: str,
detail: Optional[str] = None,
db_conn=None,
) -> dict:
"""
Build an audit entry and write it to the configured sink(s).
Sinks (applied in order):
1. Python logger (always — goes to stdout/file handler configured
by the application).
2. PostgreSQL audit_log table (if db_conn is provided).
Args:
db_conn: An active psycopg2 or asyncpg connection. If None, only
the logger sink is used (useful for unit tests).
Returns:
The audit entry dict that was written.
"""
entry = build_audit_entry(
action=action,
resource_id=resource_id,
user_id=user_id,
outcome=outcome,
ip_address=ip_address,
detail=detail,
)
# Sink 1: structured log line
logger.info("AUDIT %s", json.dumps(entry))
# Sink 2: PostgreSQL (synchronous psycopg2 path)
if db_conn is not None:
_write_to_postgres(db_conn, entry)
return entry
def _write_to_postgres(conn, entry: dict) -> None:
"""
Insert an audit entry into the audit_log table.
Expected table schema (see db_models.py):
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
user_id_hash TEXT NOT NULL,
action TEXT NOT NULL,
resource_hash TEXT NOT NULL,
outcome TEXT NOT NULL,
ip_address_hash TEXT NOT NULL,
detail TEXT
);
"""
sql = """
INSERT INTO audit_log
(timestamp, user_id_hash, action, resource_hash,
outcome, ip_address_hash, detail)
VALUES
(%(timestamp)s, %(user_id_hash)s, %(action)s, %(resource_hash)s,
%(outcome)s, %(ip_address_hash)s, %(detail)s)
"""
with conn.cursor() as cur:
cur.execute(sql, {
"timestamp": entry["timestamp"],
"user_id_hash": entry["user_id_hash"],
"action": entry["action"],
"resource_hash": entry["resource_hash"],
"outcome": entry["outcome"],
"ip_address_hash": entry["ip_address_hash"],
"detail": entry.get("detail"),
})
conn.commit()