Files
sciagent/be0/tests/test_applications_db_integration.py
Thinh Lam 688fac73e9
CI/CD / backend (push) Failing after 2m8s
CI/CD / frontend (push) Failing after 1m40s
CI/CD / deploy (push) Has been skipped
sciagent code + Gitea Actions CI/CD
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 09:38:30 +07:00

798 lines
30 KiB
Python

"""
PostgreSQL integration tests for submitted applications (update / delete).
These tests exercise `src.initiative_db.submissions` against a real database.
They are skipped unless INITIATIVE_DATABASE_URL points at PostgreSQL (asyncpg).
Example (host port from repo docker-compose.yml):
export INITIATIVE_DATABASE_URL="postgresql+asyncpg://initiative:initiative_secret@127.0.0.1:15432/initiatives"
cd be0 && python -m unittest tests.test_applications_db_integration -v
Prerequisites:
- Schema applied: `001_initiative_schema.sql` and `002_application_storage_extensions.sql` (see docker-compose postgres init mounts), or equivalent.
- Network reachable from the machine running tests.
"""
from __future__ import annotations
import os
import unittest
import uuid
from datetime import datetime, timezone
from sqlalchemy import select
_RUN_DB = os.getenv("INITIATIVE_DATABASE_URL", "").strip().lower().startswith("postgresql")
_HAS_MINIO = all(
os.getenv(k, "").strip()
for k in (
"S3_ENDPOINT_URL",
"S3_ACCESS_KEY",
"S3_SECRET_KEY",
"S3_BUCKET_ATTACHMENTS",
"S3_BUCKET_EXPORTS",
"S3_BUCKET_QUARANTINE",
)
)
@unittest.skipUnless(
_RUN_DB,
"Set INITIATIVE_DATABASE_URL=postgresql+asyncpg://.../initiatives to run DB integration tests",
)
class ApplicationsDbIntegrationTests(unittest.IsolatedAsyncioTestCase):
"""End-to-end persistence for applicant submission update + delete."""
async def asyncSetUp(self) -> None:
from src.initiative_db import engine as eng
await eng.dispose_engine()
await eng.init_engine()
async def asyncTearDown(self) -> None:
from src.initiative_db import engine as eng
await eng.dispose_engine()
async def test_update_then_delete_submission_round_trip(self) -> None:
from src.initiative_db.engine import get_session
from src.initiative_db.models import Draft, Initiative, User
from src.initiative_db.submissions import (
delete_my_submitted_application,
get_application_by_id,
update_my_submitted_application,
)
owner_id = uuid.uuid4()
owner_email = f"dbtest-{owner_id.hex[:8]}@ump.edu.vn"
case_code = f"TESTCASE-{uuid.uuid4().hex[:10]}"
submission_id = f"sub-{uuid.uuid4().hex[:16]}"
# --- seed owner + submitted initiative + draft payload ---
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="DB Test Applicant",
)
)
await session.flush()
ini = Initiative(
case_code=case_code,
owner_id=owner_id,
status="submitted",
submitted_at=datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
)
session.add(ini)
await session.flush()
payload = {
"caseId": case_code,
"updatedAt": "2026-01-01T12:00:00Z",
"tabs": {},
"submissionRecord": {
"id": submission_id,
"submittedDate": "2026-01-01T12:00:00.000Z",
"name": "Original title",
"author": {
"id": case_code,
"name": "DB Test Applicant",
"email": owner_email,
},
"status": "submitted",
"reviewStatus": "not_reviewed",
},
}
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload=payload,
version=1,
)
)
# --- update (same semantics as PUT /api/applications/{id}) ---
async with get_session() as session:
row = await update_my_submitted_application(
session,
owner_id,
owner_email,
submission_id,
"Renamed via DB test",
"2026-06-15",
)
self.assertEqual(row.get("name"), "Renamed via DB test")
self.assertIn("2026-06-15", str(row.get("submittedDate") or ""))
async with get_session() as session:
loaded = await get_application_by_id(session, submission_id)
self.assertIsNotNone(loaded)
assert loaded is not None
self.assertEqual(loaded.get("name"), "Renamed via DB test")
# --- delete (same semantics as DELETE /api/applications/{id}) ---
async with get_session() as session:
await delete_my_submitted_application(session, owner_id, owner_email, submission_id)
async with get_session() as session:
gone = await get_application_by_id(session, submission_id)
ini_row = (await session.execute(select(Initiative).where(Initiative.case_code == case_code))).scalar_one_or_none()
self.assertIsNone(gone)
self.assertIsNone(ini_row)
# --- cleanup user (initiative already cascade-deleted) ---
async with get_session() as session:
u = await session.get(User, owner_id)
if u is not None:
await session.delete(u)
async def test_get_application_by_id_matches_fallback_sub_id_when_record_has_no_id(self) -> None:
"""List rows use sub-{initiative.id[:16]} when submissionRecord.id is absent; GET must accept the same id."""
from src.initiative_db.engine import get_session
from src.initiative_db.models import Draft, Initiative, User
from src.initiative_db.submissions import get_application_by_id
owner_id = uuid.uuid4()
owner_email = f"dbtest-fallback-{owner_id.hex[:8]}@ump.edu.vn"
case_code = f"TESTCASE-{uuid.uuid4().hex[:10]}"
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Fallback Id Test",
)
)
await session.flush()
ini = Initiative(
case_code=case_code,
owner_id=owner_id,
status="submitted",
submitted_at=datetime(2026, 3, 1, 12, 0, 0, tzinfo=timezone.utc),
)
session.add(ini)
await session.flush()
# Must match `_submission_display_id`: first 16 hex digits of UUID, not `str(uuid)[:16]`.
expected_list_id = f"sub-{ini.id.hex[:16]}"
payload = {
"caseId": case_code,
"updatedAt": "2026-03-01T12:00:00Z",
"tabs": {},
"submissionRecord": {
"submittedDate": "2026-03-01T12:00:00.000Z",
"name": "No explicit submission id",
"author": {
"id": case_code,
"name": "Fallback Id Test",
"email": owner_email,
},
"status": "submitted",
"reviewStatus": "not_reviewed",
},
}
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload=payload,
version=1,
)
)
async with get_session() as session:
loaded = await get_application_by_id(session, expected_list_id)
self.assertIsNotNone(loaded)
assert loaded is not None
self.assertEqual(loaded.get("id"), expected_list_id)
self.assertEqual(loaded.get("name"), "No explicit submission id")
async with get_session() as session:
ini_row = (await session.execute(select(Initiative).where(Initiative.case_code == case_code))).scalar_one()
await session.delete(ini_row)
u = await session.get(User, owner_id)
if u is not None:
await session.delete(u)
async def test_update_forbidden_for_non_owner_mismatched_email(self) -> None:
from src.initiative_db.engine import get_session
from src.initiative_db.models import Draft, Initiative, User
from src.initiative_db.submissions import update_my_submitted_application
owner_id = uuid.uuid4()
owner_email = f"owner-{owner_id.hex[:8]}@ump.edu.vn"
intruder_id = uuid.uuid4()
intruder_email = f"other-{intruder_id.hex[:8]}@ump.edu.vn"
case_code = f"TESTCASE-{uuid.uuid4().hex[:10]}"
submission_id = f"sub-{uuid.uuid4().hex[:16]}"
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Owner",
)
)
session.add(
User(
id=intruder_id,
email=intruder_email,
password_hash="-",
full_name="Intruder",
)
)
await session.flush()
ini = Initiative(
case_code=case_code,
owner_id=owner_id,
status="submitted",
submitted_at=datetime(2026, 2, 1, 12, 0, 0, tzinfo=timezone.utc),
)
session.add(ini)
await session.flush()
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload={
"caseId": case_code,
"submissionRecord": {
"id": submission_id,
"submittedDate": "2026-02-01T12:00:00.000Z",
"name": "Sealed",
"author": {"id": case_code, "name": "Owner", "email": owner_email},
},
},
version=1,
)
)
async with get_session() as session:
with self.assertRaises(PermissionError):
await update_my_submitted_application(
session,
intruder_id,
intruder_email,
submission_id,
"Should not apply",
"2026-02-02",
)
async with get_session() as session:
ini = (await session.execute(select(Initiative).where(Initiative.case_code == case_code))).scalar_one()
d = (
await session.execute(select(Draft).where(Draft.initiative_id == ini.id).limit(1))
).scalar_one()
name = (d.payload or {}).get("submissionRecord", {}).get("name")
self.assertEqual(name, "Sealed")
async with get_session() as session:
ini = (await session.execute(select(Initiative).where(Initiative.case_code == case_code))).scalar_one_or_none()
if ini is not None:
await session.delete(ini)
u1 = await session.get(User, owner_id)
u2 = await session.get(User, intruder_id)
if u1 is not None:
await session.delete(u1)
if u2 is not None:
await session.delete(u2)
async def test_save_submitted_application_rejects_when_readiness_not_met(self) -> None:
"""Server readiness runs before official-form MinIO work (no S3_* required)."""
from src.initiative_db.engine import get_session
from src.initiative_db.models import Draft, Initiative, User
from src.initiative_db.submission_readiness import ApplicationSubmissionNotReadyError
from src.initiative_db.submissions import save_submitted_application
owner_id = uuid.uuid4()
owner_email = f"ready-{owner_id.hex[:8]}@ump.edu.vn"
case_code = f"READYCASE-{uuid.uuid4().hex[:10]}"
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Readiness tester",
)
)
await session.flush()
ini = Initiative(case_code=case_code, owner_id=owner_id, status="draft")
session.add(ini)
await session.flush()
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload={
"caseId": case_code,
"tabs": {},
"updatedAt": datetime.now(timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
},
)
)
await session.flush()
with self.assertRaises(ApplicationSubmissionNotReadyError) as ctx:
await save_submitted_application(
session,
metadata={
"caseId": case_code,
"initiativeName": "Incomplete",
"authorName": "X",
"authorEmail": owner_email,
},
file_url="/submitted-initiatives/x.pdf",
owner_user_id=owner_id,
pdf_byte_size=50,
pdf_sha256="ab" * 32,
pdf_original_name="x.pdf",
)
self.assertTrue(len(ctx.exception.missing) > 0)
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one_or_none()
if ini is not None:
await session.delete(ini)
u = await session.get(User, owner_id)
if u is not None:
await session.delete(u)
@unittest.skipUnless(
_HAS_MINIO,
"Needs S3_* env — submit persists official forms via MinIO after snapshots.",
)
async def test_save_submitted_application_writes_storage_tables(self) -> None:
"""Requires migration 002 (submit snapshots, taxonomy, workflow, artifacts)."""
from src.initiative_db.engine import get_session
from src.initiative_db.models import (
ApplicationArtifact,
ApplicationSubmitSnapshot,
ApplicationTaxonomy,
ApplicationWorkflow,
Draft,
Initiative,
User,
)
from src.initiative_db.submissions import save_submitted_application
from tests.fixtures.minimal_submit_bundle import minimal_tabs_bundle
owner_id = uuid.uuid4()
owner_email = f"snaps-{owner_id.hex[:8]}@ump.edu.vn"
case_code = f"SNAPCASE-{uuid.uuid4().hex[:10]}"
sha = "ab" * 32
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Snap tester",
)
)
await session.flush()
ini = Initiative(case_code=case_code, owner_id=owner_id, status="draft")
session.add(ini)
await session.flush()
tabs_payload = minimal_tabs_bundle(initiative_name="Storage ext test")
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload={
"caseId": case_code,
"tabs": tabs_payload,
"updatedAt": datetime.now(timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
},
)
)
session.add(
ApplicationArtifact(
initiative_id=ini.id,
role="technical_evidence",
storage_uri="initiatives/test/evidence-key.pdf",
mime_type="application/pdf",
byte_size=900,
)
)
await session.flush()
await save_submitted_application(
session,
metadata={
"caseId": case_code,
"initiativeName": "Storage ext test",
"authorName": "Snap",
"authorEmail": owner_email,
"subjectId": "math",
"groupId": "g1",
"topicType": "Hồ sơ PDF",
},
file_url="/submitted-initiatives/t.pdf",
owner_user_id=owner_id,
pdf_byte_size=123,
pdf_sha256=sha,
pdf_original_name="t.pdf",
)
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one()
snaps = (
await session.execute(
select(ApplicationSubmitSnapshot).where(
ApplicationSubmitSnapshot.initiative_id == ini.id
)
)
).scalars().all()
arts = (
await session.execute(
select(ApplicationArtifact).where(ApplicationArtifact.initiative_id == ini.id)
)
).scalars().all()
tax = (
await session.execute(
select(ApplicationTaxonomy).where(ApplicationTaxonomy.initiative_id == ini.id)
)
).scalar_one()
wf = (
await session.execute(
select(ApplicationWorkflow).where(ApplicationWorkflow.initiative_id == ini.id)
)
).scalar_one()
self.assertEqual(len(snaps), 1)
self.assertEqual(snaps[0].submission_record_id[:4], "sub-")
full_pdf_rows = [a for a in arts if a.role == "full_pdf"]
self.assertEqual(len(full_pdf_rows), 1)
self.assertEqual(full_pdf_rows[0].byte_size, 123)
self.assertEqual(tax.subject_id, "math")
self.assertEqual(wf.review_status, "not_reviewed")
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one_or_none()
if ini is not None:
await session.delete(ini)
u = await session.get(User, owner_id)
if u is not None:
await session.delete(u)
async def test_draft_tab_save_records_tab_snapshot(self) -> None:
"""Requires migration 002 (`draft_tab_snapshots`)."""
from src.initiative_db.drafts import save_application_draft_tab
from src.initiative_db.engine import get_session
from src.initiative_db.models import DraftTabSnapshot, Initiative, User
owner_id = uuid.uuid4()
owner_email = f"tabsnap-{owner_id.hex[:8]}@ump.edu.vn"
case_code = f"TABCASE-{uuid.uuid4().hex[:10]}"
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Tab snap",
)
)
await session.flush()
await save_application_draft_tab(
session, case_code, "report", {"title": "Chapter 1"}, owner_id=owner_id
)
await save_application_draft_tab(
session, case_code, "report", {"title": "Chapter 2"}, owner_id=owner_id
)
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one()
rows = (
await session.execute(
select(DraftTabSnapshot)
.where(DraftTabSnapshot.initiative_id == ini.id)
.where(DraftTabSnapshot.tab == "report")
.order_by(DraftTabSnapshot.tab_version)
)
).scalars().all()
self.assertEqual(len(rows), 2)
self.assertEqual(rows[0].tab_version, 1)
self.assertEqual(rows[0].payload.get("title"), "Chapter 1")
self.assertEqual(rows[1].tab_version, 2)
self.assertEqual(rows[1].payload.get("title"), "Chapter 2")
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one_or_none()
if ini is not None:
await session.delete(ini)
u = await session.get(User, owner_id)
if u is not None:
await session.delete(u)
async def test_admin_result_upsert_appears_in_decided_list(self) -> None:
"""PUT-style upsert updates initiative status; decided lifecycle list includes the row."""
from src.initiative_db.application_admin_results import upsert_admin_result
from src.initiative_db.engine import get_session
from src.initiative_db.models import Draft, Initiative, User
from src.initiative_db.submissions import list_submitted_applications
admin_id = uuid.uuid4()
owner_id = uuid.uuid4()
owner_email = f"admin-upsert-{owner_id.hex[:8]}@ump.edu.vn"
admin_email = f"admin-{admin_id.hex[:8]}@ump.edu.vn"
case_code = f"ADM-{uuid.uuid4().hex[:10]}"
submission_id = f"sub-{uuid.uuid4().hex[:16]}"
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Owner upsert",
)
)
session.add(
User(
id=admin_id,
email=admin_email,
password_hash="-",
full_name="Admin upsert",
)
)
await session.flush()
ini = Initiative(
case_code=case_code,
owner_id=owner_id,
status="submitted",
submitted_at=datetime(2026, 4, 1, 12, 0, 0, tzinfo=timezone.utc),
)
session.add(ini)
await session.flush()
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload={
"caseId": case_code,
"updatedAt": "2026-04-01T12:00:00Z",
"tabs": {},
"submissionRecord": {
"id": submission_id,
"submittedDate": "2026-04-01T12:00:00.000Z",
"name": "Upsert list test",
"author": {
"id": case_code,
"name": "Owner upsert",
"email": owner_email,
},
"status": "submitted",
"reviewStatus": "not_reviewed",
},
},
version=1,
)
)
async with get_session() as session:
await upsert_admin_result(
session,
submission_id,
admin_id,
decision="approved",
feedback="ok",
rationale=None,
)
async with get_session() as session:
out = await list_submitted_applications(
session=session,
page=1,
page_size=50,
name="",
author_name="",
reviewer_name="",
status="",
review_status="",
date_from="",
date_to="",
sort_by="submittedDate",
sort_order="desc",
lifecycle="decided",
)
ids = {str(r.get("id")) for r in out.get("data") or []}
self.assertIn(submission_id, ids)
decided_row = next(
(r for r in (out.get("data") or []) if str(r.get("id")) == submission_id),
None,
)
self.assertIsNotNone(decided_row)
assert decided_row is not None
self.assertEqual(decided_row.get("nhan_xet"), "ok", "Admin feedback must surface as nhan_xet for «Nhận xét» in admin list")
reviewer = decided_row.get("reviewer") or {}
self.assertEqual(
reviewer.get("name"),
"Admin upsert",
"Người đánh giá should show adjudicating admin users.full_name (updated_by)",
)
self.assertEqual(str(reviewer.get("id")), str(admin_id))
async with get_session() as session:
ini_row = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one_or_none()
if ini_row is not None:
await session.delete(ini_row)
for uid in (owner_id, admin_id):
u = await session.get(User, uid)
if u is not None:
await session.delete(u)
async def test_notification_inbox_after_admin_upsert(self) -> None:
"""Requires migration 006 (`user_notifications`). Applicant receives inbox row after admin upsert + best_effort."""
from src.initiative_db.application_admin_results import upsert_admin_result
from src.initiative_db.engine import get_session
from src.initiative_db.models import Draft, Initiative, User
from src.initiative_db.user_notifications import (
best_effort_notify_applicant_after_admin_decision,
count_unread_notifications,
list_notifications_for_user,
mark_notification_read,
)
admin_id = uuid.uuid4()
owner_id = uuid.uuid4()
owner_email = f"notif-{owner_id.hex[:8]}@ump.edu.vn"
admin_email = f"notif-adm-{admin_id.hex[:8]}@ump.edu.vn"
case_code = f"NTF-{uuid.uuid4().hex[:10]}"
submission_id = f"sub-{uuid.uuid4().hex[:16]}"
async with get_session() as session:
session.add(
User(
id=owner_id,
email=owner_email,
password_hash="-",
full_name="Notif owner",
)
)
session.add(
User(
id=admin_id,
email=admin_email,
password_hash="-",
full_name="Notif admin",
)
)
await session.flush()
ini = Initiative(
case_code=case_code,
owner_id=owner_id,
status="submitted",
submitted_at=datetime(2026, 5, 1, 12, 0, 0, tzinfo=timezone.utc),
)
session.add(ini)
await session.flush()
session.add(
Draft(
draft_code=f"DRAFT-{case_code}",
initiative_id=ini.id,
payload={
"caseId": case_code,
"tabs": {
"application": {
"initiativeClassification": "research",
"researchEvidenceKind": "international",
}
},
"submissionRecord": {
"id": submission_id,
"submittedDate": "2026-05-01T12:00:00.000Z",
"name": "Notif seed",
"author": {
"id": case_code,
"name": "Notif owner",
"email": owner_email,
},
"status": "submitted",
"reviewStatus": "not_reviewed",
},
},
version=1,
)
)
result: dict
async with get_session() as session:
result = await upsert_admin_result(
session,
submission_id,
admin_id,
decision="approved",
feedback="Kết quả thử nghiệm thông báo.",
rationale=None,
)
await best_effort_notify_applicant_after_admin_decision(result)
async with get_session() as session:
unread_before = await count_unread_notifications(session, owner_id)
inbox = await list_notifications_for_user(session, owner_id, page=1, page_size=10)
self.assertEqual(unread_before, 1)
self.assertEqual(inbox["pagination"]["totalItems"], 1)
row = inbox["data"][0]
self.assertEqual(row["applicationId"], submission_id)
self.assertEqual(row["decision"], "approved")
self.assertEqual(row["meritCategoryLabel"], "Xuất sắc")
self.assertIn("Kết quả thử nghiệm", row["feedback"])
self.assertIsNone(row["readAt"])
nid = uuid.UUID(row["id"])
async with get_session() as session:
ok = await mark_notification_read(session, owner_id, nid)
self.assertTrue(ok)
unread_after = await count_unread_notifications(session, owner_id)
self.assertEqual(unread_after, 0)
async with get_session() as session:
ini_row = (
await session.execute(select(Initiative).where(Initiative.case_code == case_code))
).scalar_one_or_none()
if ini_row is not None:
await session.delete(ini_row)
for uid in (owner_id, admin_id):
u = await session.get(User, uid)
if u is not None:
await session.delete(u)
if __name__ == "__main__":
unittest.main()