sciagent code + Gitea Actions CI/CD
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,146 @@
|
||||
"""Unit tests for the AuthenticateUser use case using fakes (no DB, no FastAPI).
|
||||
|
||||
Async use case is driven via ``asyncio.run`` so no pytest-asyncio plugin is needed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from src.application.identity.dto import LoginCommand
|
||||
from src.application.identity.use_cases.authenticate_user import AuthenticateUser
|
||||
from src.domain.identity.entities import User
|
||||
from src.domain.identity.errors import (
|
||||
EmailNotVerified,
|
||||
InvalidCredentials,
|
||||
InvalidInstitutionalEmail,
|
||||
)
|
||||
from src.shared_kernel.errors import RateLimited
|
||||
|
||||
|
||||
class FakeUsers:
|
||||
def __init__(self, user: User | None = None, roles: list[str] | None = None) -> None:
|
||||
self._user = user
|
||||
self._roles = roles or []
|
||||
self.reconciled = False
|
||||
|
||||
async def get_by_email(self, email: str) -> User | None:
|
||||
return self._user if (self._user and self._user.email == email) else None
|
||||
|
||||
async def get_by_id(self, user_id): # pragma: no cover - unused here
|
||||
return self._user
|
||||
|
||||
async def roles_after_reconcile(self, user: User) -> list[str]:
|
||||
self.reconciled = True
|
||||
return self._roles
|
||||
|
||||
|
||||
class FakeHasher:
|
||||
def __init__(self, ok: bool = True) -> None:
|
||||
self.ok = ok
|
||||
|
||||
def hash(self, plain: str) -> str:
|
||||
return "h:" + plain
|
||||
|
||||
def verify(self, plain: str, hashed: str) -> bool:
|
||||
return self.ok
|
||||
|
||||
|
||||
class FakeTokens:
|
||||
def issue(self, user_id, email, roles, credential_version) -> str:
|
||||
return f"tok:{user_id}:{credential_version}:{','.join(roles)}"
|
||||
|
||||
|
||||
class FakeRateLimiter:
|
||||
def __init__(self, allow: bool = True) -> None:
|
||||
self._allow = allow
|
||||
|
||||
def allow(self, email: str, client_ip: str) -> bool:
|
||||
return self._allow
|
||||
|
||||
|
||||
class FakeAudit:
|
||||
def __init__(self) -> None:
|
||||
self.events: list[tuple] = []
|
||||
|
||||
async def login_succeeded(self, *, user_id, email, roles) -> None:
|
||||
self.events.append(("ok", email, tuple(roles)))
|
||||
|
||||
async def login_failed(self, *, email, user_id, reason) -> None:
|
||||
self.events.append(("fail", email, reason))
|
||||
|
||||
|
||||
def _user(**kw) -> User:
|
||||
base = dict(
|
||||
id=uuid.uuid4(),
|
||||
email="a@ump.edu.vn",
|
||||
full_name="Test",
|
||||
password_hash="x",
|
||||
email_verified=True,
|
||||
is_active=True,
|
||||
credential_version=2,
|
||||
)
|
||||
base.update(kw)
|
||||
return User(**base)
|
||||
|
||||
|
||||
def _build(users: FakeUsers, hasher=None, rate_limiter=None, audit=None) -> tuple:
|
||||
audit = audit or FakeAudit()
|
||||
uc = AuthenticateUser(
|
||||
users=users,
|
||||
hasher=hasher or FakeHasher(ok=True),
|
||||
tokens=FakeTokens(),
|
||||
rate_limiter=rate_limiter or FakeRateLimiter(allow=True),
|
||||
audit=audit,
|
||||
)
|
||||
return uc, audit
|
||||
|
||||
|
||||
def test_login_success_returns_token_and_reconciles_roles() -> None:
|
||||
users = FakeUsers(user=_user(), roles=["admin", "viewer"])
|
||||
uc, audit = _build(users)
|
||||
result = asyncio.run(uc.execute(LoginCommand("A@ump.edu.vn", "pw", "1.2.3.4")))
|
||||
assert result.access_token.endswith(":2:admin,viewer")
|
||||
assert result.roles == ["admin", "viewer"]
|
||||
assert users.reconciled is True
|
||||
assert audit.events == [("ok", "a@ump.edu.vn", ("admin", "viewer"))]
|
||||
|
||||
|
||||
def test_wrong_password_raises_401_and_audits_failure() -> None:
|
||||
users = FakeUsers(user=_user())
|
||||
uc, audit = _build(users, hasher=FakeHasher(ok=False))
|
||||
with pytest.raises(InvalidCredentials) as exc:
|
||||
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "bad", "ip")))
|
||||
assert exc.value.message == "Email hoặc mật khẩu không đúng."
|
||||
assert audit.events == [("fail", "a@ump.edu.vn", None)]
|
||||
|
||||
|
||||
def test_unknown_email_raises_401() -> None:
|
||||
uc, _ = _build(FakeUsers(user=None))
|
||||
with pytest.raises(InvalidCredentials):
|
||||
asyncio.run(uc.execute(LoginCommand("nobody@ump.edu.vn", "pw", "ip")))
|
||||
|
||||
|
||||
def test_unverified_email_raises_403_with_reason() -> None:
|
||||
users = FakeUsers(user=_user(email_verified=False))
|
||||
uc, audit = _build(users)
|
||||
with pytest.raises(EmailNotVerified):
|
||||
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "pw", "ip")))
|
||||
assert audit.events == [("fail", "a@ump.edu.vn", "email_unverified")]
|
||||
|
||||
|
||||
def test_rate_limited_raises_429_before_db() -> None:
|
||||
users = FakeUsers(user=_user())
|
||||
uc, _ = _build(users, rate_limiter=FakeRateLimiter(allow=False))
|
||||
with pytest.raises(RateLimited):
|
||||
asyncio.run(uc.execute(LoginCommand("a@ump.edu.vn", "pw", "ip")))
|
||||
|
||||
|
||||
def test_non_institutional_email_rejected_before_lookup() -> None:
|
||||
users = FakeUsers(user=_user())
|
||||
uc, _ = _build(users)
|
||||
with pytest.raises(InvalidInstitutionalEmail):
|
||||
asyncio.run(uc.execute(LoginCommand("a@gmail.com", "pw", "ip")))
|
||||
Reference in New Issue
Block a user