""" Password reset + credential_version JWT integration. Requires Postgres, migrations through 013 (email_verified) and **014** (registration_otp_codes). export INITIATIVE_DATABASE_URL="postgresql+asyncpg://initiative:initiative_secret@127.0.0.1:15432/initiatives" docker exec -i initiative-postgres psql -U initiative -d initiatives < be0/migrations/012_password_reset.sql docker exec -i initiative-postgres psql -U initiative -d initiatives < be0/migrations/013_email_verification.sql cd be0 && python -m unittest tests.test_auth_password_reset_integration -v """ from __future__ import annotations import asyncio import os import unittest import uuid from unittest.mock import patch from sqlalchemy import delete from tests.auth_register_staff_fixture import register_staff_fields _RUN_DB = os.getenv("INITIATIVE_DATABASE_URL", "").strip().lower().startswith("postgresql") _TEST_PASSWORD = "Testpass1!" _NEW_PASSWORD = "Newpass1!" @unittest.skipUnless( _RUN_DB, "Set INITIATIVE_DATABASE_URL=postgresql+asyncpg://.../initiatives to run DB integration tests", ) class AuthPasswordResetIntegrationTests(unittest.TestCase): def _delete_user_email(self, email: str) -> None: """Cleanup after TestClient — use a fresh engine (TestClient tears down the app engine).""" async def go() -> None: from src.initiative_db.engine import dispose_engine, get_session, init_engine, is_postgres_enabled if not is_postgres_enabled(): return await init_engine() try: from sqlalchemy import delete from src.initiative_db.models import User async with get_session() as session: await session.execute(delete(User).where(User.email == email)) finally: await dispose_engine() asyncio.run(go()) def _register(self, email: str) -> object: from fastapi.testclient import TestClient from main import app captured: list[str] = [] async def grab(_to: str, raw: str) -> None: captured.append(raw) with patch.dict(os.environ, {"AUTH_MAIL_LOG_ONLY": "1"}): with patch("src.auth_api.deliver_registration_otp_email", side_effect=grab): with TestClient(app) as client: r = client.post( "/api/v1/auth/register", json={ "fullName": "Reset Test", "email": email, "password": _TEST_PASSWORD, "passwordConfirm": _TEST_PASSWORD, **register_staff_fields(), }, ) if r.status_code == 200 and captured: client.post( "/api/v1/auth/verify-otp", json={"email": email, "otp": captured[0]}, ) return r def test_forgot_password_unknown_email_same_message(self) -> None: from fastapi.testclient import TestClient from main import app with patch.dict(os.environ, {"AUTH_MAIL_LOG_ONLY": "1"}): with TestClient(app) as client: r = client.post( "/api/v1/auth/forgot-password", json={"email": f"nope-{uuid.uuid4().hex[:8]}@ump.edu.vn"}, ) self.assertEqual(r.status_code, 200, r.text) msg = r.json().get("message", "") self.assertIn("Nếu email", msg) def test_forgot_password_invalid_domain_400(self) -> None: from fastapi.testclient import TestClient from main import app with TestClient(app) as client: r = client.post( "/api/v1/auth/forgot-password", json={"email": "x@gmail.com"}, ) self.assertEqual(r.status_code, 400, r.text) def test_reset_flow_login_and_stale_jwt(self) -> None: email = f"reset-{uuid.uuid4().hex[:12]}@ump.edu.vn" try: reg = self._register(email) self.assertEqual(reg.status_code, 200, reg.text) from fastapi.testclient import TestClient from main import app with TestClient(app) as client_pre: lg0 = client_pre.post( "/api/v1/auth/login", json={"email": email, "password": _TEST_PASSWORD}, ) self.assertEqual(lg0.status_code, 200, lg0.text) access_before = lg0.json()["accessToken"] captured: list[str] = [] async def grab(_to: str, raw: str) -> None: captured.append(raw) with patch.dict(os.environ, {"AUTH_MAIL_LOG_ONLY": "1"}): with patch("src.auth_api.deliver_password_reset_email", side_effect=grab): with TestClient(app) as client: fr = client.post( "/api/v1/auth/forgot-password", json={"email": email}, ) self.assertEqual(fr.status_code, 200, fr.text) rr = client.post( "/api/v1/auth/reset-password", json={ "token": captured[0], "newPassword": _NEW_PASSWORD, "newPasswordConfirm": _NEW_PASSWORD, }, ) self.assertEqual(rr.status_code, 200, rr.text) bad = client.post( "/api/v1/auth/login", json={"email": email, "password": _TEST_PASSWORD}, ) self.assertEqual(bad.status_code, 401, bad.text) ok = client.post( "/api/v1/auth/login", json={"email": email, "password": _NEW_PASSWORD}, ) self.assertEqual(ok.status_code, 200, ok.text) me_old = client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {access_before}"}, ) self.assertEqual(me_old.status_code, 401, me_old.text) finally: self._delete_user_email(email) def test_reset_token_single_use(self) -> None: email = f"reset2-{uuid.uuid4().hex[:12]}@ump.edu.vn" try: reg = self._register(email) self.assertEqual(reg.status_code, 200, reg.text) captured: list[str] = [] async def grab(_to: str, raw: str) -> None: captured.append(raw) with patch.dict(os.environ, {"AUTH_MAIL_LOG_ONLY": "1"}): with patch("src.auth_api.deliver_password_reset_email", side_effect=grab): from fastapi.testclient import TestClient from main import app with TestClient(app) as client: client.post("/api/v1/auth/forgot-password", json={"email": email}) tok = captured[0] r1 = client.post( "/api/v1/auth/reset-password", json={ "token": tok, "newPassword": _NEW_PASSWORD, "newPasswordConfirm": _NEW_PASSWORD, }, ) self.assertEqual(r1.status_code, 200, r1.text) r2 = client.post( "/api/v1/auth/reset-password", json={ "token": tok, "newPassword": _NEW_PASSWORD, "newPasswordConfirm": _NEW_PASSWORD, }, ) self.assertEqual(r2.status_code, 400, r2.text) finally: self._delete_user_email(email) if __name__ == "__main__": unittest.main()