f1tness_parser/migrations/runner.py

70 lines
2.7 KiB
Python

import logging
from pathlib import Path
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
class MigrationRunner:
MIGRATIONS_SCHEMA_NAME: str = "schema_migrations"
def __init__(self, database_url: str) -> None:
self.engine = create_async_engine(database_url)
self.migrations_dir = Path("migrations/sql")
logging.error(f"{database_url}")
async def get_applied_migrations(self) -> set:
"""Get list of applied migrations"""
async with self.engine.begin() as conn:
# Create migrations list table if not exists
await conn.execute(
text(f"""
CREATE TABLE IF NOT EXISTS {self.MIGRATIONS_SCHEMA_NAME} (
version VARCHAR(50) PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
)
# Receiving list of applied migrations
result = await conn.execute(
text(f"SELECT version FROM {self.MIGRATIONS_SCHEMA_NAME}")
)
return {row[0] for row in result.fetchall()}
async def run_migrations(self):
"""Run all unapplied migrations"""
applied = await self.get_applied_migrations()
logging.info(f"Applied migrations: {applied}")
# Getting all sql files from migrations_dir
# TODO: (#ToLearn) Read about Path.glob function
migration_files = sorted([file for file in self.migrations_dir.glob(".*sql")])
for migration_file in migration_files:
# TODO: (#ToLearn) Read about stem property
migration_name = migration_file.stem
if migration_name not in applied:
logging.info(f"Applying migration: {migration_name}")
await self._apply_migration(migration_file, migration_name)
else:
logging.info(f"Skipping migration: {migration_name} (already applied)")
async def _apply_migration(self, migration_file: Path, migration_name: str):
"""Apply migrations from migrations/sql folder"""
with open(migration_file, 'r', encoding='utf-8') as f:
sql_content = f.read()
async with self.engine.begin() as conn:
try:
statements = [s.strip() for s in sql_content.split(';') if s.strip()]
for statement in statements:
if statement:
await conn.execute(text(statement))
logging.info(f"Migration {migration_name} applied successfully")
except Exception as e:
logging.error(f"Error applying migration {migration_name}: {e}")
raise