Przejdź do głównej zawartości

NoSQL Patterns and Strategies

Ta treść nie jest jeszcze dostępna w Twoim języku.

Leverage AI coding assistants to design, implement, and optimize NoSQL database solutions across document stores, key-value stores, and graph databases.

Document Store Patterns (MongoDB, DynamoDB)

Section titled “Document Store Patterns (MongoDB, DynamoDB)”

AI-Driven Document Modeling

The Pattern: Use AI to design flexible document schemas that balance denormalization with data consistency.

// Prompt to AI Assistant
"Design a MongoDB schema for an e-commerce platform with:
- User profiles with addresses and payment methods
- Product catalog with variants and inventory
- Orders with real-time status tracking
- Shopping cart with session management
Consider read/write patterns and denormalization trade-offs"
// AI generates comprehensive schema:
// User collection with embedded documents
const userSchema = {
_id: ObjectId(),
email: "user@example.com",
profile: {
firstName: "John",
lastName: "Doe",
phone: "+1234567890"
},
addresses: [
{
_id: ObjectId(),
type: "shipping",
street: "123 Main St",
city: "New York",
country: "USA",
isDefault: true
}
],
paymentMethods: [
{
_id: ObjectId(),
type: "card",
last4: "1234",
expiryMonth: 12,
expiryYear: 2025,
isDefault: true
}
],
cart: {
items: [
{
productId: ObjectId(),
variantId: ObjectId(),
quantity: 2,
price: 29.99,
addedAt: ISODate()
}
],
updatedAt: ISODate()
}
};
// Product collection with rich attributes
const productSchema = {
_id: ObjectId(),
sku: "PROD-001",
name: "Wireless Headphones",
slug: "wireless-headphones-black",
description: "Premium noise-cancelling headphones",
category: ["Electronics", "Audio", "Headphones"],
// Denormalized for performance
brand: {
_id: ObjectId(),
name: "AudioTech",
logo: "https://..."
},
// Variants as subdocuments
variants: [
{
_id: ObjectId(),
color: "Black",
price: 299.99,
compareAtPrice: 349.99,
inventory: {
available: 150,
reserved: 10,
warehouse: {
"NYC": 50,
"LA": 100
}
},
images: ["url1", "url2"]
}
],
// Aggregated review data
reviews: {
count: 245,
average: 4.5,
distribution: {
5: 150,
4: 60,
3: 20,
2: 10,
1: 5
}
}
};

Benefits:

  • Optimized for read-heavy operations
  • Minimizes joins through strategic denormalization
  • Supports atomic updates on embedded documents
  • Scales horizontally with sharding
// Ask AI to generate aggregation pipeline
"Create MongoDB aggregation pipeline to:
1. Calculate monthly revenue by category
2. Include year-over-year growth
3. Filter for active products only
4. Sort by revenue descending"
// AI generates optimized pipeline:
const pipeline = [
{
$match: {
status: "active",
"orders.createdAt": {
$gte: ISODate("2024-01-01"),
$lt: ISODate("2025-01-01")
}
}
},
{
$unwind: "$orders"
},
{
$group: {
_id: {
category: "$category",
month: { $month: "$orders.createdAt" },
year: { $year: "$orders.createdAt" }
},
revenue: { $sum: "$orders.total" },
orderCount: { $sum: 1 }
}
},
{
$lookup: {
from: "orders",
let: {
category: "$_id.category",
prevYear: { $subtract: ["$_id.year", 1] },
month: "$_id.month"
},
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ["$category", "$$category"] },
{ $eq: [{ $year: "$createdAt" }, "$$prevYear"] },
{ $eq: [{ $month: "$createdAt" }, "$$month"] }
]
}
}
},
{
$group: {
_id: null,
prevRevenue: { $sum: "$total" }
}
}
],
as: "previousYear"
}
},
{
$addFields: {
yoyGrowth: {
$cond: {
if: { $gt: [{ $size: "$previousYear" }, 0] },
then: {
$multiply: [
{
$divide: [
{ $subtract: ["$revenue", { $first: "$previousYear.prevRevenue" }] },
{ $first: "$previousYear.prevRevenue" }
]
},
100
]
},
else: null
}
}
}
},
{
$sort: { revenue: -1 }
}
];

Key-Value Store Patterns (Redis, DynamoDB)

Section titled “Key-Value Store Patterns (Redis, DynamoDB)”

Intelligent Cache Management

# Request caching pattern from AI
"Implement Redis caching layer with:
- Cache-aside pattern for user sessions
- Write-through for product catalog
- TTL management based on data type
- Cache warming strategies
- Invalidation on updates"
# AI generates comprehensive caching solution:
import redis
import json
from datetime import datetime, timedelta
from functools import wraps
import hashlib
class IntelligentCache:
def __init__(self, redis_client):
self.redis = redis_client
self.ttl_config = {
'user_session': 3600, # 1 hour
'product_detail': 300, # 5 minutes
'category_list': 86400, # 24 hours
'search_results': 60, # 1 minute
'user_cart': 7200 # 2 hours
}
def cache_aside(self, key_prefix, ttl_type='default'):
"""Decorator for cache-aside pattern"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_key(key_prefix, args, kwargs)
# Try to get from cache
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Get from source
result = await func(*args, **kwargs)
# Store in cache with appropriate TTL
ttl = self.ttl_config.get(ttl_type, 300)
await self.redis.setex(
cache_key,
ttl,
json.dumps(result)
)
return result
return wrapper
return decorator
def write_through(self, key_prefix, ttl_type='default'):
"""Decorator for write-through caching"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Execute the write operation
result = await func(*args, **kwargs)
# Update cache immediately
cache_key = self._generate_key(key_prefix, args, kwargs)
ttl = self.ttl_config.get(ttl_type, 300)
await self.redis.setex(
cache_key,
ttl,
json.dumps(result)
)
# Invalidate related caches
await self._invalidate_related(key_prefix, result)
return result
return wrapper
return decorator
async def warm_cache(self, warming_queries):
"""Pre-populate cache with frequently accessed data"""
for query in warming_queries:
key = query['key']
fetcher = query['fetcher']
ttl_type = query.get('ttl_type', 'default')
try:
data = await fetcher()
ttl = self.ttl_config.get(ttl_type, 300)
await self.redis.setex(key, ttl, json.dumps(data))
except Exception as e:
print(f"Cache warming failed for {key}: {e}")
def _generate_key(self, prefix, args, kwargs):
"""Generate consistent cache keys"""
key_data = f"{prefix}:{args}:{sorted(kwargs.items())}"
return hashlib.md5(key_data.encode()).hexdigest()
async def _invalidate_related(self, prefix, data):
"""Invalidate related cache entries"""
patterns = {
'product_update': ['category:*', 'search:*'],
'user_update': ['session:*', 'cart:*'],
'order_create': ['inventory:*', 'user_orders:*']
}
if prefix in patterns:
for pattern in patterns[prefix]:
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
# Usage example with AI-suggested patterns
cache = IntelligentCache(redis_client)
@cache.cache_aside('product', ttl_type='product_detail')
async def get_product(product_id):
# Fetch from database
return await db.products.find_one({'_id': product_id})
@cache.write_through('product', ttl_type='product_detail')
async def update_product(product_id, updates):
# Update database and cache
return await db.products.update_one(
{'_id': product_id},
{'$set': updates}
)

Single Table Design

// Request DynamoDB design from AI
"Design single-table DynamoDB schema for multi-tenant SaaS:
- Users, Organizations, Projects, Tasks
- Access patterns: by user, by org, by project
- GSIs for query flexibility
- Efficient pagination"
// AI provides comprehensive design:
const tableDesign = {
TableName: 'SaaSPlatform',
// Primary key design
PartitionKey: 'PK',
SortKey: 'SK',
// Entity patterns
entities: {
Organization: {
PK: 'ORG#${orgId}',
SK: 'ORG#${orgId}',
GSI1PK: 'ORG#${orgId}',
GSI1SK: 'METADATA'
},
User: {
PK: 'USER#${userId}',
SK: 'USER#${userId}',
GSI1PK: 'ORG#${orgId}',
GSI1SK: 'USER#${userId}',
GSI2PK: 'EMAIL#${email}',
GSI2SK: 'USER#${userId}'
},
Project: {
PK: 'ORG#${orgId}',
SK: 'PROJECT#${projectId}',
GSI1PK: 'PROJECT#${projectId}',
GSI1SK: 'METADATA',
GSI2PK: 'USER#${ownerId}',
GSI2SK: 'PROJECT#${projectId}'
},
Task: {
PK: 'PROJECT#${projectId}',
SK: 'TASK#${taskId}',
GSI1PK: 'USER#${assigneeId}',
GSI1SK: 'TASK#${dueDate}#${taskId}',
GSI2PK: 'ORG#${orgId}',
GSI2SK: 'TASK#${status}#${taskId}'
}
},
// Access patterns implementation
accessPatterns: {
// Get organization with all users
getOrgWithUsers: {
operation: 'Query',
index: 'GSI1',
keyCondition: 'GSI1PK = :orgId AND begins_with(GSI1SK, "USER#")',
example: `
const params = {
TableName: 'SaaSPlatform',
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :pk AND begins_with(GSI1SK, :sk)',
ExpressionAttributeValues: {
':pk': 'ORG#123',
':sk': 'USER#'
}
};
`
},
// Get user's tasks across all projects
getUserTasks: {
operation: 'Query',
index: 'GSI1',
keyCondition: 'GSI1PK = :userId AND begins_with(GSI1SK, "TASK#")',
example: `
const params = {
TableName: 'SaaSPlatform',
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :pk AND GSI1SK >= :start',
ExpressionAttributeValues: {
':pk': 'USER#456',
':start': 'TASK#2024-01-01'
},
ScanIndexForward: true // Sort by due date
};
`
}
}
};
// Batch operations helper
class DynamoDBHelper {
async batchWrite(items) {
const chunks = this.chunkArray(items, 25); // DynamoDB limit
for (const chunk of chunks) {
const params = {
RequestItems: {
'SaaSPlatform': chunk.map(item => ({
PutRequest: { Item: item }
}))
}
};
await this.dynamoDb.batchWrite(params).promise();
}
}
async pagedQuery(params, pageSize = 100) {
const results = [];
let lastEvaluatedKey = null;
do {
const queryParams = {
...params,
Limit: pageSize,
...(lastEvaluatedKey && { ExclusiveStartKey: lastEvaluatedKey })
};
const response = await this.dynamoDb.query(queryParams).promise();
results.push(...response.Items);
lastEvaluatedKey = response.LastEvaluatedKey;
} while (lastEvaluatedKey);
return results;
}
}

Graph Database Patterns (Neo4j, Amazon Neptune)

Section titled “Graph Database Patterns (Neo4j, Amazon Neptune)”

AI-Powered Graph Design

// Request graph model from AI
"Design Neo4j graph model for social network with:
- Users, Posts, Comments, Reactions
- Friend relationships with request status
- Content recommendations based on connections
- Fraud detection patterns"
// AI generates comprehensive Cypher schemas:
// Node definitions
CREATE CONSTRAINT user_id IF NOT EXISTS
ON (u:User) ASSERT u.id IS UNIQUE;
CREATE CONSTRAINT post_id IF NOT EXISTS
ON (p:Post) ASSERT p.id IS UNIQUE;
// User nodes with properties
CREATE (u:User {
id: 'user123',
username: 'johndoe',
email: 'john@example.com',
created: datetime(),
reputation: 100,
verified: true
})
// Relationship patterns
// Friendship with status
CREATE (u1:User)-[:FRIEND_REQUEST {
sent: datetime(),
status: 'pending',
message: 'Hey, let\'s connect!'
}]->(u2:User)
// After acceptance
MATCH (u1:User)-[r:FRIEND_REQUEST]->(u2:User)
WHERE r.status = 'pending'
CREATE (u1)-[:FRIENDS {since: datetime()}]->(u2)
CREATE (u2)-[:FRIENDS {since: datetime()}]->(u1)
DELETE r
// Content creation and interactions
CREATE (u:User)-[:POSTED {
timestamp: datetime(),
device: 'mobile',
location: point({latitude: 40.7128, longitude: -74.0060})
}]->(p:Post {
id: 'post456',
content: 'Check out this graph database!',
tags: ['neo4j', 'graphdb', 'nosql'],
visibility: 'public'
})
// Complex recommendation query
MATCH (user:User {id: $userId})-[:FRIENDS]-(friend:User)
MATCH (friend)-[:LIKED|SHARED]->(content:Post)
WHERE NOT (user)-[:VIEWED]->(content)
AND content.created > datetime() - duration('P7D')
WITH content, COUNT(DISTINCT friend) as friendInteractions
MATCH (content)<-[:POSTED]-(author:User)
OPTIONAL MATCH (content)-[:TAGGED_WITH]->(tag:Tag)<-[:INTERESTED_IN]-(user)
RETURN content, author,
friendInteractions,
COUNT(tag) as relevantTags
ORDER BY friendInteractions DESC, relevantTags DESC
LIMIT 10
// AI-assisted community detection
"Implement community detection for:
- Finding user groups with similar interests
- Detecting echo chambers
- Identifying influencers
Using Neo4j Graph Data Science"
// AI generates GDS implementation:
// Create in-memory graph projection
CALL gds.graph.project(
'social-network',
['User', 'Post', 'Tag'],
{
FRIENDS: {orientation: 'UNDIRECTED'},
LIKED: {orientation: 'NATURAL'},
POSTED: {orientation: 'NATURAL'},
INTERESTED_IN: {orientation: 'NATURAL'}
}
)
// Run Louvain community detection
CALL gds.louvain.stream('social-network', {
relationshipWeightProperty: 'weight',
includeIntermediateCommunities: true,
concurrency: 4
})
YIELD nodeId, communityId, intermediateCommunityIds
WITH gds.util.asNode(nodeId) AS user,
communityId,
intermediateCommunityIds
WHERE user:User
SET user.communityId = communityId
// Analyze communities
MATCH (u:User)
WITH u.communityId as community, COUNT(u) as size
WHERE size > 10
MATCH (member:User {communityId: community})
OPTIONAL MATCH (member)-[:INTERESTED_IN]->(tag:Tag)
WITH community, size, COLLECT(DISTINCT tag.name) as interests
RETURN community, size, interests[0..5] as topInterests
ORDER BY size DESC

AI-Enhanced Vector Search

# Request vector search implementation
"Implement vector search system using MongoDB Atlas with:
- Multimodal embeddings (text + image)
- Hybrid search (vector + metadata filtering)
- Semantic product recommendations
- Similar user detection"
# AI generates comprehensive solution:
import numpy as np
from sentence_transformers import SentenceTransformer
from pymongo import MongoClient
import torch
from PIL import Image
class VectorSearchSystem:
def __init__(self, connection_string):
self.client = MongoClient(connection_string)
self.db = self.client.ecommerce
self.text_model = SentenceTransformer('all-MiniLM-L6-v2')
self.image_model = SentenceTransformer('clip-ViT-B-32')
async def index_product(self, product):
"""Generate and store multimodal embeddings"""
# Text embedding from description
text_embedding = self.text_model.encode(
f"{product['name']} {product['description']} {' '.join(product['categories'])}"
).tolist()
# Image embeddings
image_embeddings = []
for image_url in product['images']:
image = Image.open(image_url)
img_embedding = self.image_model.encode(image).tolist()
image_embeddings.append(img_embedding)
# Average image embeddings
if image_embeddings:
avg_image_embedding = np.mean(image_embeddings, axis=0).tolist()
else:
avg_image_embedding = None
# Update product with embeddings
await self.db.products.update_one(
{'_id': product['_id']},
{
'$set': {
'text_embedding': text_embedding,
'image_embedding': avg_image_embedding,
'indexed_at': datetime.utcnow()
}
}
)
async def hybrid_search(self, query, filters=None, limit=10):
"""Combine vector search with metadata filtering"""
# Generate query embedding
query_embedding = self.text_model.encode(query).tolist()
# Build aggregation pipeline
pipeline = []
# Pre-filter stage (if filters provided)
if filters:
pipeline.append({'$match': filters})
# Vector search stage
pipeline.extend([
{
'$vectorSearch': {
'index': 'product_embeddings',
'path': 'text_embedding',
'queryVector': query_embedding,
'numCandidates': limit * 10,
'limit': limit
}
},
{
'$addFields': {
'search_score': {'$meta': 'vectorSearchScore'}
}
}
])
# Post-processing
pipeline.extend([
{
'$lookup': {
'from': 'reviews',
'localField': '_id',
'foreignField': 'productId',
'as': 'reviews'
}
},
{
'$addFields': {
'avgRating': {'$avg': '$reviews.rating'},
'reviewCount': {'$size': '$reviews'}
}
},
{
'$project': {
'text_embedding': 0,
'image_embedding': 0,
'reviews': 0
}
}
])
results = await self.db.products.aggregate(pipeline).to_list(limit)
return results
async def find_similar_users(self, user_id, threshold=0.8):
"""Find users with similar preferences using vector similarity"""
# Get user's interaction embeddings
user = await self.db.users.find_one({'_id': user_id})
if not user.get('preference_embedding'):
# Generate preference embedding from user's interactions
preference_embedding = await self._calculate_user_preferences(user_id)
await self.db.users.update_one(
{'_id': user_id},
{'$set': {'preference_embedding': preference_embedding}}
)
else:
preference_embedding = user['preference_embedding']
# Find similar users
similar_users = await self.db.users.aggregate([
{
'$vectorSearch': {
'index': 'user_preferences',
'path': 'preference_embedding',
'queryVector': preference_embedding,
'numCandidates': 100,
'limit': 20
}
},
{
'$match': {
'_id': {'$ne': user_id},
'search_score': {'$gte': threshold}
}
},
{
'$project': {
'username': 1,
'similarity': {'$meta': 'vectorSearchScore'},
'common_interests': 1
}
}
]).to_list(20)
return similar_users
# Create vector search indexes
async def setup_indexes(db):
# Product text embedding index
await db.command({
'createSearchIndex': {
'name': 'product_embeddings',
'definition': {
'mappings': {
'dynamic': True,
'fields': {
'text_embedding': {
'type': 'knnVector',
'dimensions': 384,
'similarity': 'cosine'
},
'categories': {
'type': 'string',
'analyzer': 'lucene.standard'
},
'price': {
'type': 'number'
}
}
}
}
}
})

Index Strategy

"Optimize MongoDB indexes for:
- High-cardinality queries
- Compound index selection
- Index intersection
- Covered queries"
// AI suggests optimal indexes:
// Analyze query patterns first
db.setProfilingLevel(2);
// Review slow queries
db.system.profile.aggregate([
{ $match: { millis: { $gt: 100 } } },
{ $group: {
_id: "$command.filter",
count: { $sum: 1 },
avgMillis: { $avg: "$millis" }
}},
{ $sort: { avgMillis: -1 } }
]);
// Create strategic indexes
db.orders.createIndex(
{ userId: 1, status: 1, createdAt: -1 },
{ name: "user_orders_idx" }
);

Sharding Strategy

"Design sharding strategy for:
- 100M+ documents
- Geographic distribution
- Time-series data
- Even distribution"
// AI recommends:
sh.enableSharding("analytics");
// Hashed sharding for even distribution
sh.shardCollection(
"analytics.events",
{ userId: "hashed" }
);
// Range sharding for time-series
sh.shardCollection(
"analytics.metrics",
{ timestamp: 1, source: 1 }
);

Connection Pooling

"Optimize connection pooling for:
- High concurrency
- Failover handling
- Read preference
- Connection limits"
# AI configures optimal pooling:
client = MongoClient(
connection_string,
maxPoolSize=100,
minPoolSize=10,
maxIdleTimeMS=30000,
waitQueueTimeoutMS=5000,
retryWrites=True,
retryReads=True,
readPreference='secondaryPreferred',
readConcernLevel='majority',
w='majority',
wtimeout=5000
)

Memory Management

"Configure WiredTiger cache:
- Working set analysis
- Cache size optimization
- Compression settings
- Checkpoint intervals"
// AI provides configuration:
// mongod.conf
storage:
wiredTiger:
engineConfig:
cacheSizeGB: 8
journalCompressor: snappy
directoryForIndexes: true
collectionConfig:
blockCompressor: zstd
indexConfig:
prefixCompression: true

RDBMS to NoSQL Migration

// Request migration strategy from AI
"Migrate relational database to MongoDB:
- Denormalize foreign keys
- Handle transactions
- Preserve data integrity
- Optimize for NoSQL patterns"
// AI provides migration framework:
class RDBMSToNoSQLMigrator {
async migrateSchema(sqlSchema) {
const collections = {};
// Analyze foreign keys for embedding vs referencing
for (const table of sqlSchema.tables) {
const relationships = this.analyzeRelationships(table);
if (relationships.oneToFew) {
// Embed related documents
collections[table.name] = {
...table.columns,
[relationships.related]: {
type: 'embedded',
source: relationships.relatedTable
}
};
} else if (relationships.oneToMany) {
// Use references with denormalized data
collections[table.name] = {
...table.columns,
[relationships.related]: {
type: 'reference',
denormalized: ['name', 'status']
}
};
}
}
return collections;
}
async migrateData(source, target, batchSize = 1000) {
// Stream data with transformations
const cursor = source.query('SELECT * FROM users');
const batch = [];
for await (const row of cursor) {
const doc = await this.transformRow(row);
batch.push(doc);
if (batch.length >= batchSize) {
await target.insertMany(batch);
batch.length = 0;
}
}
// Insert remaining
if (batch.length > 0) {
await target.insertMany(batch);
}
}
async verifyMigration(source, target) {
// Compare counts
const sourceCount = await source.count();
const targetCount = await target.countDocuments();
// Sample data verification
const samples = await source.query(
'SELECT * FROM users ORDER BY RANDOM() LIMIT 100'
);
for (const sample of samples) {
const doc = await target.findOne({ _id: sample.id });
this.validateTransformation(sample, doc);
}
}
}

NoSQL Development with AI

Key Takeaways:

  1. Schema Design: Let AI help you balance between embedding and referencing based on access patterns
  2. Query Optimization: Use AI to generate complex aggregation pipelines and optimize existing queries
  3. Scaling Strategies: Leverage AI for sharding key selection and index recommendations
  4. Migration Planning: AI can analyze relational schemas and suggest optimal NoSQL structures
  5. Performance Tuning: Use AI to identify bottlenecks and suggest configuration improvements

Remember:

  • NoSQL flexibility doesn’t mean no planning - use AI to design thoughtfully
  • Monitor query patterns and let AI suggest optimizations
  • Test AI-generated queries with production-like data volumes
  • Keep security in mind - validate all AI-suggested queries
  • Document your schema decisions for team understanding