159 lines
6.1 KiB
Python
159 lines
6.1 KiB
Python
"""
|
|
Security regression tests for authenticated / removed routes (no Postgres required).
|
|
|
|
Run: cd be0 && python -m unittest tests.test_security_routes -v
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import unittest
|
|
from unittest.mock import patch
|
|
|
|
from tests.security_token_fixture import mint_bearer_token
|
|
|
|
|
|
class SecurityRoutesTests(unittest.TestCase):
|
|
def _client(self):
|
|
from fastapi.testclient import TestClient
|
|
|
|
from main import app
|
|
|
|
return TestClient(app)
|
|
|
|
def test_removed_upload_document_returns_404(self) -> None:
|
|
client = self._client()
|
|
r = client.post("/upload_document", files={"file": ("x.pdf", b"%PDF", "application/pdf")})
|
|
self.assertEqual(r.status_code, 404)
|
|
|
|
def test_removed_get_page_returns_404(self) -> None:
|
|
client = self._client()
|
|
r = client.post("/get_page", data={"new_page_number": "1"})
|
|
self.assertEqual(r.status_code, 404)
|
|
|
|
def test_list_applications_requires_auth(self) -> None:
|
|
client = self._client()
|
|
r = client.get("/api/applications")
|
|
self.assertEqual(r.status_code, 401)
|
|
|
|
def test_list_applications_rejects_viewer(self) -> None:
|
|
client = self._client()
|
|
headers = {"Authorization": mint_bearer_token(roles=("viewer",))}
|
|
with patch("src.initiative_db.engine.is_postgres_enabled", return_value=False):
|
|
r = client.get("/api/applications", headers=headers)
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_list_applications_allows_staff_without_db(self) -> None:
|
|
client = self._client()
|
|
headers = {"Authorization": mint_bearer_token(roles=("admin",))}
|
|
with patch("src.initiative_db.engine.is_postgres_enabled", return_value=False):
|
|
with patch("main._load_submitted_items", return_value=[]):
|
|
r = client.get("/api/applications", headers=headers)
|
|
self.assertEqual(r.status_code, 200, r.text)
|
|
self.assertIn("data", r.json())
|
|
|
|
def test_get_application_requires_auth(self) -> None:
|
|
client = self._client()
|
|
r = client.get("/api/applications/sub-deadbeefdeadbeef")
|
|
self.assertEqual(r.status_code, 401)
|
|
|
|
def test_get_application_rejects_viewer_without_row(self) -> None:
|
|
client = self._client()
|
|
headers = {"Authorization": mint_bearer_token(roles=("viewer",), email="viewer@ump.edu.vn")}
|
|
with patch("src.initiative_db.engine.is_postgres_enabled", return_value=False):
|
|
with patch("main._get_application_from_file_index", return_value=None):
|
|
r = client.get("/api/applications/sub-deadbeefdeadbeef", headers=headers)
|
|
self.assertEqual(r.status_code, 404)
|
|
|
|
def test_review_documents_list_requires_auth(self) -> None:
|
|
client = self._client()
|
|
r = client.get("/api/v1/review-documents", params={"caseId": "CASE-1"})
|
|
self.assertEqual(r.status_code, 401)
|
|
|
|
def test_review_documents_create_requires_auth(self) -> None:
|
|
client = self._client()
|
|
r = client.post(
|
|
"/api/v1/review-documents",
|
|
json={"caseId": "CASE-1", "officialBieuMau": {}},
|
|
)
|
|
self.assertEqual(r.status_code, 401)
|
|
|
|
def test_chat_requires_auth(self) -> None:
|
|
client = self._client()
|
|
r = client.post("/api/v1/chat", json={"message": "hello"})
|
|
self.assertEqual(r.status_code, 401)
|
|
|
|
def test_analyze_compliance_requires_auth(self) -> None:
|
|
client = self._client()
|
|
r = client.post(
|
|
"/analyze_compliance",
|
|
json={"external_requirements": ["ext"], "internal_requirements": ["int"]},
|
|
)
|
|
self.assertEqual(r.status_code, 401)
|
|
|
|
def test_test_ollama_requires_admin(self) -> None:
|
|
client = self._client()
|
|
viewer = {"Authorization": mint_bearer_token(roles=("viewer",))}
|
|
r_viewer = client.post("/test_ollama", json={"prompt": "hi"}, headers=viewer)
|
|
self.assertEqual(r_viewer.status_code, 403)
|
|
|
|
admin = {"Authorization": mint_bearer_token(roles=("admin",))}
|
|
with patch(
|
|
"main.ollama.chat",
|
|
return_value={"message": {"content": "ok"}},
|
|
):
|
|
r_admin = client.post("/test_ollama", json={"prompt": "hi"}, headers=admin)
|
|
self.assertEqual(r_admin.status_code, 200, r_admin.text)
|
|
|
|
def test_ideas_post_requires_admin(self) -> None:
|
|
client = self._client()
|
|
headers = {"Authorization": mint_bearer_token(roles=("viewer",))}
|
|
r = client.post(
|
|
"/api/v1/ideas",
|
|
json={"title": "t", "description": "d"},
|
|
headers=headers,
|
|
)
|
|
self.assertEqual(r.status_code, 403)
|
|
|
|
def test_security_headers_on_health(self) -> None:
|
|
client = self._client()
|
|
r = client.get("/health")
|
|
self.assertEqual(r.status_code, 200)
|
|
self.assertEqual(r.headers.get("x-content-type-options"), "nosniff")
|
|
self.assertEqual(r.headers.get("x-frame-options"), "DENY")
|
|
self.assertIn("referrer-policy", r.headers)
|
|
|
|
|
|
class JwtSecretTests(unittest.TestCase):
|
|
def test_production_requires_secret(self) -> None:
|
|
from src.auth_jwt import jwt_secret
|
|
|
|
env = {k: v for k, v in os.environ.items() if k not in ("JWT_SECRET", "ENVIRONMENT")}
|
|
env["ENVIRONMENT"] = "production"
|
|
with patch.dict(os.environ, env, clear=True):
|
|
with self.assertRaises(RuntimeError):
|
|
jwt_secret()
|
|
|
|
def test_development_allows_dev_fallback(self) -> None:
|
|
from src.auth_jwt import jwt_secret
|
|
|
|
with patch.dict(os.environ, {"ENVIRONMENT": "development"}, clear=False):
|
|
os.environ.pop("JWT_SECRET", None)
|
|
secret = jwt_secret()
|
|
self.assertGreaterEqual(len(secret), 32)
|
|
|
|
|
|
class LoginRateLimitTests(unittest.TestCase):
|
|
def test_login_rate_limit_blocks_after_threshold(self) -> None:
|
|
from src.auth_rate_limit import allow_login
|
|
|
|
email = "ratelimit-test@ump.edu.vn"
|
|
ip = "203.0.113.50"
|
|
for _ in range(5):
|
|
self.assertTrue(allow_login(email, ip))
|
|
self.assertFalse(allow_login(email, ip))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|