Forward Compatibility
Changes must work with both old and new application versions
Ta treść nie jest jeszcze dostępna w Twoim języku.
Database migrations are the backbone of evolving applications. As your system grows, your database schema must adapt—adding features, improving performance, and fixing design flaws. With AI assistance, you can implement sophisticated migration patterns that minimize risk, ensure data integrity, and enable zero-downtime deployments. This guide covers proven patterns for production database migrations.
Modern database migrations follow these principles:
Forward Compatibility
Changes must work with both old and new application versions
Reversibility
Every migration should have a safe rollback path
Data Integrity
Never lose data, always validate transformations
Zero Downtime
Production systems must stay online during migrations
Track and apply database changes systematically using version numbers.
-- V1__Create_users_table.sqlCREATE TABLE users ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), email VARCHAR(255) NOT NULL UNIQUE, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP);
-- V2__Add_user_profiles.sqlCREATE TABLE user_profiles ( user_id UUID PRIMARY KEY REFERENCES users(id), full_name VARCHAR(255), bio TEXT, avatar_url VARCHAR(500), updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP);
-- V3__Add_email_verification.sqlALTER TABLE usersADD COLUMN email_verified BOOLEAN NOT NULL DEFAULT FALSE,ADD COLUMN verification_token VARCHAR(255),ADD COLUMN verification_sent_at TIMESTAMP;
CREATE INDEX idx_users_verification_tokenON users(verification_token)WHERE verification_token IS NOT NULL;
<?xml version="1.0" encoding="UTF-8"?><databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.0.xsd">
<changeSet id="1" author="developer"> <createTable tableName="users"> <column name="id" type="UUID" defaultValueComputed="gen_random_uuid()"> <constraints primaryKey="true" nullable="false"/> </column> <column name="email" type="VARCHAR(255)"> <constraints nullable="false" unique="true"/> </column> <column name="created_at" type="TIMESTAMP" defaultValueComputed="CURRENT_TIMESTAMP"> <constraints nullable="false"/> </column> </createTable>
<rollback> <dropTable tableName="users"/> </rollback> </changeSet>
<changeSet id="2" author="developer"> <createTable tableName="user_profiles"> <column name="user_id" type="UUID"> <constraints primaryKey="true" nullable="false" foreignKeyName="fk_profiles_users" references="users(id)"/> </column> <column name="full_name" type="VARCHAR(255)"/> <column name="bio" type="TEXT"/> <column name="avatar_url" type="VARCHAR(500)"/> <column name="updated_at" type="TIMESTAMP" defaultValueComputed="CURRENT_TIMESTAMP"> <constraints nullable="false"/> </column> </createTable> </changeSet></databaseChangeLog>
Enable zero-downtime migrations by maintaining backward compatibility during transitions.
Expand Phase: Add new schema elements without removing old ones
-- Add new column alongside old oneALTER TABLE productsADD COLUMN price_cents INTEGER;
-- Populate new column from old dataUPDATE productsSET price_cents = CAST(price * 100 AS INTEGER)WHERE price_cents IS NULL;
Transition Phase: Update application to use both old and new schemas
// Write to both columnsasync function updateProductPrice(productId, price) { await db.query( `UPDATE products SET price = $1, price_cents = $2, updated_at = CURRENT_TIMESTAMP WHERE id = $3`, [price, Math.round(price * 100), productId] );}
// Read with fallbackasync function getProductPrice(productId) { const result = await db.query( 'SELECT price, price_cents FROM products WHERE id = $1', [productId] );
// Prefer new column, fallback to old const cents = result.rows[0].price_cents ?? Math.round(result.rows[0].price * 100); return cents / 100;}
Contract Phase: Remove old schema elements after full migration
-- Ensure all data is migratedUPDATE productsSET price_cents = CAST(price * 100 AS INTEGER)WHERE price_cents IS NULL;
-- Make new column requiredALTER TABLE productsALTER COLUMN price_cents SET NOT NULL;
-- Drop old columnALTER TABLE productsDROP COLUMN price;
Maintain two versions of schema objects for seamless transitions.
Blue-Green Table Migration
-- Step 1: Create new table structureCREATE TABLE orders_v2 ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), customer_id UUID NOT NULL REFERENCES customers(id), order_number VARCHAR(50) NOT NULL UNIQUE, status VARCHAR(50) NOT NULL, total_amount_cents BIGINT NOT NULL, currency VARCHAR(3) NOT NULL DEFAULT 'USD', metadata JSONB DEFAULT '{}', created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP);
-- Step 2: Create triggers to sync dataCREATE OR REPLACE FUNCTION sync_orders_to_v2()RETURNS TRIGGER AS $$BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO orders_v2 ( id, customer_id, order_number, status, total_amount_cents, currency, metadata, created_at, updated_at ) VALUES ( NEW.id, NEW.customer_id, NEW.order_number, NEW.status, CAST(NEW.total_amount * 100 AS BIGINT), 'USD', '{}', NEW.created_at, NEW.updated_at ); ELSIF TG_OP = 'UPDATE' THEN UPDATE orders_v2 SET customer_id = NEW.customer_id, order_number = NEW.order_number, status = NEW.status, total_amount_cents = CAST(NEW.total_amount * 100 AS BIGINT), updated_at = NEW.updated_at WHERE id = NEW.id; ELSIF TG_OP = 'DELETE' THEN DELETE FROM orders_v2 WHERE id = OLD.id; END IF; RETURN NEW;END;$$ LANGUAGE plpgsql;
CREATE TRIGGER sync_orders_triggerAFTER INSERT OR UPDATE OR DELETE ON ordersFOR EACH ROW EXECUTE FUNCTION sync_orders_to_v2();
-- Step 3: Backfill existing dataINSERT INTO orders_v2 ( id, customer_id, order_number, status, total_amount_cents, created_at, updated_at)SELECT id, customer_id, order_number, status, CAST(total_amount * 100 AS BIGINT), created_at, updated_atFROM ordersON CONFLICT (id) DO NOTHING;
-- Step 4: Switch application to use new table-- Update application configuration
-- Step 5: Clean upDROP TRIGGER sync_orders_trigger ON orders;DROP FUNCTION sync_orders_to_v2();ALTER TABLE orders RENAME TO orders_old;ALTER TABLE orders_v2 RENAME TO orders;
Ensure data integrity during complex transformations.
// AI-assisted data migration with validationclass DataMigrator { constructor(db, logger) { this.db = db; this.logger = logger; this.batchSize = 1000; }
async migrateWithValidation(config) { const { sourceTable, targetTable, transform, validate, idColumn = 'id' } = config;
let offset = 0; let totalMigrated = 0; let errors = [];
// Start transaction for each batch while (true) { const batch = await this.fetchBatch( sourceTable, idColumn, offset, this.batchSize );
if (batch.length === 0) break;
const migrationResult = await this.migrateBatch( batch, targetTable, transform, validate );
totalMigrated += migrationResult.successful; errors.push(...migrationResult.errors);
this.logger.info('Migration progress', { offset, batchSize: batch.length, successful: migrationResult.successful, failed: migrationResult.errors.length, totalMigrated });
offset += this.batchSize;
// Add delay to avoid overwhelming the database await this.sleep(100); }
return { totalMigrated, errors, success: errors.length === 0 }; }
async migrateBatch(batch, targetTable, transform, validate) { const successful = 0; const errors = [];
await this.db.transaction(async (trx) => { for (const record of batch) { try { // Transform the data const transformed = await transform(record);
// Validate before inserting const validationResult = await validate(transformed); if (!validationResult.valid) { errors.push({ id: record.id, error: 'Validation failed', details: validationResult.errors }); continue; }
// Insert into target table await trx(targetTable).insert(transformed); successful++; } catch (error) { errors.push({ id: record.id, error: error.message, stack: error.stack }); } } });
return { successful, errors }; }}
// Usage exampleconst migrator = new DataMigrator(db, logger);
await migrator.migrateWithValidation({ sourceTable: 'legacy_users', targetTable: 'users', transform: async (legacyUser) => ({ id: generateUUID(), email: legacyUser.email_address.toLowerCase(), username: legacyUser.user_name || generateUsername(legacyUser.email_address), full_name: `${legacyUser.first_name} ${legacyUser.last_name}`.trim(), phone: normalizePhoneNumber(legacyUser.phone), created_at: legacyUser.registration_date, legacy_id: legacyUser.id }), validate: async (user) => { const errors = [];
if (!isValidEmail(user.email)) { errors.push('Invalid email format'); }
if (user.username.length < 3) { errors.push('Username too short'); }
// Check for duplicates const existing = await db('users') .where('email', user.email) .first(); if (existing) { errors.push('Email already exists'); }
return { valid: errors.length === 0, errors }; }});
Test migrations safely using shadow tables before applying to production.
-- Create shadow table for testingCREATE TABLE users_shadow (LIKE users INCLUDING ALL);
-- Copy data to shadow tableINSERT INTO users_shadow SELECT * FROM users;
-- Test migration on shadow tableALTER TABLE users_shadowADD COLUMN preferences JSONB DEFAULT '{}',ADD COLUMN last_login_at TIMESTAMP;
-- Validate shadow tableDO $$DECLARE original_count INTEGER; shadow_count INTEGER;BEGIN SELECT COUNT(*) INTO original_count FROM users; SELECT COUNT(*) INTO shadow_count FROM users_shadow;
IF original_count != shadow_count THEN RAISE EXCEPTION 'Row count mismatch: % vs %', original_count, shadow_count; END IF;
-- Additional validation queries PERFORM 1 FROM users_shadow WHERE preferences IS NULL LIMIT 1;
IF FOUND THEN RAISE EXCEPTION 'Found NULL preferences'; END IF;END $$;
-- If validation passes, apply to productionALTER TABLE usersADD COLUMN preferences JSONB DEFAULT '{}',ADD COLUMN last_login_at TIMESTAMP;
-- Clean upDROP TABLE users_shadow;
Perform schema changes without locking tables.
// MySQL/Percona online schema change wrapperclass OnlineSchemaChange { constructor(db, config = {}) { this.db = db; this.config = { maxLagSeconds: 5, checkInterval: 1000, ...config }; }
async alterTable(tableName, alterStatement, options = {}) { const tempTable = `${tableName}_new`; const oldTable = `${tableName}_old`;
try { // Step 1: Create new table with desired schema await this.createTableLike(tableName, tempTable); await this.db.query(`ALTER TABLE ${tempTable} ${alterStatement}`);
// Step 2: Set up triggers for data sync await this.createSyncTriggers(tableName, tempTable);
// Step 3: Copy existing data in batches await this.copyDataInBatches(tableName, tempTable, options);
// Step 4: Atomic table swap await this.db.transaction(async (trx) => { await trx.raw(`RENAME TABLE ${tableName} TO ${oldTable}, ${tempTable} TO ${tableName}`); });
// Step 5: Clean up await this.dropTriggers(oldTable);
// Keep old table for rollback option this.logger.info(`Migration complete. Old table preserved as ${oldTable}`);
} catch (error) { // Rollback await this.cleanup(tempTable, tableName); throw error; } }
async copyDataInBatches(source, target, options) { const batchSize = options.batchSize || 1000; let lastId = 0;
while (true) { // Check replication lag const lag = await this.getReplicationLag(); if (lag > this.config.maxLagSeconds) { await this.sleep(this.config.checkInterval); continue; }
// Copy batch const result = await this.db.query(` INSERT INTO ${target} SELECT * FROM ${source} WHERE id > ? ORDER BY id LIMIT ? `, [lastId, batchSize]);
if (result.affectedRows === 0) break;
// Update progress lastId = await this.getMaxId(target);
// Throttle to avoid overwhelming the database await this.sleep(100); } }
async createSyncTriggers(source, target) { // Insert trigger await this.db.query(` CREATE TRIGGER ${source}_insert_sync AFTER INSERT ON ${source} FOR EACH ROW BEGIN INSERT INTO ${target} SELECT NEW.*; END `);
// Update trigger await this.db.query(` CREATE TRIGGER ${source}_update_sync AFTER UPDATE ON ${source} FOR EACH ROW BEGIN REPLACE INTO ${target} SELECT NEW.*; END `);
// Delete trigger await this.db.query(` CREATE TRIGGER ${source}_delete_sync AFTER DELETE ON ${source} FOR EACH ROW BEGIN DELETE FROM ${target} WHERE id = OLD.id; END `); }}
Control migration rollout using feature flags.
// Feature flag controlled migrationsclass FeatureFlagMigration { constructor(db, featureFlags) { this.db = db; this.featureFlags = featureFlags; }
async runConditionalMigration(migrationName, migration) { const flagName = `migration_${migrationName}`;
// Check if migration should run if (!await this.featureFlags.isEnabled(flagName)) { this.logger.info(`Migration ${migrationName} skipped (flag disabled)`); return; }
// Check if already applied const applied = await this.db('schema_migrations') .where('migration', migrationName) .first();
if (applied) { this.logger.info(`Migration ${migrationName} already applied`); return; }
// Run migration with monitoring const start = Date.now();
try { await this.db.transaction(async (trx) => { // Execute migration await migration(trx);
// Record completion await trx('schema_migrations').insert({ migration: migrationName, applied_at: new Date(), execution_time_ms: Date.now() - start }); });
this.logger.info(`Migration ${migrationName} completed`, { duration: Date.now() - start });
} catch (error) { this.logger.error(`Migration ${migrationName} failed`, { error: error.message, duration: Date.now() - start });
// Disable feature flag on failure await this.featureFlags.disable(flagName); throw error; } }}
// Usageconst migrator = new FeatureFlagMigration(db, featureFlags);
await migrator.runConditionalMigration('add_user_preferences', async (trx) => { await trx.schema.alterTable('users', (table) => { table.jsonb('preferences').defaultTo('{}'); table.index('preferences', 'idx_users_preferences', 'GIN'); });});
// Migration test frameworkclass MigrationTester { constructor() { this.testDb = null; }
async testMigration(migration, testCases) { // Setup test database await this.setupTestDatabase();
try { // Apply migration await migration.up(this.testDb);
// Run test cases for (const testCase of testCases) { await this.runTestCase(testCase); }
// Test rollback if available if (migration.down) { await migration.down(this.testDb); await this.verifyRollback(); }
} finally { await this.teardownTestDatabase(); } }
async runTestCase(testCase) { const { name, setup, verify } = testCase;
console.log(`Running test: ${name}`);
// Setup test data if (setup) { await setup(this.testDb); }
// Verify expectations await verify(this.testDb); }}
// Example testconst tester = new MigrationTester();
await tester.testMigration(addUserPreferencesMigration, [ { name: 'Should add preferences column with default value', setup: async (db) => { await db('users').insert({ email: 'test@example.com', username: 'testuser' }); }, verify: async (db) => { const user = await db('users') .where('email', 'test@example.com') .first();
expect(user.preferences).toEqual({}); } }, { name: 'Should preserve existing data', setup: async (db) => { const countBefore = await db('users').count(); return { countBefore }; }, verify: async (db, { countBefore }) => { const countAfter = await db('users').count(); expect(countAfter).toEqual(countBefore); } }]);
Version Everything
Test Thoroughly
Monitor and Measure
Plan for Failure
Before running any migration in production:
// Migration monitoring dashboardclass MigrationMonitor { async getDashboardMetrics() { return { pending: await this.getPendingMigrations(), recent: await this.getRecentMigrations(), failed: await this.getFailedMigrations(), performance: await this.getMigrationPerformance(), health: await this.getSchemaHealth() }; }
async getMigrationPerformance() { const metrics = await this.db('schema_migrations') .select('migration', 'execution_time_ms', 'applied_at') .orderBy('applied_at', 'desc') .limit(20);
return { averageTime: avg(metrics.map(m => m.execution_time_ms)), slowest: metrics.sort((a, b) => b.execution_time_ms - a.execution_time_ms)[0], trend: this.calculateTrend(metrics) }; }}
Master database migrations with: