Files
sciagent/be0/tests/test_authenticate_user.py
Thinh Lam 688fac73e9
CI/CD / backend (push) Failing after 2m8s
CI/CD / frontend (push) Failing after 1m40s
CI/CD / deploy (push) Has been skipped
sciagent code + Gitea Actions CI/CD
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 09:38:30 +07:00

147 lines
4.6 KiB
Python

"""Unit tests for the AuthenticateUser use case using fakes (no DB, no FastAPI).
Async use case is driven via ``asyncio.run`` so no pytest-asyncio plugin is needed.
"""
from __future__ import annotations
import asyncio
import uuid
import pytest
from src.application.identity.dto import LoginCommand
from src.application.identity.use_cases.authenticate_user import AuthenticateUser
from src.domain.identity.entities import User
from src.domain.identity.errors import (
EmailNotVerified,
InvalidCredentials,
InvalidInstitutionalEmail,
)
from src.shared_kernel.errors import RateLimited
class FakeUsers:
def __init__(self, user: User | None = None, roles: list[str] | None = None) -> None:
self._user = user
self._roles = roles or []
self.reconciled = False
async def get_by_email(self, email: str) -> User | None:
return self._user if (self._user and self._user.email == email) else None
async def get_by_id(self, user_id): # pragma: no cover - unused here
return self._user
async def roles_after_reconcile(self, user: User) -> list[str]:
self.reconciled = True
return self._roles
class FakeHasher:
def __init__(self, ok: bool = True) -> None:
self.ok = ok
def hash(self, plain: str) -> str:
return "h:" + plain
def verify(self, plain: str, hashed: str) -> bool:
return self.ok
class FakeTokens:
def issue(self, user_id, email, roles, credential_version) -> str:
return f"tok:{user_id}:{credential_version}:{','.join(roles)}"
class FakeRateLimiter:
def __init__(self, allow: bool = True) -> None:
self._allow = allow
def allow(self, email: str, client_ip: str) -> bool:
return self._allow
class FakeAudit:
def __init__(self) -> None:
self.events: list[tuple] = []
async def login_succeeded(self, *, user_id, email, roles) -> None:
self.events.append(("ok", email, tuple(roles)))
async def login_failed(self, *, email, user_id, reason) -> None:
self.events.append(("fail", email, reason))
def _user(**kw) -> User:
base = dict(
id=uuid.uuid4(),
email="a@ump.edu.vn",
full_name="Test",
password_hash="x",
email_verified=True,
is_active=True,
credential_version=2,
)
base.update(kw)
return User(**base)
def _build(users: FakeUsers, hasher=None, rate_limiter=None, audit=None) -> tuple:
audit = audit or FakeAudit()
uc = AuthenticateUser(
users=users,
hasher=hasher or FakeHasher(ok=True),
tokens=FakeTokens(),
rate_limiter=rate_limiter or FakeRateLimiter(allow=True),
audit=audit,
)
return uc, audit
def test_login_success_returns_token_and_reconciles_roles() -> None:
users = FakeUsers(user=_user(), roles=["admin", "viewer"])
uc, audit = _build(users)
result = asyncio.run(uc.execute(LoginCommand("A@ump.edu.vn", "pw", "1.2.3.4")))
assert result.access_token.endswith(":2:admin,viewer")
assert result.roles == ["admin", "viewer"]
assert users.reconciled is True
assert audit.events == [("ok", "a@ump.edu.vn", ("admin", "viewer"))]
def test_wrong_password_raises_401_and_audits_failure() -> None:
users = FakeUsers(user=_user())
uc, audit = _build(users, hasher=FakeHasher(ok=False))
with pytest.raises(InvalidCredentials) as exc:
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "bad", "ip")))
assert exc.value.message == "Email hoặc mật khẩu không đúng."
assert audit.events == [("fail", "a@ump.edu.vn", None)]
def test_unknown_email_raises_401() -> None:
uc, _ = _build(FakeUsers(user=None))
with pytest.raises(InvalidCredentials):
asyncio.run(uc.execute(LoginCommand("nobody@ump.edu.vn", "pw", "ip")))
def test_unverified_email_raises_403_with_reason() -> None:
users = FakeUsers(user=_user(email_verified=False))
uc, audit = _build(users)
with pytest.raises(EmailNotVerified):
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "pw", "ip")))
assert audit.events == [("fail", "a@ump.edu.vn", "email_unverified")]
def test_rate_limited_raises_429_before_db() -> None:
users = FakeUsers(user=_user())
uc, _ = _build(users, rate_limiter=FakeRateLimiter(allow=False))
with pytest.raises(RateLimited):
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "pw", "ip")))
def test_non_institutional_email_rejected_before_lookup() -> None:
users = FakeUsers(user=_user())
uc, _ = _build(users)
with pytest.raises(InvalidInstitutionalEmail):
asyncio.run(uc.execute(LoginCommand("a@gmail.com", "pw", "ip")))