""" audit_logger.py Signal CGM — STTIL Solutions PHI-safe audit logging for all system actions. RETENTION POLICY: Audit logs must be retained for a minimum of six (6) years from the date of creation, per 45 CFR § 164.530(j) (HIPAA administrative safeguards) and applicable state regulations. Retention enforcement is the responsibility of the hosting operator (STTIL Solutions or contracted DME supplier). Do not delete logs without documented authorization. PHI CONTRACT: Raw patient_id values MUST NOT appear in any log entry. All identity fields are SHA-256 hashed before storage. IP addresses are also hashed to limit incidental PII exposure. The hash function is one-way — this module cannot reverse a hash to recover an identifier. Log fields: timestamp ISO-8601 UTC datetime of the event user_id_hash SHA-256 of the staff user's internal ID action Verb describing the operation (see AuditAction) resource_hash SHA-256 of the affected resource identifier (patient_id, file name, record ID, etc.) outcome "success" | "failure" ip_address_hash SHA-256 of the requester's IP address detail Optional free-text note (must not contain PHI) """ import hashlib import json import logging import os from datetime import datetime, timezone from enum import Enum from typing import Optional logger = logging.getLogger(__name__) class AuditAction(str, Enum): CSV_INGEST = "csv_ingest" COVERAGE_CALC = "coverage_calc" WORKLIST_EXPORT = "worklist_export" EMAIL_DISPATCH = "email_dispatch" USER_LOGIN = "user_login" USER_LOGOUT = "user_logout" RULE_UPDATE = "rule_update" RECORD_VIEW = "record_view" def _hash(value: str) -> str: """ SHA-256 hash of a string value. Returns a hex digest. Never pass a raw patient_id, SSN, or IP to any log sink — use this first. """ if not value: return "" return hashlib.sha256(value.encode("utf-8")).hexdigest() def _utc_now() -> str: return datetime.now(tz=timezone.utc).isoformat() def build_audit_entry( action: AuditAction, resource_id: str, user_id: str, outcome: str, ip_address: str, detail: Optional[str] = None, ) -> dict: """ Build a single audit log entry dict with all identity fields hashed. Args: action: The operation being logged (AuditAction enum). resource_id: The raw resource identifier (patient_id, file name, etc.). This value is hashed before inclusion — do not pre-hash. user_id: The raw internal staff user ID. Hashed before inclusion. outcome: "success" or "failure". ip_address: The requester's IP address string. Hashed before inclusion. detail: Optional context note. MUST NOT contain PHI. Returns: Dict suitable for JSON serialization and PostgreSQL insertion. """ if outcome not in ("success", "failure"): raise ValueError(f"outcome must be 'success' or 'failure', got: '{outcome}'") entry = { "timestamp": _utc_now(), "user_id_hash": _hash(user_id), "action": action.value if isinstance(action, AuditAction) else str(action), "resource_hash": _hash(resource_id), "outcome": outcome, "ip_address_hash": _hash(ip_address), } if detail is not None: entry["detail"] = detail return entry def log_event( action: AuditAction, resource_id: str, user_id: str, outcome: str, ip_address: str, detail: Optional[str] = None, db_conn=None, ) -> dict: """ Build an audit entry and write it to the configured sink(s). Sinks (applied in order): 1. Python logger (always — goes to stdout/file handler configured by the application). 2. PostgreSQL audit_log table (if db_conn is provided). Args: db_conn: An active psycopg2 or asyncpg connection. If None, only the logger sink is used (useful for unit tests). Returns: The audit entry dict that was written. """ entry = build_audit_entry( action=action, resource_id=resource_id, user_id=user_id, outcome=outcome, ip_address=ip_address, detail=detail, ) # Sink 1: structured log line logger.info("AUDIT %s", json.dumps(entry)) # Sink 2: PostgreSQL (synchronous psycopg2 path) if db_conn is not None: _write_to_postgres(db_conn, entry) return entry def _write_to_postgres(conn, entry: dict) -> None: """ Insert an audit entry into the audit_log table. Expected table schema (see db_models.py): CREATE TABLE audit_log ( id BIGSERIAL PRIMARY KEY, timestamp TIMESTAMPTZ NOT NULL, user_id_hash TEXT NOT NULL, action TEXT NOT NULL, resource_hash TEXT NOT NULL, outcome TEXT NOT NULL, ip_address_hash TEXT NOT NULL, detail TEXT ); """ sql = """ INSERT INTO audit_log (timestamp, user_id_hash, action, resource_hash, outcome, ip_address_hash, detail) VALUES (%(timestamp)s, %(user_id_hash)s, %(action)s, %(resource_hash)s, %(outcome)s, %(ip_address_hash)s, %(detail)s) """ with conn.cursor() as cur: cur.execute(sql, { "timestamp": entry["timestamp"], "user_id_hash": entry["user_id_hash"], "action": entry["action"], "resource_hash": entry["resource_hash"], "outcome": entry["outcome"], "ip_address_hash": entry["ip_address_hash"], "detail": entry.get("detail"), }) conn.commit()