Files
sciagent/be0/scripts/apply_initiative_migrations.py
Thinh Lam 688fac73e9
CI/CD / backend (push) Failing after 2m8s
CI/CD / frontend (push) Failing after 1m40s
CI/CD / deploy (push) Has been skipped
sciagent code + Gitea Actions CI/CD
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 09:38:30 +07:00

534 lines
19 KiB
Python

"""
Apply idempotent SQL fixes when the DB volume predates newer migrations.
- ``008_audit_events.sql`` when ``audit_events`` is missing (older volumes never
ran ``docker-entrypoint-initdb.d`` for new files).
- ``009_backup_artifact_roles_storage_kind.sql`` when ``storage_kind`` is missing.
- ``010_user_staff_profiles.sql`` + ``011_academic_titles_vn.sql`` when
``academic_titles`` is missing (staff profile / register flow).
- ``013_email_verification.sql`` when ``email_verification_tokens`` is missing.
- ``014_registration_otp.sql`` when ``registration_otp_codes`` is missing.
Run automatically from entrypoint when ``INITIATIVE_DATABASE_URL`` is set.
Standalone:
INITIATIVE_DATABASE_URL=postgresql+asyncpg://user:pass@host:5432/dbname \\
python scripts/apply_initiative_migrations.py
"""
from __future__ import annotations
import asyncio
import os
import sys
from pathlib import Path
def _async_url_to_asyncpg_dsn(url: str) -> str:
u = url.strip()
if "+asyncpg" in u:
u = u.replace("postgresql+asyncpg://", "postgresql://", 1)
return u
def _strip_sql_comments(text: str) -> str:
lines: list[str] = []
for line in text.splitlines():
s = line.strip()
if s.startswith("--"):
continue
lines.append(line)
return "\n".join(lines)
def _split_sql_statements(text: str) -> list[str]:
"""Split on semicolons outside ``$$`` dollar-quoted blocks (008 uses ``DO $$``)."""
statements: list[str] = []
buf: list[str] = []
i = 0
n = len(text)
in_dollar = False
while i < n:
if text.startswith("$$", i):
in_dollar = not in_dollar
buf.append("$$")
i += 2
continue
ch = text[i]
if ch == ";" and not in_dollar:
stmt = "".join(buf).strip()
if stmt:
statements.append(stmt)
buf = []
i += 1
continue
buf.append(ch)
i += 1
tail = "".join(buf).strip()
if tail:
statements.append(tail)
return statements
async def _needs_audit_events_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'audit_events'
LIMIT 1
"""
)
return row is None
async def _needs_backup_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'application_artifacts'
AND column_name = 'storage_kind'
LIMIT 1
"""
)
return row is None
async def _needs_staff_profiles_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'academic_titles'
LIMIT 1
"""
)
return row is None
async def _needs_email_verification_migration(conn) -> bool:
"""True when verification tokens table is missing (013 also adds users.email_verified)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'email_verification_tokens'
LIMIT 1
"""
)
return row is None
async def _needs_registration_otp_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'registration_otp_codes'
LIMIT 1
"""
)
return row is None
async def _needs_document_templates_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'document_templates'
LIMIT 1
"""
)
return row is None
async def _needs_research_projects_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'research_projects'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_datasets_migration(conn) -> bool:
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'imagehub_datasets'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_segmentation_columns_migration(conn) -> bool:
"""True when imagehub_dataset_files lacks the segmentation-link columns (018)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'imagehub_dataset_files'
AND column_name = 'file_kind'
LIMIT 1
"""
)
return row is None
async def _needs_cloud_import_migration(conn) -> bool:
"""True when the cloud-import storage_methods table is absent (019)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'imagehub_storage_methods'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_stages_migration(conn) -> bool:
"""True when the dataset-stages table is absent (020)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'imagehub_dataset_stages'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_tasks_migration(conn) -> bool:
"""True when the per-file task-pipeline table is absent (021)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'imagehub_tasks'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_task_annotations_migration(conn) -> bool:
"""True when imagehub_tasks lacks the annotations column (022)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'imagehub_tasks'
AND column_name = 'annotations'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_members_migration(conn) -> bool:
"""True when the dataset-membership table is absent (023)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'imagehub_dataset_members'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_dataset_project_link_migration(conn) -> bool:
"""True when imagehub_datasets.research_project_id is absent (024)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'imagehub_datasets'
AND column_name = 'research_project_id'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_review_events_migration(conn) -> bool:
"""True when the task-review-events table is absent (025)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'imagehub_task_review_events'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_folder_path_migration(conn) -> bool:
"""True when imagehub_dataset_files.folder_path is absent (026)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'imagehub_dataset_files'
AND column_name = 'folder_path'
LIMIT 1
"""
)
return row is None
async def _needs_imagehub_label_map_migration(conn) -> bool:
"""True when imagehub_datasets.label_map is absent (027)."""
row = await conn.fetchrow(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'imagehub_datasets'
AND column_name = 'label_map'
LIMIT 1
"""
)
return row is None
async def _apply_sql_file(conn, path: Path, label: str) -> None:
body = _strip_sql_comments(path.read_text(encoding="utf-8"))
for stmt in _split_sql_statements(body):
await conn.execute(stmt)
print(f"apply_initiative_migrations: {label} applied.")
async def main() -> int:
raw_url = (os.environ.get("INITIATIVE_DATABASE_URL") or "").strip()
if not raw_url.lower().startswith("postgresql"):
print("apply_initiative_migrations: no PostgreSQL URL; skipping.", file=sys.stderr)
return 0
root = Path(__file__).resolve().parent.parent
m008 = root / "migrations" / "008_audit_events.sql"
m009 = root / "migrations" / "009_backup_artifact_roles_storage_kind.sql"
m010 = root / "migrations" / "010_user_staff_profiles.sql"
m011 = root / "migrations" / "011_academic_titles_vn.sql"
for p in (m008, m009, m010, m011):
if not p.is_file():
print(f"apply_initiative_migrations: missing {p}", file=sys.stderr)
return 1
import asyncpg
dsn = _async_url_to_asyncpg_dsn(raw_url)
conn = await asyncpg.connect(dsn, timeout=60)
try:
if await _needs_audit_events_migration(conn):
print("apply_initiative_migrations: applying 008_audit_events …")
await _apply_sql_file(conn, m008, "008_audit_events")
else:
print("apply_initiative_migrations: audit_events present; OK.")
if await _needs_backup_migration(conn):
print("apply_initiative_migrations: applying 009_backup_artifact_roles_storage_kind …")
await _apply_sql_file(conn, m009, "009_backup_artifact_roles_storage_kind")
else:
print("apply_initiative_migrations: application_artifacts.storage_kind present; OK.")
if await _needs_staff_profiles_migration(conn):
print("apply_initiative_migrations: applying 010_user_staff_profiles …")
await _apply_sql_file(conn, m010, "010_user_staff_profiles")
print("apply_initiative_migrations: applying 011_academic_titles_vn …")
await _apply_sql_file(conn, m011, "011_academic_titles_vn")
else:
print("apply_initiative_migrations: academic_titles present; OK.")
m013 = root / "migrations" / "013_email_verification.sql"
if not m013.is_file():
print(f"apply_initiative_migrations: missing {m013}", file=sys.stderr)
return 1
if await _needs_email_verification_migration(conn):
print("apply_initiative_migrations: applying 013_email_verification …")
await _apply_sql_file(conn, m013, "013_email_verification")
else:
print("apply_initiative_migrations: email_verification_tokens present; OK.")
m014 = root / "migrations" / "014_registration_otp.sql"
if not m014.is_file():
print(f"apply_initiative_migrations: missing {m014}", file=sys.stderr)
return 1
if await _needs_registration_otp_migration(conn):
print("apply_initiative_migrations: applying 014_registration_otp …")
await _apply_sql_file(conn, m014, "014_registration_otp")
else:
print("apply_initiative_migrations: registration_otp_codes present; OK.")
m015 = root / "migrations" / "015_document_templates.sql"
if not m015.is_file():
print(f"apply_initiative_migrations: missing {m015}", file=sys.stderr)
return 1
if await _needs_document_templates_migration(conn):
print("apply_initiative_migrations: applying 015_document_templates …")
await _apply_sql_file(conn, m015, "015_document_templates")
else:
print("apply_initiative_migrations: document_templates present; OK.")
m016 = root / "migrations" / "016_research_projects.sql"
if not m016.is_file():
print(f"apply_initiative_migrations: missing {m016}", file=sys.stderr)
return 1
if await _needs_research_projects_migration(conn):
print("apply_initiative_migrations: applying 016_research_projects …")
await _apply_sql_file(conn, m016, "016_research_projects")
else:
print("apply_initiative_migrations: research_projects present; OK.")
m017 = root / "migrations" / "017_imagehub_datasets.sql"
if not m017.is_file():
print(f"apply_initiative_migrations: missing {m017}", file=sys.stderr)
return 1
if await _needs_imagehub_datasets_migration(conn):
print("apply_initiative_migrations: applying 017_imagehub_datasets …")
await _apply_sql_file(conn, m017, "017_imagehub_datasets")
else:
print("apply_initiative_migrations: imagehub_datasets present; OK.")
m018 = root / "migrations" / "018_imagehub_segmentation_links.sql"
if not m018.is_file():
print(f"apply_initiative_migrations: missing {m018}", file=sys.stderr)
return 1
if await _needs_imagehub_segmentation_columns_migration(conn):
print("apply_initiative_migrations: applying 018_imagehub_segmentation_links …")
await _apply_sql_file(conn, m018, "018_imagehub_segmentation_links")
else:
print("apply_initiative_migrations: imagehub_dataset_files.file_kind present; OK.")
m019 = root / "migrations" / "019_imagehub_cloud_import.sql"
if not m019.is_file():
print(f"apply_initiative_migrations: missing {m019}", file=sys.stderr)
return 1
if await _needs_cloud_import_migration(conn):
print("apply_initiative_migrations: applying 019_imagehub_cloud_import …")
await _apply_sql_file(conn, m019, "019_imagehub_cloud_import")
else:
print("apply_initiative_migrations: imagehub_storage_methods present; OK.")
m020 = root / "migrations" / "020_imagehub_dataset_stages.sql"
if not m020.is_file():
print(f"apply_initiative_migrations: missing {m020}", file=sys.stderr)
return 1
if await _needs_imagehub_stages_migration(conn):
print("apply_initiative_migrations: applying 020_imagehub_dataset_stages …")
await _apply_sql_file(conn, m020, "020_imagehub_dataset_stages")
else:
print("apply_initiative_migrations: imagehub_dataset_stages present; OK.")
m021 = root / "migrations" / "021_imagehub_task_pipeline.sql"
if not m021.is_file():
print(f"apply_initiative_migrations: missing {m021}", file=sys.stderr)
return 1
if await _needs_imagehub_tasks_migration(conn):
print("apply_initiative_migrations: applying 021_imagehub_task_pipeline …")
await _apply_sql_file(conn, m021, "021_imagehub_task_pipeline")
else:
print("apply_initiative_migrations: imagehub_tasks present; OK.")
m022 = root / "migrations" / "022_imagehub_task_annotations.sql"
if not m022.is_file():
print(f"apply_initiative_migrations: missing {m022}", file=sys.stderr)
return 1
if await _needs_imagehub_task_annotations_migration(conn):
print("apply_initiative_migrations: applying 022_imagehub_task_annotations …")
await _apply_sql_file(conn, m022, "022_imagehub_task_annotations")
else:
print("apply_initiative_migrations: imagehub_tasks.annotations present; OK.")
m023 = root / "migrations" / "023_imagehub_dataset_members.sql"
if not m023.is_file():
print(f"apply_initiative_migrations: missing {m023}", file=sys.stderr)
return 1
if await _needs_imagehub_members_migration(conn):
print("apply_initiative_migrations: applying 023_imagehub_dataset_members …")
await _apply_sql_file(conn, m023, "023_imagehub_dataset_members")
else:
print("apply_initiative_migrations: imagehub_dataset_members present; OK.")
m024 = root / "migrations" / "024_imagehub_dataset_project_link.sql"
if not m024.is_file():
print(f"apply_initiative_migrations: missing {m024}", file=sys.stderr)
return 1
if await _needs_imagehub_dataset_project_link_migration(conn):
print("apply_initiative_migrations: applying 024_imagehub_dataset_project_link …")
await _apply_sql_file(conn, m024, "024_imagehub_dataset_project_link")
else:
print("apply_initiative_migrations: imagehub_datasets.research_project_id present; OK.")
m025 = root / "migrations" / "025_imagehub_task_review_events.sql"
if not m025.is_file():
print(f"apply_initiative_migrations: missing {m025}", file=sys.stderr)
return 1
if await _needs_imagehub_review_events_migration(conn):
print("apply_initiative_migrations: applying 025_imagehub_task_review_events …")
await _apply_sql_file(conn, m025, "025_imagehub_task_review_events")
else:
print("apply_initiative_migrations: imagehub_task_review_events present; OK.")
m026 = root / "migrations" / "026_imagehub_file_folder_path.sql"
if not m026.is_file():
print(f"apply_initiative_migrations: missing {m026}", file=sys.stderr)
return 1
if await _needs_imagehub_folder_path_migration(conn):
print("apply_initiative_migrations: applying 026_imagehub_file_folder_path …")
await _apply_sql_file(conn, m026, "026_imagehub_file_folder_path")
else:
print("apply_initiative_migrations: imagehub_dataset_files.folder_path present; OK.")
m027 = root / "migrations" / "027_imagehub_dataset_label_map.sql"
if not m027.is_file():
print(f"apply_initiative_migrations: missing {m027}", file=sys.stderr)
return 1
if await _needs_imagehub_label_map_migration(conn):
print("apply_initiative_migrations: applying 027_imagehub_dataset_label_map …")
await _apply_sql_file(conn, m027, "027_imagehub_dataset_label_map")
else:
print("apply_initiative_migrations: imagehub_datasets.label_map present; OK.")
return 0
except Exception as exc:
print(f"apply_initiative_migrations: FAILED: {exc}", file=sys.stderr)
if os.environ.get("INITIATIVE_DB_STRICT_MIGRATE", "").strip().lower() in ("1", "true", "yes"):
return 1
return 0
finally:
await conn.close()
if __name__ == "__main__":
raise SystemExit(asyncio.run(main()))