Files

94 lines
2.9 KiB
Python

from alembic import context
from sqlalchemy import engine_from_config, pool
from sqlalchemy.ext.asyncio import create_async_engine
from logging.config import fileConfig
import sys, os
# Ensure project root is on sys.path for importing config.settings
PROJECT_ROOT = os.path.dirname(os.path.dirname(__file__))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
try:
from config.settings import settings
except Exception:
settings = None
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = None # not used, explicit migrations only
def _get_urls():
"""Derive a sync SQLAlchemy URL from app settings (async -> sync)."""
async_url = None
sync_url = None
if settings and getattr(settings, 'database_url', None):
async_url = settings.database_url
sync_url = async_url
if async_url.startswith("postgresql+asyncpg://"):
sync_url = async_url.replace("postgresql+asyncpg://", "postgresql://", 1)
return async_url, sync_url
def run_migrations_offline():
async_url, sync_url = _get_urls()
url = sync_url or config.get_main_option("sqlalchemy.url")
if url:
config.set_main_option("sqlalchemy.url", url)
context.configure(
url=url,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
async_url, sync_url = _get_urls()
# If we have an async URL, run migrations via async engine
if async_url and async_url.startswith("postgresql+asyncpg://"):
async def do_run_migrations():
connectable = create_async_engine(async_url, poolclass=pool.NullPool)
async with connectable.connect() as connection:
def sync_migrations(conn):
context.configure(connection=conn)
with context.begin_transaction():
context.run_migrations()
await connection.run_sync(sync_migrations)
import asyncio
asyncio.run(do_run_migrations())
return
# Fallback to sync engine (e.g., if sync URL is provided)
url = sync_url or config.get_main_option("sqlalchemy.url")
if url:
config.set_main_option("sqlalchemy.url", url)
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()