f1tness_parser/migrations/runner.py

49 lines
1.9 KiB
Python

import logging
from pathlib import Path
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
class MigrationRunner:
MIGRATIONS_SCHEMA_NAME: str = "schema_migrations"
def __init__(self, database_url: str) -> None:
self.engine = create_async_engine(database_url)
self.migrations_dir = Path("migrations/sql")
async def get_applied_migrations(self) -> set:
"""Get list of applied migrations"""
async with self.engine.begin() as conn:
# Create migrations list table if not exists
await conn.execute(
text(f"""
CREATE TABLE IF NOT EXISTS {self.MIGRATIONS_SCHEMA_NAME} (
version VARCHAR(50) PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
)
# Receiving list of applied migrations
result = await conn.execute(
text(f"SELECT version FROM {self.MIGRATIONS_SCHEMA_NAME}")
)
return {row[0] for row in result.fetchall()}
async def run_migrations(self):
"""Run all unapplied migrations"""
applied = await self.get_applied_migrations()
# Getting all sql files from migrations_dir
# TODO: (#ToLearn) Read about Path.glob function
migration_files = sorted([file for file in self.migrations_dir.glob(".*sql")])
for migration_file in migration_files:
# TODO: (#ToLearn) Read about stem property
migration_name = migration_file.stem
if migration_name not in applied:
logging.info(f"Applying migration: {migration_name}")
await self._apply_migration(migration_file, migration_name)
else:
logging.info(f"Skipping migration: {migration_name} (already applied)")