""" Registration stack alignment: frontend-shaped payloads, API, PostgreSQL, and MinIO. Mirrors the JSON body produced by fe0 ``src/lib/auth-service.ts`` (``register()``); when changing that client, update this test's payload builder if needed. Flow: 1. POST /api/v1/auth/register — expect verification flow (no JWT); DB rows match payload. 2. Login blocked with 403 until verify-otp. 3. POST verify-otp — DB email_verified, OTP row consumed. 4. Login — JWT issued. 5. Minimal draft save + evidence upload — MinIO attachments bucket contains object at ``storageKey`` (``head_object``). Run (same env style as test_backup_e2e): export INITIATIVE_DATABASE_URL="postgresql+asyncpg://initiative:initiative_secret@127.0.0.1:15432/initiatives" export S3_ENDPOINT_URL="http://127.0.0.1:19000" export S3_PUBLIC_ENDPOINT_URL="http://127.0.0.1:19000" export S3_ACCESS_KEY="minio_user" export S3_SECRET_KEY="minio_password" export S3_BUCKET_ATTACHMENTS="initiative-attachments" export S3_BUCKET_EXPORTS="initiative-exports" export S3_BUCKET_QUARANTINE="initiative-quarantine" export REGISTRATION_STACK_TEST=1 cd be0 && python -m unittest tests.test_registration_stack_alignment -v """ from __future__ import annotations import hashlib import io import os import unittest import uuid from unittest.mock import patch from sqlalchemy import select from tests.auth_register_staff_fixture import register_staff_fields from tests.fixtures.minimal_submit_bundle import minimal_tabs_bundle _RUN_DB = os.getenv("INITIATIVE_DATABASE_URL", "").strip().lower().startswith("postgresql") _S3_KEYS = ( "S3_ENDPOINT_URL", "S3_ACCESS_KEY", "S3_SECRET_KEY", "S3_BUCKET_ATTACHMENTS", "S3_BUCKET_EXPORTS", "S3_BUCKET_QUARANTINE", ) _HAS_S3 = all(os.getenv(k, "").strip() for k in _S3_KEYS) _RUN_ALIGN = os.getenv("REGISTRATION_STACK_TEST", "").strip().lower() in ("1", "true", "yes") _TEST_PASSWORD = "Testpass1!" _MIN_PDF = ( b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n1 0 obj<<>>endobj\ntrailer<<>>\n%%EOF\n" + b"0" * 120 ) def _fe0_style_register_json( *, email: str, full_name: str, password: str, staff: dict[str, str], ) -> dict: """Same keys as fe0 auth-service.register() JSON body (no ``role``, no camelCase drift).""" return { "fullName": full_name, "email": email, "password": password, "passwordConfirm": password, "employeeId": staff["employeeId"], "academicTitleCode": staff["academicTitleCode"], "unitNameFreetext": staff["unitNameFreetext"], "jobTitle": staff["jobTitle"], } def _token_hash(raw: str) -> str: return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _s3_client(): import boto3 from botocore.config import Config as BotoConfig return boto3.client( "s3", endpoint_url=os.environ["S3_ENDPOINT_URL"].strip(), aws_access_key_id=os.environ["S3_ACCESS_KEY"].strip(), aws_secret_access_key=os.environ["S3_SECRET_KEY"].strip(), region_name=os.getenv("S3_REGION", "us-east-1"), config=BotoConfig(signature_version="s3v4"), ) @unittest.skipUnless( _RUN_DB and _HAS_S3 and _RUN_ALIGN, "Need INITIATIVE_DATABASE_URL, full S3_* env, REGISTRATION_STACK_TEST=1 (see module docstring).", ) class RegistrationStackAlignmentTests(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: from src.initiative_db import engine as eng await eng.dispose_engine() await eng.init_engine() async def asyncTearDown(self) -> None: from src.initiative_db import engine as eng await eng.dispose_engine() async def _delete_user_and_initiatives(self, email: str) -> None: from src.initiative_db.engine import get_session from src.initiative_db.models import Initiative, User async with get_session() as session: user = ( await session.execute(select(User).where(User.email == email)) ).scalar_one_or_none() if user is None: return inis = ( await session.execute(select(Initiative).where(Initiative.owner_id == user.id)) ).scalars().all() for ini in inis: await session.delete(ini) await session.flush() await session.delete(user) await session.commit() async def test_register_api_db_login_verify_minio_alignment(self) -> None: from fastapi.testclient import TestClient from main import app from src.initiative_db.engine import get_session from src.initiative_db.models import ( RegistrationOtpCode, User, UserRoleRow, UserStaffProfile, ) email = f"align-reg-{uuid.uuid4().hex[:12]}@ump.edu.vn" staff = register_staff_fields() full_name = "Alignment Register User" body = _fe0_style_register_json( email=email, full_name=full_name, password=_TEST_PASSWORD, staff=staff, ) captured: list[str] = [] async def grab_mail(_to: str, raw: str) -> None: captured.append(raw) bucket = os.environ["S3_BUCKET_ATTACHMENTS"].strip() s3 = _s3_client() storage_key: str | None = None try: with patch.dict(os.environ, {"AUTH_MAIL_LOG_ONLY": "1"}): with patch("src.auth_api.deliver_registration_otp_email", side_effect=grab_mail): with TestClient(app) as client: # --- Register (mirrors fe0 fetch body) --- r = client.post("/api/v1/auth/register", json=body) self.assertEqual(r.status_code, 200, r.text) payload = r.json() self.assertTrue(payload.get("emailVerificationRequired"), payload) self.assertNotIn("accessToken", payload) self.assertEqual(payload.get("email"), email) self.assertIn("message", payload) u_out = payload.get("user") or {} self.assertEqual(u_out.get("email"), email) self.assertEqual(u_out.get("name"), full_name) self.assertIs(u_out.get("emailVerified"), False) self.assertIn("viewer", u_out.get("roles") or []) sp_out = u_out.get("staffProfile") or {} self.assertEqual(sp_out.get("employeeId"), staff["employeeId"]) self.assertEqual(sp_out.get("academicTitleCode"), staff["academicTitleCode"]) self.assertEqual(sp_out.get("unitNameFreetext"), staff["unitNameFreetext"]) self.assertEqual(sp_out.get("jobTitle"), staff["jobTitle"]) self.assertTrue(captured, "OTP should be issued") raw_otp = captured[0] self.assertEqual(len(raw_otp), 6, raw_otp) self.assertTrue(raw_otp.isdigit()) with TestClient(app) as client: blocked = client.post( "/api/v1/auth/login", json={"email": email, "password": _TEST_PASSWORD}, ) self.assertEqual(blocked.status_code, 403, blocked.text) # --- PostgreSQL: user, profile, role, OTP row --- async with get_session() as session: user = ( await session.execute(select(User).where(User.email == email)) ).scalar_one() self.assertFalse(user.email_verified) self.assertEqual(user.full_name, full_name) profile = await session.get(UserStaffProfile, user.id) self.assertIsNotNone(profile) assert profile is not None self.assertEqual(profile.employee_id, staff["employeeId"]) self.assertEqual(profile.academic_title_code, staff["academicTitleCode"]) self.assertEqual(profile.job_title, staff["jobTitle"]) roles = ( await session.execute( select(UserRoleRow.role).where(UserRoleRow.user_id == user.id) ) ).scalars().all() self.assertEqual(sorted(roles), ["viewer"]) otps = ( await session.execute( select(RegistrationOtpCode).where( RegistrationOtpCode.user_id == user.id ) ) ).scalars().all() self.assertEqual(len(otps), 1) o = otps[0] self.assertIsNone(o.used_at) self.assertEqual(o.otp_hash, _token_hash(raw_otp)) # --- Verify OTP --- with TestClient(app) as client: vr = client.post("/api/v1/auth/verify-otp", json={"email": email, "otp": raw_otp}) self.assertEqual(vr.status_code, 200, vr.text) async with get_session() as session: user = ( await session.execute(select(User).where(User.email == email)) ).scalar_one() self.assertTrue(user.email_verified) otps = ( await session.execute( select(RegistrationOtpCode).where( RegistrationOtpCode.user_id == user.id ) ) ).scalars().all() self.assertEqual(len(otps), 1) self.assertIsNotNone(otps[0].used_at) # --- Login --- with TestClient(app) as client: ok = client.post( "/api/v1/auth/login", json={"email": email, "password": _TEST_PASSWORD}, ) self.assertEqual(ok.status_code, 200, ok.text) token = ok.json()["accessToken"] self.assertTrue(token) cr = client.post( "/api/applications/new", headers={"Authorization": f"Bearer {token}"}, json={"name": "Alignment case"}, ) self.assertEqual(cr.status_code, 200, cr.text) shell = cr.json().get("application") or {} case_id = str(shell.get("draft_case_id") or "").strip() self.assertTrue(case_id, shell) bundle = minimal_tabs_bundle(initiative_name="Align Initiative") for tab_name in ("report", "application", "contribution"): dr = client.post( "/api/v1/application-drafts", headers={"Authorization": f"Bearer {token}"}, json={"caseId": case_id, "tab": tab_name, "data": bundle[tab_name]}, ) self.assertEqual(dr.status_code, 200, dr.text) er = client.post( f"/api/v1/application-drafts/{case_id}/evidence", headers={"Authorization": f"Bearer {token}"}, data={"kind": "technical"}, files={"file": ("align.pdf", io.BytesIO(_MIN_PDF), "application/pdf")}, ) self.assertEqual(er.status_code, 200, er.text, er.text) ev_body = er.json() storage_key = str(ev_body.get("storageKey") or "").strip() self.assertTrue(storage_key, ev_body) # MinIO must contain bytes at the key the API recorded. assert storage_key is not None head = s3.head_object(Bucket=bucket, Key=storage_key) self.assertGreater(int(head["ContentLength"]), 0) finally: await self._delete_user_and_initiatives(email) if storage_key: try: s3.delete_object(Bucket=bucket, Key=storage_key) except Exception: pass if __name__ == "__main__": unittest.main()