Przejdź do głównej zawartości

Database Migration Patterns

Ta treść nie jest jeszcze dostępna w Twoim języku.

Database migrations are the backbone of evolving applications. As your system grows, your database schema must adapt—adding features, improving performance, and fixing design flaws. With AI assistance, you can implement sophisticated migration patterns that minimize risk, ensure data integrity, and enable zero-downtime deployments. This guide covers proven patterns for production database migrations.

Modern database migrations follow these principles:

Forward Compatibility

Changes must work with both old and new application versions

Reversibility

Every migration should have a safe rollback path

Data Integrity

Never lose data, always validate transformations

Zero Downtime

Production systems must stay online during migrations

Track and apply database changes systematically using version numbers.

-- V1__Create_users_table.sql
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- V2__Add_user_profiles.sql
CREATE TABLE user_profiles (
user_id UUID PRIMARY KEY REFERENCES users(id),
full_name VARCHAR(255),
bio TEXT,
avatar_url VARCHAR(500),
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- V3__Add_email_verification.sql
ALTER TABLE users
ADD COLUMN email_verified BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN verification_token VARCHAR(255),
ADD COLUMN verification_sent_at TIMESTAMP;
CREATE INDEX idx_users_verification_token
ON users(verification_token)
WHERE verification_token IS NOT NULL;

Enable zero-downtime migrations by maintaining backward compatibility during transitions.

  1. Expand Phase: Add new schema elements without removing old ones

    -- Add new column alongside old one
    ALTER TABLE products
    ADD COLUMN price_cents INTEGER;
    -- Populate new column from old data
    UPDATE products
    SET price_cents = CAST(price * 100 AS INTEGER)
    WHERE price_cents IS NULL;
  2. Transition Phase: Update application to use both old and new schemas

    // Write to both columns
    async function updateProductPrice(productId, price) {
    await db.query(
    `UPDATE products
    SET price = $1,
    price_cents = $2,
    updated_at = CURRENT_TIMESTAMP
    WHERE id = $3`,
    [price, Math.round(price * 100), productId]
    );
    }
    // Read with fallback
    async function getProductPrice(productId) {
    const result = await db.query(
    'SELECT price, price_cents FROM products WHERE id = $1',
    [productId]
    );
    // Prefer new column, fallback to old
    const cents = result.rows[0].price_cents ??
    Math.round(result.rows[0].price * 100);
    return cents / 100;
    }
  3. Contract Phase: Remove old schema elements after full migration

    -- Ensure all data is migrated
    UPDATE products
    SET price_cents = CAST(price * 100 AS INTEGER)
    WHERE price_cents IS NULL;
    -- Make new column required
    ALTER TABLE products
    ALTER COLUMN price_cents SET NOT NULL;
    -- Drop old column
    ALTER TABLE products
    DROP COLUMN price;

Maintain two versions of schema objects for seamless transitions.

Blue-Green Table Migration

-- Step 1: Create new table structure
CREATE TABLE orders_v2 (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES customers(id),
order_number VARCHAR(50) NOT NULL UNIQUE,
status VARCHAR(50) NOT NULL,
total_amount_cents BIGINT NOT NULL,
currency VARCHAR(3) NOT NULL DEFAULT 'USD',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Step 2: Create triggers to sync data
CREATE OR REPLACE FUNCTION sync_orders_to_v2()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO orders_v2 (
id, customer_id, order_number, status,
total_amount_cents, currency, metadata,
created_at, updated_at
) VALUES (
NEW.id, NEW.customer_id, NEW.order_number, NEW.status,
CAST(NEW.total_amount * 100 AS BIGINT), 'USD', '{}',
NEW.created_at, NEW.updated_at
);
ELSIF TG_OP = 'UPDATE' THEN
UPDATE orders_v2 SET
customer_id = NEW.customer_id,
order_number = NEW.order_number,
status = NEW.status,
total_amount_cents = CAST(NEW.total_amount * 100 AS BIGINT),
updated_at = NEW.updated_at
WHERE id = NEW.id;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM orders_v2 WHERE id = OLD.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER sync_orders_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION sync_orders_to_v2();
-- Step 3: Backfill existing data
INSERT INTO orders_v2 (
id, customer_id, order_number, status,
total_amount_cents, created_at, updated_at
)
SELECT
id, customer_id, order_number, status,
CAST(total_amount * 100 AS BIGINT),
created_at, updated_at
FROM orders
ON CONFLICT (id) DO NOTHING;
-- Step 4: Switch application to use new table
-- Update application configuration
-- Step 5: Clean up
DROP TRIGGER sync_orders_trigger ON orders;
DROP FUNCTION sync_orders_to_v2();
ALTER TABLE orders RENAME TO orders_old;
ALTER TABLE orders_v2 RENAME TO orders;

Ensure data integrity during complex transformations.

// AI-assisted data migration with validation
class DataMigrator {
constructor(db, logger) {
this.db = db;
this.logger = logger;
this.batchSize = 1000;
}
async migrateWithValidation(config) {
const {
sourceTable,
targetTable,
transform,
validate,
idColumn = 'id'
} = config;
let offset = 0;
let totalMigrated = 0;
let errors = [];
// Start transaction for each batch
while (true) {
const batch = await this.fetchBatch(
sourceTable,
idColumn,
offset,
this.batchSize
);
if (batch.length === 0) break;
const migrationResult = await this.migrateBatch(
batch,
targetTable,
transform,
validate
);
totalMigrated += migrationResult.successful;
errors.push(...migrationResult.errors);
this.logger.info('Migration progress', {
offset,
batchSize: batch.length,
successful: migrationResult.successful,
failed: migrationResult.errors.length,
totalMigrated
});
offset += this.batchSize;
// Add delay to avoid overwhelming the database
await this.sleep(100);
}
return {
totalMigrated,
errors,
success: errors.length === 0
};
}
async migrateBatch(batch, targetTable, transform, validate) {
const successful = 0;
const errors = [];
await this.db.transaction(async (trx) => {
for (const record of batch) {
try {
// Transform the data
const transformed = await transform(record);
// Validate before inserting
const validationResult = await validate(transformed);
if (!validationResult.valid) {
errors.push({
id: record.id,
error: 'Validation failed',
details: validationResult.errors
});
continue;
}
// Insert into target table
await trx(targetTable).insert(transformed);
successful++;
} catch (error) {
errors.push({
id: record.id,
error: error.message,
stack: error.stack
});
}
}
});
return { successful, errors };
}
}
// Usage example
const migrator = new DataMigrator(db, logger);
await migrator.migrateWithValidation({
sourceTable: 'legacy_users',
targetTable: 'users',
transform: async (legacyUser) => ({
id: generateUUID(),
email: legacyUser.email_address.toLowerCase(),
username: legacyUser.user_name || generateUsername(legacyUser.email_address),
full_name: `${legacyUser.first_name} ${legacyUser.last_name}`.trim(),
phone: normalizePhoneNumber(legacyUser.phone),
created_at: legacyUser.registration_date,
legacy_id: legacyUser.id
}),
validate: async (user) => {
const errors = [];
if (!isValidEmail(user.email)) {
errors.push('Invalid email format');
}
if (user.username.length < 3) {
errors.push('Username too short');
}
// Check for duplicates
const existing = await db('users')
.where('email', user.email)
.first();
if (existing) {
errors.push('Email already exists');
}
return {
valid: errors.length === 0,
errors
};
}
});

Test migrations safely using shadow tables before applying to production.

-- Create shadow table for testing
CREATE TABLE users_shadow (LIKE users INCLUDING ALL);
-- Copy data to shadow table
INSERT INTO users_shadow SELECT * FROM users;
-- Test migration on shadow table
ALTER TABLE users_shadow
ADD COLUMN preferences JSONB DEFAULT '{}',
ADD COLUMN last_login_at TIMESTAMP;
-- Validate shadow table
DO $$
DECLARE
original_count INTEGER;
shadow_count INTEGER;
BEGIN
SELECT COUNT(*) INTO original_count FROM users;
SELECT COUNT(*) INTO shadow_count FROM users_shadow;
IF original_count != shadow_count THEN
RAISE EXCEPTION 'Row count mismatch: % vs %',
original_count, shadow_count;
END IF;
-- Additional validation queries
PERFORM 1 FROM users_shadow
WHERE preferences IS NULL
LIMIT 1;
IF FOUND THEN
RAISE EXCEPTION 'Found NULL preferences';
END IF;
END $$;
-- If validation passes, apply to production
ALTER TABLE users
ADD COLUMN preferences JSONB DEFAULT '{}',
ADD COLUMN last_login_at TIMESTAMP;
-- Clean up
DROP TABLE users_shadow;

Perform schema changes without locking tables.

// MySQL/Percona online schema change wrapper
class OnlineSchemaChange {
constructor(db, config = {}) {
this.db = db;
this.config = {
maxLagSeconds: 5,
checkInterval: 1000,
...config
};
}
async alterTable(tableName, alterStatement, options = {}) {
const tempTable = `${tableName}_new`;
const oldTable = `${tableName}_old`;
try {
// Step 1: Create new table with desired schema
await this.createTableLike(tableName, tempTable);
await this.db.query(`ALTER TABLE ${tempTable} ${alterStatement}`);
// Step 2: Set up triggers for data sync
await this.createSyncTriggers(tableName, tempTable);
// Step 3: Copy existing data in batches
await this.copyDataInBatches(tableName, tempTable, options);
// Step 4: Atomic table swap
await this.db.transaction(async (trx) => {
await trx.raw(`RENAME TABLE
${tableName} TO ${oldTable},
${tempTable} TO ${tableName}`);
});
// Step 5: Clean up
await this.dropTriggers(oldTable);
// Keep old table for rollback option
this.logger.info(`Migration complete. Old table preserved as ${oldTable}`);
} catch (error) {
// Rollback
await this.cleanup(tempTable, tableName);
throw error;
}
}
async copyDataInBatches(source, target, options) {
const batchSize = options.batchSize || 1000;
let lastId = 0;
while (true) {
// Check replication lag
const lag = await this.getReplicationLag();
if (lag > this.config.maxLagSeconds) {
await this.sleep(this.config.checkInterval);
continue;
}
// Copy batch
const result = await this.db.query(`
INSERT INTO ${target}
SELECT * FROM ${source}
WHERE id > ?
ORDER BY id
LIMIT ?
`, [lastId, batchSize]);
if (result.affectedRows === 0) break;
// Update progress
lastId = await this.getMaxId(target);
// Throttle to avoid overwhelming the database
await this.sleep(100);
}
}
async createSyncTriggers(source, target) {
// Insert trigger
await this.db.query(`
CREATE TRIGGER ${source}_insert_sync
AFTER INSERT ON ${source}
FOR EACH ROW
BEGIN
INSERT INTO ${target} SELECT NEW.*;
END
`);
// Update trigger
await this.db.query(`
CREATE TRIGGER ${source}_update_sync
AFTER UPDATE ON ${source}
FOR EACH ROW
BEGIN
REPLACE INTO ${target} SELECT NEW.*;
END
`);
// Delete trigger
await this.db.query(`
CREATE TRIGGER ${source}_delete_sync
AFTER DELETE ON ${source}
FOR EACH ROW
BEGIN
DELETE FROM ${target} WHERE id = OLD.id;
END
`);
}
}

Control migration rollout using feature flags.

// Feature flag controlled migrations
class FeatureFlagMigration {
constructor(db, featureFlags) {
this.db = db;
this.featureFlags = featureFlags;
}
async runConditionalMigration(migrationName, migration) {
const flagName = `migration_${migrationName}`;
// Check if migration should run
if (!await this.featureFlags.isEnabled(flagName)) {
this.logger.info(`Migration ${migrationName} skipped (flag disabled)`);
return;
}
// Check if already applied
const applied = await this.db('schema_migrations')
.where('migration', migrationName)
.first();
if (applied) {
this.logger.info(`Migration ${migrationName} already applied`);
return;
}
// Run migration with monitoring
const start = Date.now();
try {
await this.db.transaction(async (trx) => {
// Execute migration
await migration(trx);
// Record completion
await trx('schema_migrations').insert({
migration: migrationName,
applied_at: new Date(),
execution_time_ms: Date.now() - start
});
});
this.logger.info(`Migration ${migrationName} completed`, {
duration: Date.now() - start
});
} catch (error) {
this.logger.error(`Migration ${migrationName} failed`, {
error: error.message,
duration: Date.now() - start
});
// Disable feature flag on failure
await this.featureFlags.disable(flagName);
throw error;
}
}
}
// Usage
const migrator = new FeatureFlagMigration(db, featureFlags);
await migrator.runConditionalMigration('add_user_preferences', async (trx) => {
await trx.schema.alterTable('users', (table) => {
table.jsonb('preferences').defaultTo('{}');
table.index('preferences', 'idx_users_preferences', 'GIN');
});
});
// Migration test framework
class MigrationTester {
constructor() {
this.testDb = null;
}
async testMigration(migration, testCases) {
// Setup test database
await this.setupTestDatabase();
try {
// Apply migration
await migration.up(this.testDb);
// Run test cases
for (const testCase of testCases) {
await this.runTestCase(testCase);
}
// Test rollback if available
if (migration.down) {
await migration.down(this.testDb);
await this.verifyRollback();
}
} finally {
await this.teardownTestDatabase();
}
}
async runTestCase(testCase) {
const { name, setup, verify } = testCase;
console.log(`Running test: ${name}`);
// Setup test data
if (setup) {
await setup(this.testDb);
}
// Verify expectations
await verify(this.testDb);
}
}
// Example test
const tester = new MigrationTester();
await tester.testMigration(addUserPreferencesMigration, [
{
name: 'Should add preferences column with default value',
setup: async (db) => {
await db('users').insert({
email: 'test@example.com',
username: 'testuser'
});
},
verify: async (db) => {
const user = await db('users')
.where('email', 'test@example.com')
.first();
expect(user.preferences).toEqual({});
}
},
{
name: 'Should preserve existing data',
setup: async (db) => {
const countBefore = await db('users').count();
return { countBefore };
},
verify: async (db, { countBefore }) => {
const countAfter = await db('users').count();
expect(countAfter).toEqual(countBefore);
}
}
]);

Version Everything

  • Track all schema changes in version control
  • Use sequential versioning
  • Never modify existing migrations
  • Document migration purpose and risks

Test Thoroughly

  • Test on production-like data volumes
  • Verify data integrity after migration
  • Test rollback procedures
  • Use shadow tables for validation

Monitor and Measure

  • Track migration execution time
  • Monitor database performance
  • Set up alerts for long-running migrations
  • Log all migration activities

Plan for Failure

  • Always have a rollback plan
  • Take backups before major changes
  • Use transactions where possible
  • Implement circuit breakers for migrations

Before running any migration in production:

  • Backup created and verified
  • Migration tested on staging with production-like data
  • Rollback plan documented and tested
  • Performance impact assessed
  • Application compatibility verified
  • Monitoring alerts configured
  • Team notified of migration window
  • Feature flags configured (if applicable)
// Migration monitoring dashboard
class MigrationMonitor {
async getDashboardMetrics() {
return {
pending: await this.getPendingMigrations(),
recent: await this.getRecentMigrations(),
failed: await this.getFailedMigrations(),
performance: await this.getMigrationPerformance(),
health: await this.getSchemaHealth()
};
}
async getMigrationPerformance() {
const metrics = await this.db('schema_migrations')
.select('migration', 'execution_time_ms', 'applied_at')
.orderBy('applied_at', 'desc')
.limit(20);
return {
averageTime: avg(metrics.map(m => m.execution_time_ms)),
slowest: metrics.sort((a, b) => b.execution_time_ms - a.execution_time_ms)[0],
trend: this.calculateTrend(metrics)
};
}
}

Master database migrations with: