Skip to content

Database Operations

Databases are the memory of your applications. Whether you’re designing schemas, writing complex queries, or orchestrating migrations, Claude Code transforms database work from error-prone SQL wrestling into intelligent data engineering. This lesson explores how to leverage AI assistance for all aspects of database operations.

Scenario: Your e-commerce platform’s database is struggling. Queries that took milliseconds now take seconds. The schema is a mess of denormalized tables. Migrations are Russian roulette. You need to scale from 10K to 1M users. Traditional approach: hire a DBA. With Claude Code: become one.

Week 1: Analysis
- Manually profile slow queries
- Draw ER diagrams by hand
- Guess at missing indexes
Week 2-3: Schema Redesign
- Write migration scripts
- Test on staging (hope for the best)
- Fix broken constraints
Week 4: Optimization
- Add indexes one by one
- Rewrite queries manually
- Monitor performance
Result: Marginal improvements, high risk
> Design a database schema for an e-commerce platform with:
> - Multi-vendor support
> - Product variants (size, color)
> - Inventory tracking
> - Order management
> - Customer reviews
> - Wishlist functionality
> Follow best practices for normalization and performance

Claude generates a comprehensive schema:

Core Tables

-- Users table (customers and vendors)
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
user_type ENUM('customer', 'vendor', 'admin') NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Vendors table (extends users)
CREATE TABLE vendors (
user_id UUID PRIMARY KEY REFERENCES users(id),
business_name VARCHAR(255) NOT NULL,
description TEXT,
logo_url VARCHAR(500),
verified BOOLEAN DEFAULT FALSE,
commission_rate DECIMAL(5,2) DEFAULT 10.00
);
-- Products table
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vendor_id UUID NOT NULL REFERENCES vendors(user_id),
name VARCHAR(255) NOT NULL,
description TEXT,
base_price DECIMAL(10,2) NOT NULL,
category_id INTEGER REFERENCES categories(id),
status ENUM('active', 'inactive', 'out_of_stock') DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_vendor_status (vendor_id, status),
FULLTEXT INDEX idx_search (name, description)
);

Inventory & Variants

-- Product variants (size, color, etc.)
CREATE TABLE product_variants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_id UUID NOT NULL REFERENCES products(id),
sku VARCHAR(100) UNIQUE NOT NULL,
attributes JSONB NOT NULL, -- {"size": "L", "color": "Blue"}
price_modifier DECIMAL(10,2) DEFAULT 0,
weight_grams INTEGER,
INDEX idx_product_attributes (product_id, attributes)
);
-- Inventory tracking
CREATE TABLE inventory (
variant_id UUID PRIMARY KEY REFERENCES product_variants(id),
quantity INTEGER NOT NULL DEFAULT 0,
reserved_quantity INTEGER NOT NULL DEFAULT 0,
reorder_point INTEGER DEFAULT 10,
reorder_quantity INTEGER DEFAULT 100,
last_restocked TIMESTAMP,
CHECK (quantity >= reserved_quantity)
);
-- Inventory movements for audit trail
CREATE TABLE inventory_movements (
id BIGSERIAL PRIMARY KEY,
variant_id UUID NOT NULL REFERENCES product_variants(id),
movement_type ENUM('restock', 'sale', 'return', 'adjustment') NOT NULL,
quantity INTEGER NOT NULL,
reference_type VARCHAR(50), -- 'order', 'return', 'manual'
reference_id UUID,
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by UUID REFERENCES users(id)
);

Orders & Transactions

-- Orders table
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES users(id),
status ENUM('pending', 'processing', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
subtotal DECIMAL(10,2) NOT NULL,
tax DECIMAL(10,2) NOT NULL DEFAULT 0,
shipping DECIMAL(10,2) NOT NULL DEFAULT 0,
total DECIMAL(10,2) NOT NULL,
shipping_address JSONB NOT NULL,
billing_address JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_customer_status (customer_id, status),
INDEX idx_created (created_at DESC)
);
-- Order items
CREATE TABLE order_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL REFERENCES orders(id),
variant_id UUID NOT NULL REFERENCES product_variants(id),
vendor_id UUID NOT NULL REFERENCES vendors(user_id),
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL,
commission DECIMAL(10,2) NOT NULL,
status ENUM('pending', 'confirmed', 'shipped', 'delivered', 'returned') DEFAULT 'pending',
INDEX idx_order (order_id),
INDEX idx_vendor_status (vendor_id, status)
);

Reviews & Wishlists

-- Product reviews
CREATE TABLE reviews (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_id UUID NOT NULL REFERENCES products(id),
customer_id UUID NOT NULL REFERENCES users(id),
order_item_id UUID REFERENCES order_items(id),
rating INTEGER NOT NULL CHECK (rating >= 1 AND rating <= 5),
title VARCHAR(255),
comment TEXT,
verified_purchase BOOLEAN DEFAULT FALSE,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_review (product_id, customer_id),
INDEX idx_product_rating (product_id, rating)
);
-- Wishlists
CREATE TABLE wishlists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES users(id),
name VARCHAR(255) NOT NULL DEFAULT 'My Wishlist',
is_public BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_customer (customer_id)
);
-- Wishlist items
CREATE TABLE wishlist_items (
wishlist_id UUID NOT NULL REFERENCES wishlists(id) ON DELETE CASCADE,
product_id UUID NOT NULL REFERENCES products(id),
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (wishlist_id, product_id)
);
> Analyze the schema and suggest optimal indexes for:
> - Common query patterns
> - Performance optimization
> - Covering indexes where beneficial
> - Avoiding over-indexing
-- Performance-critical indexes
CREATE INDEX idx_orders_customer_created
ON orders(customer_id, created_at DESC)
INCLUDE (status, total);
CREATE INDEX idx_products_vendor_category
ON products(vendor_id, category_id)
WHERE status = 'active';
CREATE INDEX idx_inventory_low_stock
ON inventory(variant_id)
WHERE quantity <= reorder_point;
-- Composite indexes for common joins
CREATE INDEX idx_order_items_complete
ON order_items(order_id, vendor_id, status)
INCLUDE (total_price, commission);
-- Partial indexes for specific queries
CREATE INDEX idx_reviews_high_rating
ON reviews(product_id, rating)
WHERE rating >= 4;
-- JSON indexes for flexible queries
CREATE INDEX idx_variant_attributes_gin
ON product_variants USING GIN (attributes);
> Write an optimized query to find:
> - Top 10 best-selling products this month
> - Include vendor info and average rating
> - Show current inventory levels
> - Calculate revenue and profit margins
WITH monthly_sales AS (
SELECT
pv.product_id,
SUM(oi.quantity) as units_sold,
SUM(oi.total_price) as revenue,
SUM(oi.total_price - oi.commission) as vendor_revenue
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
JOIN product_variants pv ON oi.variant_id = pv.id
WHERE o.created_at >= DATE_TRUNC('month', CURRENT_DATE)
AND o.status NOT IN ('cancelled', 'pending')
GROUP BY pv.product_id
),
product_ratings AS (
SELECT
product_id,
AVG(rating) as avg_rating,
COUNT(*) as review_count
FROM reviews
GROUP BY product_id
),
current_inventory AS (
SELECT
pv.product_id,
SUM(i.quantity) as total_stock,
SUM(i.quantity - i.reserved_quantity) as available_stock
FROM inventory i
JOIN product_variants pv ON i.variant_id = pv.id
GROUP BY pv.product_id
)
SELECT
p.id,
p.name,
p.base_price,
v.business_name as vendor_name,
COALESCE(ms.units_sold, 0) as units_sold,
COALESCE(ms.revenue, 0) as revenue,
COALESCE(ms.vendor_revenue, 0) as vendor_revenue,
ROUND(COALESCE(pr.avg_rating, 0), 2) as avg_rating,
COALESCE(pr.review_count, 0) as review_count,
COALESCE(ci.total_stock, 0) as total_stock,
COALESCE(ci.available_stock, 0) as available_stock,
CASE
WHEN ms.revenue > 0 THEN
ROUND((ms.revenue - ms.vendor_revenue) / ms.revenue * 100, 2)
ELSE 0
END as profit_margin_pct
FROM products p
JOIN vendors v ON p.vendor_id = v.user_id
LEFT JOIN monthly_sales ms ON p.id = ms.product_id
LEFT JOIN product_ratings pr ON p.id = pr.product_id
LEFT JOIN current_inventory ci ON p.id = ci.product_id
WHERE p.status = 'active'
ORDER BY COALESCE(ms.units_sold, 0) DESC
LIMIT 10;
> Analyze this slow query and optimize it:
> [paste problematic query]

Claude analyzes and optimizes:

-- Slow query (takes 5+ seconds)
SELECT DISTINCT
c.id,
c.email,
COUNT(DISTINCT o.id) as order_count,
SUM(o.total) as lifetime_value,
(
SELECT STRING_AGG(p.name, ', ')
FROM order_items oi
JOIN product_variants pv ON oi.variant_id = pv.id
JOIN products p ON pv.product_id = p.id
WHERE oi.order_id IN (
SELECT id FROM orders WHERE customer_id = c.id
)
) as purchased_products
FROM users c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE c.user_type = 'customer'
AND c.created_at >= '2024-01-01'
GROUP BY c.id, c.email
HAVING COUNT(DISTINCT o.id) > 0
ORDER BY lifetime_value DESC;
> Create a migration to:
> - Add multi-currency support to our orders
> - Must be zero-downtime
> - Include rollback plan
> - Handle existing data
  1. Phase 1: Add new columns (backwards compatible)

    -- Migration: add_currency_support_phase1.sql
    BEGIN;
    -- Add currency columns with defaults
    ALTER TABLE products
    ADD COLUMN currency_code CHAR(3) DEFAULT 'USD' NOT NULL,
    ADD COLUMN base_price_cents BIGINT;
    ALTER TABLE order_items
    ADD COLUMN currency_code CHAR(3) DEFAULT 'USD' NOT NULL,
    ADD COLUMN unit_price_cents BIGINT,
    ADD COLUMN total_price_cents BIGINT;
    ALTER TABLE orders
    ADD COLUMN currency_code CHAR(3) DEFAULT 'USD' NOT NULL,
    ADD COLUMN subtotal_cents BIGINT,
    ADD COLUMN tax_cents BIGINT,
    ADD COLUMN shipping_cents BIGINT,
    ADD COLUMN total_cents BIGINT;
    -- Create currency conversion table
    CREATE TABLE currency_rates (
    from_currency CHAR(3) NOT NULL,
    to_currency CHAR(3) NOT NULL,
    rate DECIMAL(10,6) NOT NULL,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (from_currency, to_currency)
    );
    -- Populate with initial rates
    INSERT INTO currency_rates VALUES
    ('USD', 'EUR', 0.85),
    ('USD', 'GBP', 0.73),
    ('EUR', 'USD', 1.18),
    ('EUR', 'GBP', 0.86),
    ('GBP', 'USD', 1.37),
    ('GBP', 'EUR', 1.16);
    COMMIT;
  2. Phase 2: Dual-write application code

    # Application code writes to both old and new columns
    class OrderService:
    def create_order(self, items, currency='USD'):
    # Write to both decimal and integer columns
    order = Order(
    # Old columns (for backwards compatibility)
    subtotal=subtotal_decimal,
    total=total_decimal,
    # New columns
    currency_code=currency,
    subtotal_cents=int(subtotal_decimal * 100),
    total_cents=int(total_decimal * 100)
    )
    return order
  3. Phase 3: Backfill existing data

    -- Migration: backfill_currency_data.sql
    BEGIN;
    -- Backfill products
    UPDATE products
    SET base_price_cents = ROUND(base_price * 100)
    WHERE base_price_cents IS NULL;
    -- Backfill in batches to avoid locking
    DO $$
    DECLARE
    batch_size INTEGER := 10000;
    offset_val INTEGER := 0;
    total_rows INTEGER;
    BEGIN
    SELECT COUNT(*) INTO total_rows FROM orders WHERE total_cents IS NULL;
    WHILE offset_val < total_rows LOOP
    UPDATE orders
    SET
    subtotal_cents = ROUND(subtotal * 100),
    tax_cents = ROUND(tax * 100),
    shipping_cents = ROUND(shipping * 100),
    total_cents = ROUND(total * 100)
    WHERE id IN (
    SELECT id FROM orders
    WHERE total_cents IS NULL
    LIMIT batch_size
    );
    offset_val := offset_val + batch_size;
    PERFORM pg_sleep(0.1); -- Brief pause between batches
    END LOOP;
    END $$;
    -- Similar for order_items...
    COMMIT;
  4. Phase 4: Switch reads to new columns

    # Application now reads from new columns
    class OrderService:
    def get_order_total(self, order):
    # Use new columns, convert cents to decimal for display
    return Decimal(order.total_cents) / 100
  5. Phase 5: Drop old columns

    -- Migration: drop_old_currency_columns.sql
    -- Only run after verifying all systems use new columns
    BEGIN;
    ALTER TABLE products DROP COLUMN base_price;
    ALTER TABLE order_items
    DROP COLUMN unit_price,
    DROP COLUMN total_price;
    ALTER TABLE orders
    DROP COLUMN subtotal,
    DROP COLUMN tax,
    DROP COLUMN shipping,
    DROP COLUMN total;
    -- Add NOT NULL constraints now that backfill is complete
    ALTER TABLE products
    ALTER COLUMN base_price_cents SET NOT NULL;
    COMMIT;
-- Rollback for each phase
-- Phase 1 rollback
BEGIN;
DROP TABLE IF EXISTS currency_rates;
ALTER TABLE products DROP COLUMN IF EXISTS currency_code, DROP COLUMN IF EXISTS base_price_cents;
-- ... similar for other tables
COMMIT;
-- Phase 3 rollback (if backfill fails)
UPDATE products SET base_price_cents = NULL WHERE base_price_cents IS NOT NULL;
-- ... similar for other tables
-- Phase 5 rollback (if needed after drop)
-- Must restore from backup or use audit tables
> Convert our SQL schema to Prisma schema
> Include all relationships and indexes
schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model User {
id String @id @default(uuid())
email String @unique
passwordHash String @map("password_hash")
userType UserType
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
vendor Vendor?
orders Order[]
reviews Review[]
wishlists Wishlist[]
@@map("users")
}
model Vendor {
userId String @id @map("user_id")
businessName String @map("business_name")
description String?
logoUrl String? @map("logo_url")
verified Boolean @default(false)
commissionRate Decimal @default(10.00) @map("commission_rate")
user User @relation(fields: [userId], references: [id])
products Product[]
orderItems OrderItem[]
@@map("vendors")
}
model Product {
id String @id @default(uuid())
vendorId String @map("vendor_id")
name String
description String?
basePrice Decimal @map("base_price")
categoryId Int? @map("category_id")
status ProductStatus @default(ACTIVE)
createdAt DateTime @default(now()) @map("created_at")
vendor Vendor @relation(fields: [vendorId], references: [userId])
category Category? @relation(fields: [categoryId], references: [id])
variants ProductVariant[]
reviews Review[]
wishlistItems WishlistItem[]
@@index([vendorId, status])
@@fulltext([name, description])
@@map("products")
}
// Complex Prisma queries
const bestSellers = await prisma.product.findMany({
where: {
status: 'ACTIVE',
variants: {
some: {
orderItems: {
some: {
order: {
createdAt: {
gte: startOfMonth(new Date())
},
status: {
notIn: ['CANCELLED', 'PENDING']
}
}
}
}
}
}
},
include: {
vendor: true,
_count: {
select: {
reviews: true
}
},
variants: {
include: {
inventory: true,
orderItems: {
where: {
order: {
createdAt: {
gte: startOfMonth(new Date())
}
}
},
select: {
quantity: true,
totalPrice: true
}
}
}
}
},
orderBy: {
variants: {
_count: 'desc'
}
},
take: 10
});
> Create SQLAlchemy models for our schema
> Include relationships and query helpers
models.py
from sqlalchemy import Column, String, Decimal, Integer, ForeignKey, DateTime, Enum, JSON, Index
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship, validates
from sqlalchemy.ext.hybrid import hybrid_property
from datetime import datetime
import uuid
class User(Base):
__tablename__ = 'users'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String(255), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
user_type = Column(Enum('customer', 'vendor', 'admin'), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
vendor = relationship("Vendor", back_populates="user", uselist=False)
orders = relationship("Order", back_populates="customer")
reviews = relationship("Review", back_populates="customer")
@validates('email')
def validate_email(self, key, email):
assert '@' in email, "Invalid email"
return email.lower()
class Product(Base):
__tablename__ = 'products'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
vendor_id = Column(UUID(as_uuid=True), ForeignKey('vendors.user_id'), nullable=False)
name = Column(String(255), nullable=False)
description = Column(Text)
base_price = Column(Decimal(10, 2), nullable=False)
status = Column(Enum('active', 'inactive', 'out_of_stock'), default='active')
# Relationships
vendor = relationship("Vendor", back_populates="products")
variants = relationship("ProductVariant", back_populates="product")
reviews = relationship("Review", back_populates="product")
# Indexes
__table_args__ = (
Index('idx_vendor_status', 'vendor_id', 'status'),
Index('idx_search', 'name', 'description', postgresql_using='gin'),
)
@hybrid_property
def average_rating(self):
if not self.reviews:
return 0
return sum(r.rating for r in self.reviews) / len(self.reviews)
@average_rating.expression
def average_rating(cls):
return select(func.avg(Review.rating)).where(Review.product_id == cls.id).scalar_subquery()
# Query helpers
class ProductRepository:
def __init__(self, session):
self.session = session
def get_best_sellers(self, limit=10):
subquery = (
self.session.query(
ProductVariant.product_id,
func.sum(OrderItem.quantity).label('units_sold'),
func.sum(OrderItem.total_price).label('revenue')
)
.join(OrderItem)
.join(Order)
.filter(
Order.created_at >= func.date_trunc('month', func.current_date()),
Order.status.notin_(['cancelled', 'pending'])
)
.group_by(ProductVariant.product_id)
.subquery()
)
return (
self.session.query(Product, subquery.c.units_sold, subquery.c.revenue)
.join(subquery, Product.id == subquery.c.product_id)
.order_by(subquery.c.units_sold.desc())
.limit(limit)
.all()
)
> Analyze and optimize our database performance:
> - Identify slow queries
> - Suggest index improvements
> - Recommend query rewrites
> - Implement caching strategies
-- Enable query performance logging
ALTER SYSTEM SET log_min_duration_statement = 1000; -- Log queries over 1 second
ALTER SYSTEM SET shared_preload_libraries = 'pg_stat_statements';
-- Analyze query performance
SELECT
query,
calls,
total_time,
mean_time,
stddev_time,
rows,
100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
WHERE query NOT LIKE '%pg_stat_statements%'
ORDER BY mean_time DESC
LIMIT 20;
-- Find missing indexes
SELECT
schemaname,
tablename,
attname,
n_distinct,
most_common_vals
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
AND attname NOT IN (
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid
WHERE a.attnum = ANY(i.indkey)
);
-- Implement query result caching
CREATE TABLE query_cache (
cache_key VARCHAR(255) PRIMARY KEY,
query_hash VARCHAR(64) NOT NULL,
result JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
hit_count INTEGER DEFAULT 0
);
CREATE INDEX idx_cache_expires ON query_cache(expires_at);
-- Cache implementation in application
class CachedQuery:
def __init__(self, db, cache_ttl=3600):
self.db = db
self.cache_ttl = cache_ttl
def execute(self, query, params=None):
cache_key = self._generate_cache_key(query, params)
# Check cache
cached = self.db.execute(
"SELECT result FROM query_cache WHERE cache_key = %s AND expires_at > NOW()",
(cache_key,)
).fetchone()
if cached:
self.db.execute(
"UPDATE query_cache SET hit_count = hit_count + 1 WHERE cache_key = %s",
(cache_key,)
)
return json.loads(cached[0])
# Execute query
result = self.db.execute(query, params).fetchall()
# Cache result
self.db.execute(
"""INSERT INTO query_cache (cache_key, query_hash, result, expires_at)
VALUES (%s, %s, %s, NOW() + INTERVAL '%s seconds')
ON CONFLICT (cache_key) DO UPDATE
SET result = EXCLUDED.result, expires_at = EXCLUDED.expires_at""",
(cache_key, hashlib.md5(query.encode()).hexdigest(),
json.dumps(result), self.cache_ttl)
)
return result
# Database connection pool configuration
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Number of persistent connections
max_overflow=40, # Maximum overflow connections
pool_timeout=30, # Timeout for getting connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True, # Test connections before use
echo_pool=True # Log pool checkouts/checkins
)
# Implement read replicas
class DatabaseRouter:
def __init__(self, primary_url, replica_urls):
self.primary = create_engine(primary_url)
self.replicas = [create_engine(url) for url in replica_urls]
self.current_replica = 0
def get_session(self, readonly=False):
if readonly:
# Round-robin between replicas
engine = self.replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.replicas)
else:
engine = self.primary
return sessionmaker(bind=engine)()
> Design a MongoDB schema for a real-time chat application with:
> - Users and conversations
> - Messages with read receipts
> - File attachments
> - Search functionality
// MongoDB schema design
// Users collection
{
_id: ObjectId(),
username: "john_doe",
email: "john@example.com",
profile: {
displayName: "John Doe",
avatar: "https://...",
status: "online",
lastSeen: ISODate()
},
settings: {
notifications: true,
theme: "dark"
},
blockedUsers: [ObjectId(), ...],
createdAt: ISODate()
}
// Conversations collection (for group metadata)
{
_id: ObjectId(),
type: "group", // "direct" or "group"
participants: [
{
userId: ObjectId(),
role: "admin", // "admin", "member"
joinedAt: ISODate(),
lastRead: ISODate(),
unreadCount: 0
}
],
name: "Project Team",
avatar: "https://...",
createdBy: ObjectId(),
createdAt: ISODate(),
lastMessage: {
text: "Latest message preview",
senderId: ObjectId(),
timestamp: ISODate()
}
}
// Messages collection (optimized for write performance)
{
_id: ObjectId(),
conversationId: ObjectId(),
senderId: ObjectId(),
type: "text", // "text", "image", "file", "system"
content: {
text: "Hello world!",
mentions: [ObjectId(), ...]
},
attachments: [{
type: "image",
url: "https://...",
size: 1024000,
mimeType: "image/jpeg",
thumbnail: "https://..."
}],
reactions: [{
userId: ObjectId(),
emoji: "👍",
timestamp: ISODate()
}],
readBy: [{
userId: ObjectId(),
timestamp: ISODate()
}],
editedAt: ISODate(),
deletedAt: null,
createdAt: ISODate()
}
// Indexes for performance
db.messages.createIndex({ conversationId: 1, createdAt: -1 })
db.messages.createIndex({ "content.text": "text" }) // For search
db.messages.createIndex({ senderId: 1, createdAt: -1 })
db.conversations.createIndex({ "participants.userId": 1 })
db.conversations.createIndex({ "lastMessage.timestamp": -1 })
// Aggregation for unread counts
db.messages.aggregate([
{
$match: {
conversationId: ObjectId("..."),
createdAt: { $gt: lastReadTimestamp },
senderId: { $ne: currentUserId }
}
},
{
$count: "unreadCount"
}
])
// Search messages with text index
db.messages.find({
conversationId: ObjectId("..."),
$text: { $search: "project deadline" }
}).sort({ score: { $meta: "textScore" } })
> Implement database security measures:
> - Row-level security
> - Encryption at rest
> - Audit logging
> - SQL injection prevention
-- Row-level security implementation
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
-- Customers can only see their own orders
CREATE POLICY customer_orders_policy ON orders
FOR ALL
TO application_role
USING (
current_setting('app.current_user_id')::uuid = customer_id
OR
current_setting('app.current_user_type') = 'admin'
);
-- Vendors can see orders containing their products
CREATE POLICY vendor_orders_policy ON order_items
FOR SELECT
TO application_role
USING (
vendor_id = current_setting('app.current_user_id')::uuid
OR
current_setting('app.current_user_type') = 'admin'
);
-- Audit logging
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(50) NOT NULL,
operation VARCHAR(10) NOT NULL,
user_id UUID,
row_id UUID,
old_data JSONB,
new_data JSONB,
ip_address INET,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Generic audit trigger function
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS trigger AS $$
BEGIN
INSERT INTO audit_log(
table_name, operation, user_id, row_id,
old_data, new_data, ip_address, user_agent
)
VALUES (
TG_TABLE_NAME,
TG_OP,
current_setting('app.current_user_id', true)::uuid,
COALESCE(NEW.id, OLD.id),
to_jsonb(OLD),
to_jsonb(NEW),
inet_client_addr(),
current_setting('app.user_agent', true)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Apply audit trigger to sensitive tables
CREATE TRIGGER orders_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();
-- Encryption for sensitive data
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Encrypt sensitive columns
ALTER TABLE users ADD COLUMN ssn_encrypted BYTEA;
-- Encryption functions
CREATE OR REPLACE FUNCTION encrypt_sensitive(text_value TEXT)
RETURNS BYTEA AS $$
BEGIN
RETURN pgp_sym_encrypt(
text_value,
current_setting('app.encryption_key')
);
END;
$$ LANGUAGE plpgsql;
-- SQL injection prevention in application
from psycopg2.sql import SQL, Identifier, Literal
def safe_dynamic_query(table_name, columns, where_clause):
# Safe dynamic SQL composition
query = SQL("SELECT {} FROM {} WHERE {}").format(
SQL(', ').join(map(Identifier, columns)),
Identifier(table_name),
where_clause
)
return query
# Never do this:
# query = f"SELECT * FROM {table} WHERE id = {user_input}"
# Always use parameterized queries:
cursor.execute(
"SELECT * FROM users WHERE email = %s AND status = %s",
(email, status)
)

You’ve learned how to leverage Claude Code for comprehensive database operations - from schema design to complex queries to performance optimization. The key is treating your database as a living system that needs careful design, continuous optimization, and thoughtful evolution.

Remember: A well-designed database is the foundation of a scalable application. Use Claude Code to handle the complexity of modern data persistence while you focus on modeling your business domain. With AI assistance, you can build databases that are not just functional, but optimized, secure, and ready to scale.