187 lines
5.8 KiB
Python
187 lines
5.8 KiB
Python
"""
|
|
audit_logger.py
|
|
Signal CGM — STTIL Solutions
|
|
|
|
PHI-safe audit logging for all system actions.
|
|
|
|
RETENTION POLICY:
|
|
Audit logs must be retained for a minimum of six (6) years from the
|
|
date of creation, per 45 CFR § 164.530(j) (HIPAA administrative
|
|
safeguards) and applicable state regulations. Retention enforcement
|
|
is the responsibility of the hosting operator (STTIL Solutions or
|
|
contracted DME supplier). Do not delete logs without documented
|
|
authorization.
|
|
|
|
PHI CONTRACT:
|
|
Raw patient_id values MUST NOT appear in any log entry. All identity
|
|
fields are SHA-256 hashed before storage. IP addresses are also hashed
|
|
to limit incidental PII exposure. The hash function is one-way — this
|
|
module cannot reverse a hash to recover an identifier.
|
|
|
|
Log fields:
|
|
timestamp ISO-8601 UTC datetime of the event
|
|
user_id_hash SHA-256 of the staff user's internal ID
|
|
action Verb describing the operation (see AuditAction)
|
|
resource_hash SHA-256 of the affected resource identifier
|
|
(patient_id, file name, record ID, etc.)
|
|
outcome "success" | "failure"
|
|
ip_address_hash SHA-256 of the requester's IP address
|
|
detail Optional free-text note (must not contain PHI)
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuditAction(str, Enum):
|
|
CSV_INGEST = "csv_ingest"
|
|
COVERAGE_CALC = "coverage_calc"
|
|
WORKLIST_EXPORT = "worklist_export"
|
|
EMAIL_DISPATCH = "email_dispatch"
|
|
USER_LOGIN = "user_login"
|
|
USER_LOGOUT = "user_logout"
|
|
RULE_UPDATE = "rule_update"
|
|
RECORD_VIEW = "record_view"
|
|
|
|
|
|
def _hash(value: str) -> str:
|
|
"""
|
|
SHA-256 hash of a string value. Returns a hex digest.
|
|
Never pass a raw patient_id, SSN, or IP to any log sink — use this first.
|
|
"""
|
|
if not value:
|
|
return ""
|
|
return hashlib.sha256(value.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _utc_now() -> str:
|
|
return datetime.now(tz=timezone.utc).isoformat()
|
|
|
|
|
|
def build_audit_entry(
|
|
action: AuditAction,
|
|
resource_id: str,
|
|
user_id: str,
|
|
outcome: str,
|
|
ip_address: str,
|
|
detail: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Build a single audit log entry dict with all identity fields hashed.
|
|
|
|
Args:
|
|
action: The operation being logged (AuditAction enum).
|
|
resource_id: The raw resource identifier (patient_id, file name, etc.).
|
|
This value is hashed before inclusion — do not pre-hash.
|
|
user_id: The raw internal staff user ID.
|
|
Hashed before inclusion.
|
|
outcome: "success" or "failure".
|
|
ip_address: The requester's IP address string.
|
|
Hashed before inclusion.
|
|
detail: Optional context note. MUST NOT contain PHI.
|
|
|
|
Returns:
|
|
Dict suitable for JSON serialization and PostgreSQL insertion.
|
|
"""
|
|
if outcome not in ("success", "failure"):
|
|
raise ValueError(f"outcome must be 'success' or 'failure', got: '{outcome}'")
|
|
|
|
entry = {
|
|
"timestamp": _utc_now(),
|
|
"user_id_hash": _hash(user_id),
|
|
"action": action.value if isinstance(action, AuditAction) else str(action),
|
|
"resource_hash": _hash(resource_id),
|
|
"outcome": outcome,
|
|
"ip_address_hash": _hash(ip_address),
|
|
}
|
|
if detail is not None:
|
|
entry["detail"] = detail
|
|
|
|
return entry
|
|
|
|
|
|
def log_event(
|
|
action: AuditAction,
|
|
resource_id: str,
|
|
user_id: str,
|
|
outcome: str,
|
|
ip_address: str,
|
|
detail: Optional[str] = None,
|
|
db_conn=None,
|
|
) -> dict:
|
|
"""
|
|
Build an audit entry and write it to the configured sink(s).
|
|
|
|
Sinks (applied in order):
|
|
1. Python logger (always — goes to stdout/file handler configured
|
|
by the application).
|
|
2. PostgreSQL audit_log table (if db_conn is provided).
|
|
|
|
Args:
|
|
db_conn: An active psycopg2 or asyncpg connection. If None, only
|
|
the logger sink is used (useful for unit tests).
|
|
|
|
Returns:
|
|
The audit entry dict that was written.
|
|
"""
|
|
entry = build_audit_entry(
|
|
action=action,
|
|
resource_id=resource_id,
|
|
user_id=user_id,
|
|
outcome=outcome,
|
|
ip_address=ip_address,
|
|
detail=detail,
|
|
)
|
|
|
|
# Sink 1: structured log line
|
|
logger.info("AUDIT %s", json.dumps(entry))
|
|
|
|
# Sink 2: PostgreSQL (synchronous psycopg2 path)
|
|
if db_conn is not None:
|
|
_write_to_postgres(db_conn, entry)
|
|
|
|
return entry
|
|
|
|
|
|
def _write_to_postgres(conn, entry: dict) -> None:
|
|
"""
|
|
Insert an audit entry into the audit_log table.
|
|
|
|
Expected table schema (see db_models.py):
|
|
CREATE TABLE audit_log (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
user_id_hash TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
resource_hash TEXT NOT NULL,
|
|
outcome TEXT NOT NULL,
|
|
ip_address_hash TEXT NOT NULL,
|
|
detail TEXT
|
|
);
|
|
"""
|
|
sql = """
|
|
INSERT INTO audit_log
|
|
(timestamp, user_id_hash, action, resource_hash,
|
|
outcome, ip_address_hash, detail)
|
|
VALUES
|
|
(%(timestamp)s, %(user_id_hash)s, %(action)s, %(resource_hash)s,
|
|
%(outcome)s, %(ip_address_hash)s, %(detail)s)
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, {
|
|
"timestamp": entry["timestamp"],
|
|
"user_id_hash": entry["user_id_hash"],
|
|
"action": entry["action"],
|
|
"resource_hash": entry["resource_hash"],
|
|
"outcome": entry["outcome"],
|
|
"ip_address_hash": entry["ip_address_hash"],
|
|
"detail": entry.get("detail"),
|
|
})
|
|
conn.commit()
|