Złożone transformacje
AI pomaga pisać i optymalizować logikę transformacji danych w różnych formatach
Pipeline’y danych stanowią szkielet nowoczesnych aplikacji opartych na danych. Ta lekcja pokazuje, jak możliwości AI w Cursor przekształcają złożone procesy ETL, przetwarzanie strumieni i zadania inżynierii danych w zarządzalne, wydajne przepływy pracy.
Tradycyjne tworzenie pipeline’ów danych wymaga żonglowania wieloma technologiami, obsługi złożonych transformacji i zapewniania jakości danych. Wsparcie AI czyni te zadania bardziej przystępnymi, generując kod podstawowy, sugerując optymalizacje i pomagając debugować problemy z przepływem danych.
Złożone transformacje
AI pomaga pisać i optymalizować logikę transformacji danych w różnych formatach
Obsługa błędów
AI generuje solidne mechanizmy obsługi błędów i odzyskiwania
Dostrajanie wydajności
AI sugeruje optymalizacje dla przetwarzania danych na dużą skalę
Ewolucja schematów
AI pomaga w strategiach wersjonowania i migracji schematów
Definiowanie wymagań pipeline’u
# Poproś AI o zaprojektowanie architektury pipeline'u"Zaprojektuj pipeline ETL, który:- Pobiera dane z PostgreSQL, REST API i plików CSV- Transformuje i waliduje dane- Ładuje do hurtowni danych (Snowflake)- Obsługuje aktualizacje przyrostowe- Zawiera odzyskiwanie błędów i monitoring"
Generowanie szkieletu pipeline’u
# AI tworzy strukturę pipeline'ufrom dataclasses import dataclassfrom typing import Any, Dict, Listimport asynciofrom abc import ABC, abstractmethod
@dataclassclass PipelineConfig: source_configs: Dict[str, Dict[str, Any]] transformation_rules: List[Dict[str, Any]] destination_config: Dict[str, Any] error_handling: Dict[str, Any]
class DataPipeline(ABC): def __init__(self, config: PipelineConfig): self.config = config self.setup_monitoring()
@abstractmethod async def extract(self) -> Any: pass
@abstractmethod async def transform(self, data: Any) -> Any: pass
@abstractmethod async def load(self, data: Any) -> None: pass
Implementacja źródeł danych
# AI implementuje różne łączniki źródeł danych"Stwórz łączniki dla:- PostgreSQL z poolingiem połączeń- REST API z ograniczaniem szybkości- CSV z wnioskowaniem schematów- Zawrzyj logikę ponownych prób i obsługę błędów"
# AI tworzy ekstraktor bazy danych"Implementuj ekstraktor PostgreSQL z:- Poolingiem połączeń- Ekstrakcją przyrostową używając znaczników czasu- Równoległą ekstrakcją tabel- Efektywnym streamingiem pamięci"
import asyncpgfrom contextlib import asynccontextmanager
class PostgreSQLExtractor: def __init__(self, connection_string: str, batch_size: int = 10000): self.connection_string = connection_string self.batch_size = batch_size self.pool = None
async def initialize(self): self.pool = await asyncpg.create_pool( self.connection_string, min_size=5, max_size=20, command_timeout=60 )
async def extract_incremental(self, table: str, timestamp_column: str, last_extracted: datetime) -> AsyncGenerator: async with self.pool.acquire() as conn: query = f""" SELECT * FROM {table} WHERE {timestamp_column} > $1 ORDER BY {timestamp_column} """
async with conn.transaction(): async for row in conn.cursor(query, last_extracted): yield dict(row)
# AI implementuje ekstraktor API"Stwórz ekstraktor REST API z:- Ograniczaniem szybkości i backoff- Obsługą paginacji- Uwierzytelnianiem OAuth- Cachowaniem odpowiedzi"
import aiohttpfrom tenacity import retry, stop_after_attempt, wait_exponential
class APIExtractor: def __init__(self, base_url: str, auth_config: Dict[str, Any]): self.base_url = base_url self.auth_config = auth_config self.rate_limiter = AsyncLimiter(10, 60) # 10 żądań na minutę
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def fetch_paginated(self, endpoint: str, params: Dict = None): async with aiohttp.ClientSession() as session: all_data = [] next_page = f"{self.base_url}/{endpoint}"
while next_page: async with self.rate_limiter: async with session.get( next_page, params=params, headers=await self._get_auth_headers() ) as response: response.raise_for_status() data = await response.json()
all_data.extend(data.get('results', [])) next_page = data.get('next')
return all_data
# AI tworzy ekstraktor plików"Implementuj ekstraktor plików dla:- Dużych plików CSV ze streamingiem- Wnioskowania i walidacji schematów- Wielu formatów plików (CSV, JSON, Parquet)- Wsparcia S3/Azure/GCS"
import pandas as pdfrom typing import Iterator
class FileExtractor: def __init__(self, chunk_size: int = 10000): self.chunk_size = chunk_size self.schema_cache = {}
def extract_csv_streaming(self, filepath: str, infer_schema: bool = True) -> Iterator[pd.DataFrame]: # Wnioskuj schemat z pierwszego fragmentu jeśli potrzeba if infer_schema and filepath not in self.schema_cache: sample = pd.read_csv(filepath, nrows=1000) self.schema_cache[filepath] = self._infer_schema(sample)
# Streamuj plik w fragmentach for chunk in pd.read_csv(filepath, chunksize=self.chunk_size, dtype=self.schema_cache.get(filepath)): yield self._validate_chunk(chunk)
def _infer_schema(self, df: pd.DataFrame) -> Dict[str, str]: # AI pomaga z inteligentnym wnioskowaniem schematów return {col: str(dtype) for col, dtype in df.dtypes.items()}
# AI implementuje framework transformacji"Stwórz framework transformacji, który wspiera:- Mapowanie i przemianowanie kolumn- Konwersje typów danych- Pola obliczane- Reguły walidacji danych- Obsługę rekordów błędnych"
class TransformationEngine: def __init__(self, rules: List[TransformationRule]): self.rules = rules self.error_handler = ErrorHandler()
async def transform_batch(self, data: pd.DataFrame) -> TransformResult: transformed = data.copy() errors = []
for rule in self.rules: try: transformed = await rule.apply(transformed) except TransformationError as e: errors.extend(self.error_handler.handle(e, data))
return TransformResult( success_data=transformed, error_records=errors, stats=self._calculate_stats(data, transformed) )
# AI generuje konkretne transformacjeclass DateTimeTransformation(TransformationRule): def __init__(self, columns: List[str], target_format: str): self.columns = columns self.target_format = target_format
async def apply(self, df: pd.DataFrame) -> pd.DataFrame: for col in self.columns: if col in df.columns: df[col] = pd.to_datetime(df[col]).dt.strftime(self.target_format) return df
# AI implementuje procesor strumieni Kafka"Stwórz procesor strumieni Kafka z:- Grupami konsumentów do skalowania- Semantyką exactly-once- Agregacjami okienkowymi- Zarządzaniem stanem"
from aiokafka import AIOKafkaConsumer, AIOKafkaProducerfrom aiokafka.helpers import create_ssl_contextimport json
class KafkaStreamProcessor: def __init__(self, config: KafkaConfig): self.config = config self.consumer = None self.producer = None self.state_store = StateStore()
async def start(self): self.consumer = AIOKafkaConsumer( *self.config.topics, bootstrap_servers=self.config.brokers, group_id=self.config.consumer_group, enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('utf-8')) )
self.producer = AIOKafkaProducer( bootstrap_servers=self.config.brokers, value_serializer=lambda v: json.dumps(v).encode('utf-8') )
await self.consumer.start() await self.producer.start()
async def process_messages(self): async for msg in self.consumer: try: # Przetwórz wiadomość result = await self.transform_message(msg.value)
# Wyślij do topicu wyjściowego await self.producer.send( self.config.output_topic, value=result, key=msg.key )
# Zatwierdź offset tylko po udanym przetworzeniu await self.consumer.commit()
except Exception as e: await self.handle_error(msg, e)
# AI tworzy zadanie Spark streaming"Implementuj Spark structured streaming dla:- Agregacji w czasie rzeczywistym- Łączenia ze statycznymi danymi- Watermarking dla spóźnionych danych- Checkpointingu dla tolerancji błędów"
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import window, col, count, avg
class SparkStreamProcessor: def __init__(self, app_name: str): self.spark = SparkSession.builder \ .appName(app_name) \ .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \ .getOrCreate()
def create_streaming_pipeline(self): # Czytaj z Kafka df = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load()
# Parsuj JSON i transformuj events = df.select( col("key").cast("string"), from_json(col("value").cast("string"), self.schema).alias("data") ).select("key", "data.*")
# Agregacja okienkowa z watermark aggregated = events \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(col("timestamp"), "5 minutes", "1 minute"), col("category") ) \ .agg( count("*").alias("event_count"), avg("value").alias("avg_value") )
# Zapisz wyniki query = aggregated \ .writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "/data/aggregated") \ .trigger(processingTime='30 seconds') \ .start()
return query
# AI implementuje niestandardowy procesor strumieni"Zbuduj niestandardowy procesor strumieni z:- Wieloma źródłami wejściowymi- Złożonym przetwarzaniem zdarzeń- Transformacjami stanowymi- Wyjściem do wielu sink'ów"
import asynciofrom collections import defaultdictfrom datetime import datetime, timedelta
class CustomStreamProcessor: def __init__(self): self.sources = [] self.sinks = [] self.transformations = [] self.state = defaultdict(dict) self.windows = defaultdict(list)
async def process_stream(self): # Stwórz zadania dla wszystkich źródeł source_tasks = [ self.consume_source(source) for source in self.sources ]
# Przetwarzaj zdarzenia równocześnie await asyncio.gather(*source_tasks)
async def consume_source(self, source): async for event in source.stream(): # Zastosuj transformacje transformed = event for transform in self.transformations: transformed = await transform.apply(transformed, self.state)
# Obsłuż okienkowanie if transformed.requires_windowing: await self.add_to_window(transformed)
# Wyślij do sink'ów await self.emit_to_sinks(transformed)
async def add_to_window(self, event): window_key = self.get_window_key(event) self.windows[window_key].append(event)
# Sprawdź czy okno jest kompletne if self.is_window_complete(window_key): aggregated = await self.aggregate_window(window_key) await self.emit_to_sinks(aggregated) del self.windows[window_key]
# AI tworzy kompleksowy framework jakości danych"Zaprojektuj framework jakości danych z:- Walidacją schematów- Walidacją reguł biznesowych- Wykrywaniem anomalii- Profilowaniem danych- Metrykami jakości i raportowaniem"
class DataQualityEngine: def __init__(self): self.validators = [] self.profiler = DataProfiler() self.anomaly_detector = AnomalyDetector()
def add_validator(self, validator: DataValidator): self.validators.append(validator)
async def validate_batch(self, data: pd.DataFrame) -> QualityReport: report = QualityReport()
# Uruchom wszystkie walidatory for validator in self.validators: result = await validator.validate(data) report.add_result(result)
# Profiluj dane profile = await self.profiler.profile(data) report.add_profile(profile)
# Wykryj anomalie anomalies = await self.anomaly_detector.detect(data) report.add_anomalies(anomalies)
return report
# AI implementuje konkretne walidatoryclass SchemaValidator(DataValidator): def __init__(self, expected_schema: Dict[str, type]): self.expected_schema = expected_schema
async def validate(self, data: pd.DataFrame) -> ValidationResult: errors = []
# Sprawdź kolumny missing_cols = set(self.expected_schema.keys()) - set(data.columns) if missing_cols: errors.append(f"Brakujące kolumny: {missing_cols}")
# Sprawdź typy danych for col, expected_type in self.expected_schema.items(): if col in data.columns: actual_type = data[col].dtype if not self._compatible_types(actual_type, expected_type): errors.append(f"Kolumna {col}: oczekiwano {expected_type}, otrzymano {actual_type}")
return ValidationResult( passed=len(errors) == 0, errors=errors, stats=self._calculate_stats(data) )
Śledzenie lineage
Śledź przepływ danych od źródła do miejsca przeznaczenia z pełnym audytem
Metryki wydajności
Monitoruj przepustowość pipeline’u, opóźnienia i wykorzystanie zasobów
Śledzenie błędów
Przechwytuj i analizuj błędy pipeline’u z analizą podstawowych przyczyn
Świeżość danych
Monitoruj aktualność danych i alerty o przestarzałych danych
# AI implementuje pipeline CDC"Stwórz pipeline CDC, który:- Przechwytuje zmiany w bazie danych w czasie rzeczywistym- Obsługuje ewolucję schematów- Utrzymuje gwarancje kolejności- Wspiera wiele źródeł CDC"
class CDCPipeline: def __init__(self, source_config: CDCSourceConfig): self.source_config = source_config self.schema_registry = SchemaRegistry() self.state_store = CDCStateStore()
async def start_cdc(self): # Zainicjalizuj połączenie CDC cdc_stream = await self.connect_to_source()
async for change_event in cdc_stream: try: # Obsłuż zmiany schematów if change_event.is_ddl: await self.handle_schema_change(change_event)
# Przetwórz zmiany danych else: transformed = await self.transform_change_event(change_event) await self.emit_change(transformed)
# Zaktualizuj checkpoint await self.state_store.update_position(change_event.position)
except Exception as e: await self.handle_cdc_error(change_event, e)
# AI tworzy warstwę przetwarzania batch"Implementuj warstwę batch dla data lake:- Partycjonowane przechowywanie (rok/miesiąc/dzień)- Optymalizacja formatu (Parquet/ORC)- Strategie kompresji- Zarządzanie metadanymi"
class BatchLayer: def __init__(self, storage_config: StorageConfig): self.storage = self._create_storage_client(storage_config) self.metadata_store = MetadataStore()
async def process_batch(self, data: pd.DataFrame, partition_keys: List[str]) -> BatchResult: # Optymalizuj format danych optimized = self.optimize_for_storage(data)
# Oblicz partycje partitions = self.calculate_partitions(optimized, partition_keys)
# Zapisz partycjonowane dane write_results = [] for partition, partition_data in partitions.items(): path = self.generate_path(partition) result = await self.write_optimized( partition_data, path, compression='snappy' ) write_results.append(result)
# Zaktualizuj metadane await self.metadata_store.register_batch(write_results)
return BatchResult(write_results)
# AI implementuje warstwę prędkości"Stwórz warstwę prędkości dla analityki w czasie rzeczywistym:- Cachowanie w pamięci- Agregacje w czasie rzeczywistym- Zapytania o niskim opóżnieniu- Łączenie z wynikami batch"
class SpeedLayer: def __init__(self): self.cache = RedisCache() self.aggregation_engine = AggregationEngine() self.query_engine = QueryEngine()
async def process_real_time(self, event: Event): # Zaktualizuj agregacje w czasie rzeczywistym await self.aggregation_engine.update(event)
# Cachuj najnowsze dane await self.cache.add(event, ttl=3600)
# Zaktualizuj zmaterializowane widoki await self.update_views(event)
async def query(self, query: Query) -> QueryResult: # Sprawdź czy zapytanie wymaga danych w czasie rzeczywistym if query.requires_real_time: # Połącz wyniki batch i czasu rzeczywistego batch_data = await self.query_batch_layer(query) real_time_data = await self.query_cache(query) return self.merge_results(batch_data, real_time_data)
return await self.query_batch_layer(query)
# AI tworzy DAG Airflow"Generuj DAG Airflow dla:- Dziennego pipeline'u ETL- Zależności między zadaniami- Obsługi błędów i ponownych prób- Monitorowania SLA- Dynamicznego generowania zadań"
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperatorfrom datetime import datetime, timedelta
default_args = { 'owner': 'zespół-danych', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5)}
dag = DAG( 'pipeline_danych', default_args=default_args, description='Dzienny pipeline ETL', schedule_interval='@daily', catchup=False, tags=['etl', 'produkcja'])
# AI generuje definicje zadańdef extract_data(**context): # Logika ekstrakcji tutaj pass
def validate_data(**context): # Logika walidacji tutaj pass
def transform_data(**context): # Logika transformacji tutaj pass
# Definiuj zadaniaextract_task = PythonOperator( task_id='wyciągnij_dane', python_callable=extract_data, dag=dag)
validate_task = PythonOperator( task_id='waliduj_dane', python_callable=validate_data, dag=dag)
transform_task = DatabricksSubmitRunOperator( task_id='transformuj_dane', databricks_conn_id='databricks_default', notebook_task={ 'notebook_path': '/Repos/etl/transform', 'base_parameters': { 'date': '{{ ds }}' } }, dag=dag)
# Ustaw zależnościextract_task >> validate_task >> transform_task
Idempotentność
Zapewnij, że pipeline’y można bezpiecznie uruchamiać ponownie bez duplikowania danych
Przetwarzanie przyrostowe
Przetwarzaj tylko nowe lub zmienione dane dla poprawy wydajności
Odzyskiwanie błędów
Implementuj kompleksową obsługę błędów i mechanizmy odzyskiwania
Monitoring pierwsza
Buduj monitoring i obserwowalność od początku
DevOps dla danych
Naucz się automatyzacji infrastruktury dla pipeline’ów danych
Monitoring pipeline'ów
Implementuj kompleksowy monitoring pipeline’ów
Wzorce architektoniczne
Eksploruj zaawansowane wzorce architektury danych