Complex Transformations
AI helps write and optimize data transformation logic across formats
Data pipelines form the backbone of modern data-driven applications. This lesson demonstrates how Cursor IDE’s AI capabilities transform complex ETL processes, stream processing, and data engineering tasks into manageable, efficient workflows.
Traditional data pipeline development involves juggling multiple technologies, handling complex transformations, and ensuring data quality. AI assistance makes these tasks more approachable by generating boilerplate code, suggesting optimizations, and helping debug data flow issues.
Complex Transformations
AI helps write and optimize data transformation logic across formats
Error Handling
AI generates robust error handling and recovery mechanisms
Performance Tuning
AI suggests optimizations for large-scale data processing
Schema Evolution
AI assists with schema versioning and migration strategies
Define Pipeline Requirements
# Ask AI to design pipeline architecture"Design an ETL pipeline that:- Ingests data from PostgreSQL, REST APIs, and CSV files- Transforms and validates data- Loads into a data warehouse (Snowflake)- Handles incremental updates- Includes error recovery and monitoring"
Generate Pipeline Scaffold
# AI creates pipeline structurefrom dataclasses import dataclassfrom typing import Any, Dict, Listimport asynciofrom abc import ABC, abstractmethod
@dataclassclass PipelineConfig: source_configs: Dict[str, Dict[str, Any]] transformation_rules: List[Dict[str, Any]] destination_config: Dict[str, Any] error_handling: Dict[str, Any]
class DataPipeline(ABC): def __init__(self, config: PipelineConfig): self.config = config self.setup_monitoring()
@abstractmethod async def extract(self) -> Any: pass
@abstractmethod async def transform(self, data: Any) -> Any: pass
@abstractmethod async def load(self, data: Any) -> None: pass
Implement Data Sources
# AI implements various data source connectors"Create connectors for:- PostgreSQL with connection pooling- REST API with rate limiting- CSV with schema inference- Include retry logic and error handling"
# AI creates database extractor"Implement PostgreSQL extractor with:- Connection pooling- Incremental extraction using timestamps- Parallel table extraction- Memory-efficient streaming"
import asyncpgfrom contextlib import asynccontextmanager
class PostgreSQLExtractor: def __init__(self, connection_string: str, batch_size: int = 10000): self.connection_string = connection_string self.batch_size = batch_size self.pool = None
async def initialize(self): self.pool = await asyncpg.create_pool( self.connection_string, min_size=5, max_size=20, command_timeout=60 )
async def extract_incremental(self, table: str, timestamp_column: str, last_extracted: datetime) -> AsyncGenerator: async with self.pool.acquire() as conn: query = f""" SELECT * FROM {table} WHERE {timestamp_column} > $1 ORDER BY {timestamp_column} """
async with conn.transaction(): async for row in conn.cursor(query, last_extracted): yield dict(row)
# AI implements API extractor"Create REST API extractor with:- Rate limiting and backoff- Pagination handling- OAuth authentication- Response caching"
import aiohttpfrom tenacity import retry, stop_after_attempt, wait_exponential
class APIExtractor: def __init__(self, base_url: str, auth_config: Dict[str, Any]): self.base_url = base_url self.auth_config = auth_config self.rate_limiter = AsyncLimiter(10, 60) # 10 requests per minute
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def fetch_paginated(self, endpoint: str, params: Dict = None): async with aiohttp.ClientSession() as session: all_data = [] next_page = f"{self.base_url}/{endpoint}"
while next_page: async with self.rate_limiter: async with session.get( next_page, params=params, headers=await self._get_auth_headers() ) as response: response.raise_for_status() data = await response.json()
all_data.extend(data.get('results', [])) next_page = data.get('next')
return all_data
# AI creates file extractor"Implement file extractor for:- Large CSV files with streaming- Schema inference and validation- Multiple file formats (CSV, JSON, Parquet)- S3/Azure/GCS support"
import pandas as pdfrom typing import Iterator
class FileExtractor: def __init__(self, chunk_size: int = 10000): self.chunk_size = chunk_size self.schema_cache = {}
def extract_csv_streaming(self, filepath: str, infer_schema: bool = True) -> Iterator[pd.DataFrame]: # Infer schema from first chunk if needed if infer_schema and filepath not in self.schema_cache: sample = pd.read_csv(filepath, nrows=1000) self.schema_cache[filepath] = self._infer_schema(sample)
# Stream file in chunks for chunk in pd.read_csv(filepath, chunksize=self.chunk_size, dtype=self.schema_cache.get(filepath)): yield self._validate_chunk(chunk)
def _infer_schema(self, df: pd.DataFrame) -> Dict[str, str]: # AI helps with intelligent schema inference return {col: str(dtype) for col, dtype in df.dtypes.items()}
# AI implements transformation framework"Create a transformation framework that supports:- Column mapping and renaming- Data type conversions- Calculated fields- Data validation rules- Error record handling"
class TransformationEngine: def __init__(self, rules: List[TransformationRule]): self.rules = rules self.error_handler = ErrorHandler()
async def transform_batch(self, data: pd.DataFrame) -> TransformResult: transformed = data.copy() errors = []
for rule in self.rules: try: transformed = await rule.apply(transformed) except TransformationError as e: errors.extend(self.error_handler.handle(e, data))
return TransformResult( success_data=transformed, error_records=errors, stats=self._calculate_stats(data, transformed) )
# AI generates specific transformationsclass DateTimeTransformation(TransformationRule): def __init__(self, columns: List[str], target_format: str): self.columns = columns self.target_format = target_format
async def apply(self, df: pd.DataFrame) -> pd.DataFrame: for col in self.columns: if col in df.columns: df[col] = pd.to_datetime(df[col]).dt.strftime(self.target_format) return df
# AI implements Kafka stream processor"Create Kafka stream processor with:- Consumer groups for scaling- Exactly-once semantics- Windowed aggregations- State management"
from aiokafka import AIOKafkaConsumer, AIOKafkaProducerfrom aiokafka.helpers import create_ssl_contextimport json
class KafkaStreamProcessor: def __init__(self, config: KafkaConfig): self.config = config self.consumer = None self.producer = None self.state_store = StateStore()
async def start(self): self.consumer = AIOKafkaConsumer( *self.config.topics, bootstrap_servers=self.config.brokers, group_id=self.config.consumer_group, enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('utf-8')) )
self.producer = AIOKafkaProducer( bootstrap_servers=self.config.brokers, value_serializer=lambda v: json.dumps(v).encode('utf-8') )
await self.consumer.start() await self.producer.start()
async def process_messages(self): async for msg in self.consumer: try: # Process message result = await self.transform_message(msg.value)
# Send to output topic await self.producer.send( self.config.output_topic, value=result, key=msg.key )
# Commit offset only after successful processing await self.consumer.commit()
except Exception as e: await self.handle_error(msg, e)
# AI creates Spark streaming job"Implement Spark structured streaming for:- Real-time aggregations- Join with static data- Watermarking for late data- Checkpointing for fault tolerance"
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import window, col, count, avg
class SparkStreamProcessor: def __init__(self, app_name: str): self.spark = SparkSession.builder \ .appName(app_name) \ .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \ .getOrCreate()
def create_streaming_pipeline(self): # Read from Kafka df = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load()
# Parse JSON and transform events = df.select( col("key").cast("string"), from_json(col("value").cast("string"), self.schema).alias("data") ).select("key", "data.*")
# Windowed aggregation with watermark aggregated = events \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(col("timestamp"), "5 minutes", "1 minute"), col("category") ) \ .agg( count("*").alias("event_count"), avg("value").alias("avg_value") )
# Write results query = aggregated \ .writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "/data/aggregated") \ .trigger(processingTime='30 seconds') \ .start()
return query
# AI implements custom stream processor"Build custom stream processor with:- Multiple input sources- Complex event processing- Stateful transformations- Output to multiple sinks"
import asynciofrom collections import defaultdictfrom datetime import datetime, timedelta
class CustomStreamProcessor: def __init__(self): self.sources = [] self.sinks = [] self.transformations = [] self.state = defaultdict(dict) self.windows = defaultdict(list)
async def process_stream(self): # Create tasks for all sources source_tasks = [ self.consume_source(source) for source in self.sources ]
# Process events concurrently await asyncio.gather(*source_tasks)
async def consume_source(self, source): async for event in source.stream(): # Apply transformations transformed = event for transform in self.transformations: transformed = await transform.apply(transformed, self.state)
# Handle windowing if transformed.requires_windowing: await self.add_to_window(transformed)
# Send to sinks await self.emit_to_sinks(transformed)
async def add_to_window(self, event): window_key = self.get_window_key(event) self.windows[window_key].append(event)
# Check if window is complete if self.is_window_complete(window_key): aggregated = await self.aggregate_window(window_key) await self.emit_to_sinks(aggregated) del self.windows[window_key]
# AI creates comprehensive data quality framework"Design data quality framework with:- Schema validation- Business rule validation- Anomaly detection- Data profiling- Quality metrics and reporting"
class DataQualityEngine: def __init__(self): self.validators = [] self.profiler = DataProfiler() self.anomaly_detector = AnomalyDetector()
def add_validator(self, validator: DataValidator): self.validators.append(validator)
async def validate_batch(self, data: pd.DataFrame) -> QualityReport: report = QualityReport()
# Run all validators for validator in self.validators: result = await validator.validate(data) report.add_result(result)
# Profile data profile = await self.profiler.profile(data) report.add_profile(profile)
# Detect anomalies anomalies = await self.anomaly_detector.detect(data) report.add_anomalies(anomalies)
return report
# AI implements specific validatorsclass SchemaValidator(DataValidator): def __init__(self, expected_schema: Dict[str, type]): self.expected_schema = expected_schema
async def validate(self, data: pd.DataFrame) -> ValidationResult: errors = []
# Check columns missing_cols = set(self.expected_schema.keys()) - set(data.columns) if missing_cols: errors.append(f"Missing columns: {missing_cols}")
# Check data types for col, expected_type in self.expected_schema.items(): if col in data.columns: actual_type = data[col].dtype if not self._compatible_types(actual_type, expected_type): errors.append(f"Column {col}: expected {expected_type}, got {actual_type}")
return ValidationResult( passed=len(errors) == 0, errors=errors, stats=self._calculate_stats(data) )
Lineage Tracking
Track data flow from source to destination with full audit trail
Performance Metrics
Monitor pipeline throughput, latency, and resource usage
Error Tracking
Capture and analyze pipeline errors with root cause analysis
Data Freshness
Monitor data timeliness and alert on stale data
# AI implements CDC pipeline"Create CDC pipeline that:- Captures database changes in real-time- Handles schema evolution- Maintains ordering guarantees- Supports multiple CDC sources"
class CDCPipeline: def __init__(self, source_config: CDCSourceConfig): self.source_config = source_config self.schema_registry = SchemaRegistry() self.state_store = CDCStateStore()
async def start_cdc(self): # Initialize CDC connection cdc_stream = await self.connect_to_source()
async for change_event in cdc_stream: try: # Handle schema changes if change_event.is_ddl: await self.handle_schema_change(change_event)
# Process data changes else: transformed = await self.transform_change_event(change_event) await self.emit_change(transformed)
# Update checkpoint await self.state_store.update_position(change_event.position)
except Exception as e: await self.handle_cdc_error(change_event, e)
# AI creates batch processing layer"Implement batch layer for data lake:- Partitioned storage (year/month/day)- Format optimization (Parquet/ORC)- Compression strategies- Metadata management"
class BatchLayer: def __init__(self, storage_config: StorageConfig): self.storage = self._create_storage_client(storage_config) self.metadata_store = MetadataStore()
async def process_batch(self, data: pd.DataFrame, partition_keys: List[str]) -> BatchResult: # Optimize data format optimized = self.optimize_for_storage(data)
# Calculate partitions partitions = self.calculate_partitions(optimized, partition_keys)
# Write partitioned data write_results = [] for partition, partition_data in partitions.items(): path = self.generate_path(partition) result = await self.write_optimized( partition_data, path, compression='snappy' ) write_results.append(result)
# Update metadata await self.metadata_store.register_batch(write_results)
return BatchResult(write_results)
# AI implements speed layer"Create speed layer for real-time analytics:- In-memory caching- Real-time aggregations- Low-latency queries- Merge with batch results"
class SpeedLayer: def __init__(self): self.cache = RedisCache() self.aggregation_engine = AggregationEngine() self.query_engine = QueryEngine()
async def process_real_time(self, event: Event): # Update real-time aggregations await self.aggregation_engine.update(event)
# Cache recent data await self.cache.add(event, ttl=3600)
# Update materialized views await self.update_views(event)
async def query(self, query: Query) -> QueryResult: # Check if query needs real-time data if query.requires_real_time: # Merge batch and real-time results batch_data = await self.query_batch_layer(query) real_time_data = await self.query_cache(query) return self.merge_results(batch_data, real_time_data)
return await self.query_batch_layer(query)
# AI creates Airflow DAG"Generate Airflow DAG for:- Daily ETL pipeline- Dependencies between tasks- Error handling and retries- SLA monitoring- Dynamic task generation"
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperatorfrom datetime import datetime, timedelta
default_args = { 'owner': 'data-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5)}
dag = DAG( 'data_pipeline', default_args=default_args, description='Daily ETL pipeline', schedule_interval='@daily', catchup=False, tags=['etl', 'production'])
# AI generates task definitionsdef extract_data(**context): # Extract logic here pass
def validate_data(**context): # Validation logic here pass
def transform_data(**context): # Transformation logic here pass
# Define tasksextract_task = PythonOperator( task_id='extract_data', python_callable=extract_data, dag=dag)
validate_task = PythonOperator( task_id='validate_data', python_callable=validate_data, dag=dag)
transform_task = DatabricksSubmitRunOperator( task_id='transform_data', databricks_conn_id='databricks_default', notebook_task={ 'notebook_path': '/Repos/etl/transform', 'base_parameters': { 'date': '{{ ds }}' } }, dag=dag)
# Set dependenciesextract_task >> validate_task >> transform_task
Idempotency
Ensure pipelines can be safely re-run without duplicating data
Incremental Processing
Process only new or changed data to improve efficiency
Error Recovery
Implement comprehensive error handling and recovery mechanisms
Monitoring First
Build monitoring and observability from the start
DevOps for Data
Learn infrastructure automation for data pipelines
Monitoring Pipelines
Implement comprehensive pipeline monitoring
Architecture Patterns
Explore advanced data architecture patterns