Signal/tests/smoke_test.py
Kisa cf171a3f87 add Phase 1 security hardening, mapping confidence, audit logging, pilot docs
- lock CORS to Vercel domain via ALLOWED_ORIGINS env var (removes allow_origins=*)
- add X-API-Key header auth on /api/upload and /api/export
- normalizer: add mapping confidence (high/inferred), new aliases for Acct #,
  Member ID, External Patient Ref, DME Description, dispensedate; 63/63 CSV files pass
- coverage_calculator: add RULE_VERSION = "v0.1", rule_version on every CoverageResult
- main.py: audit logging wired on upload + export, rule_version + mapping_summary in response
- generate_samples.py: 25 CSV files now use 25 different real-world header formats
- add generate_10k.py for 10,000-patient synthetic dataset
- add tests/smoke_test.py (passes against local backend)
- add docs/pilot-guide-v1.md for Robert Robinson pilot onboarding
- add docs/daniel-pilot-readiness-whitepaper.md and .pdf

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 05:41:25 -04:00

180 lines
5.1 KiB
Python

"""
Signal smoke test — runs against the local backend (port 8001).
Usage:
cd /Users/sttil-solutions/projects/signal
python3 tests/smoke_test.py
Or via pytest:
pytest tests/smoke_test.py -v
What it does:
1. Starts uvicorn on port 8001 as a subprocess
2. Waits for /health to respond
3. POSTs a sample CSV to /api/upload — verifies records are scored
4. POSTs the scored records to /api/export — verifies CSV download
5. Reports PASS or FAIL with reason
6. Kills the server
"""
import json
import os
import subprocess
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
BACKEND_PORT = 8001
BASE_URL = f"http://localhost:{BACKEND_PORT}"
SIGNAL_ROOT = Path(__file__).parent.parent
SAMPLE_CSV = SIGNAL_ROOT / "test-data" / "sample-batch-01-ok.csv"
def _wait_for_ready(timeout: int = 15) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
try:
urllib.request.urlopen(f"{BASE_URL}/health", timeout=1)
return True
except Exception:
time.sleep(0.5)
return False
def _post_file(path: Path) -> dict:
import email.mime.multipart
import http.client
boundary = "SignalSmokeBoundary"
body_parts = []
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(
f'Content-Disposition: form-data; name="file"; filename="{path.name}"\r\n'.encode()
)
body_parts.append(b"Content-Type: text/csv\r\n\r\n")
body_parts.append(path.read_bytes())
body_parts.append(f"\r\n--{boundary}--\r\n".encode())
body = b"".join(body_parts)
conn = http.client.HTTPConnection("localhost", BACKEND_PORT, timeout=30)
conn.request(
"POST",
"/api/upload",
body=body,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
resp = conn.getresponse()
data = resp.read()
conn.close()
if resp.status != 200:
raise RuntimeError(f"/api/upload returned {resp.status}: {data[:200]}")
return json.loads(data)
def _post_export(records: list) -> bytes:
import http.client
body = json.dumps(records).encode()
conn = http.client.HTTPConnection("localhost", BACKEND_PORT, timeout=30)
conn.request(
"POST",
"/api/export",
body=body,
headers={"Content-Type": "application/json"},
)
resp = conn.getresponse()
data = resp.read()
conn.close()
if resp.status != 200:
raise RuntimeError(f"/api/export returned {resp.status}: {data[:200]}")
return data
def run() -> bool:
print("Signal Smoke Test")
print("=" * 40)
if not SAMPLE_CSV.exists():
print(f"FAIL — sample CSV not found: {SAMPLE_CSV}")
return False
env = os.environ.copy()
env.pop("SIGNAL_API_KEY", None)
print("Starting backend on port 8001...")
proc = subprocess.Popen(
[
sys.executable, "-m", "uvicorn",
"api.main:app",
"--host", "127.0.0.1",
"--port", str(BACKEND_PORT),
"--log-level", "warning",
],
cwd=str(SIGNAL_ROOT / "python-backend"),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
print("Waiting for backend to be ready...")
if not _wait_for_ready():
print("FAIL — backend did not start within 15 seconds")
return False
print("Backend ready.")
# Test 1: upload
print("Uploading sample CSV...")
result = _post_file(SAMPLE_CSV)
total = result.get("total", 0)
records = result.get("records", [])
mapping = result.get("mapping_summary", {})
if total == 0 or not records:
print(f"FAIL — /api/upload returned 0 records. Response: {result}")
return False
print(f" Upload OK: {total} records scored")
# Test 2: mapping summary present
if not mapping.get("mapped"):
print("FAIL — mapping_summary missing from response")
return False
print(f" Mapping OK: {list(mapping['mapped'].keys())} mapped")
# Test 3: rule_version present
rv = records[0].get("rule_version", "")
if not rv:
print("FAIL — rule_version missing from records")
return False
print(f" Rule version OK: {rv}")
# Test 4: export
print("Exporting work queue...")
csv_bytes = _post_export(records)
lines = csv_bytes.decode("utf-8").strip().splitlines()
if len(lines) < 2:
print(f"FAIL — /api/export returned fewer than 2 lines: {lines}")
return False
print(f" Export OK: {len(lines)} lines (header + {len(lines)-1} records)")
print("=" * 40)
print("PASS")
return True
except Exception as exc:
print(f"FAIL — exception: {exc}")
return False
finally:
proc.terminate()
proc.wait(timeout=5)
if __name__ == "__main__":
success = run()
sys.exit(0 if success else 1)
def test_signal_smoke():
"""pytest entry point."""
assert run(), "Smoke test failed"