127 lines
4.4 KiB
Python
127 lines
4.4 KiB
Python
"""
|
|
Auth policy integration: server-derived admin vs viewer, ignored client role, UMC domain.
|
|
|
|
Requires Postgres + migration 007 (admin_from_email_policy column) and 013 (email_verified).
|
|
|
|
export INITIATIVE_DATABASE_URL="postgresql+asyncpg://initiative:initiative_secret@127.0.0.1:15432/initiatives"
|
|
cd be0 && python -m unittest tests.test_auth_policy_integration -v
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import unittest
|
|
import uuid
|
|
from unittest.mock import patch
|
|
|
|
from sqlalchemy import select
|
|
|
|
from tests.auth_register_staff_fixture import register_staff_fields
|
|
|
|
_RUN_DB = os.getenv("INITIATIVE_DATABASE_URL", "").strip().lower().startswith("postgresql")
|
|
|
|
_TEST_PASSWORD = "Testpass1!"
|
|
|
|
|
|
@unittest.skipUnless(
|
|
_RUN_DB,
|
|
"Set INITIATIVE_DATABASE_URL=postgresql+asyncpg://.../initiatives to run DB integration tests",
|
|
)
|
|
class AuthPolicyIntegrationTests(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self) -> None:
|
|
from src.initiative_db import engine as eng
|
|
|
|
await eng.dispose_engine()
|
|
await eng.init_engine()
|
|
|
|
async def asyncTearDown(self) -> None:
|
|
from src.initiative_db import engine as eng
|
|
|
|
await eng.dispose_engine()
|
|
|
|
async def _delete_user_email(self, email: str) -> None:
|
|
from src.initiative_db.engine import get_session
|
|
from src.initiative_db.models import User
|
|
|
|
async with get_session() as session:
|
|
user = (
|
|
await session.execute(select(User).where(User.email == email))
|
|
).scalar_one_or_none()
|
|
if user is not None:
|
|
await session.delete(user)
|
|
|
|
def _register(self, email: str, full_name: str = "Policy Test", extra: dict | None = None):
|
|
from fastapi.testclient import TestClient
|
|
|
|
from main import app
|
|
|
|
body = {
|
|
"fullName": full_name,
|
|
"email": email,
|
|
"password": _TEST_PASSWORD,
|
|
"passwordConfirm": _TEST_PASSWORD,
|
|
**register_staff_fields(),
|
|
}
|
|
if extra:
|
|
body.update(extra)
|
|
with TestClient(app) as client:
|
|
return client.post("/api/v1/auth/register", json=body)
|
|
|
|
async def test_register_default_viewer_ump(self) -> None:
|
|
email = f"applicant-{uuid.uuid4().hex[:12]}@ump.edu.vn"
|
|
try:
|
|
r = self._register(email)
|
|
self.assertEqual(r.status_code, 200, r.text)
|
|
data = r.json()
|
|
self.assertTrue(data.get("emailVerificationRequired"))
|
|
self.assertNotIn("accessToken", data)
|
|
roles = data["user"]["roles"]
|
|
self.assertIn("viewer", roles)
|
|
self.assertNotIn("admin", roles)
|
|
finally:
|
|
await self._delete_user_email(email)
|
|
|
|
async def test_register_default_viewer_umc(self) -> None:
|
|
email = f"applicant-{uuid.uuid4().hex[:12]}@umc.edu.vn"
|
|
try:
|
|
r = self._register(email)
|
|
self.assertEqual(r.status_code, 200, r.text)
|
|
self.assertTrue(r.json().get("emailVerificationRequired"))
|
|
roles = r.json()["user"]["roles"]
|
|
self.assertIn("viewer", roles)
|
|
self.assertNotIn("admin", roles)
|
|
finally:
|
|
await self._delete_user_email(email)
|
|
|
|
async def test_register_ignores_client_admin_role(self) -> None:
|
|
email = f"privesc-{uuid.uuid4().hex[:12]}@ump.edu.vn"
|
|
try:
|
|
r = self._register(
|
|
email,
|
|
extra={"role": "admin"},
|
|
)
|
|
self.assertEqual(r.status_code, 200, r.text)
|
|
self.assertTrue(r.json().get("emailVerificationRequired"))
|
|
roles = r.json()["user"]["roles"]
|
|
self.assertIn("viewer", roles)
|
|
self.assertNotIn("admin", roles)
|
|
finally:
|
|
await self._delete_user_email(email)
|
|
|
|
async def test_policy_env_makes_admin(self) -> None:
|
|
email = f"stub-admin-{uuid.uuid4().hex[:12]}@ump.edu.vn"
|
|
try:
|
|
with patch.dict(os.environ, {"AUTH_ADMIN_EMAILS": email}):
|
|
r = self._register(email)
|
|
self.assertEqual(r.status_code, 200, r.text)
|
|
self.assertTrue(r.json().get("emailVerificationRequired"))
|
|
roles = r.json()["user"]["roles"]
|
|
self.assertIn("admin", roles)
|
|
self.assertNotIn("viewer", roles)
|
|
finally:
|
|
await self._delete_user_email(email)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|