147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
"""Unit tests for the AuthenticateUser use case using fakes (no DB, no FastAPI).
|
|
|
|
Async use case is driven via ``asyncio.run`` so no pytest-asyncio plugin is needed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from src.application.identity.dto import LoginCommand
|
|
from src.application.identity.use_cases.authenticate_user import AuthenticateUser
|
|
from src.domain.identity.entities import User
|
|
from src.domain.identity.errors import (
|
|
EmailNotVerified,
|
|
InvalidCredentials,
|
|
InvalidInstitutionalEmail,
|
|
)
|
|
from src.shared_kernel.errors import RateLimited
|
|
|
|
|
|
class FakeUsers:
|
|
def __init__(self, user: User | None = None, roles: list[str] | None = None) -> None:
|
|
self._user = user
|
|
self._roles = roles or []
|
|
self.reconciled = False
|
|
|
|
async def get_by_email(self, email: str) -> User | None:
|
|
return self._user if (self._user and self._user.email == email) else None
|
|
|
|
async def get_by_id(self, user_id): # pragma: no cover - unused here
|
|
return self._user
|
|
|
|
async def roles_after_reconcile(self, user: User) -> list[str]:
|
|
self.reconciled = True
|
|
return self._roles
|
|
|
|
|
|
class FakeHasher:
|
|
def __init__(self, ok: bool = True) -> None:
|
|
self.ok = ok
|
|
|
|
def hash(self, plain: str) -> str:
|
|
return "h:" + plain
|
|
|
|
def verify(self, plain: str, hashed: str) -> bool:
|
|
return self.ok
|
|
|
|
|
|
class FakeTokens:
|
|
def issue(self, user_id, email, roles, credential_version) -> str:
|
|
return f"tok:{user_id}:{credential_version}:{','.join(roles)}"
|
|
|
|
|
|
class FakeRateLimiter:
|
|
def __init__(self, allow: bool = True) -> None:
|
|
self._allow = allow
|
|
|
|
def allow(self, email: str, client_ip: str) -> bool:
|
|
return self._allow
|
|
|
|
|
|
class FakeAudit:
|
|
def __init__(self) -> None:
|
|
self.events: list[tuple] = []
|
|
|
|
async def login_succeeded(self, *, user_id, email, roles) -> None:
|
|
self.events.append(("ok", email, tuple(roles)))
|
|
|
|
async def login_failed(self, *, email, user_id, reason) -> None:
|
|
self.events.append(("fail", email, reason))
|
|
|
|
|
|
def _user(**kw) -> User:
|
|
base = dict(
|
|
id=uuid.uuid4(),
|
|
email="a@ump.edu.vn",
|
|
full_name="Test",
|
|
password_hash="x",
|
|
email_verified=True,
|
|
is_active=True,
|
|
credential_version=2,
|
|
)
|
|
base.update(kw)
|
|
return User(**base)
|
|
|
|
|
|
def _build(users: FakeUsers, hasher=None, rate_limiter=None, audit=None) -> tuple:
|
|
audit = audit or FakeAudit()
|
|
uc = AuthenticateUser(
|
|
users=users,
|
|
hasher=hasher or FakeHasher(ok=True),
|
|
tokens=FakeTokens(),
|
|
rate_limiter=rate_limiter or FakeRateLimiter(allow=True),
|
|
audit=audit,
|
|
)
|
|
return uc, audit
|
|
|
|
|
|
def test_login_success_returns_token_and_reconciles_roles() -> None:
|
|
users = FakeUsers(user=_user(), roles=["admin", "viewer"])
|
|
uc, audit = _build(users)
|
|
result = asyncio.run(uc.execute(LoginCommand("A@ump.edu.vn", "pw", "1.2.3.4")))
|
|
assert result.access_token.endswith(":2:admin,viewer")
|
|
assert result.roles == ["admin", "viewer"]
|
|
assert users.reconciled is True
|
|
assert audit.events == [("ok", "a@ump.edu.vn", ("admin", "viewer"))]
|
|
|
|
|
|
def test_wrong_password_raises_401_and_audits_failure() -> None:
|
|
users = FakeUsers(user=_user())
|
|
uc, audit = _build(users, hasher=FakeHasher(ok=False))
|
|
with pytest.raises(InvalidCredentials) as exc:
|
|
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "bad", "ip")))
|
|
assert exc.value.message == "Email hoặc mật khẩu không đúng."
|
|
assert audit.events == [("fail", "a@ump.edu.vn", None)]
|
|
|
|
|
|
def test_unknown_email_raises_401() -> None:
|
|
uc, _ = _build(FakeUsers(user=None))
|
|
with pytest.raises(InvalidCredentials):
|
|
asyncio.run(uc.execute(LoginCommand("nobody@ump.edu.vn", "pw", "ip")))
|
|
|
|
|
|
def test_unverified_email_raises_403_with_reason() -> None:
|
|
users = FakeUsers(user=_user(email_verified=False))
|
|
uc, audit = _build(users)
|
|
with pytest.raises(EmailNotVerified):
|
|
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "pw", "ip")))
|
|
assert audit.events == [("fail", "a@ump.edu.vn", "email_unverified")]
|
|
|
|
|
|
def test_rate_limited_raises_429_before_db() -> None:
|
|
users = FakeUsers(user=_user())
|
|
uc, _ = _build(users, rate_limiter=FakeRateLimiter(allow=False))
|
|
with pytest.raises(RateLimited):
|
|
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "pw", "ip")))
|
|
|
|
|
|
def test_non_institutional_email_rejected_before_lookup() -> None:
|
|
users = FakeUsers(user=_user())
|
|
uc, _ = _build(users)
|
|
with pytest.raises(InvalidInstitutionalEmail):
|
|
asyncio.run(uc.execute(LoginCommand("a@gmail.com", "pw", "ip")))
|