Przejdź do głównej zawartości

Operacje bazodanowe

Bazy danych są pamięcią twoich aplikacji. Niezależnie od tego, czy projektujesz schematy, piszesz złożone zapytania, czy orkiestrujesz migracje, Claude Code przekształca pracę z bazami danych z podatnego na błędy zmagania się z SQL w inteligentną inżynierię danych. Ta lekcja eksploruje jak wykorzystać asystę AI do wszystkich aspektów operacji bazodanowych.

Scenariusz: Baza danych twojej platformy e-commerce ma problemy. Zapytania, które zajmowały milisekundy, teraz trwają sekundy. Schemat to bałagan zdenormalizowanych tabel. Migracje to rosyjska ruletka. Musisz przeskalować się z 10K do 1M użytkowników. Tradycyjne podejście: zatrudnij DBA. Z Claude Code: zostań nim.

Tydzień 1: Analiza
- Ręczne profilowanie wolnych zapytań
- Rysowanie diagramów ER ręcznie
- Zgadywanie brakujących indeksów
Tydzień 2-3: Przeprojektowanie schematu
- Pisanie skryptów migracji
- Testowanie na staging (liczenie na najlepsze)
- Naprawianie zepsutych ograniczeń
Tydzień 4: Optymalizacja
- Dodawanie indeksów jeden po drugim
- Przepisywanie zapytań ręcznie
- Monitorowanie wydajności
Rezultat: Marginalne poprawy, wysokie ryzyko
> Zaprojektuj schemat bazy danych dla platformy e-commerce z:
> - Obsługą wielu sprzedawców
> - Wariantami produktów (rozmiar, kolor)
> - Śledzeniem zapasów
> - Zarządzaniem zamówieniami
> - Recenzjami klientów
> - Funkcjonalnością listy życzeń
> Przestrzegaj najlepszych praktyk normalizacji i wydajności

Claude generuje kompleksowy schemat:

Tabele podstawowe

-- Tabela użytkowników (klienci i sprzedawcy)
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
user_type ENUM('customer', 'vendor', 'admin') NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Tabela sprzedawców (rozszerza users)
CREATE TABLE vendors (
user_id UUID PRIMARY KEY REFERENCES users(id),
business_name VARCHAR(255) NOT NULL,
description TEXT,
logo_url VARCHAR(500),
verified BOOLEAN DEFAULT FALSE,
commission_rate DECIMAL(5,2) DEFAULT 10.00
);
-- Tabela produktów
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vendor_id UUID NOT NULL REFERENCES vendors(user_id),
name VARCHAR(255) NOT NULL,
description TEXT,
base_price DECIMAL(10,2) NOT NULL,
category_id INTEGER REFERENCES categories(id),
status ENUM('active', 'inactive', 'out_of_stock') DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_vendor_status (vendor_id, status),
FULLTEXT INDEX idx_search (name, description)
);

Zapasy i warianty

-- Warianty produktów (rozmiar, kolor, itp.)
CREATE TABLE product_variants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_id UUID NOT NULL REFERENCES products(id),
sku VARCHAR(100) UNIQUE NOT NULL,
attributes JSONB NOT NULL, -- {"size": "L", "color": "Blue"}
price_modifier DECIMAL(10,2) DEFAULT 0,
weight_grams INTEGER,
INDEX idx_product_attributes (product_id, attributes)
);
-- Śledzenie zapasów
CREATE TABLE inventory (
variant_id UUID PRIMARY KEY REFERENCES product_variants(id),
quantity INTEGER NOT NULL DEFAULT 0,
reserved_quantity INTEGER NOT NULL DEFAULT 0,
reorder_point INTEGER DEFAULT 10,
reorder_quantity INTEGER DEFAULT 100,
last_restocked TIMESTAMP,
CHECK (quantity >= reserved_quantity)
);
-- Ruchy zapasów do audytu
CREATE TABLE inventory_movements (
id BIGSERIAL PRIMARY KEY,
variant_id UUID NOT NULL REFERENCES product_variants(id),
movement_type ENUM('restock', 'sale', 'return', 'adjustment') NOT NULL,
quantity INTEGER NOT NULL,
reference_type VARCHAR(50), -- 'order', 'return', 'manual'
reference_id UUID,
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by UUID REFERENCES users(id)
);

Zamówienia i transakcje

-- Tabela zamówień
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES users(id),
status ENUM('pending', 'processing', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
subtotal DECIMAL(10,2) NOT NULL,
tax DECIMAL(10,2) NOT NULL DEFAULT 0,
shipping DECIMAL(10,2) NOT NULL DEFAULT 0,
total DECIMAL(10,2) NOT NULL,
shipping_address JSONB NOT NULL,
billing_address JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_customer_status (customer_id, status),
INDEX idx_created (created_at DESC)
);
-- Pozycje zamówień
CREATE TABLE order_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL REFERENCES orders(id),
variant_id UUID NOT NULL REFERENCES product_variants(id),
vendor_id UUID NOT NULL REFERENCES vendors(user_id),
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL,
commission DECIMAL(10,2) NOT NULL,
status ENUM('pending', 'confirmed', 'shipped', 'delivered', 'returned') DEFAULT 'pending',
INDEX idx_order (order_id),
INDEX idx_vendor_status (vendor_id, status)
);

Recenzje i listy życzeń

-- Recenzje produktów
CREATE TABLE reviews (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
product_id UUID NOT NULL REFERENCES products(id),
customer_id UUID NOT NULL REFERENCES users(id),
order_item_id UUID REFERENCES order_items(id),
rating INTEGER NOT NULL CHECK (rating >= 1 AND rating <= 5),
title VARCHAR(255),
comment TEXT,
verified_purchase BOOLEAN DEFAULT FALSE,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_review (product_id, customer_id),
INDEX idx_product_rating (product_id, rating)
);
-- Listy życzeń
CREATE TABLE wishlists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES users(id),
name VARCHAR(255) NOT NULL DEFAULT 'My Wishlist',
is_public BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_customer (customer_id)
);
-- Pozycje list życzeń
CREATE TABLE wishlist_items (
wishlist_id UUID NOT NULL REFERENCES wishlists(id) ON DELETE CASCADE,
product_id UUID NOT NULL REFERENCES products(id),
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (wishlist_id, product_id)
);
> Przeanalizuj schemat i zasugeruj optymalne indeksy dla:
> - Częstych wzorców zapytań
> - Optymalizacji wydajności
> - Pokrywających indeksów gdzie korzystne
> - Unikania nadmiernego indeksowania
-- Indeksy krytyczne dla wydajności
CREATE INDEX idx_orders_customer_created
ON orders(customer_id, created_at DESC)
INCLUDE (status, total);
CREATE INDEX idx_products_vendor_category
ON products(vendor_id, category_id)
WHERE status = 'active';
CREATE INDEX idx_inventory_low_stock
ON inventory(variant_id)
WHERE quantity <= reorder_point;
-- Indeksy złożone dla częstych joinów
CREATE INDEX idx_order_items_complete
ON order_items(order_id, vendor_id, status)
INCLUDE (total_price, commission);
-- Indeksy częściowe dla specyficznych zapytań
CREATE INDEX idx_reviews_high_rating
ON reviews(product_id, rating)
WHERE rating >= 4;
-- Indeksy JSON dla elastycznych zapytań
CREATE INDEX idx_variant_attributes_gin
ON product_variants USING GIN (attributes);
> Napisz zoptymalizowane zapytanie aby znaleźć:
> - Top 10 najlepiej sprzedających się produktów w tym miesiącu
> - Uwzględnij informacje o sprzedawcy i średnią ocenę
> - Pokaż obecne poziomy zapasów
> - Oblicz przychody i marże zysku
WITH monthly_sales AS (
SELECT
pv.product_id,
SUM(oi.quantity) as units_sold,
SUM(oi.total_price) as revenue,
SUM(oi.total_price - oi.commission) as vendor_revenue
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
JOIN product_variants pv ON oi.variant_id = pv.id
WHERE o.created_at >= DATE_TRUNC('month', CURRENT_DATE)
AND o.status NOT IN ('cancelled', 'pending')
GROUP BY pv.product_id
),
product_ratings AS (
SELECT
product_id,
AVG(rating) as avg_rating,
COUNT(*) as review_count
FROM reviews
GROUP BY product_id
),
current_inventory AS (
SELECT
pv.product_id,
SUM(i.quantity) as total_stock,
SUM(i.quantity - i.reserved_quantity) as available_stock
FROM inventory i
JOIN product_variants pv ON i.variant_id = pv.id
GROUP BY pv.product_id
)
SELECT
p.id,
p.name,
p.base_price,
v.business_name as vendor_name,
COALESCE(ms.units_sold, 0) as units_sold,
COALESCE(ms.revenue, 0) as revenue,
COALESCE(ms.vendor_revenue, 0) as vendor_revenue,
ROUND(COALESCE(pr.avg_rating, 0), 2) as avg_rating,
COALESCE(pr.review_count, 0) as review_count,
COALESCE(ci.total_stock, 0) as total_stock,
COALESCE(ci.available_stock, 0) as available_stock,
CASE
WHEN ms.revenue > 0 THEN
ROUND((ms.revenue - ms.vendor_revenue) / ms.revenue * 100, 2)
ELSE 0
END as profit_margin_pct
FROM products p
JOIN vendors v ON p.vendor_id = v.user_id
LEFT JOIN monthly_sales ms ON p.id = ms.product_id
LEFT JOIN product_ratings pr ON p.id = pr.product_id
LEFT JOIN current_inventory ci ON p.id = ci.product_id
WHERE p.status = 'active'
ORDER BY COALESCE(ms.units_sold, 0) DESC
LIMIT 10;
> Przeanalizuj to wolne zapytanie i zoptymalizuj je:
> [wklej problematyczne zapytanie]

Claude analizuje i optymalizuje:

-- Wolne zapytanie (zajmuje 5+ sekund)
SELECT DISTINCT
c.id,
c.email,
COUNT(DISTINCT o.id) as order_count,
SUM(o.total) as lifetime_value,
(
SELECT STRING_AGG(p.name, ', ')
FROM order_items oi
JOIN product_variants pv ON oi.variant_id = pv.id
JOIN products p ON pv.product_id = p.id
WHERE oi.order_id IN (
SELECT id FROM orders WHERE customer_id = c.id
)
) as purchased_products
FROM users c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE c.user_type = 'customer'
AND c.created_at >= '2024-01-01'
GROUP BY c.id, c.email
HAVING COUNT(DISTINCT o.id) > 0
ORDER BY lifetime_value DESC;
> Stwórz migrację aby:
> - Dodać obsługę wielu walut do naszych zamówień
> - Musi być zero-downtime
> - Uwzględnić plan rollback
> - Obsłużyć istniejące dane
  1. Faza 1: Dodanie nowych kolumn (wstecznie kompatybilne)

    -- Migracja: add_currency_support_phase1.sql
    BEGIN;
    -- Dodaj kolumny walut z domyślnymi wartościami
    ALTER TABLE products
    ADD COLUMN currency_code CHAR(3) DEFAULT 'USD' NOT NULL,
    ADD COLUMN base_price_cents BIGINT;
    ALTER TABLE order_items
    ADD COLUMN currency_code CHAR(3) DEFAULT 'USD' NOT NULL,
    ADD COLUMN unit_price_cents BIGINT,
    ADD COLUMN total_price_cents BIGINT;
    ALTER TOKEN orders
    ADD COLUMN currency_code CHAR(3) DEFAULT 'USD' NOT NULL,
    ADD COLUMN subtotal_cents BIGINT,
    ADD COLUMN tax_cents BIGINT,
    ADD COLUMN shipping_cents BIGINT,
    ADD COLUMN total_cents BIGINT;
    -- Stwórz tabelę konwersji walut
    CREATE TABLE currency_rates (
    from_currency CHAR(3) NOT NULL,
    to_currency CHAR(3) NOT NULL,
    rate DECIMAL(10,6) NOT NULL,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (from_currency, to_currency)
    );
    -- Wypełnij początkowymi kursami
    INSERT INTO currency_rates VALUES
    ('USD', 'EUR', 0.85),
    ('USD', 'GBP', 0.73),
    ('EUR', 'USD', 1.18),
    ('EUR', 'GBP', 0.86),
    ('GBP', 'USD', 1.37),
    ('GBP', 'EUR', 1.16);
    COMMIT;
  2. Faza 2: Kod aplikacji dual-write

    # Kod aplikacji pisze do starych i nowych kolumn
    class OrderService:
    def create_order(self, items, currency='USD'):
    # Pisz do obu kolumn dziesiętnych i całkowitych
    order = Order(
    # Stare kolumny (dla kompatybilności wstecznej)
    subtotal=subtotal_decimal,
    total=total_decimal,
    # Nowe kolumny
    currency_code=currency,
    subtotal_cents=int(subtotal_decimal * 100),
    total_cents=int(total_decimal * 100)
    )
    return order
  3. Faza 3: Backfill istniejących danych

    -- Migracja: backfill_currency_data.sql
    BEGIN;
    -- Backfill produktów
    UPDATE products
    SET base_price_cents = ROUND(base_price * 100)
    WHERE base_price_cents IS NULL;
    -- Backfill w partiach aby uniknąć blokowania
    DO $$
    DECLARE
    batch_size INTEGER := 10000;
    offset_val INTEGER := 0;
    total_rows INTEGER;
    BEGIN
    SELECT COUNT(*) INTO total_rows FROM orders WHERE total_cents IS NULL;
    WHILE offset_val < total_rows LOOP
    UPDATE orders
    SET
    subtotal_cents = ROUND(subtotal * 100),
    tax_cents = ROUND(tax * 100),
    shipping_cents = ROUND(shipping * 100),
    total_cents = ROUND(total * 100)
    WHERE id IN (
    SELECT id FROM orders
    WHERE total_cents IS NULL
    LIMIT batch_size
    );
    offset_val := offset_val + batch_size;
    PERFORM pg_sleep(0.1); -- Krótka pauza między partiami
    END LOOP;
    END $$;
    -- Podobnie dla order_items...
    COMMIT;
  4. Faza 4: Przełączenie odczytów na nowe kolumny

    # Aplikacja teraz czyta z nowych kolumn
    class OrderService:
    def get_order_total(self, order):
    # Użyj nowych kolumn, konwertuj centy na dziesiętne do wyświetlenia
    return Decimal(order.total_cents) / 100
  5. Faza 5: Usuń stare kolumny

    -- Migracja: drop_old_currency_columns.sql
    -- Uruchom tylko po weryfikacji że wszystkie systemy używają nowych kolumn
    BEGIN;
    ALTER TABLE products DROP COLUMN base_price;
    ALTER TABLE order_items
    DROP COLUMN unit_price,
    DROP COLUMN total_price;
    ALTER TABLE orders
    DROP COLUMN subtotal,
    DROP COLUMN tax,
    DROP COLUMN shipping,
    DROP COLUMN total;
    -- Dodaj ograniczenia NOT NULL teraz gdy backfill jest ukończony
    ALTER TABLE products
    ALTER COLUMN base_price_cents SET NOT NULL;
    COMMIT;
-- Rollback dla każdej fazy
-- Rollback fazy 1
BEGIN;
DROP TABLE IF EXISTS currency_rates;
ALTER TABLE products DROP COLUMN IF EXISTS currency_code, DROP COLUMN IF EXISTS base_price_cents;
-- ... podobnie dla innych tabel
COMMIT;
-- Rollback fazy 3 (jeśli backfill się nie powiedzie)
UPDATE products SET base_price_cents = NULL WHERE base_price_cents IS NOT NULL;
-- ... podobnie dla innych tabel
-- Rollback fazy 5 (jeśli potrzebny po drop)
-- Musi przywrócić z backup lub użyć tabel audytu
> Konwertuj nasz schemat SQL na schemat Prisma
> Uwzględnij wszystkie relacje i indeksy
schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model User {
id String @id @default(uuid())
email String @unique
passwordHash String @map("password_hash")
userType UserType
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
vendor Vendor?
orders Order[]
reviews Review[]
wishlists Wishlist[]
@@map("users")
}
model Vendor {
userId String @id @map("user_id")
businessName String @map("business_name")
description String?
logoUrl String? @map("logo_url")
verified Boolean @default(false)
commissionRate Decimal @default(10.00) @map("commission_rate")
user User @relation(fields: [userId], references: [id])
products Product[]
orderItems OrderItem[]
@@map("vendors")
}
model Product {
id String @id @default(uuid())
vendorId String @map("vendor_id")
name String
description String?
basePrice Decimal @map("base_price")
categoryId Int? @map("category_id")
status ProductStatus @default(ACTIVE)
createdAt DateTime @default(now()) @map("created_at")
vendor Vendor @relation(fields: [vendorId], references: [userId])
category Category? @relation(fields: [categoryId], references: [id])
variants ProductVariant[]
reviews Review[]
wishlistItems WishlistItem[]
@@index([vendorId, status])
@@fulltext([name, description])
@@map("products")
}
// Złożone zapytania Prisma
const bestSellers = await prisma.product.findMany({
where: {
status: 'ACTIVE',
variants: {
some: {
orderItems: {
some: {
order: {
createdAt: {
gte: startOfMonth(new Date())
},
status: {
notIn: ['CANCELLED', 'PENDING']
}
}
}
}
}
}
},
include: {
vendor: true,
_count: {
select: {
reviews: true
}
},
variants: {
include: {
inventory: true,
orderItems: {
where: {
order: {
createdAt: {
gte: startOfMonth(new Date())
}
}
},
select: {
quantity: true,
totalPrice: true
}
}
}
}
},
orderBy: {
variants: {
_count: 'desc'
}
},
take: 10
});
> Stwórz modele SQLAlchemy dla naszego schematu
> Uwzględnij relacje i pomocniki zapytań
models.py
from sqlalchemy import Column, String, Decimal, Integer, ForeignKey, DateTime, Enum, JSON, Index
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship, validates
from sqlalchemy.ext.hybrid import hybrid_property
from datetime import datetime
import uuid
class User(Base):
__tablename__ = 'users'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String(255), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
user_type = Column(Enum('customer', 'vendor', 'admin'), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relacje
vendor = relationship("Vendor", back_populates="user", uselist=False)
orders = relationship("Order", back_populates="customer")
reviews = relationship("Review", back_populates="customer")
@validates('email')
def validate_email(self, key, email):
assert '@' in email, "Invalid email"
return email.lower()
class Product(Base):
__tablename__ = 'products'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
vendor_id = Column(UUID(as_uuid=True), ForeignKey('vendors.user_id'), nullable=False)
name = Column(String(255), nullable=False)
description = Column(Text)
base_price = Column(Decimal(10, 2), nullable=False)
status = Column(Enum('active', 'inactive', 'out_of_stock'), default='active')
# Relacje
vendor = relationship("Vendor", back_populates="products")
variants = relationship("ProductVariant", back_populates="product")
reviews = relationship("Review", back_populates="product")
# Indeksy
__table_args__ = (
Index('idx_vendor_status', 'vendor_id', 'status'),
Index('idx_search', 'name', 'description', postgresql_using='gin'),
)
@hybrid_property
def average_rating(self):
if not self.reviews:
return 0
return sum(r.rating for r in self.reviews) / len(self.reviews)
@average_rating.expression
def average_rating(cls):
return select(func.avg(Review.rating)).where(Review.product_id == cls.id).scalar_subquery()
# Pomocniki zapytań
class ProductRepository:
def __init__(self, session):
self.session = session
def get_best_sellers(self, limit=10):
subquery = (
self.session.query(
ProductVariant.product_id,
func.sum(OrderItem.quantity).label('units_sold'),
func.sum(OrderItem.total_price).label('revenue')
)
.join(OrderItem)
.join(Order)
.filter(
Order.created_at >= func.date_trunc('month', func.current_date()),
Order.status.notin_(['cancelled', 'pending'])
)
.group_by(ProductVariant.product_id)
.subquery()
)
return (
self.session.query(Product, subquery.c.units_sold, subquery.c.revenue)
.join(subquery, Product.id == subquery.c.product_id)
.order_by(subquery.c.units_sold.desc())
.limit(limit)
.all()
)
> Przeanalizuj i zoptymalizuj wydajność naszej bazy danych:
> - Zidentyfikuj wolne zapytania
> - Zasugeruj poprawy indeksów
> - Zarekomenduj przepisanie zapytań
> - Zaimplementuj strategie cache'owania
-- Włącz logowanie wydajności zapytań
ALTER SYSTEM SET log_min_duration_statement = 1000; -- Loguj zapytania powyżej 1 sekundy
ALTER SYSTEM SET shared_preload_libraries = 'pg_stat_statements';
-- Analizuj wydajność zapytań
SELECT
query,
calls,
total_time,
mean_time,
stddev_time,
rows,
100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
WHERE query NOT LIKE '%pg_stat_statements%'
ORDER BY mean_time DESC
LIMIT 20;
-- Znajdź brakujące indeksy
SELECT
schemaname,
tablename,
attname,
n_distinct,
most_common_vals
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
AND attname NOT IN (
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid
WHERE a.attnum = ANY(i.indkey)
);
-- Implementuj cache'owanie wyników zapytań
CREATE TABLE query_cache (
cache_key VARCHAR(255) PRIMARY KEY,
query_hash VARCHAR(64) NOT NULL,
result JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
hit_count INTEGER DEFAULT 0
);
CREATE INDEX idx_cache_expires ON query_cache(expires_at);
-- Implementacja cache w aplikacji
class CachedQuery:
def __init__(self, db, cache_ttl=3600):
self.db = db
self.cache_ttl = cache_ttl
def execute(self, query, params=None):
cache_key = self._generate_cache_key(query, params)
# Sprawdź cache
cached = self.db.execute(
"SELECT result FROM query_cache WHERE cache_key = %s AND expires_at > NOW()",
(cache_key,)
).fetchone()
if cached:
self.db.execute(
"UPDATE query_cache SET hit_count = hit_count + 1 WHERE cache_key = %s",
(cache_key,)
)
return json.loads(cached[0])
# Wykonaj zapytanie
result = self.db.execute(query, params).fetchall()
# Cache'uj wynik
self.db.execute(
"""INSERT INTO query_cache (cache_key, query_hash, result, expires_at)
VALUES (%s, %s, %s, NOW() + INTERVAL '%s seconds')
ON CONFLICT (cache_key) DO UPDATE
SET result = EXCLUDED.result, expires_at = EXCLUDED.expires_at""",
(cache_key, hashlib.md5(query.encode()).hexdigest(),
json.dumps(result), self.cache_ttl)
)
return result
# Konfiguracja puli połączeń bazy danych
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Liczba trwałych połączeń
max_overflow=40, # Maksymalne połączenia overflow
pool_timeout=30, # Timeout dla uzyskania połączenia
pool_recycle=1800, # Recyklinguj połączenia po 30 minutach
pool_pre_ping=True, # Testuj połączenia przed użyciem
echo_pool=True # Loguj checkout/checkin puli
)
# Implementuj repliki odczytu
class DatabaseRouter:
def __init__(self, primary_url, replica_urls):
self.primary = create_engine(primary_url)
self.replicas = [create_engine(url) for url in replica_urls]
self.current_replica = 0
def get_session(self, readonly=False):
if readonly:
# Round-robin między replikami
engine = self.replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.replicas)
else:
engine = self.primary
return sessionmaker(bind=engine)()
> Zaprojektuj schemat MongoDB dla aplikacji czatu real-time z:
> - Użytkownikami i konwersacjami
> - Wiadomościami z potwierdzeniami odczytu
> - Załącznikami plików
> - Funkcjonalnością wyszukiwania
// Projektowanie schematu MongoDB
// Kolekcja users
{
_id: ObjectId(),
username: "john_doe",
email: "john@example.com",
profile: {
displayName: "John Doe",
avatar: "https://...",
status: "online",
lastSeen: ISODate()
},
settings: {
notifications: true,
theme: "dark"
},
blockedUsers: [ObjectId(), ...],
createdAt: ISODate()
}
// Kolekcja conversations (dla metadanych grup)
{
_id: ObjectId(),
type: "group", // "direct" lub "group"
participants: [
{
userId: ObjectId(),
role: "admin", // "admin", "member"
joinedAt: ISODate(),
lastRead: ISODate(),
unreadCount: 0
}
],
name: "Project Team",
avatar: "https://...",
createdBy: ObjectId(),
createdAt: ISODate(),
lastMessage: {
text: "Latest message preview",
senderId: ObjectId(),
timestamp: ISODate()
}
}
// Kolekcja messages (zoptymalizowana do wydajności zapisu)
{
_id: ObjectId(),
conversationId: ObjectId(),
senderId: ObjectId(),
type: "text", // "text", "image", "file", "system"
content: {
text: "Hello world!",
mentions: [ObjectId(), ...]
},
attachments: [{
type: "image",
url: "https://...",
size: 1024000,
mimeType: "image/jpeg",
thumbnail: "https://..."
}],
reactions: [{
userId: ObjectId(),
emoji: "👍",
timestamp: ISODate()
}],
readBy: [{
userId: ObjectId(),
timestamp: ISODate()
}],
editedAt: ISODate(),
deletedAt: null,
createdAt: ISODate()
}
// Indeksy dla wydajności
db.messages.createIndex({ conversationId: 1, createdAt: -1 })
db.messages.createIndex({ "content.text": "text" }) // Do wyszukiwania
db.messages.createIndex({ senderId: 1, createdAt: -1 })
db.conversations.createIndex({ "participants.userId": 1 })
db.conversations.createIndex({ "lastMessage.timestamp": -1 })
// Agregacja dla liczników nieprzeczytanych
db.messages.aggregate([
{
$match: {
conversationId: ObjectId("..."),
createdAt: { $gt: lastReadTimestamp },
senderId: { $ne: currentUserId }
}
},
{
$count: "unreadCount"
}
])
// Wyszukiwanie wiadomości z indeksem tekstowym
db.messages.find({
conversationId: ObjectId("..."),
$text: { $search: "project deadline" }
}).sort({ score: { $meta: "textScore" } })
> Zaimplementuj środki bezpieczeństwa bazy danych:
> - Row-level security
> - Szyfrowanie w spoczynku
> - Logowanie audytu
> - Zapobieganie wstrzykiwaniu SQL
-- Implementacja row-level security
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
-- Klienci mogą widzieć tylko swoje zamówienia
CREATE POLICY customer_orders_policy ON orders
FOR ALL
TO application_role
USING (
current_setting('app.current_user_id')::uuid = customer_id
OR
current_setting('app.current_user_type') = 'admin'
);
-- Sprzedawcy mogą widzieć zamówienia zawierające ich produkty
CREATE POLICY vendor_orders_policy ON order_items
FOR SELECT
TO application_role
USING (
vendor_id = current_setting('app.current_user_id')::uuid
OR
current_setting('app.current_user_type') = 'admin'
);
-- Logowanie audytu
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(50) NOT NULL,
operation VARCHAR(10) NOT NULL,
user_id UUID,
row_id UUID,
old_data JSONB,
new_data JSONB,
ip_address INET,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Generyczna funkcja triggera audytu
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS trigger AS $$
BEGIN
INSERT INTO audit_log(
table_name, operation, user_id, row_id,
old_data, new_data, ip_address, user_agent
)
VALUES (
TG_TABLE_NAME,
TG_OP,
current_setting('app.current_user_id', true)::uuid,
COALESCE(NEW.id, OLD.id),
to_jsonb(OLD),
to_jsonb(NEW),
inet_client_addr(),
current_setting('app.user_agent', true)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Zastosuj trigger audytu do wrażliwych tabel
CREATE TRIGGER orders_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();
-- Szyfrowanie wrażliwych danych
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Szyfruj wrażliwe kolumny
ALTER TABLE users ADD COLUMN ssn_encrypted BYTEA;
-- Funkcje szyfrowania
CREATE OR REPLACE FUNCTION encrypt_sensitive(text_value TEXT)
RETURNS BYTEA AS $$
BEGIN
RETURN pgp_sym_encrypt(
text_value,
current_setting('app.encryption_key')
);
END;
$$ LANGUAGE plpgsql;
-- Zapobieganie wstrzykiwaniu SQL w aplikacji
from psycopg2.sql import SQL, Identifier, Literal
def safe_dynamic_query(table_name, columns, where_clause):
# Bezpieczne komponowanie dynamicznego SQL
query = SQL("SELECT {} FROM {} WHERE {}").format(
SQL(', ').join(map(Identifier, columns)),
Identifier(table_name),
where_clause
)
return query
# Nigdy nie rób tego:
# query = f"SELECT * FROM {table} WHERE id = {user_input}"
# Zawsze używaj sparametryzowanych zapytań:
cursor.execute(
"SELECT * FROM users WHERE email = %s AND status = %s",
(email, status)
)

Nauczyłeś się jak wykorzystać Claude Code do kompleksowych operacji bazodanowych - od projektowania schematu po złożone zapytania po optymalizację wydajności. Kluczem jest traktowanie bazy danych jako żywego systemu, który potrzebuje starannego projektowania, ciągłej optymalizacji i przemyślanej ewolucji.

Pamiętaj: Dobrze zaprojektowana baza danych to fundament skalowalnej aplikacji. Użyj Claude Code do obsługi złożoności nowoczesnej trwałości danych, podczas gdy ty skupiasz się na modelowaniu domeny biznesowej. Z asystą AI możesz budować bazy danych, które są nie tylko funkcjonalne, ale zoptymalizowane, bezpieczne i gotowe do skalowania.