Files
Thinh Lam 688fac73e9
CI/CD / backend (push) Failing after 2m8s
CI/CD / frontend (push) Failing after 1m40s
CI/CD / deploy (push) Has been skipped
sciagent code + Gitea Actions CI/CD
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 09:38:30 +07:00

3726 lines
148 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Query
from fastapi.responses import JSONResponse, Response, StreamingResponse
from starlette.staticfiles import StaticFiles
from starlette.requests import Request as StarletteRequest
from pydantic import BaseModel, Field
import unicodedata
from typing import Dict, List, Optional, Any, Literal
from datetime import datetime, timezone
from fastapi import File, UploadFile, Form, Header, Body # type: ignore
from src.auth_jwt import decode_access_token_user_id, decode_bearer_token
from src.admin_audit_routes import router as admin_audit_router
from src.admin_user_profile_routes import router as admin_user_profile_router
from src.template_routes import router as template_router
from src.research_routes import router as research_router
from src.imagehub_routes import router as imagehub_router
from fastapi.middleware.cors import CORSMiddleware # type: ignore
from src.utils import initialize_a_logger
import ollama
import numpy as np
from src.internal_control.it_governance.document_io import DocumentIO
np.random.seed(42)
from pathlib import Path
import uuid
import json
import hashlib
import asyncio
from enum import Enum
from pydantic import BaseModel, Field, validator
import os
import subprocess
import sys
import yaml
# Import the workflow (assuming it's in rm_workflow.py)
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
import numpy as np
from src.compliance_verifier import Compliance_Verifier, ComplianceRequest, PromptRequest
from src.chat_assistant import ChatAssistant, ChatRequest, ChatResponse, get_chat_assistant
from src.auth_api import router as auth_api_router
# Re-define the state and workflow components for FastAPI
class RMIntegrationState(TypedDict):
current_phase: str
phase_number: int
checklist_items: List[dict]
completed_items: List[int]
pending_approvals: List[str]
records_officer_involved: bool
project_status: str
comments: dict
validation_results: dict
next_phase_ready: bool
# Pydantic models for API requests/responses
class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
class WorkflowInitRequest(BaseModel):
project_name: str = Field(..., description="Name of the project")
project_description: Optional[str] = Field(None, description="Project description")
records_officer_email: Optional[str] = Field(None, description="Records officer contact")
class UpdateItemRequest(BaseModel):
item_id: int = Field(..., description="ID of the checklist item to update")
status: TaskStatus = Field(..., description="New status of the item")
comment: Optional[str] = Field(None, description="Comment about the update")
updated_by: Optional[str] = Field(None, description="Who updated the item")
class ApprovalRequest(BaseModel):
approval_type: str = Field(..., description="Type of approval")
approved: bool = Field(..., description="Whether approved or rejected")
approver: str = Field(..., description="Who provided the approval")
comment: Optional[str] = Field(None, description="Approval comment")
class WorkflowResponse(BaseModel):
workflow_id: str
current_phase: str
phase_number: int
project_status: str
completion_percentage: float
pending_approvals: List[str]
next_phase_ready: bool
timestamp: str
class StatusReport(BaseModel):
workflow_id: str
current_phase: str
phase_number: int
completion_percentage: float
completed_items: int
total_items: int
pending_approvals: List[str]
validation_results: Dict[str, str]
project_status: str
checklist_items: List[dict]
timestamp: str
# Chat Assistant Request Models
class VerifyContentRequest(BaseModel):
"""Request model for content verification."""
field_name: str
content: str
verification_criteria: Optional[str] = None
class PolicyQuestionRequest(BaseModel):
"""Request model for policy questions."""
question: str
policy_context: Optional[str] = None
# In-memory storage for workflows (in production, use a database)
workflows_storage: Dict[str, Dict[str, Any]] = {}
# try:
# logger = initialize_a_logger()
# logger.info("Logger initialized successfully") # Test it immediately
# except Exception as e:
# import logging
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger()
# logger.error(f"Logger initialization failed: {e}", exc_info=True)
logger = initialize_a_logger('./logs/main.log')
# FastAPI app initialization
app = FastAPI(
title="RM Integration SDLC Workflow API",
description="API for managing Records Management integration into System Development Life Cycle",
version="1.0.0"
)
app.include_router(auth_api_router, prefix="/api/v1")
app.include_router(admin_audit_router, prefix="/api/v1")
app.include_router(admin_user_profile_router, prefix="/api/v1")
app.include_router(template_router, prefix="/api/v1")
app.include_router(research_router, prefix="/api/v1")
app.include_router(imagehub_router, prefix="/api/v1")
APP_ROOT_DIR = Path(__file__).resolve().parent
def _resolved_frontend_public_dir() -> Path:
"""`assets` at repo root locally, or `/app/assets` when mounted in Docker."""
env = os.getenv("APPLICATION_REPORT_EXPORT_DIR")
if env:
return Path(env)
app_assets = APP_ROOT_DIR / "assets"
if app_assets.is_dir():
return app_assets.resolve()
return (APP_ROOT_DIR.parent / "assets").resolve()
def _resolved_application_draft_dir() -> Path:
"""
fe0/public/application-drafts when running from repo; `/app/assets/application-drafts` in Docker
(see docker-compose `./assets:/app/assets`). Using APP_ROOT_DIR.parent/fe0 breaks inside Docker
(parent is `/`, producing `/fe0/...`).
"""
env = os.getenv("APPLICATION_DRAFT_DIR")
if env:
return Path(env)
fe0 = APP_ROOT_DIR.parent / "fe0"
if fe0.is_dir():
return (fe0 / "public" / "application-drafts").resolve()
return (APP_ROOT_DIR / "assets" / "application-drafts").resolve()
FRONTEND_PUBLIC_DIR = _resolved_frontend_public_dir()
APPLICATION_DRAFT_DIR = _resolved_application_draft_dir()
def _load_application_draft_yaml(case_id: str) -> Optional[Dict[str, Any]]:
"""Load draft from disk; try current path and legacy Docker mis-resolved path."""
name = f"{case_id}.yml"
candidates = [
APPLICATION_DRAFT_DIR / name,
APP_ROOT_DIR.parent / "fe0" / "public" / "application-drafts" / name,
Path("/fe0/public/application-drafts") / name,
]
seen: set[str] = set()
for path in candidates:
try:
key = str(path.resolve(strict=False))
except (OSError, RuntimeError):
key = str(path)
if key in seen:
continue
seen.add(key)
if path.is_file():
with open(path, "r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if isinstance(data, dict):
return data
return None
def _empty_applicant_draft_bundle(case_id: str) -> Dict[str, Any]:
"""
Client-generated case IDs can exist in sessionStorage before any POST save.
DB reset / no YAML yet must not 404 — return the same shape as save/load.
"""
return {
"caseId": case_id,
"updatedAt": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"tabs": {},
}
class ApplicationDraftSaveRequest(BaseModel):
caseId: Optional[str] = None
tab: str
data: Dict[str, Any]
class ReviewDocumentSaveRequest(BaseModel):
caseId: str = Field(..., min_length=1, max_length=128)
officialBieuMau: Dict[str, Any] = Field(default_factory=dict)
templateData: Optional[Dict[str, Any]] = None
fullBundle: Optional[Dict[str, Any]] = None
class ReviewDocumentUpdateRequest(BaseModel):
officialBieuMau: Dict[str, Any] = Field(default_factory=dict)
templateData: Optional[Dict[str, Any]] = None
fullBundle: Optional[Dict[str, Any]] = None
class PdfLayoutEditPayload(BaseModel):
id: str = Field(default="", max_length=128)
text: str = Field(default="", max_length=20_000)
page: int = Field(default=1, ge=1, le=300)
x: float = 0
y: float = 0
fontSize: float = Field(default=12, ge=1, le=256)
lineHeight: float = Field(default=14, ge=1, le=512)
boxWidth: float = Field(default=240, ge=1, le=5_000)
letterSpacing: float = Field(default=0, ge=-50, le=200)
textAlign: Literal["left", "center", "right"] = "left"
fontName: Literal["TimesRoman", "TimesRomanBold", "Helvetica", "HelveticaBold"] = "TimesRoman"
colorHex: str = Field(default="#111827", max_length=16)
class UpdateSubmittedApplicationBody(BaseModel):
"""Applicant history panel: edit name + submission date (mirrors fe0 ApplicantHistoryCrudDialog)."""
name: str = Field(..., min_length=1, max_length=500)
submittedDate: str = Field(..., min_length=4, max_length=40)
class CreateSubmittedApplicationBody(BaseModel):
"""Create a new shell record for applicant and immediately allocate `applicationId`."""
name: Optional[str] = Field(default=None, max_length=500)
def _cors_allow_origins() -> List[str]:
"""Localhost defaults plus optional comma-separated `CORS_ORIGINS` (e.g. http://VM_IP:8081)."""
base = [
"http://localhost:8081",
"http://localhost:8080",
"http://localhost:3000",
"http://127.0.0.1:8081",
"http://127.0.0.1:8080",
"http://127.0.0.1:3000",
]
extra = os.getenv("CORS_ORIGINS", "").strip()
if not extra:
return base
merged = list(base)
for part in extra.split(","):
o = part.strip()
if o and o not in merged:
merged.append(o)
return merged
CORS_ALLOW_ORIGINS = _cors_allow_origins()
if "*" in CORS_ALLOW_ORIGINS:
raise RuntimeError("CORS_ORIGINS must not include '*' when allow_credentials=True")
# document_parser = DocumentIO()
app.add_middleware(
CORSMiddleware,
allow_origins=CORS_ALLOW_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"],
)
@app.middleware("http")
async def _credential_version_middleware(request: StarletteRequest, call_next):
from src.auth_credential_middleware import auth_credential_version_middleware
return await auth_credential_version_middleware(request, call_next)
@app.middleware("http")
async def _security_headers_middleware(request: StarletteRequest, call_next):
response = await call_next(request)
response.headers.setdefault("X-Content-Type-Options", "nosniff")
response.headers.setdefault("X-Frame-Options", "DENY")
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
if os.getenv("ENVIRONMENT", "").lower() == "production":
response.headers.setdefault(
"Strict-Transport-Security", "max-age=31536000; includeSubDomains"
)
return response
@app.on_event("startup")
async def _initiative_db_startup():
from src.initiative_db.engine import init_engine, is_postgres_enabled
if is_postgres_enabled():
await init_engine()
logger.info("Initiative PostgreSQL persistence enabled")
mig_script = APP_ROOT_DIR / "scripts" / "apply_initiative_migrations.py"
if mig_script.is_file():
try:
proc = await asyncio.create_subprocess_exec(
sys.executable,
str(mig_script),
cwd=str(APP_ROOT_DIR),
env=os.environ.copy(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out, err = await asyncio.wait_for(proc.communicate(), timeout=120)
if out.strip():
logger.info(
"apply_initiative_migrations: %s",
out.decode("utf-8", errors="replace").strip(),
)
if err.strip():
logger.warning(
"apply_initiative_migrations stderr: %s",
err.decode("utf-8", errors="replace").strip(),
)
if proc.returncode != 0:
logger.warning(
"apply_initiative_migrations exited code %s (e.g. missing registration_otp_codes)",
proc.returncode,
)
except Exception as exc:
logger.warning("apply_initiative_migrations could not run: %s", exc)
try:
from src.minio.storage import S3Storage, settings as _s3s
await S3Storage(_s3s).ensure_buckets_exist()
logger.info("MinIO/S3 buckets ensured (attachments/exports/quarantine).")
except Exception as exc:
logger.warning("MinIO/S3 bucket init skipped (configure S3_* env to enable evidence uploads): %s", exc)
@app.on_event("shutdown")
async def _initiative_db_shutdown():
from src.initiative_db.engine import dispose_engine, is_postgres_enabled
if is_postgres_enabled():
await dispose_engine()
logger.info(f"parser start")
Compliance_Verifier = Compliance_Verifier()
Chat_Assistant = get_chat_assistant()
# Import or redefine the workflow functions from the previous artifact
def phase1_concept_development(state: RMIntegrationState) -> RMIntegrationState:
"""Phase 1: Concept Development - Initial records planning"""
phase1_checklist = [
{
"id": 1,
"task": "Include Records Officer in system design process",
"status": "pending",
"requires_approval": True,
"approver": "Records Officer"
},
{
"id": 2,
"task": "Identify records that support the business process",
"status": "pending",
"requires_approval": False,
"approver": None
},
{
"id": 3,
"task": "Evaluate current record schedules applicability",
"status": "pending",
"requires_approval": False,
"approver": None
},
{
"id": 4,
"task": "Determine if new record schedule is required",
"status": "pending",
"requires_approval": False,
"approver": None
},
{
"id": 5,
"task": "Obtain Records Officer signature on Investment Summary Proposal",
"status": "pending",
"requires_approval": True,
"approver": "Records Officer"
}
]
state["current_phase"] = "Concept Development"
state["phase_number"] = 1
state["checklist_items"] = phase1_checklist
state["pending_approvals"] = ["Records Officer - System Design", "Records Officer - Investment Summary"]
return state
def create_rm_integration_workflow():
"""Simplified workflow creation for FastAPI"""
# This would include all the phase functions from the previous artifact
# For brevity, I'm showing the structure
workflow = StateGraph(RMIntegrationState)
# Add all nodes and edges as in the previous implementation
return workflow
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "RM Integration SDLC Workflow API",
"version": "1.0.0",
"endpoints": {
"POST /workflows": "Create new workflow",
"GET /workflows/{workflow_id}": "Get workflow status",
"PUT /workflows/{workflow_id}/items": "Update checklist item",
"POST /workflows/{workflow_id}/approvals": "Submit approval",
"GET /workflows/{workflow_id}/report": "Get detailed status report",
"POST /workflows/{workflow_id}/advance": "Advance to next phase",
"GET /workflows": "List all workflows"
}
}
@app.post("/workflows", response_model=WorkflowResponse)
async def create_workflow(request: WorkflowInitRequest):
"""Create a new RM integration workflow"""
workflow_id = str(uuid.uuid4())
# Initialize workflow state
initial_state = {
"current_phase": "",
"phase_number": 0,
"checklist_items": [],
"completed_items": [],
"pending_approvals": [],
"records_officer_involved": False,
"project_status": "Starting RM Integration Process",
"comments": {},
"validation_results": {},
"next_phase_ready": False
}
# Start with Phase 1
state = phase1_concept_development(initial_state)
# Store workflow
workflows_storage[workflow_id] = {
"state": state,
"metadata": {
"project_name": request.project_name,
"project_description": request.project_description,
"records_officer_email": request.records_officer_email,
"created_at": datetime.now().isoformat(),
"last_updated": datetime.now().isoformat()
}
}
completed_count = len([item for item in state["checklist_items"] if item["status"] == "completed"])
total_count = len(state["checklist_items"])
completion_percentage = (completed_count / total_count * 100) if total_count > 0 else 0
return WorkflowResponse(
workflow_id=workflow_id,
current_phase=state["current_phase"],
phase_number=state["phase_number"],
project_status=state["project_status"],
completion_percentage=completion_percentage,
pending_approvals=state["pending_approvals"],
next_phase_ready=state["next_phase_ready"],
timestamp=datetime.now().isoformat()
)
@app.get("/workflows/{workflow_id}", response_model=WorkflowResponse)
async def get_workflow_status(workflow_id: str):
"""Get current workflow status"""
if workflow_id not in workflows_storage:
raise HTTPException(status_code=404, detail="Workflow not found")
state = workflows_storage[workflow_id]["state"]
completed_count = len([item for item in state["checklist_items"] if item["status"] == "completed"])
total_count = len(state["checklist_items"])
completion_percentage = (completed_count / total_count * 100) if total_count > 0 else 0
return WorkflowResponse(
workflow_id=workflow_id,
current_phase=state["current_phase"],
phase_number=state["phase_number"],
project_status=state["project_status"],
completion_percentage=completion_percentage,
pending_approvals=state["pending_approvals"],
next_phase_ready=state["next_phase_ready"],
timestamp=datetime.now().isoformat()
)
@app.put("/workflows/{workflow_id}/items")
async def update_checklist_item(workflow_id: str, request: UpdateItemRequest):
"""Update a checklist item status"""
if workflow_id not in workflows_storage:
raise HTTPException(status_code=404, detail="Workflow not found")
state = workflows_storage[workflow_id]["state"]
# Find and update the item
item_found = False
for item in state["checklist_items"]:
if item["id"] == request.item_id:
item["status"] = request.status.value
if request.status == TaskStatus.COMPLETED and request.item_id not in state["completed_items"]:
state["completed_items"].append(request.item_id)
item_found = True
break
if not item_found:
raise HTTPException(status_code=404, detail="Checklist item not found")
# Add comment if provided
if request.comment:
state["comments"][request.item_id] = {
"comment": request.comment,
"updated_by": request.updated_by,
"timestamp": datetime.now().isoformat()
}
# Update metadata
workflows_storage[workflow_id]["metadata"]["last_updated"] = datetime.now().isoformat()
return {"message": f"Item {request.item_id} updated successfully", "status": request.status.value}
@app.post("/workflows/{workflow_id}/approvals")
async def submit_approval(workflow_id: str, request: ApprovalRequest):
"""Submit an approval for the workflow"""
if workflow_id not in workflows_storage:
raise HTTPException(status_code=404, detail="Workflow not found")
state = workflows_storage[workflow_id]["state"]
# Remove approval from pending if approved
if request.approved and request.approval_type in state["pending_approvals"]:
state["pending_approvals"].remove(request.approval_type)
# Log the approval
approval_key = f"approval_{len(state.get('approval_log', []))}"
if "approval_log" not in state:
state["approval_log"] = []
state["approval_log"].append({
"approval_type": request.approval_type,
"approved": request.approved,
"approver": request.approver,
"comment": request.comment,
"timestamp": datetime.now().isoformat()
})
# Update metadata
workflows_storage[workflow_id]["metadata"]["last_updated"] = datetime.now().isoformat()
return {
"message": f"Approval {'granted' if request.approved else 'rejected'} for {request.approval_type}",
"pending_approvals": state["pending_approvals"]
}
@app.get("/workflows/{workflow_id}/report", response_model=StatusReport)
async def get_detailed_report(workflow_id: str):
"""Get detailed status report for a workflow"""
if workflow_id not in workflows_storage:
raise HTTPException(status_code=404, detail="Workflow not found")
state = workflows_storage[workflow_id]["state"]
metadata = workflows_storage[workflow_id]["metadata"]
completed_count = len([item for item in state["checklist_items"] if item["status"] == "completed"])
total_count = len(state["checklist_items"])
completion_percentage = (completed_count / total_count * 100) if total_count > 0 else 0
return StatusReport(
workflow_id=workflow_id,
current_phase=state["current_phase"],
phase_number=state["phase_number"],
completion_percentage=completion_percentage,
completed_items=completed_count,
total_items=total_count,
pending_approvals=state["pending_approvals"],
validation_results=state["validation_results"],
project_status=state["project_status"],
checklist_items=state["checklist_items"],
timestamp=datetime.now().isoformat()
)
@app.post("/workflows/{workflow_id}/advance")
async def advance_workflow(workflow_id: str):
"""Attempt to advance workflow to next phase"""
if workflow_id not in workflows_storage:
raise HTTPException(status_code=404, detail="Workflow not found")
state = workflows_storage[workflow_id]["state"]
# Validate current phase completion
all_completed = True
validation_results = {}
for item in state["checklist_items"]:
if item["status"] != "completed":
all_completed = False
validation_results[str(item["id"])] = f"Item {item['id']} not completed: {item['task']}"
if state["pending_approvals"]:
all_completed = False
validation_results["approvals"] = f"Pending approvals: {', '.join(state['pending_approvals'])}"
if not all_completed:
state["validation_results"] = validation_results
state["next_phase_ready"] = False
return {
"success": False,
"message": "Cannot advance: Phase requirements not met",
"validation_results": validation_results
}
# Advance to next phase
current_phase = state["phase_number"]
if current_phase >= 8:
return {
"success": False,
"message": "Workflow already at final phase",
"current_phase": state["current_phase"]
}
# Here you would call the appropriate next phase function
# For now, just incrementing phase number as example
state["phase_number"] += 1
state["current_phase"] = f"Phase {state['phase_number']}"
state["project_status"] = f"Advanced to {state['current_phase']}"
state["validation_results"] = {}
state["next_phase_ready"] = True
# Update metadata
workflows_storage[workflow_id]["metadata"]["last_updated"] = datetime.now().isoformat()
return {
"success": True,
"message": f"Advanced to {state['current_phase']}",
"current_phase": state["current_phase"],
"phase_number": state["phase_number"]
}
@app.get("/workflows")
async def list_workflows():
"""List all workflows with basic information"""
workflows = []
for workflow_id, data in workflows_storage.items():
state = data["state"]
metadata = data["metadata"]
completed_count = len([item for item in state["checklist_items"] if item["status"] == "completed"])
total_count = len(state["checklist_items"])
completion_percentage = (completed_count / total_count * 100) if total_count > 0 else 0
workflows.append({
"workflow_id": workflow_id,
"project_name": metadata["project_name"],
"current_phase": state["current_phase"],
"phase_number": state["phase_number"],
"completion_percentage": completion_percentage,
"created_at": metadata["created_at"],
"last_updated": metadata["last_updated"]
})
return {"workflows": workflows, "total_count": len(workflows)}
@app.delete("/workflows/{workflow_id}")
async def delete_workflow(workflow_id: str):
"""Delete a workflow"""
if workflow_id not in workflows_storage:
raise HTTPException(status_code=404, detail="Workflow not found")
del workflows_storage[workflow_id]
return {"message": f"Workflow {workflow_id} deleted successfully"}
# Health check endpoint
@app.get("/health")
async def health_check():
"""Health check endpoint"""
# Check Ollama connectivity
ollama_status = "unknown"
try:
import ollama
models = ollama.list()
ollama_status = "connected"
available_models = [m.get("name", "") for m in models.get("models", [])]
except Exception as e:
ollama_status = f"error: {str(e)}"
available_models = []
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"active_workflows": len(workflows_storage),
"ollama": {
"status": ollama_status,
"available_models": available_models
}
}
# Test endpoint for connectivity
@app.get("/api/v1/test")
async def test_endpoint():
"""Simple test endpoint to verify connectivity"""
return {
"message": "Backend is reachable",
"timestamp": datetime.now().isoformat(),
"status": "ok"
}
# Error handlers
@app.exception_handler(ValueError)
async def value_error_handler(request, exc):
return JSONResponse(
status_code=400,
content={"detail": str(exc)}
)
@app.post("/test_ollama")
async def test_ollama(req: PromptRequest, authorization: Optional[str] = Header(None)):
_require_admin_user(authorization)
try:
response = ollama.chat(
model="qwen2.5:3b",
messages=[{'role': 'user', 'content': req.prompt}],
options={"temperature": 0.0},
)
return {"oss_json": response["message"]["content"]}
except HTTPException as e:
return {"oss_json": str(e)}
except HTTPException as e:
return {"oss_json": str(e)}
@app.post("/test_ollama_1")
async def test_ollama_1(req: PromptRequest, authorization: Optional[str] = Header(None)):
_require_admin_user(authorization)
result = await Compliance_Verifier.generate_text(req)
return result
@app.post("/test_ollama_similarity")
async def vectorize_requirement(req: PromptRequest, authorization: Optional[str] = Header(None)):
_require_admin_user(authorization)
result = await Compliance_Verifier.vectorize_requirement(req)
return result
@app.post("/analyze_compliance")
async def semantic_similarity(
data: ComplianceRequest, authorization: Optional[str] = Header(None)
) -> Dict[str, Any]:
_require_authenticated_user(authorization)
result = await Compliance_Verifier.semantic_similarity(data)
return result
@app.post("/analyze_structure")
async def structure_similarity(
data: ComplianceRequest, authorization: Optional[str] = Header(None)
) -> Dict[str, Any]:
_require_authenticated_user(authorization)
result = await Compliance_Verifier.structural_similarity(data)
return result
# Chat Assistant Endpoints
@app.options("/api/v1/chat")
async def chat_options(request: StarletteRequest):
"""Handle CORS preflight for chat endpoint"""
origin = request.headers.get("origin", "http://localhost:8080")
allow_origin = origin if origin in CORS_ALLOW_ORIGINS else "http://localhost:8080"
return Response(
status_code=200,
headers={
"Access-Control-Allow-Origin": allow_origin,
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Allow-Credentials": "true",
}
)
@app.post("/api/v1/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest, authorization: Optional[str] = Header(None)):
"""
Chat endpoint for conversational AI assistant.
Handles policy questions and general compliance queries.
"""
_require_authenticated_user(authorization)
try:
logger.info(f"Chat endpoint called with message: {request.message[:50] if request.message else 'Empty'}...")
logger.debug(f"Full request: message={request.message}, has_history={bool(request.conversation_history)}, context={request.context}")
# Validate request
if not request.message or not request.message.strip():
raise HTTPException(status_code=400, detail="Message cannot be empty")
response = await Chat_Assistant.chat(request)
logger.info(f"Chat response generated successfully: {response.message[:50] if response.message else 'Empty'}...")
return response
except HTTPException as he:
logger.error(f"HTTPException in chat endpoint: {he.status_code} - {he.detail}")
raise
except Exception as e:
logger.error(f"Unexpected error in chat endpoint: {e}", exc_info=True)
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.post("/api/v1/chat/verify", response_model=ChatResponse)
async def verify_content_endpoint(
request: VerifyContentRequest, authorization: Optional[str] = Header(None)
):
"""
Verify content against compliance requirements.
"""
_require_authenticated_user(authorization)
try:
response = await Chat_Assistant.verify_content(
field_name=request.field_name,
content=request.content,
verification_criteria=request.verification_criteria
)
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in verify endpoint: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/chat/question", response_model=ChatResponse)
async def answer_policy_question(
request: PolicyQuestionRequest, authorization: Optional[str] = Header(None)
):
"""
Answer a policy or compliance question.
"""
_require_authenticated_user(authorization)
try:
response = await Chat_Assistant.answer_policy_question(
question=request.question,
policy_context=request.policy_context
)
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in question endpoint: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# Idea Management Endpoints
class IdeaRequest(BaseModel):
title: str = Field(..., description="Title of the idea")
description: str = Field(..., description="Description of the idea")
category: Optional[str] = Field(None, description="Category of the idea")
class IdeaSearchRequest(BaseModel):
query: str = Field(..., description="Search query text")
limit: Optional[int] = Field(5, description="Maximum number of results")
score_threshold: Optional[float] = Field(0.5, description="Minimum similarity score")
# Initialize Qdrant collection on first API call (lazy initialization)
@app.post("/api/v1/ideas")
async def add_idea(request: IdeaRequest, authorization: Optional[str] = Header(None)):
"""Add a new idea to the vector database"""
_require_admin_user(authorization)
try:
from src.infrastructure.vector_db.qdrant_service import get_qdrant_service
qdrant_service = get_qdrant_service()
# Ensure collection is initialized
await qdrant_service.initialize_collection()
result = await qdrant_service.add_idea(
title=request.title,
description=request.description,
category=request.category
)
return result
except Exception as e:
logger.error(f"Error adding idea: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/ideas/search")
async def search_ideas(request: IdeaSearchRequest, authorization: Optional[str] = Header(None)):
"""Search for similar ideas using vector similarity"""
_require_authenticated_user(authorization)
try:
from src.infrastructure.vector_db.qdrant_service import get_qdrant_service
qdrant_service = get_qdrant_service()
# Ensure collection is initialized
await qdrant_service.initialize_collection()
results = await qdrant_service.search_similar_ideas(
query_text=request.query,
limit=request.limit or 5,
score_threshold=request.score_threshold or 0.5
)
return {"results": results, "count": len(results)}
except Exception as e:
logger.error(f"Error searching ideas: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/ideas")
async def get_all_ideas(limit: int = 100, authorization: Optional[str] = Header(None)):
"""Get all ideas from the database"""
_require_admin_user(authorization)
try:
from src.infrastructure.vector_db.qdrant_service import get_qdrant_service
qdrant_service = get_qdrant_service()
ideas = await qdrant_service.get_all_ideas(limit=limit)
return {"ideas": ideas, "count": len(ideas)}
except Exception as e:
logger.error(f"Error getting ideas: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/v1/ideas/{idea_id}")
async def delete_idea(idea_id: str, authorization: Optional[str] = Header(None)):
"""Delete an idea from the database"""
_require_admin_user(authorization)
try:
from src.infrastructure.vector_db.qdrant_service import get_qdrant_service
qdrant_service = get_qdrant_service()
success = await qdrant_service.delete_idea(idea_id)
if success:
return {"message": "Idea deleted successfully", "id": idea_id}
else:
raise HTTPException(status_code=404, detail="Idea not found")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting idea: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/ideas/bulk-add")
async def bulk_add_ideas(ideas: List[IdeaRequest], authorization: Optional[str] = Header(None)):
"""Add multiple ideas at once"""
_require_admin_user(authorization)
if len(ideas) > 50:
raise HTTPException(status_code=422, detail="Tối đa 50 ý tưởng mỗi lần.")
try:
from src.infrastructure.vector_db.qdrant_service import get_qdrant_service
qdrant_service = get_qdrant_service()
# Ensure collection is initialized
await qdrant_service.initialize_collection()
results = []
for idea in ideas:
result = await qdrant_service.add_idea(
title=idea.title,
description=idea.description,
category=idea.category
)
results.append(result)
return {"results": results, "count": len(results)}
except Exception as e:
logger.error(f"Error bulk adding ideas: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/ideas/initialize-ump")
async def initialize_ump_ideas(authorization: Optional[str] = Header(None)):
"""Initialize database with the 10 UMP innovation ideas"""
_require_admin_user(authorization)
ump_ideas = [
IdeaRequest(
title="Nền tảng Trợ lý AI học tập lâm sàng (Clinical AI Tutor)",
description="Ứng dụng AI đóng vai trò trợ giảng cho sinh viên y, hỗ trợ phân tích ca bệnh giả lập, giải thích cận lâm sàng, và gợi ý chẩn đoán theo phác đồ Việt Nam.",
category="Giáo dục - AI"
),
IdeaRequest(
title="Hệ thống bệnh án điện tử học thuật (Academic EMR Sandbox)",
description="Môi trường EMR mô phỏng cho đào tạo và nghiên cứu, cho phép sinh viên và giảng viên thực hành nhập phân tích khai thác dữ liệu y khoa mà không ảnh hưởng dữ liệu bệnh nhân thật.",
category="Giáo dục - Chuyển đổi số"
),
IdeaRequest(
title="Trung tâm mô phỏng y khoa bằng AR/VR & Digital Twin",
description="Xây dựng phòng lab mô phỏng phẫu thuật, cấp cứu, và quy trình điều trị bằng AR/VR, kết hợp mô hình \"digital twin\" của cơ thể người phục vụ đào tạo nâng cao.",
category="Giáo dục - AR/VR"
),
IdeaRequest(
title="Chương trình Y tế cộng đồng số cho vùng sâu vùng xa",
description="Kết hợp telehealth, trợ lý ảo y tế (agentic care) và AI sàng lọc sớm bệnh không lây (NCD) cho người dân vùng nông thôn, miền núi và hải đảo.",
category="Tác động xã hội - Telehealth"
),
IdeaRequest(
title="Nền tảng nghiên cứu AI y sinh dùng chung (UMP AI Research Hub)",
description="Cung cấp hạ tầng GPU, kho dữ liệu y khoa ẩn danh, và công cụ phân tích AI cho giảng viên nghiên cứu sinh startup hợp tác nghiên cứu.",
category="Nghiên cứu - AI"
),
IdeaRequest(
title="Hệ thống theo dõi và dự báo sức khỏe sinh viên & nhân viên y tế",
description="Ứng dụng phân tích dữ liệu và AI để phát hiện sớm stress, burnout, và vấn đề sức khỏe tâm thần trong cộng đồng sinh viên và nhân viên y tế.",
category="Tác động xã hội - Sức khỏe"
),
IdeaRequest(
title="Vườn ươm khởi nghiệp công nghệ y sinh (MedTech Incubator)",
description="Hỗ trợ sinh viên, bác sĩ và giảng viên phát triển startup MedTech, HealthTech, AI y tế thông qua mentoring, quỹ seed và kết nối bệnh viện doanh nghiệp.",
category="Khởi nghiệp - MedTech"
),
IdeaRequest(
title="Hệ thống quản lý chất lượng đào tạo và kiểm định số",
description="Số hóa toàn bộ quy trình đảm bảo chất lượng nội bộ (IQA), đánh giá chương trình đào tạo, và chuẩn hóa theo tiêu chuẩn quốc tế (WFME, AUN-QA).",
category="Giáo dục - Quản lý chất lượng"
),
IdeaRequest(
title="Nền tảng dữ liệu lớn phòng chống dịch và bệnh không lây",
description="Phân tích dữ liệu dịch tễ, môi trường, và hành vi để dự báo dịch bệnh, hỗ trợ Sở Y tế và Bộ Y tế trong ra quyết định chính sách.",
category="Nghiên cứu - Dịch tễ học"
),
IdeaRequest(
title="Học viện Y học chính xác & Y học cá thể hóa",
description="Kết hợp dữ liệu gen, hình ảnh y khoa, lối sống và AI để nghiên cứu và ứng dụng điều trị cá thể hóa cho bệnh ung thư, tim mạch và bệnh mạn tính.",
category="Nghiên cứu - Y học chính xác"
),
]
try:
from src.infrastructure.vector_db.qdrant_service import get_qdrant_service
qdrant_service = get_qdrant_service()
# Ensure collection is initialized
await qdrant_service.initialize_collection()
results = []
for idea in ump_ideas:
result = await qdrant_service.add_idea(
title=idea.title,
description=idea.description,
category=idea.category
)
results.append(result)
return {"results": results, "count": len(results), "message": f"Successfully added {len(results)} UMP ideas"}
except Exception as e:
logger.error(f"Error initializing UMP ideas: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/application-reports/excel")
async def save_application_report_excel(caseId: str = Form(...), file: UploadFile = File(...)):
"""
Save uploaded application Excel file to shared assets folder so admin can review it.
"""
if not caseId.strip():
raise HTTPException(status_code=400, detail="caseId is required")
FRONTEND_PUBLIC_DIR.mkdir(parents=True, exist_ok=True)
safe_case_id = "".join(ch for ch in caseId if ch.isalnum() or ch in ("-", "_"))
if not safe_case_id:
raise HTTPException(status_code=400, detail="Invalid caseId")
target_name = f"{safe_case_id}.xlsx"
target_path = FRONTEND_PUBLIC_DIR / target_name
content = await file.read()
with open(target_path, "wb") as output:
output.write(content)
return {
"caseId": safe_case_id,
"fileName": target_name,
"savedPath": str(target_path),
"publicUrl": f"/assets/{target_name}",
}
@app.get("/api/v1/application-reports")
async def list_application_reports():
"""
List saved report files from shared assets folder.
"""
FRONTEND_PUBLIC_DIR.mkdir(parents=True, exist_ok=True)
files = []
for path in sorted(FRONTEND_PUBLIC_DIR.glob("*.xlsx"), key=lambda p: p.stat().st_mtime, reverse=True):
stat = path.stat()
files.append(
{
"fileName": path.name,
"publicUrl": f"/assets/{path.name}",
"sizeBytes": stat.st_size,
"updatedAt": datetime.fromtimestamp(stat.st_mtime).isoformat(),
}
)
return {"files": files}
def _normalize_case_id(case_id: Optional[str]) -> str:
raw = case_id or f"CASE-{int(datetime.now().timestamp() * 1000)}"
safe = "".join(ch for ch in raw if ch.isalnum() or ch in ("-", "_"))
if not safe:
raise HTTPException(status_code=400, detail="Invalid caseId")
return safe
def _draft_path(case_id: str) -> Path:
APPLICATION_DRAFT_DIR.mkdir(parents=True, exist_ok=True)
return APPLICATION_DRAFT_DIR / f"{case_id}.yml"
@app.post("/api/v1/application-drafts")
async def save_application_draft(
request: ApplicationDraftSaveRequest,
authorization: Optional[str] = Header(None),
):
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.drafts import save_application_draft_tab
case_id = _normalize_case_id(request.caseId)
owner_uid = decode_access_token_user_id(authorization)
if is_postgres_enabled():
try:
async with get_session() as session:
return await save_application_draft_tab(
session, case_id, request.tab, request.data, owner_id=owner_uid
)
except HTTPException:
raise
except Exception as e:
logger.exception("application draft save (PostgreSQL) failed")
raise HTTPException(status_code=500, detail="Failed to persist draft") from e
target = _draft_path(case_id)
current: Dict[str, Any] = {
"caseId": case_id,
"updatedAt": datetime.now().isoformat(),
"tabs": {},
}
if target.exists():
with open(target, "r", encoding="utf-8") as handle:
loaded = yaml.safe_load(handle) or {}
if isinstance(loaded, dict):
current.update(loaded)
current["tabs"] = dict(loaded.get("tabs") or {})
current["caseId"] = case_id
current["updatedAt"] = datetime.now().isoformat()
current["tabs"][request.tab] = request.data
with open(target, "w", encoding="utf-8") as handle:
yaml.safe_dump(current, handle, allow_unicode=True, sort_keys=False)
return {
"caseId": case_id,
"updatedAt": current["updatedAt"],
"tabs": current["tabs"],
"publicUrl": f"/application-drafts/{case_id}.yml",
}
@app.get("/api/v1/application-drafts/{case_id}")
async def get_application_draft(case_id: str):
from sqlalchemy.exc import IntegrityError
from src.initiative_db.drafts import (
get_application_draft_document,
insert_initiative_draft_if_missing,
)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import merge_application_draft_document_with_snapshot_if_needed
safe_case_id = _normalize_case_id(case_id)
if is_postgres_enabled():
try:
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini_res = await resolve_initiative_for_draft_case_key(session, case_id)
if ini_res is not None:
safe_case_id = ini_res.case_code
except Exception:
logger.exception("resolve initiative for application draft GET failed; using path case id")
yaml_fallback = _load_application_draft_yaml(safe_case_id)
if is_postgres_enabled():
try:
async with get_session() as session:
try:
doc = await get_application_draft_document(session, safe_case_id)
return await merge_application_draft_document_with_snapshot_if_needed(session, safe_case_id, doc)
except KeyError:
pass
if yaml_fallback is None:
return _empty_applicant_draft_bundle(safe_case_id)
try:
await insert_initiative_draft_if_missing(session, safe_case_id, yaml_fallback)
except IntegrityError:
await session.rollback()
async with get_session() as session:
try:
doc = await get_application_draft_document(session, safe_case_id)
return await merge_application_draft_document_with_snapshot_if_needed(session, safe_case_id, doc)
except KeyError:
return yaml_fallback
except HTTPException:
raise
except Exception as e:
logger.exception("application draft load (PostgreSQL) failed")
raise HTTPException(status_code=500, detail="Failed to load draft") from e
if yaml_fallback is not None:
return yaml_fallback
return _empty_applicant_draft_bundle(safe_case_id)
def _evidence_kind_to_role(kind: object) -> Optional[str]:
"""
Map API kind query/form value → storage role.
Accepts str or list (duplicate ``kind=`` keys); strips BOM / ZWSP; NFKC-normalizes Unicode
so lookalike Latin cannot bypass the allow-list.
"""
if kind is None:
return None
if isinstance(kind, (list, tuple)):
candidates = [str(x) for x in kind if x is not None and str(x).strip() != ""]
else:
candidates = [str(kind)]
for c in candidates:
k = unicodedata.normalize("NFKC", (c or "").strip())
k = k.replace("\ufeff", "").replace("\u200b", "").strip().lower()
if k == "research":
return "research_evidence"
if k == "textbook":
return "textbook_evidence"
if k == "technical":
return "technical_evidence"
return None
def _evidence_role_to_api_kind(role: str) -> str:
if role == "research_evidence":
return "research"
if role == "textbook_evidence":
return "textbook"
if role == "technical_evidence":
return "technical"
return "research"
def _evidence_row_looks_like_pdf(row, default_name: str) -> bool:
"""True when the stored artifact should be shown with an inline PDF presign."""
mt = (row.mime_type or "").lower()
if "pdf" in mt:
return True
dn = (default_name or "").lower()
return dn.endswith(".pdf")
def _jwt_role_strings(authorization: Optional[str]) -> list[str]:
p = decode_bearer_token(authorization)
if not p:
return []
r = p.get("roles")
if isinstance(r, list):
return [str(x) for x in r]
return []
def _is_staff_reviewer(authorization: Optional[str]) -> bool:
roles = _jwt_role_strings(authorization)
return "admin" in roles or "editor" in roles
def _require_admin_user(authorization: Optional[str]) -> uuid.UUID:
"""JWT must be valid and include role ``admin``."""
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để thực hiện thao tác.")
roles = _jwt_role_strings(authorization)
if "admin" not in roles:
raise HTTPException(status_code=403, detail="Chỉ tài khoản quản trị mới thực hiện được.")
return uid
def _require_authenticated_user(authorization: Optional[str]) -> uuid.UUID:
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để thực hiện thao tác.")
return uid
def _require_staff_reviewer(authorization: Optional[str]) -> uuid.UUID:
uid = _require_authenticated_user(authorization)
if not _is_staff_reviewer(authorization):
raise HTTPException(status_code=403, detail="Không có quyền truy cập.")
return uid
async def _assert_initiative_case_access(
session: Any,
case_id: str,
uid: uuid.UUID,
authorization: Optional[str],
) -> None:
"""Allow staff or initiative owner for the normalized case code."""
from sqlalchemy import select
from src.initiative_db.models import Initiative
normalized = _normalize_case_id(case_id)
ini = (await session.execute(select(Initiative).where(Initiative.case_code == normalized))).scalar_one_or_none()
if ini is None:
return
if ini.owner_id == uid or _is_staff_reviewer(authorization):
return
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
async def _assert_review_document_access(
session: Any,
review_document_id: str,
uid: uuid.UUID,
authorization: Optional[str],
) -> None:
from src.initiative_db.models import ApplicationReviewDocument, Initiative
try:
rid = uuid.UUID(str(review_document_id))
except ValueError as exc:
raise HTTPException(status_code=404, detail="Không tìm thấy review document") from exc
doc = await session.get(ApplicationReviewDocument, rid)
if doc is None:
raise HTTPException(status_code=404, detail="Không tìm thấy review document")
ini = await session.get(Initiative, doc.initiative_id)
if ini is not None and ini.owner_id != uid and not _is_staff_reviewer(authorization):
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
class AdminApplicationResultBody(BaseModel):
decision: Literal["approved", "rejected"]
feedback: str = Field(default="", max_length=50_000)
rationale: Optional[str] = Field(default=None, max_length=50_000)
def _initiative_allows_owner_evidence_edit(status: str) -> bool:
s = (status or "").strip().lower()
return s not in ("approved", "rejected")
def _normalize_pdf_layout_edits(raw: Any) -> list[Dict[str, Any]]:
if not isinstance(raw, list):
raise HTTPException(status_code=422, detail="layoutEdits phải là mảng.")
if len(raw) > 200:
raise HTTPException(status_code=422, detail="Tối đa 200 mục chỉnh bố cục.")
out: list[Dict[str, Any]] = []
for idx, item in enumerate(raw):
if not isinstance(item, dict):
raise HTTPException(status_code=422, detail=f"layoutEdits[{idx}] không hợp lệ.")
try:
parsed = PdfLayoutEditPayload(**item)
except Exception as exc:
raise HTTPException(status_code=422, detail=f"layoutEdits[{idx}] lỗi định dạng: {exc}") from exc
row = parsed.model_dump()
if not str(row.get("text") or "").strip():
continue
out.append(row)
return out
def _load_minio_for_evidence():
"""Returns (storage, bucket_name, err_msg). On failure, storage is None."""
try:
from src.minio.storage import S3Storage, settings as s3settings
return S3Storage(), s3settings.s3_bucket_attachments, None
except Exception as exc:
logger.warning("MinIO / S3 not available for evidence upload: %s", exc)
return None, None, str(exc)
def _load_minio_for_exports():
"""Returns (storage, bucket_name, err_msg). On failure, storage is None."""
try:
from src.minio.storage import S3Storage, settings as s3settings
return S3Storage(), s3settings.s3_bucket_exports, None
except Exception as exc:
logger.warning("MinIO / S3 not available for export upload: %s", exc)
return None, None, str(exc)
@app.post("/api/v1/application-drafts/{case_id}/evidence")
async def upload_application_draft_evidence(
case_id: str,
kind: str = Form(..., description="research | textbook | technical (Minh chứng 2.1 / 2.2 / kỹ thuật)"),
file: UploadFile = File(...),
authorization: Optional[str] = Header(None),
):
"""
Tải minh chứng (PDF, hình, Word, Excel, …) lên MinIO; chủ sở hữu, trừ hồ sơ đã approved/rejected.
"""
from src.initiative_db.application_storage import (
get_evidence_artifact_row,
upsert_evidence_artifact,
)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.minio.storage import ALLOWED_MIME_TYPES, StorageError
role = _evidence_kind_to_role(kind)
if role is None:
raise HTTPException(status_code=400, detail="kind phải là research, textbook hoặc technical")
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để tải minh chứng.")
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để lưu minh chứng trên máy chủ.")
s3, bucket, _cfg_err = _load_minio_for_evidence()
if s3 is None or bucket is None:
raise HTTPException(
status_code=503,
detail="Lưu tệp MinIO chưa cấu hình. Đặt biến môi trường S3/MinIO hoặc xem tài liệu triển khai.",
)
import mimetypes
filename_l = (file.filename or "").lower()
guessed, _ = mimetypes.guess_type(filename_l)
content_type = (file.content_type or "").split(";")[0].strip() or "application/octet-stream"
if content_type not in ALLOWED_MIME_TYPES and guessed in ALLOWED_MIME_TYPES:
content_type = guessed
if content_type not in ALLOWED_MIME_TYPES:
raise HTTPException(status_code=422, detail=f"Loại tệp không được phép: {content_type}")
_max_evidence_bytes = 50 * 1024 * 1024
if file.size is not None and int(file.size) > _max_evidence_bytes:
raise HTTPException(
status_code=413,
detail="Tệp vượt quá 50 MB. Hãy nén hoặc chia nhỏ tệp trước khi tải lên.",
)
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, case_id)
if ini is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ (hãy lưu bản nháp trước).")
canonical_case = ini.case_code
if ini.owner_id != uid:
raise HTTPException(status_code=403, detail="Chỉ chủ sở hữu mới tải được minh chứng.")
st = str(ini.status or "")
if not _initiative_allows_owner_evidence_edit(st):
raise HTTPException(
status_code=422,
detail="Hồ sơ đã kết thúc duyệt — không cập nhật minh chứng.",
)
old_row = await get_evidence_artifact_row(session, initiative_id=ini.id, role=role)
prior_storage_key = (
(old_row.storage_uri or "").strip() or None if old_row is not None else None
)
object_key = s3.build_key_for_initiative(ini.id, file.filename or "evidence.pdf")
try:
result = await s3.upload(
bucket=bucket,
key=object_key,
fileobj=file.file,
mime_type=content_type,
metadata={"uploaded_by": str(uid), "case_code": canonical_case, "role": role},
)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except StorageError as exc:
raise HTTPException(status_code=502, detail=f"Không tải được lên kho: {exc}") from exc
await upsert_evidence_artifact(
session,
initiative_id=ini.id,
role=role,
storage_uri=object_key,
original_name=file.filename,
byte_size=result.get("size"),
sha256_hex=result.get("sha256"),
uploaded_by=uid,
mime_type=content_type,
)
from src.auth_jwt import decode_bearer_token
from src.audit import AuditAction, jwt_payload_actor_email, record_audit
ae, ar = jwt_payload_actor_email(decode_bearer_token(authorization))
ev_action = AuditAction.update if old_row is not None else AuditAction.create
await record_audit(
session,
actor_user_id=uid,
actor_email=ae,
actor_role=ar,
action=ev_action,
entity_type="application_evidence",
entity_id=f"{canonical_case}:{role}",
before=(
{"storageKey": prior_storage_key}
if prior_storage_key
else None
),
after={
"storageKey": object_key,
"originalName": file.filename,
"mimeType": content_type,
},
metadata={"minioBucket": bucket, "caseId": canonical_case, "evidenceRole": role},
)
await session.commit()
if prior_storage_key and prior_storage_key != object_key:
try:
await s3.delete(bucket, prior_storage_key)
except StorageError as exc:
logger.warning(
"MinIO delete of replaced evidence object failed (DB already points to new key): %s",
exc,
)
k_label = _evidence_role_to_api_kind(role)
return {
"ok": True,
"caseId": canonical_case,
"kind": k_label,
"storageKey": object_key,
"originalName": file.filename,
"byteSize": result.get("size"),
"uploadedAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
@app.get("/api/v1/application-drafts/{case_id}/evidence")
async def get_application_draft_evidence(
case_id: str,
authorization: Optional[str] = Header(None),
):
"""Metadata (và link tải có thời hạn) cho minh chứng 2.1 / 2.2. Chủ hồ sơ hoặc admin/hội đồng."""
from src.initiative_db.application_storage import (
get_evidence_artifact_row,
)
from src.initiative_db.engine import get_session, is_postgres_enabled
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem minh chứng.")
if not is_postgres_enabled():
return {"research": None, "textbook": None, "technical": None}
s3, bucket, _ = _load_minio_for_exports()
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, case_id)
if ini is None:
return {"research": None, "textbook": None, "technical": None}
is_staff = _is_staff_reviewer(authorization)
if ini.owner_id != uid and not is_staff:
raise HTTPException(status_code=403, detail="Không có quyền xem minh chứng hồ sơ này.")
r_row = await get_evidence_artifact_row(session, initiative_id=ini.id, role="research_evidence")
t_row = await get_evidence_artifact_row(session, initiative_id=ini.id, role="textbook_evidence")
tech_row = await get_evidence_artifact_row(session, initiative_id=ini.id, role="technical_evidence")
async def pack(row, kind_key: str):
if row is None:
return None
default_name = row.original_name or "evidence"
if not default_name.lower().endswith((".pdf", ".png", ".jpg", ".jpeg", ".docx", ".xlsx")):
mt = (row.mime_type or "").lower()
if "pdf" in mt:
default_name = f"{default_name}.pdf"
elif "word" in mt or "document" in mt:
default_name = f"{default_name}.docx"
out = {
"kind": kind_key,
"originalName": row.original_name,
"byteSize": row.byte_size,
"mimeType": row.mime_type,
"uploadedAt": row.uploaded_at.isoformat() if row.uploaded_at else None,
"storageKey": row.storage_uri,
"reviewStatus": row.review_status,
"reviewedAt": row.reviewed_at.isoformat() if row.reviewed_at else None,
}
if s3 and bucket and row.storage_uri:
try:
from src.minio.storage import settings as s3s
out["downloadUrl"] = await s3.get_download_url(
bucket,
row.storage_uri,
ttl=s3s.s3_signed_url_ttl,
filename=default_name,
inline=False,
)
if _evidence_row_looks_like_pdf(row, default_name):
out["viewUrl"] = await s3.get_download_url(
bucket,
row.storage_uri,
ttl=s3s.s3_signed_url_ttl,
filename=default_name,
inline=True,
response_content_type="application/pdf",
)
else:
out["viewUrl"] = None
except Exception:
out["downloadUrl"] = None
out["viewUrl"] = None
return out
return {
"research": await pack(r_row, "research"),
"textbook": await pack(t_row, "textbook"),
"technical": await pack(tech_row, "technical"),
}
@app.get("/api/v1/application-drafts/{case_id}/evidence/content")
async def stream_application_draft_evidence_content(
case_id: str,
request: Request,
kind: str = Query(..., description="research | textbook | technical"),
attachment: bool = Query(
False,
description="If true, Content-Disposition: attachment (download). Otherwise inline for embedding.",
),
authorization: Optional[str] = Header(None),
):
"""
Stream evidence bytes through the API so browsers on HTTPS avoid mixed-content iframes
(presigned MinIO URLs are often http://HOST:MINIO_PORT).
Same ACL as GET …/evidence.
"""
from src.initiative_db.application_storage import (
get_evidence_artifact_row,
)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.minio.storage import StorageError, _sanitize_filename as minio_safe_fn
kinds = request.query_params.getlist("kind")
if len(kinds) > 1:
effective_kind: object = kinds
elif len(kinds) == 1:
effective_kind = kinds[0]
else:
effective_kind = kind
role = _evidence_kind_to_role(effective_kind)
if role is None:
raise HTTPException(status_code=400, detail="kind phải là research, textbook hoặc technical")
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem minh chứng.")
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để lấy minh chứng.")
s3, bucket, _ = _load_minio_for_evidence()
if s3 is None or bucket is None:
raise HTTPException(
status_code=503,
detail="Lưu tệp MinIO chưa cấu hình. Đặt biến môi trường S3/MinIO hoặc xem tài liệu triển khai.",
)
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, case_id)
if ini is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ.")
if ini.owner_id != uid and not _is_staff_reviewer(authorization):
raise HTTPException(status_code=403, detail="Không có quyền xem minh chứng hồ sơ này.")
row = await get_evidence_artifact_row(session, initiative_id=ini.id, role=role)
if row is None or not (row.storage_uri or "").strip():
raise HTTPException(status_code=404, detail="Không có tệp minh chứng cho loại này.")
default_name = row.original_name or "evidence"
if not default_name.lower().endswith((".pdf", ".png", ".jpg", ".jpeg", ".docx", ".xlsx")):
mtguess = (row.mime_type or "").lower()
if "pdf" in mtguess:
default_name = f"{default_name}.pdf"
elif "word" in mtguess or "document" in mtguess:
default_name = f"{default_name}.docx"
safe_fn = minio_safe_fn(default_name) or "file"
disp = "attachment" if attachment else "inline"
media_type = (row.mime_type or "").strip() or "application/octet-stream"
try:
async def body():
async for chunk in s3.download_stream(bucket, row.storage_uri):
yield chunk
return StreamingResponse(
body(),
media_type=media_type,
headers={
"Content-Disposition": f'{disp}; filename="{safe_fn}"',
"Cache-Control": "private, no-store",
},
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Tệp không còn trên kho lưu trữ.") from None
except StorageError as exc:
raise HTTPException(status_code=502, detail=f"Không đọc được tệp từ kho: {exc}") from exc
@app.delete("/api/v1/application-drafts/{case_id}/evidence")
async def delete_application_draft_evidence(
case_id: str,
request: Request,
kind: str = Query(..., description="research | textbook | technical"),
authorization: Optional[str] = Header(None),
):
from src.initiative_db.application_storage import (
delete_evidence_artifact_row,
)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.minio.storage import StorageError
kinds = request.query_params.getlist("kind")
if len(kinds) > 1:
effective_kind: object = kinds
elif len(kinds) == 1:
effective_kind = kinds[0]
else:
effective_kind = kind
role = _evidence_kind_to_role(effective_kind)
if role is None:
raise HTTPException(status_code=400, detail="kind phải là research, textbook hoặc technical")
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xóa minh chứng.")
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL.")
s3, bucket, _ = _load_minio_for_evidence()
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, case_id)
if ini is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ.")
if ini.owner_id != uid:
raise HTTPException(status_code=403, detail="Chỉ chủ sở hữu mới xóa được minh chứng.")
st = str(ini.status or "")
if not _initiative_allows_owner_evidence_edit(st):
raise HTTPException(
status_code=422,
detail="Hồ sơ đã kết thúc duyệt — không xóa minh chứng.",
)
old = await delete_evidence_artifact_row(session, initiative_id=ini.id, role=role)
from src.auth_jwt import decode_bearer_token
from src.audit import AuditAction, jwt_payload_actor_email, record_audit
if old is not None:
ae, ar = jwt_payload_actor_email(decode_bearer_token(authorization))
await record_audit(
session,
actor_user_id=uid,
actor_email=ae,
actor_role=ar,
action=AuditAction.delete,
entity_type="application_evidence",
entity_id=f"{ini.case_code}:{role}",
before={
"storageKey": old.storage_uri,
"originalName": old.original_name,
},
metadata={"caseId": ini.case_code, "minioBucket": bucket},
)
if old and old.storage_uri and s3 and bucket:
try:
await s3.delete(bucket, old.storage_uri)
except StorageError as exc:
logger.warning("MinIO delete failed (continuing with DB row removed): %s", exc)
await session.commit()
return {"ok": True, "kind": _evidence_role_to_api_kind(role)}
@app.get("/api/v1/application-drafts/{case_id}/official-form-layout")
async def get_application_draft_official_form_layout(
case_id: str,
authorization: Optional[str] = Header(None),
):
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.drafts import get_official_form_layout_payload
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem bố cục PDF.")
if not is_postgres_enabled():
return {"caseId": _normalize_case_id(case_id), "layoutEdits": [], "pdf": None}
s3, bucket, _ = _load_minio_for_evidence()
safe_case_id = _normalize_case_id(case_id)
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, safe_case_id)
if ini is None:
return {"caseId": safe_case_id, "layoutEdits": [], "pdf": None}
if ini.owner_id != uid and not _is_staff_reviewer(authorization):
raise HTTPException(status_code=403, detail="Không có quyền xem bố cục hồ sơ này.")
payload = await get_official_form_layout_payload(session, ini.case_code)
if not payload:
return {"caseId": safe_case_id, "layoutEdits": [], "pdf": None}
edits = payload.get("layoutEdits")
if not isinstance(edits, list):
edits = []
pdf_meta: Optional[Dict[str, Any]] = None
storage_key = str(payload.get("storageKey") or "").strip()
if storage_key:
pdf_meta = {
"storageKey": storage_key,
"originalName": payload.get("originalName"),
"byteSize": payload.get("byteSize"),
"uploadedAt": payload.get("uploadedAt"),
"downloadUrl": None,
"viewUrl": None,
}
if s3 and bucket:
try:
from src.minio.storage import settings as s3s
default_name = str(payload.get("originalName") or "official-form-layout.pdf")
pdf_meta["downloadUrl"] = await s3.get_download_url(
bucket,
storage_key,
ttl=s3s.s3_signed_url_ttl,
filename=default_name,
inline=False,
)
pdf_meta["viewUrl"] = await s3.get_download_url(
bucket,
storage_key,
ttl=s3s.s3_signed_url_ttl,
filename=default_name,
inline=True,
response_content_type="application/pdf",
)
except Exception:
pdf_meta["downloadUrl"] = None
pdf_meta["viewUrl"] = None
return {
"caseId": safe_case_id,
"layoutEdits": edits,
"updatedAt": payload.get("updatedAt"),
"pdf": pdf_meta,
}
@app.post("/api/v1/application-drafts/{case_id}/official-form-layout")
async def save_application_draft_official_form_layout(
case_id: str,
layout_edits_json: str = Form(..., description="JSON array of PdfTextLayoutEdit"),
file: UploadFile = File(..., description="Edited official-form PDF"),
authorization: Optional[str] = Header(None),
):
from src.initiative_db.drafts import get_official_form_layout_payload, save_official_form_layout_payload
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.minio.storage import StorageError
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để lưu bố cục PDF.")
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để lưu bố cục PDF.")
s3, bucket, _cfg_err = _load_minio_for_exports()
if s3 is None or bucket is None:
raise HTTPException(
status_code=503,
detail="Lưu tệp MinIO chưa cấu hình. Đặt biến môi trường S3/MinIO hoặc xem tài liệu triển khai.",
)
safe_case_id = _normalize_case_id(case_id)
try:
raw_edits = json.loads(layout_edits_json)
except Exception as exc:
raise HTTPException(status_code=422, detail="layoutEdits JSON không hợp lệ.") from exc
normalized_edits = _normalize_pdf_layout_edits(raw_edits)
content_type = (file.content_type or "").split(";")[0].strip().lower() or "application/octet-stream"
if content_type != "application/pdf":
raise HTTPException(status_code=422, detail=f"Chỉ nhận PDF, nhận được: {content_type}")
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, safe_case_id)
if ini is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ (hãy lưu bản nháp trước).")
if ini.owner_id != uid:
raise HTTPException(status_code=403, detail="Chỉ chủ sở hữu mới lưu bố cục PDF.")
st = str(ini.status or "")
if not _initiative_allows_owner_evidence_edit(st):
raise HTTPException(status_code=422, detail="Hồ sơ đã kết thúc duyệt — không cập nhật bố cục PDF.")
existing_payload = await get_official_form_layout_payload(session, ini.case_code)
object_key = s3.build_key_for_initiative(ini.id, file.filename or "official-form-layout.pdf")
try:
upload = await s3.upload(
bucket=bucket,
key=object_key,
fileobj=file.file,
mime_type="application/pdf",
metadata={"uploaded_by": str(uid), "case_code": ini.case_code, "role": "official_form_layout_pdf"},
)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except StorageError as exc:
raise HTTPException(status_code=502, detail=f"Không tải được PDF bố cục lên kho: {exc}") from exc
now_iso = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
payload = {
"storageKey": object_key,
"originalName": file.filename or "official-form-layout.pdf",
"mimeType": "application/pdf",
"byteSize": upload.get("size"),
"sha256": upload.get("sha256"),
"uploadedBy": str(uid),
"uploadedAt": now_iso,
"updatedAt": now_iso,
"layoutEdits": normalized_edits,
"layoutEditCount": len(normalized_edits),
}
await save_official_form_layout_payload(
session,
case_id=ini.case_code,
payload=payload,
owner_id=uid,
)
await session.commit()
prior_storage_key = str((existing_payload or {}).get("storageKey") or "").strip()
if prior_storage_key and prior_storage_key != object_key:
try:
await s3.delete(bucket, prior_storage_key)
except StorageError as exc:
logger.warning("MinIO delete of replaced official-form layout PDF failed: %s", exc)
return {
"ok": True,
"caseId": safe_case_id,
"layoutEdits": normalized_edits,
"storageKey": object_key,
"byteSize": upload.get("size"),
"uploadedAt": payload["uploadedAt"],
}
class EvidenceReviewBody(BaseModel):
decision: Literal["approved", "rejected"]
@app.patch("/api/v1/application-drafts/{case_id}/evidence/review")
async def patch_evidence_review(
case_id: str,
request: Request,
kind: str = Query(..., description="research | textbook | technical"),
body: EvidenceReviewBody = Body(...),
authorization: Optional[str] = Header(None),
):
"""
Duyệt / từ chối minh chứng (chỉ admin hoặc hội đồng / editor).
"""
from src.initiative_db.application_storage import get_evidence_artifact_row, set_evidence_artifact_review
from src.initiative_db.engine import get_session, is_postgres_enabled
if not _is_staff_reviewer(authorization):
raise HTTPException(status_code=403, detail="Chỉ quản trị hoặc hội đồng mới thẩm định minh chứng.")
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập.")
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL.")
kinds = request.query_params.getlist("kind")
if len(kinds) > 1:
effective_kind: object = kinds
elif len(kinds) == 1:
effective_kind = kinds[0]
else:
effective_kind = kind
role = _evidence_kind_to_role(effective_kind)
if role is None:
raise HTTPException(status_code=400, detail="kind phải là research, textbook hoặc technical")
async with get_session() as session:
from src.initiative_db.submissions import resolve_initiative_for_draft_case_key
ini = await resolve_initiative_for_draft_case_key(session, case_id)
if ini is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ.")
row = await get_evidence_artifact_row(session, initiative_id=ini.id, role=role)
if row is None:
raise HTTPException(status_code=404, detail="Chưa có tệp minh chứng cho loại này.")
before_review = {
"reviewStatus": row.review_status,
"reviewedBy": str(row.reviewed_by) if row.reviewed_by else None,
}
await set_evidence_artifact_review(
session,
initiative_id=ini.id,
role=role,
review_status=body.decision,
reviewer_id=uid,
)
from src.auth_jwt import decode_bearer_token
from src.audit import AuditAction, jwt_payload_actor_email, record_audit
ae, ar = jwt_payload_actor_email(decode_bearer_token(authorization))
await record_audit(
session,
actor_user_id=uid,
actor_email=ae,
actor_role=ar,
action=AuditAction.update,
entity_type="application_evidence_review",
entity_id=f"{ini.case_code}:{role}",
before=before_review,
after={"reviewStatus": body.decision, "reviewedBy": str(uid)},
metadata={"caseId": ini.case_code},
)
await session.commit()
return {"ok": True, "kind": _evidence_role_to_api_kind(role), "decision": body.decision}
@app.get("/api/v1/initiatives/by-case/{case_id}/tab-snapshots")
async def list_initiative_tab_snapshots(
case_id: str,
tab: Optional[str] = Query(
None, description="Optional filter: report | application | contribution"
),
limit: int = Query(20, ge=1, le=200),
authorization: Optional[str] = Header(None),
):
"""List versioned tab payloads for an initiative (Postgres + migration 002). Owner-only."""
from sqlalchemy import select
from src.initiative_db.application_storage import list_tab_snapshots_for_case
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import Initiative
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem lịch sử tab.")
if not is_postgres_enabled():
return {"data": []}
safe_case_id = _normalize_case_id(case_id)
try:
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == safe_case_id))
).scalar_one_or_none()
if ini is None:
return {"data": []}
if ini.owner_id != uid:
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
data = await list_tab_snapshots_for_case(
session, case_code=safe_case_id, tab=tab, limit=limit
)
return {"data": data}
except HTTPException:
raise
except Exception:
logger.exception("GET tab-snapshots failed case=%s", safe_case_id)
raise HTTPException(status_code=500, detail="Không tải được lịch sử tab") from None
@app.get("/api/v1/initiatives/by-case/{case_id}/submit-snapshots")
async def list_initiative_submit_snapshots(
case_id: str,
limit: int = Query(10, ge=1, le=50),
authorization: Optional[str] = Header(None),
):
"""Immutable submit snapshots for an initiative (Postgres + migration 002). Owner-only."""
from sqlalchemy import select
from src.initiative_db.application_storage import list_submit_snapshots_for_case
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import Initiative
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem lịch sử nộp.")
if not is_postgres_enabled():
return {"data": []}
safe_case_id = _normalize_case_id(case_id)
try:
async with get_session() as session:
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == safe_case_id))
).scalar_one_or_none()
if ini is None:
return {"data": []}
if ini.owner_id != uid:
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
data = await list_submit_snapshots_for_case(session, case_code=safe_case_id, limit=limit)
return {"data": data}
except HTTPException:
raise
except Exception:
logger.exception("GET submit-snapshots failed case=%s", safe_case_id)
raise HTTPException(status_code=500, detail="Không tải được lịch sử nộp") from None
@app.post("/api/v1/review-documents")
async def create_review_document(
body: ReviewDocumentSaveRequest,
authorization: Optional[str] = Header(None),
):
"""
Persist ReviewPanel JSON bundle.
Primary payload is `officialBieuMau`; `templateData` / `fullBundle` are optional mirrors.
"""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import save_review_document_bundle
safe_case_id = _normalize_case_id(body.caseId)
req_case_id = _normalize_case_id(body.caseId)
if req_case_id != safe_case_id:
raise HTTPException(status_code=400, detail="caseId in path and body must match.")
owner_uid = _require_authenticated_user(authorization)
if is_postgres_enabled():
try:
async with get_session() as session:
await _assert_initiative_case_access(session, safe_case_id, owner_uid, authorization)
saved = await save_review_document_bundle(
session,
case_id=safe_case_id,
official_bieu_mau=body.officialBieuMau,
template_data=body.templateData,
full_bundle=body.fullBundle,
owner_user_id=owner_uid,
)
return saved
except HTTPException:
raise
except Exception:
logger.exception("review-document save (PostgreSQL) failed case=%s", safe_case_id)
raise HTTPException(status_code=500, detail="Không lưu được JSON ReviewPanel") from None
# file fallback
target_dir = APP_ROOT_DIR / "assets" / "review-documents"
target_dir.mkdir(parents=True, exist_ok=True)
payload = {
"caseId": safe_case_id,
"officialBieuMau": body.officialBieuMau,
"templateData": body.templateData,
"fullBundle": body.fullBundle,
"savedAt": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
}
with open(target_dir / f"{safe_case_id}.json", "w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
return payload
@app.get("/api/v1/review-documents")
async def list_review_documents(
caseId: str,
limit: int = Query(20, ge=1, le=200),
authorization: Optional[str] = Header(None),
):
"""List review documents by case id (latest first)."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import list_review_document_bundles
uid = _require_authenticated_user(authorization)
safe_case_id = _normalize_case_id(caseId)
if is_postgres_enabled():
try:
async with get_session() as session:
await _assert_initiative_case_access(session, safe_case_id, uid, authorization)
rows = await list_review_document_bundles(session, case_id=safe_case_id, limit=limit)
return {"data": rows}
except Exception:
logger.exception("review-document load (PostgreSQL) failed case=%s", safe_case_id)
raise HTTPException(status_code=500, detail="Không tải được JSON ReviewPanel") from None
target_file = APP_ROOT_DIR / "assets" / "review-documents" / f"{safe_case_id}.json"
if not _is_staff_reviewer(authorization):
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
if target_file.exists():
try:
with open(target_file, "r", encoding="utf-8") as handle:
return {"data": [json.load(handle)]}
except Exception:
logger.exception("review-document file fallback load failed case=%s", safe_case_id)
raise HTTPException(status_code=500, detail="Không tải được JSON ReviewPanel") from None
return {"data": []}
@app.get("/api/v1/review-documents/{review_document_id}")
async def get_review_document_by_id(
review_document_id: str, authorization: Optional[str] = Header(None)
):
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import get_review_document_bundle_by_id
uid = _require_authenticated_user(authorization)
if is_postgres_enabled():
try:
async with get_session() as session:
await _assert_review_document_access(session, review_document_id, uid, authorization)
row = await get_review_document_bundle_by_id(
session, review_document_id=review_document_id
)
if row is None:
raise HTTPException(status_code=404, detail="Không tìm thấy review document")
return row
except HTTPException:
raise
except Exception:
logger.exception("review-document get by id failed id=%s", review_document_id)
raise HTTPException(status_code=500, detail="Không tải được review document") from None
raise HTTPException(status_code=501, detail="ID-based lookup requires PostgreSQL mode")
@app.put("/api/v1/review-documents/{review_document_id}")
async def update_review_document_by_id(
review_document_id: str,
body: ReviewDocumentUpdateRequest,
authorization: Optional[str] = Header(None),
):
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import update_review_document_bundle
uid = _require_authenticated_user(authorization)
if is_postgres_enabled():
try:
async with get_session() as session:
await _assert_review_document_access(session, review_document_id, uid, authorization)
row = await update_review_document_bundle(
session,
review_document_id=review_document_id,
official_bieu_mau=body.officialBieuMau,
template_data=body.templateData,
full_bundle=body.fullBundle,
)
if row is None:
raise HTTPException(status_code=404, detail="Không tìm thấy review document")
return row
except HTTPException:
raise
except Exception:
logger.exception("review-document update failed id=%s", review_document_id)
raise HTTPException(status_code=500, detail="Không cập nhật được review document") from None
raise HTTPException(status_code=501, detail="ID-based update requires PostgreSQL mode")
@app.delete("/api/v1/review-documents/{review_document_id}")
async def delete_review_document_by_id(
review_document_id: str, authorization: Optional[str] = Header(None)
):
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import delete_review_document_bundle
uid = _require_authenticated_user(authorization)
if is_postgres_enabled():
try:
async with get_session() as session:
await _assert_review_document_access(session, review_document_id, uid, authorization)
ok = await delete_review_document_bundle(
session, review_document_id=review_document_id
)
if not ok:
raise HTTPException(status_code=404, detail="Không tìm thấy review document")
return {"deleted": True, "id": review_document_id}
except HTTPException:
raise
except Exception:
logger.exception("review-document delete failed id=%s", review_document_id)
raise HTTPException(status_code=500, detail="Không xóa được review document") from None
raise HTTPException(status_code=501, detail="ID-based delete requires PostgreSQL mode")
# Backward-compatible endpoints (deprecated)
@app.post("/api/v1/applications/{case_id}/review-document")
async def save_review_document(
case_id: str,
body: ReviewDocumentSaveRequest,
authorization: Optional[str] = Header(None),
):
if _normalize_case_id(case_id) != _normalize_case_id(body.caseId):
raise HTTPException(status_code=400, detail="caseId in path and body must match.")
return await create_review_document(body, authorization)
@app.get("/api/v1/applications/{case_id}/review-document")
async def get_review_document(case_id: str):
rows = await list_review_documents(case_id, limit=1)
data = rows.get("data") if isinstance(rows, dict) else None
if isinstance(data, list) and data:
return data[0]
raise HTTPException(status_code=404, detail="Không tìm thấy JSON ReviewPanel")
@app.get("/api/v1/applications/{case_id}/review-document/be01-context")
async def get_review_document_be01_context(case_id: str):
"""Convert latest official ReviewPanel JSON to be01 `data_blank.json` shape."""
from src.be01.official_to_data_blank import official_to_data_blank
row = await get_review_document(case_id)
official = row.get("officialBieuMau") if isinstance(row, dict) else {}
if not isinstance(official, dict) or not official:
raise HTTPException(status_code=404, detail="Không có officialBieuMau để chuyển đổi.")
return {
"caseId": str(row.get("caseId") or case_id),
"be01Context": official_to_data_blank(official),
}
class PreviewApplicationFormDocxRequest(BaseModel):
"""Paired with `data_blank.json` after `official_to_data_blank(officialBieuMau)`."""
officialBieuMau: dict[str, Any]
@app.post("/api/v1/docx/preview-application-form")
async def preview_application_form_docx(body: PreviewApplicationFormDocxRequest):
"""
Render `template_application_form.docx` (Jinja2/docxtpl) and return a filled .docx.
Accepts the same `officialBieuMau` object produced on the « Xem lại » tab.
"""
from src.be01.official_to_data_blank import official_to_data_blank
from src.be01.fill_application_form import fill_application_form_docx
try:
ctx = official_to_data_blank(body.officialBieuMau or {})
except Exception as exc: # noqa: BLE001
raise HTTPException(
status_code=400, detail="Không chuyển officialBieuMau sang dữ liệu biểu mẫu: " + str(exc)
) from exc
try:
raw = fill_application_form_docx(ctx)
except ImportError as exc:
raise HTTPException(
status_code=501, detail="Thiếu thư viện docxtpl. Cài: pip install docxtpl"
) from exc
except FileNotFoundError as exc:
raise HTTPException(
status_code=500, detail="Không tìm thấy file mẫu Word trên server: " + str(exc)
) from exc
except Exception as exc: # noqa: BLE001
raise HTTPException(
status_code=500, detail="Lỗi khi render mẫu Word: " + str(exc)
) from exc
return Response(
content=raw,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": 'inline; filename="mau-don-va-bao-cao-xem-truoc.docx"'},
)
@app.post("/api/v1/docx/preview-application-form-pdf")
async def preview_application_form_pdf(body: PreviewApplicationFormDocxRequest):
"""
Same merge as `preview-application-form` (docxtpl), then LibreOffice → PDF so layout matches DOCX.
"""
from src.be01.docx_to_pdf import convert_docx_bytes_to_pdf
from src.be01.fill_application_form import fill_application_form_docx
from src.be01.official_to_data_blank import official_to_data_blank
try:
ctx = official_to_data_blank(body.officialBieuMau or {})
except Exception as exc: # noqa: BLE001
raise HTTPException(
status_code=400, detail="Không chuyển officialBieuMau sang dữ liệu biểu mẫu: " + str(exc)
) from exc
try:
docx_bytes = fill_application_form_docx(ctx)
except ImportError as exc:
raise HTTPException(
status_code=501, detail="Thiếu thư viện docxtpl. Cài: pip install docxtpl"
) from exc
except FileNotFoundError as exc:
raise HTTPException(
status_code=500, detail="Không tìm thấy file mẫu Word trên server: " + str(exc)
) from exc
except Exception as exc: # noqa: BLE001
raise HTTPException(
status_code=500, detail="Lỗi khi render mẫu Word: " + str(exc)
) from exc
try:
pdf_bytes = convert_docx_bytes_to_pdf(
docx_bytes,
relax_justified_softbreaks=True,
strip_table_row_heights=False,
)
except FileNotFoundError as exc:
raise HTTPException(
status_code=501,
detail="Chưa cài LibreOffice để xuất PDF. Docker: thêm libreoffice-writer-nogui; "
"hoặc đặt LIBREOFFICE_PATH. Chi tiết: " + str(exc),
) from exc
except (RuntimeError, ValueError, subprocess.TimeoutExpired) as exc:
raise HTTPException(
status_code=500, detail="Không chuyển DOCX sang PDF: " + str(exc)
) from exc
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": 'inline; filename="mau-ho-so-sang-kien.pdf"'},
)
@app.post("/api/v1/docx/convert-pdf")
async def convert_uploaded_docx_to_pdf(
file: UploadFile = File(...),
relax_justified_softbreaks: bool = Form(True),
strip_table_row_heights: bool = Form(False),
):
"""
Convert an uploaded `.docx` file to PDF using LibreOffice for near-Word layout fidelity.
"""
from src.be01.docx_to_pdf import convert_docx_bytes_to_pdf
filename = (file.filename or "").strip()
if not filename:
raise HTTPException(status_code=400, detail="Thiếu tên file .docx.")
if not filename.lower().endswith(".docx"):
raise HTTPException(status_code=400, detail="Chỉ hỗ trợ file .docx.")
try:
docx_bytes = await file.read()
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail="Không đọc được nội dung file upload.") from exc
if not docx_bytes:
raise HTTPException(status_code=400, detail="File .docx rỗng.")
try:
pdf_bytes = await asyncio.to_thread(
convert_docx_bytes_to_pdf,
docx_bytes,
relax_justified_softbreaks=relax_justified_softbreaks,
strip_table_row_heights=strip_table_row_heights,
)
except FileNotFoundError as exc:
raise HTTPException(
status_code=501,
detail="Chưa cài LibreOffice để xuất PDF. Docker: thêm libreoffice-writer-nogui; "
"hoặc đặt LIBREOFFICE_PATH. Chi tiết: " + str(exc),
) from exc
except (RuntimeError, ValueError, subprocess.TimeoutExpired) as exc:
raise HTTPException(
status_code=500, detail="Không chuyển DOCX sang PDF: " + str(exc)
) from exc
safe_stem = "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in Path(filename).stem)
out_name = (safe_stem or "document") + ".pdf"
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": f'inline; filename="{out_name}"'},
)
# --- Đơn sáng kiến: nộp PDF (người nộp) + danh sách cho admin « Danh Sách Sáng kiến » ---
SUBMITTED_INITIATIVES_DIR = Path(
os.getenv(
"SUBMITTED_INITIATIVES_DIR",
str((APP_ROOT_DIR.parent / "fe0" / "public" / "submitted-initiatives").resolve()),
)
)
SUBMITTED_INDEX_PATH = SUBMITTED_INITIATIVES_DIR / "index.json"
SUBMITTED_INITIATIVES_DIR.mkdir(parents=True, exist_ok=True)
app.mount(
"/submitted-initiatives",
StaticFiles(directory=str(SUBMITTED_INITIATIVES_DIR.resolve())),
name="submitted_initiatives",
)
def _load_submitted_items() -> List[Dict[str, Any]]:
SUBMITTED_INITIATIVES_DIR.mkdir(parents=True, exist_ok=True)
if not SUBMITTED_INDEX_PATH.exists():
return []
try:
with open(SUBMITTED_INDEX_PATH, "r", encoding="utf-8") as handle:
data = json.load(handle)
items = data.get("items") if isinstance(data, dict) else None
return list(items) if isinstance(items, list) else []
except Exception:
logger.exception("Failed to load submitted initiatives index")
return []
def _save_submitted_items(items: List[Dict[str, Any]]) -> None:
SUBMITTED_INITIATIVES_DIR.mkdir(parents=True, exist_ok=True)
with open(SUBMITTED_INDEX_PATH, "w", encoding="utf-8") as handle:
json.dump({"items": items}, handle, ensure_ascii=False, indent=2)
@app.post("/api/applications/submit")
async def submit_initiative_application(
file: UploadFile = File(...),
metadata: str = Form(""),
authorization: Optional[str] = Header(None),
):
"""
Nhận file PDF hồ sơ đầy đủ từ tab « Xem lại », lưu vào public/submitted-initiatives
và ghi vào index để GET /api/applications trả về cho admin.
"""
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Yêu cầu file PDF (.pdf)")
content = await file.read()
if not content or len(content) < 100:
raise HTTPException(status_code=400, detail="File PDF không hợp lệ hoặc quá nhỏ")
try:
meta = json.loads(metadata) if metadata.strip() else {}
except json.JSONDecodeError:
meta = {}
new_id = f"sub-{uuid.uuid4().hex[:16]}"
now = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
safe_name = f"{new_id}.pdf"
SUBMITTED_INITIATIVES_DIR.mkdir(parents=True, exist_ok=True)
pdf_path = SUBMITTED_INITIATIVES_DIR / safe_name
with open(pdf_path, "wb") as handle:
handle.write(content)
initiative_name = (meta.get("initiativeName") or meta.get("name") or "").strip() or "Hồ sơ sáng kiến"
author_name = (meta.get("authorName") or "").strip() or "—"
author_email = (meta.get("authorEmail") or "").strip() or None
author_phone = (meta.get("authorPhone") or "").strip() or None
case_id = (meta.get("caseId") or "").strip() or None
item: Dict[str, Any] = {
"id": new_id,
"submittedDate": now,
"name": initiative_name,
"author": {
"id": case_id or new_id,
"name": author_name,
"email": author_email,
"phone": author_phone,
},
"subjectId": meta.get("subjectId") or "",
"groupId": meta.get("groupId") or "",
"status": "pending",
"reviewStatus": "not_reviewed",
"supervisor": None,
"reviewer": None,
"reviewDeadline": None,
"conference": None,
"topicType": str(meta.get("topicType") or "Hồ sơ PDF (đơn + báo cáo)"),
"files": {
"fullText": {"url": f"/submitted-initiatives/{safe_name}", "type": "pdf"},
"abstract": None,
"poster": None,
},
}
public_url = f"/submitted-initiatives/{safe_name}"
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import ApplicationSubmitPersistError, save_submitted_application
from src.initiative_db.submission_readiness import ApplicationSubmissionNotReadyError
owner_uid = decode_access_token_user_id(authorization)
if is_postgres_enabled():
try:
pdf_sha256 = hashlib.sha256(content).hexdigest()
pdf_len = len(content)
async with get_session() as session:
saved = await save_submitted_application(
session=session,
metadata=meta if isinstance(meta, dict) else {},
file_url=public_url,
submission_id=new_id,
owner_user_id=owner_uid,
pdf_byte_size=pdf_len,
pdf_sha256=pdf_sha256,
pdf_original_name=safe_name,
pdf_body=content,
)
logger.info("Submitted initiative PDF persisted in PostgreSQL path=%s", pdf_path)
return saved
except ApplicationSubmissionNotReadyError as exc:
try:
pdf_path.unlink(missing_ok=True)
except OSError:
pass
raise HTTPException(
status_code=400,
detail={
"message": "Hồ sơ chưa đủ điều kiện nộp.",
"missing": exc.missing,
},
) from exc
except ApplicationSubmitPersistError as exc:
raise HTTPException(
status_code=503,
detail=str(exc),
) from exc
except Exception:
logger.exception("application submission persist (PostgreSQL) failed; fallback to file index")
items = _load_submitted_items()
items.insert(0, item)
_save_submitted_items(items)
logger.info("Submitted initiative PDF id=%s path=%s", new_id, pdf_path)
return {
"id": new_id,
"submittedDate": now,
"publicUrl": public_url,
"name": initiative_name,
}
@app.post("/api/applications/new")
async def create_submitted_application(
body: CreateSubmittedApplicationBody,
authorization: Optional[str] = Header(None),
):
"""Create a new submitted-application shell row and return generated application id."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import User
from src.initiative_db.submissions import create_submitted_application_shell
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để tạo hồ sơ mới.")
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Tính năng này yêu cầu PostgreSQL.")
try:
async with get_session() as session:
user = await session.get(User, uid)
row = await create_submitted_application_shell(
session=session,
owner_user_id=uid,
name=(body.name or "").strip() or None,
author_name=(str(user.full_name).strip() if user and user.full_name else None),
author_email=(str(user.email).strip() if user and user.email else None),
author_phone=(str(user.phone).strip() if user and user.phone else None),
)
return {"id": str(row.get("id") or ""), "application": row}
except HTTPException:
raise
except Exception:
logger.exception("POST /api/applications/new failed")
raise HTTPException(status_code=500, detail="Không thể tạo hồ sơ mới.") from None
def _get_application_from_file_index(application_id: str) -> Optional[Dict[str, Any]]:
for row in _load_submitted_items():
if str(row.get("id")) == application_id:
return row
return None
@app.get("/api/applications/mine")
async def list_my_applications(authorization: Optional[str] = Header(None)):
"""Submitted applications for the logged-in applicant (Postgres + optional file fallback)."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import User
from src.initiative_db.submissions import list_my_submitted_applications
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem hồ sơ của bạn.")
if is_postgres_enabled():
try:
async with get_session() as session:
user = await session.get(User, uid)
email = str(user.email) if user is not None else ""
data = await list_my_submitted_applications(session, uid, email)
return {"data": data}
except HTTPException:
raise
except Exception:
logger.exception("GET /api/applications/mine (PostgreSQL) failed")
raise HTTPException(status_code=500, detail="Không tải được danh sách hồ sơ") from None
payload = decode_bearer_token(authorization)
token_email = str((payload or {}).get("email") or "").strip().lower()
items = _load_submitted_items()
filtered: List[Dict[str, Any]] = []
for row in items:
auth_em = str((row.get("author") or {}).get("email") or "").strip().lower()
if token_email and auth_em == token_email:
filtered.append(row)
filtered.sort(key=lambda x: str(x.get("submittedDate") or ""), reverse=True)
for row in filtered:
sd = str(row.get("submittedDate") or "")
if len(sd) >= 4 and sd[:4].isdigit():
row["calendarYear"] = int(sd[:4])
return {"data": filtered}
async def _enrich_application_detail_full_pdf_presign(session, row: Dict[str, Any]) -> None:
"""If full PDF artifact is stored as a MinIO exports key, add files.fullText.viewUrl for admins."""
from sqlalchemy import select
from src.initiative_db.models import ApplicationArtifact, Initiative
case = str(row.get("draft_case_id") or "").strip()
if not case:
return
ini = (
await session.execute(select(Initiative).where(Initiative.case_code == case))
).scalar_one_or_none()
if ini is None:
return
art = (
await session.execute(
select(ApplicationArtifact).where(
ApplicationArtifact.initiative_id == ini.id,
ApplicationArtifact.role == "full_pdf",
)
)
).scalar_one_or_none()
if art is None or not (art.storage_uri or "").strip():
return
uri = (art.storage_uri or "").strip()
if uri.startswith("/submitted-initiatives") or uri.startswith(("http://", "https://")):
return
try:
from src.minio.storage import S3Storage, settings as s3s
s3 = S3Storage()
bucket = s3s.s3_bucket_exports
view_url = await s3.get_download_url(
bucket,
uri,
ttl=3600,
filename=(art.original_name or "ho-so.pdf"),
inline=True,
response_content_type="application/pdf",
)
except Exception:
logger.warning("Presigned URL for submitted full PDF failed (case=%s)", case, exc_info=True)
return
files = row.setdefault("files", {})
ft = files.get("fullText")
merged = dict(ft) if isinstance(ft, dict) else {}
merged["viewUrl"] = view_url
merged["storageKey"] = uri
files["fullText"] = merged
@app.get("/api/applications/export")
async def export_applications_excel(
authorization: Optional[str] = Header(None),
page: int = 1,
pageSize: int = 20,
name: str = "",
authorName: str = "",
reviewerName: str = "",
status: str = "",
reviewStatus: str = "",
dateFrom: str = "",
dateTo: str = "",
sortBy: str = "submittedDate",
sortOrder: str = "desc",
lifecycle: str = "",
):
"""
Xuất Excel danh sách sáng kiến (cùng bộ lọc / sắp xếp với GET /api/applications).
Cột TT & MSSK: «YYYY-n» (n tăng theo từng năm trong bản xuất). Chỉ tài khoản admin.
"""
_require_admin_user(authorization)
from src.be01.export_applications_list_xlsx import build_applications_list_xlsx
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import submitted_applications_pairs_for_export
_ = page, pageSize # client gửi cùng query với danh sách; không phân trang khi xuất
if is_postgres_enabled():
try:
async with get_session() as session:
pairs = await submitted_applications_pairs_for_export(
session,
name=name,
author_name=authorName,
reviewer_name=reviewerName,
status=status,
review_status=reviewStatus,
date_from=dateFrom,
date_to=dateTo,
sort_by=sortBy,
sort_order=sortOrder,
lifecycle=lifecycle,
)
except Exception:
logger.exception("GET /api/applications/export (PostgreSQL) failed")
raise HTTPException(
status_code=503,
detail="Không thể xuất Excel từ cơ sở dữ liệu. Vui lòng thử lại sau.",
) from None
else:
items = _load_submitted_items()
lc = (lifecycle or "").strip().lower()
def match(row: Dict[str, Any], *, skip_status: bool = False) -> bool:
row_status = str(row.get("status") or "")
if lc == "inbox":
if row_status in ("approved", "rejected"):
return False
elif lc == "decided":
if row_status not in ("approved", "rejected"):
return False
n = name.strip().lower()
if n and n not in str(row.get("name") or "").lower():
return False
an = authorName.strip().lower()
auth = row.get("author") or {}
if an and an not in str(auth.get("name") or "").lower():
return False
rn = reviewerName.strip().lower()
if rn:
rev = row.get("reviewer") or {}
if rn not in str(rev.get("name") or "").lower():
return False
if not skip_status and status and row_status != status:
return False
if reviewStatus and str(row.get("reviewStatus") or "") != reviewStatus:
return False
sd = row.get("submittedDate")
if dateFrom and sd:
sd_day = str(sd)[:10]
if len(sd_day) == 10 and sd_day < dateFrom:
return False
if dateTo and sd:
sd_day = str(sd)[:10]
if len(sd_day) == 10 and sd_day > dateTo:
return False
return True
filtered = [x for x in items if match(x, skip_status=False)]
reverse = sortOrder != "asc"
if sortBy == "name":
filtered.sort(key=lambda x: str(x.get("name") or ""), reverse=reverse)
elif sortBy == "author":
filtered.sort(
key=lambda x: str((x.get("author") or {}).get("name") or ""),
reverse=reverse,
)
else:
filtered.sort(key=lambda x: str(x.get("submittedDate") or ""), reverse=reverse)
pairs = [(r, {}) for r in filtered]
body = build_applications_list_xlsx(pairs)
safe_fn = f"sang-kien-export-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M')}.xlsx"
return Response(
content=body,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{safe_fn}"'},
)
BULK_APPLICATION_BACKUPS_MAX = 250
@app.get("/api/applications/export-backups")
async def export_applications_backups_bundle(
authorization: Optional[str] = Header(None),
page: int = 1,
pageSize: int = 20,
name: str = "",
authorName: str = "",
reviewerName: str = "",
status: str = "",
reviewStatus: str = "",
dateFrom: str = "",
dateTo: str = "",
sortBy: str = "submittedDate",
sortOrder: str = "desc",
lifecycle: str = "",
):
"""
Admin-only: một file ZIP chứa từng file ZIP sao lưu hồ sơ (cùng bộ lọc / sắp xếp với Xuất Excel).
"""
from sqlalchemy import desc, select
from src.audit import AuditAction, record_audit, resolve_actor_fields
from src.initiative_db.application_backup import build_backup_zipstream
from src.initiative_db.backup_naming import backup_zip_attachment_filename
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import ApplicationArtifact, ApplicationReviewDocument, User
from src.initiative_db.submissions import (
_as_review_document_row,
resolve_submitted_initiative_for_backup,
submitted_applications_pairs_for_export,
)
from src.minio.storage import _sanitize_filename, settings as s3_settings
from zipstream import ZipStream
admin_uid = _require_admin_user(authorization)
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Sao lưu yêu cầu PostgreSQL.")
_ = page, pageSize
outer = ZipStream()
used_inner_names: set[str] = set()
def _unique_member_name(safe_fn: str) -> str:
base = safe_fn or "backup.zip"
if base not in used_inner_names:
used_inner_names.add(base)
return base
stem = base[:-4] if base.lower().endswith(".zip") else base
n = 2
while True:
cand = f"{stem}_{n}.zip"
if cand not in used_inner_names:
used_inner_names.add(cand)
return cand
n += 1
packed_ids: List[str] = []
async with get_session() as session:
try:
pairs = await submitted_applications_pairs_for_export(
session,
name=name,
author_name=authorName,
reviewer_name=reviewerName,
status=status,
review_status=reviewStatus,
date_from=dateFrom,
date_to=dateTo,
sort_by=sortBy,
sort_order=sortOrder,
lifecycle=lifecycle,
)
except Exception:
logger.exception("GET /api/applications/export-backups (list) failed")
raise HTTPException(
status_code=503,
detail="Không thể tải danh sách hồ sơ để đóng gói sao lưu.",
) from None
if len(pairs) > BULK_APPLICATION_BACKUPS_MAX:
raise HTTPException(
status_code=413,
detail=f"Quá nhiều hồ sơ ({len(pairs)}). Tối đa {BULK_APPLICATION_BACKUPS_MAX} — vui lòng thu hẹp bộ lọc.",
)
for row, _payload in pairs:
application_id = str(row.get("id") or "").strip()
if not application_id:
continue
resolved = await resolve_submitted_initiative_for_backup(session, application_id)
if resolved is None:
continue
initiative, public_id = resolved
arts = (
await session.execute(
select(ApplicationArtifact).where(ApplicationArtifact.initiative_id == initiative.id)
)
).scalars().all()
rd = (
await session.execute(
select(ApplicationReviewDocument)
.where(ApplicationReviewDocument.initiative_id == initiative.id)
.order_by(desc(ApplicationReviewDocument.document_version))
.limit(1)
)
).scalar_one_or_none()
review_json = _as_review_document_row(rd) if rd is not None else None
owner = await session.get(User, initiative.owner_id)
inner_fn = backup_zip_attachment_filename(
owner_email=owner.email if owner is not None else None,
owner_full_name=owner.full_name if owner is not None else None,
public_application_id=public_id,
)
member_name = _unique_member_name(inner_fn)
inner_z = build_backup_zipstream(
settings=s3_settings,
initiative=initiative,
application_id=public_id,
case_code=initiative.case_code,
artifacts=list(arts),
review_doc_json=review_json,
owner_id=str(initiative.owner_id),
submitted_at=initiative.submitted_at.isoformat()
if initiative.submitted_at is not None
else None,
)
outer.add(iter(inner_z), member_name)
packed_ids.append(public_id)
if not packed_ids:
raise HTTPException(
status_code=404,
detail="Không có hồ sơ nào khớp bộ lọc để đóng gói sao lưu.",
)
actor_email, actor_role = await resolve_actor_fields(session, admin_uid)
await record_audit(
session,
actor_user_id=admin_uid,
actor_email=actor_email,
actor_role=actor_role,
action=AuditAction.read,
entity_type="application_backup_bulk",
entity_id="bulk",
metadata={
"outcome": "nested_zip_stream",
"packed_count": len(packed_ids),
"application_ids": packed_ids[:100],
"truncated_ids": len(packed_ids) > 100,
},
)
await session.commit()
outer_fn = _sanitize_filename(
f"sang-kien-sao-luu-tong-hop-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M')}.zip"
) or "sang-kien-sao-luu-tong-hop.zip"
return StreamingResponse(
outer,
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{outer_fn}"'},
)
@app.get("/api/applications/{application_id}")
async def get_application(
application_id: str, authorization: Optional[str] = Header(None)
):
"""Single submitted application for review page (admin / applicant deep link)."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import User
from src.initiative_db.submissions import (
_applicant_may_mutate_row,
_as_submission_item,
_resolve_initiative_and_latest_draft_for_application_id,
get_application_by_id,
)
uid = _require_authenticated_user(authorization)
if is_postgres_enabled():
try:
async with get_session() as session:
if not _is_staff_reviewer(authorization):
try:
initiative, draft = await _resolve_initiative_and_latest_draft_for_application_id(
session, application_id
)
except LookupError as exc:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ") from exc
payload = dict(draft.payload) if isinstance(draft.payload, dict) else {}
row = _as_submission_item(initiative, payload)
user = await session.get(User, uid)
email = str(user.email) if user is not None else ""
if not _applicant_may_mutate_row(initiative, row, uid, email):
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
row = await get_application_by_id(session, application_id)
if row is not None:
await _enrich_application_detail_full_pdf_presign(session, row)
return row
except HTTPException:
raise
except Exception:
logger.exception("application detail query (PostgreSQL) failed; refusing file index fallback while DB is configured")
raise HTTPException(
status_code=503,
detail="Không thể tải hồ sơ từ cơ sở dữ liệu. Vui lòng thử lại sau hoặc liên hệ quản trị.",
) from None
if not _is_staff_reviewer(authorization):
payload = decode_bearer_token(authorization)
token_email = str((payload or {}).get("email") or "").strip().lower()
row_check = _get_application_from_file_index(application_id)
if row_check is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ")
auth_em = str((row_check.get("author") or {}).get("email") or "").strip().lower()
if not token_email or auth_em != token_email:
raise HTTPException(status_code=403, detail="Không có quyền xem hồ sơ này.")
row = _get_application_from_file_index(application_id)
if row is not None:
return row
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ")
@app.get("/api/applications/{application_id}/backup")
async def download_application_backup(
application_id: str,
authorization: Optional[str] = Header(None),
):
"""
Admin-only: stream a ZIP (manifest + submitted PDF + official DOCX/PDF + evidence + optional review JSON).
"""
from sqlalchemy import desc, select
from src.audit import AuditAction, record_audit, resolve_actor_fields
from src.initiative_db.application_backup import build_backup_zipstream
from src.initiative_db.backup_naming import backup_zip_attachment_filename
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import ApplicationArtifact, ApplicationReviewDocument, User
from src.initiative_db.submissions import _as_review_document_row, resolve_submitted_initiative_for_backup
from src.minio.storage import settings as s3_settings
admin_uid = _require_admin_user(authorization)
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Sao lưu yêu cầu PostgreSQL.")
async with get_session() as session:
resolved = await resolve_submitted_initiative_for_backup(session, application_id)
if resolved is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ.")
initiative, public_id = resolved
arts = (
await session.execute(
select(ApplicationArtifact).where(ApplicationArtifact.initiative_id == initiative.id)
)
).scalars().all()
rd = (
await session.execute(
select(ApplicationReviewDocument)
.where(ApplicationReviewDocument.initiative_id == initiative.id)
.order_by(desc(ApplicationReviewDocument.document_version))
.limit(1)
)
).scalar_one_or_none()
review_json = _as_review_document_row(rd) if rd is not None else None
actor_email, actor_role = await resolve_actor_fields(session, admin_uid)
await record_audit(
session,
actor_user_id=admin_uid,
actor_email=actor_email,
actor_role=actor_role,
action=AuditAction.read,
entity_type="application_backup",
entity_id=public_id,
metadata={
"outcome": "streaming_zip",
"initiative_id": str(initiative.id),
"case_code": initiative.case_code,
"artifact_count": len(arts),
},
)
await session.commit()
owner = await session.get(User, initiative.owner_id)
safe_fn = backup_zip_attachment_filename(
owner_email=owner.email if owner is not None else None,
owner_full_name=owner.full_name if owner is not None else None,
public_application_id=public_id,
)
z = build_backup_zipstream(
settings=s3_settings,
initiative=initiative,
application_id=public_id,
case_code=initiative.case_code,
artifacts=list(arts),
review_doc_json=review_json,
owner_id=str(initiative.owner_id),
submitted_at=initiative.submitted_at.isoformat()
if initiative.submitted_at is not None
else None,
)
return StreamingResponse(
z,
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{safe_fn}"'},
)
@app.put("/api/applications/{application_id}")
async def update_submitted_application(
application_id: str,
body: UpdateSubmittedApplicationBody,
authorization: Optional[str] = Header(None),
):
"""Update name and submitted date for the applicant's own submission (Postgres or file index)."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import User
from src.initiative_db.submissions import _parse_submitted_date_input, update_my_submitted_application
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để cập nhật hồ sơ.")
if is_postgres_enabled():
try:
async with get_session() as session:
user = await session.get(User, uid)
email = str(user.email) if user is not None else ""
return await update_my_submitted_application(
session, uid, email, application_id, body.name, body.submittedDate
)
except LookupError:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ")
except PermissionError:
raise HTTPException(status_code=403, detail="Không có quyền cập nhật hồ sơ này.")
except HTTPException:
raise
except Exception:
logger.exception("PUT /api/applications (PostgreSQL) failed")
raise HTTPException(status_code=500, detail="Không thể cập nhật hồ sơ") from None
payload = decode_bearer_token(authorization)
token_email = str((payload or {}).get("email") or "").strip().lower()
items = _load_submitted_items()
idx = next((i for i, r in enumerate(items) if str(r.get("id")) == application_id), None)
if idx is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ")
row = items[idx]
auth_em = str((row.get("author") or {}).get("email") or "").strip().lower()
if not token_email or auth_em != token_email:
raise HTTPException(status_code=403, detail="Không có quyền cập nhật hồ sơ này.")
try:
dt = _parse_submitted_date_input(body.submittedDate)
iso = dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
except Exception:
raise HTTPException(status_code=400, detail="Ngày nộp không hợp lệ.")
updated = {**row, "name": body.name.strip(), "submittedDate": iso}
if len(iso) >= 4 and iso[:4].isdigit():
updated["calendarYear"] = int(iso[:4])
items[idx] = updated
_save_submitted_items(items)
return items[idx]
@app.delete("/api/applications/{application_id}")
async def delete_submitted_application(
application_id: str,
authorization: Optional[str] = Header(None),
):
"""Delete the applicant's own submission (Postgres cascade; file index removes row + PDF if present)."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import User
from src.initiative_db.submissions import delete_my_submitted_application
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xóa hồ sơ.")
if is_postgres_enabled():
try:
async with get_session() as session:
user = await session.get(User, uid)
email = str(user.email) if user is not None else ""
await delete_my_submitted_application(session, uid, email, application_id)
return {"deleted": True, "id": application_id}
except LookupError:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ")
except PermissionError:
raise HTTPException(status_code=403, detail="Không có quyền xóa hồ sơ này.")
except HTTPException:
raise
except Exception:
logger.exception("DELETE /api/applications (PostgreSQL) failed")
raise HTTPException(status_code=500, detail="Không thể xóa hồ sơ") from None
payload = decode_bearer_token(authorization)
token_email = str((payload or {}).get("email") or "").strip().lower()
items = _load_submitted_items()
idx = next((i for i, r in enumerate(items) if str(r.get("id")) == application_id), None)
if idx is None:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ")
row = items[idx]
auth_em = str((row.get("author") or {}).get("email") or "").strip().lower()
if not token_email or auth_em != token_email:
raise HTTPException(status_code=403, detail="Không có quyền xóa hồ sơ này.")
files = row.get("files") or {}
ft = files.get("fullText") if isinstance(files.get("fullText"), dict) else None
url = str((ft or {}).get("url") or "") if ft else ""
if url.startswith("/submitted-initiatives/"):
fname = url.replace("/submitted-initiatives/", "").lstrip("/")
safe = "".join(c for c in fname if c.isalnum() or c in ("-", "_", "."))
if safe:
pdf_path = SUBMITTED_INITIATIVES_DIR / safe
try:
if pdf_path.is_file():
pdf_path.unlink()
except OSError:
logger.warning("Could not delete PDF file %s", pdf_path)
items.pop(idx)
_save_submitted_items(items)
return {"deleted": True, "id": application_id}
@app.get("/api/conferences")
async def list_conference_filter_options():
"""
Distinct hội nghị / đợt (from ``application_workflow.conference``) for council/admin list filters.
Returns a list of ``{id, name}`` objects for ``useLookupQuery`` / ``toOptionList``.
"""
from sqlalchemy import select
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import ApplicationWorkflow
if not is_postgres_enabled():
return []
try:
async with get_session() as session:
rows = (await session.execute(select(ApplicationWorkflow))).scalars().all()
seen: dict[str, dict] = {}
for wf in rows:
c = wf.conference
if not isinstance(c, dict):
continue
cid = str(c.get("id") or "").strip()
cname = str(c.get("name") or "").strip()
if not cname:
continue
opt_id = cid if cid else f"__name__:{cname}"
if opt_id not in seen:
seen[opt_id] = {"id": opt_id, "name": cname}
return sorted(seen.values(), key=lambda x: (str(x.get("name") or "").lower(), str(x.get("id") or "")))
except Exception:
logger.exception("GET /api/conferences failed")
return []
@app.get("/api/supervisors")
async def list_supervisor_filter_options():
"""Distinct supervisors from ``application_workflow.supervisor`` for dashboard filters."""
from sqlalchemy import select
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.models import ApplicationWorkflow
if not is_postgres_enabled():
return []
try:
async with get_session() as session:
rows = (await session.execute(select(ApplicationWorkflow))).scalars().all()
seen: dict[str, dict] = {}
for wf in rows:
s = wf.supervisor
if not isinstance(s, dict):
continue
sid = str(s.get("id") or "").strip()
sname = str(s.get("name") or s.get("fullName") or "").strip()
if not sname:
continue
opt_id = sid if sid else f"__name__:{sname}"
if opt_id not in seen:
seen[opt_id] = {"id": opt_id, "name": sname}
return sorted(seen.values(), key=lambda x: (str(x.get("name") or "").lower(), str(x.get("id") or "")))
except Exception:
logger.exception("GET /api/supervisors failed")
return []
@app.get("/api/applications")
async def list_applications(
page: int = 1,
pageSize: int = 20,
name: str = "",
authorName: str = "",
reviewerName: str = "",
status: str = "",
reviewStatus: str = "",
dateFrom: str = "",
dateTo: str = "",
sortBy: str = "submittedDate",
sortOrder: str = "desc",
lifecycle: str = "",
authorization: Optional[str] = Header(None),
):
"""
Danh sách hồ sơ đã nộp (PDF) cho dashboard admin / hội đồng.
Dữ liệu lưu cục bộ qua POST /api/applications/submit.
"""
_require_staff_reviewer(authorization)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.submissions import list_submitted_applications
page = max(1, page)
page_size = max(1, min(100, pageSize))
if is_postgres_enabled():
try:
async with get_session() as session:
return await list_submitted_applications(
session=session,
page=page,
page_size=page_size,
name=name,
author_name=authorName,
reviewer_name=reviewerName,
status=status,
review_status=reviewStatus,
date_from=dateFrom,
date_to=dateTo,
sort_by=sortBy,
sort_order=sortOrder,
lifecycle=lifecycle,
)
except Exception:
logger.exception("application list query (PostgreSQL) failed; refusing file index fallback while DB is configured")
raise HTTPException(
status_code=503,
detail="Không thể tải danh sách hồ sơ từ cơ sở dữ liệu. Vui lòng thử lại sau hoặc liên hệ quản trị.",
) from None
items = _load_submitted_items()
lc = (lifecycle or "").strip().lower()
def match(row: Dict[str, Any], *, skip_status: bool = False) -> bool:
row_status = str(row.get("status") or "")
if lc == "inbox":
if row_status in ("approved", "rejected"):
return False
elif lc == "decided":
if row_status not in ("approved", "rejected"):
return False
n = name.strip().lower()
if n and n not in str(row.get("name") or "").lower():
return False
an = authorName.strip().lower()
auth = row.get("author") or {}
if an and an not in str(auth.get("name") or "").lower():
return False
rn = reviewerName.strip().lower()
if rn:
rev = row.get("reviewer") or {}
if rn not in str(rev.get("name") or "").lower():
return False
if not skip_status and status and row_status != status:
return False
if reviewStatus and str(row.get("reviewStatus") or "") != reviewStatus:
return False
sd = row.get("submittedDate")
if dateFrom and sd:
sd_day = str(sd)[:10]
if len(sd_day) == 10 and sd_day < dateFrom:
return False
if dateTo and sd:
sd_day = str(sd)[:10]
if len(sd_day) == 10 and sd_day > dateTo:
return False
return True
filtered_for_counts = [x for x in items if match(x, skip_status=True)]
status_counts = {
"approved": sum(1 for x in filtered_for_counts if str(x.get("status") or "") == "approved"),
"rejected": sum(1 for x in filtered_for_counts if str(x.get("status") or "") == "rejected"),
}
filtered = [x for x in items if match(x, skip_status=False)]
reverse = sortOrder != "asc"
if sortBy == "name":
filtered.sort(key=lambda x: str(x.get("name") or ""), reverse=reverse)
elif sortBy == "author":
filtered.sort(
key=lambda x: str((x.get("author") or {}).get("name") or ""),
reverse=reverse,
)
else:
filtered.sort(key=lambda x: str(x.get("submittedDate") or ""), reverse=reverse)
total = len(filtered)
start = (page - 1) * page_size
page_data = filtered[start : start + page_size]
total_pages = max(1, (total + page_size - 1) // page_size) if total else 1
return {
"data": page_data,
"pagination": {
"page": page,
"pageSize": page_size,
"totalItems": total,
"totalPages": total_pages,
},
"statusCounts": status_counts,
}
@app.get("/api/applications/{application_id}/admin-result")
async def get_application_admin_result(
application_id: str,
authorization: Optional[str] = Header(None),
):
"""READ kết quả Duyệt/Từ chối do quản trị ghi nhận (theo ``applicationId``)."""
_require_admin_user(authorization)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.application_admin_results import get_admin_result_for_application
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để đọc kết quả.")
async with get_session() as session:
row = await get_admin_result_for_application(session, application_id)
if row is None:
raise HTTPException(status_code=404, detail="Chưa có kết quả cho mã hồ sơ này.")
return row
@app.get("/api/notifications")
async def api_list_notifications(
page: int = 1,
pageSize: int = 20,
authorization: Optional[str] = Header(None),
):
"""In-app inbox for the current user (applicant receives rows after admin adjudication)."""
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem thông báo.")
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.user_notifications import list_notifications_for_user
if not is_postgres_enabled():
return {
"data": [],
"pagination": {"page": 1, "pageSize": max(1, min(100, pageSize)), "totalItems": 0, "totalPages": 1},
}
async with get_session() as session:
return await list_notifications_for_user(session, uid, page=page, page_size=pageSize)
@app.get("/api/notifications/unread-count")
async def api_notifications_unread_count(authorization: Optional[str] = Header(None)):
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem thông báo.")
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.user_notifications import count_unread_notifications
if not is_postgres_enabled():
return {"count": 0}
async with get_session() as session:
n = await count_unread_notifications(session, uid)
return {"count": n}
@app.patch("/api/notifications/{notification_id}/read")
async def api_mark_notification_read(
notification_id: str,
authorization: Optional[str] = Header(None),
):
uid = decode_access_token_user_id(authorization)
if uid is None:
raise HTTPException(status_code=401, detail="Đăng nhập để xem thông báo.")
try:
nid = uuid.UUID(notification_id.strip())
except ValueError:
raise HTTPException(status_code=404, detail="Không tìm thấy thông báo.") from None
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.user_notifications import mark_notification_read
if not is_postgres_enabled():
raise HTTPException(status_code=404, detail="Không tìm thấy thông báo.")
async with get_session() as session:
ok = await mark_notification_read(session, uid, nid)
if not ok:
raise HTTPException(status_code=404, detail="Không tìm thấy thông báo.")
return {"ok": True}
@app.post("/api/applications/{application_id}/admin-result")
async def create_application_admin_result(
application_id: str,
body: AdminApplicationResultBody,
authorization: Optional[str] = Header(None),
):
"""CREATE kết quả — đồng bộ ``initiatives.status`` (approved / rejected)."""
admin_uid = _require_admin_user(authorization)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.application_admin_results import create_admin_result
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để lưu kết quả.")
from src.initiative_db.user_notifications import best_effort_notify_applicant_after_admin_decision
result: Optional[Dict[str, Any]] = None
try:
async with get_session() as session:
result = await create_admin_result(
session,
application_id,
admin_uid,
decision=body.decision,
feedback=body.feedback,
rationale=body.rationale,
)
except LookupError:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ đã nộp với mã đã chọn.") from None
except ValueError as exc:
msg = str(exc)
if msg == "result_already_exists":
raise HTTPException(
status_code=409,
detail="Đã có kết quả cho hồ sơ này — dùng cập nhật hoặc xóa trước.",
) from None
if msg == "invalid_decision":
raise HTTPException(status_code=422, detail="Quyết định không hợp lệ.") from None
raise HTTPException(status_code=400, detail=msg) from None
await best_effort_notify_applicant_after_admin_decision(result)
return result
@app.put("/api/applications/{application_id}/admin-result")
async def update_application_admin_result(
application_id: str,
body: AdminApplicationResultBody,
authorization: Optional[str] = Header(None),
):
"""Idempotent upsert: tạo hoặc cập nhật kết quả trong một yêu cầu (đồng bộ ``initiatives.status``)."""
admin_uid = _require_admin_user(authorization)
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.application_admin_results import upsert_admin_result
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để lưu kết quả.")
from src.initiative_db.user_notifications import best_effort_notify_applicant_after_admin_decision
result: Optional[Dict[str, Any]] = None
try:
async with get_session() as session:
result = await upsert_admin_result(
session,
application_id,
admin_uid,
decision=body.decision,
feedback=body.feedback,
rationale=body.rationale,
)
except LookupError:
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ đã nộp với mã đã chọn.") from None
except ValueError as exc:
if str(exc) == "invalid_decision":
raise HTTPException(status_code=422, detail="Quyết định không hợp lệ.") from None
raise HTTPException(status_code=400, detail=str(exc)) from None
await best_effort_notify_applicant_after_admin_decision(result)
return result
@app.delete("/api/applications/{application_id}/admin-result")
async def delete_application_admin_result(
application_id: str,
authorization: Optional[str] = Header(None),
):
"""DELETE kết quả — trả ``initiatives.status`` về ``submitted``."""
from src.initiative_db.engine import get_session, is_postgres_enabled
from src.initiative_db.application_admin_results import delete_admin_result
admin_uid = _require_admin_user(authorization)
if not is_postgres_enabled():
raise HTTPException(status_code=503, detail="Cần PostgreSQL để xóa kết quả.")
try:
async with get_session() as session:
await delete_admin_result(session, application_id, actor_user_id=admin_uid)
except LookupError as exc:
if exc.args and exc.args[0] == "result_not_found":
raise HTTPException(status_code=404, detail="Chưa có kết quả để xóa.") from None
raise HTTPException(status_code=404, detail="Không tìm thấy hồ sơ đã nộp với mã đã chọn.") from None
return {"deleted": True, "applicationId": application_id}
if __name__ == "__main__":
import uvicorn # type: ignore
uvicorn.run(app, host='0.0.0.0', port='4402', debug=True)