Przejdź do głównej zawartości

Budowanie pipeline'ów danych z AI

Pipeline’y danych stanowią szkielet nowoczesnych aplikacji opartych na danych. Ta lekcja pokazuje, jak możliwości AI w Cursor przekształcają złożone procesy ETL, przetwarzanie strumieni i zadania inżynierii danych w zarządzalne, wydajne przepływy pracy.

Tradycyjne tworzenie pipeline’ów danych wymaga żonglowania wieloma technologiami, obsługi złożonych transformacji i zapewniania jakości danych. Wsparcie AI czyni te zadania bardziej przystępnymi, generując kod podstawowy, sugerując optymalizacje i pomagając debugować problemy z przepływem danych.

Złożone transformacje

AI pomaga pisać i optymalizować logikę transformacji danych w różnych formatach

Obsługa błędów

AI generuje solidne mechanizmy obsługi błędów i odzyskiwania

Dostrajanie wydajności

AI sugeruje optymalizacje dla przetwarzania danych na dużą skalę

Ewolucja schematów

AI pomaga w strategiach wersjonowania i migracji schematów

  1. Definiowanie wymagań pipeline’u

    # Poproś AI o zaprojektowanie architektury pipeline'u
    "Zaprojektuj pipeline ETL, który:
    - Pobiera dane z PostgreSQL, REST API i plików CSV
    - Transformuje i waliduje dane
    - Ładuje do hurtowni danych (Snowflake)
    - Obsługuje aktualizacje przyrostowe
    - Zawiera odzyskiwanie błędów i monitoring"
  2. Generowanie szkieletu pipeline’u

    # AI tworzy strukturę pipeline'u
    from dataclasses import dataclass
    from typing import Any, Dict, List
    import asyncio
    from abc import ABC, abstractmethod
    @dataclass
    class PipelineConfig:
    source_configs: Dict[str, Dict[str, Any]]
    transformation_rules: List[Dict[str, Any]]
    destination_config: Dict[str, Any]
    error_handling: Dict[str, Any]
    class DataPipeline(ABC):
    def __init__(self, config: PipelineConfig):
    self.config = config
    self.setup_monitoring()
    @abstractmethod
    async def extract(self) -> Any:
    pass
    @abstractmethod
    async def transform(self, data: Any) -> Any:
    pass
    @abstractmethod
    async def load(self, data: Any) -> None:
    pass
  3. Implementacja źródeł danych

    # AI implementuje różne łączniki źródeł danych
    "Stwórz łączniki dla:
    - PostgreSQL z poolingiem połączeń
    - REST API z ograniczaniem szybkości
    - CSV z wnioskowaniem schematów
    - Zawrzyj logikę ponownych prób i obsługę błędów"
# AI tworzy ekstraktor bazy danych
"Implementuj ekstraktor PostgreSQL z:
- Poolingiem połączeń
- Ekstrakcją przyrostową używając znaczników czasu
- Równoległą ekstrakcją tabel
- Efektywnym streamingiem pamięci"
import asyncpg
from contextlib import asynccontextmanager
class PostgreSQLExtractor:
def __init__(self, connection_string: str, batch_size: int = 10000):
self.connection_string = connection_string
self.batch_size = batch_size
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=5,
max_size=20,
command_timeout=60
)
async def extract_incremental(self, table: str, timestamp_column: str,
last_extracted: datetime) -> AsyncGenerator:
async with self.pool.acquire() as conn:
query = f"""
SELECT * FROM {table}
WHERE {timestamp_column} > $1
ORDER BY {timestamp_column}
"""
async with conn.transaction():
async for row in conn.cursor(query, last_extracted):
yield dict(row)
# AI implementuje framework transformacji
"Stwórz framework transformacji, który wspiera:
- Mapowanie i przemianowanie kolumn
- Konwersje typów danych
- Pola obliczane
- Reguły walidacji danych
- Obsługę rekordów błędnych"
class TransformationEngine:
def __init__(self, rules: List[TransformationRule]):
self.rules = rules
self.error_handler = ErrorHandler()
async def transform_batch(self, data: pd.DataFrame) -> TransformResult:
transformed = data.copy()
errors = []
for rule in self.rules:
try:
transformed = await rule.apply(transformed)
except TransformationError as e:
errors.extend(self.error_handler.handle(e, data))
return TransformResult(
success_data=transformed,
error_records=errors,
stats=self._calculate_stats(data, transformed)
)
# AI generuje konkretne transformacje
class DateTimeTransformation(TransformationRule):
def __init__(self, columns: List[str], target_format: str):
self.columns = columns
self.target_format = target_format
async def apply(self, df: pd.DataFrame) -> pd.DataFrame:
for col in self.columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col]).dt.strftime(self.target_format)
return df
# AI implementuje procesor strumieni Kafka
"Stwórz procesor strumieni Kafka z:
- Grupami konsumentów do skalowania
- Semantyką exactly-once
- Agregacjami okienkowymi
- Zarządzaniem stanem"
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
import json
class KafkaStreamProcessor:
def __init__(self, config: KafkaConfig):
self.config = config
self.consumer = None
self.producer = None
self.state_store = StateStore()
async def start(self):
self.consumer = AIOKafkaConsumer(
*self.config.topics,
bootstrap_servers=self.config.brokers,
group_id=self.config.consumer_group,
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = AIOKafkaProducer(
bootstrap_servers=self.config.brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
await self.consumer.start()
await self.producer.start()
async def process_messages(self):
async for msg in self.consumer:
try:
# Przetwórz wiadomość
result = await self.transform_message(msg.value)
# Wyślij do topicu wyjściowego
await self.producer.send(
self.config.output_topic,
value=result,
key=msg.key
)
# Zatwierdź offset tylko po udanym przetworzeniu
await self.consumer.commit()
except Exception as e:
await self.handle_error(msg, e)
# AI tworzy kompleksowy framework jakości danych
"Zaprojektuj framework jakości danych z:
- Walidacją schematów
- Walidacją reguł biznesowych
- Wykrywaniem anomalii
- Profilowaniem danych
- Metrykami jakości i raportowaniem"
class DataQualityEngine:
def __init__(self):
self.validators = []
self.profiler = DataProfiler()
self.anomaly_detector = AnomalyDetector()
def add_validator(self, validator: DataValidator):
self.validators.append(validator)
async def validate_batch(self, data: pd.DataFrame) -> QualityReport:
report = QualityReport()
# Uruchom wszystkie walidatory
for validator in self.validators:
result = await validator.validate(data)
report.add_result(result)
# Profiluj dane
profile = await self.profiler.profile(data)
report.add_profile(profile)
# Wykryj anomalie
anomalies = await self.anomaly_detector.detect(data)
report.add_anomalies(anomalies)
return report
# AI implementuje konkretne walidatory
class SchemaValidator(DataValidator):
def __init__(self, expected_schema: Dict[str, type]):
self.expected_schema = expected_schema
async def validate(self, data: pd.DataFrame) -> ValidationResult:
errors = []
# Sprawdź kolumny
missing_cols = set(self.expected_schema.keys()) - set(data.columns)
if missing_cols:
errors.append(f"Brakujące kolumny: {missing_cols}")
# Sprawdź typy danych
for col, expected_type in self.expected_schema.items():
if col in data.columns:
actual_type = data[col].dtype
if not self._compatible_types(actual_type, expected_type):
errors.append(f"Kolumna {col}: oczekiwano {expected_type}, otrzymano {actual_type}")
return ValidationResult(
passed=len(errors) == 0,
errors=errors,
stats=self._calculate_stats(data)
)

Śledzenie lineage

Śledź przepływ danych od źródła do miejsca przeznaczenia z pełnym audytem

Metryki wydajności

Monitoruj przepustowość pipeline’u, opóźnienia i wykorzystanie zasobów

Śledzenie błędów

Przechwytuj i analizuj błędy pipeline’u z analizą podstawowych przyczyn

Świeżość danych

Monitoruj aktualność danych i alerty o przestarzałych danych

# AI implementuje pipeline CDC
"Stwórz pipeline CDC, który:
- Przechwytuje zmiany w bazie danych w czasie rzeczywistym
- Obsługuje ewolucję schematów
- Utrzymuje gwarancje kolejności
- Wspiera wiele źródeł CDC"
class CDCPipeline:
def __init__(self, source_config: CDCSourceConfig):
self.source_config = source_config
self.schema_registry = SchemaRegistry()
self.state_store = CDCStateStore()
async def start_cdc(self):
# Zainicjalizuj połączenie CDC
cdc_stream = await self.connect_to_source()
async for change_event in cdc_stream:
try:
# Obsłuż zmiany schematów
if change_event.is_ddl:
await self.handle_schema_change(change_event)
# Przetwórz zmiany danych
else:
transformed = await self.transform_change_event(change_event)
await self.emit_change(transformed)
# Zaktualizuj checkpoint
await self.state_store.update_position(change_event.position)
except Exception as e:
await self.handle_cdc_error(change_event, e)
# AI tworzy warstwę przetwarzania batch
"Implementuj warstwę batch dla data lake:
- Partycjonowane przechowywanie (rok/miesiąc/dzień)
- Optymalizacja formatu (Parquet/ORC)
- Strategie kompresji
- Zarządzanie metadanymi"
class BatchLayer:
def __init__(self, storage_config: StorageConfig):
self.storage = self._create_storage_client(storage_config)
self.metadata_store = MetadataStore()
async def process_batch(self, data: pd.DataFrame,
partition_keys: List[str]) -> BatchResult:
# Optymalizuj format danych
optimized = self.optimize_for_storage(data)
# Oblicz partycje
partitions = self.calculate_partitions(optimized, partition_keys)
# Zapisz partycjonowane dane
write_results = []
for partition, partition_data in partitions.items():
path = self.generate_path(partition)
result = await self.write_optimized(
partition_data,
path,
compression='snappy'
)
write_results.append(result)
# Zaktualizuj metadane
await self.metadata_store.register_batch(write_results)
return BatchResult(write_results)
# AI tworzy DAG Airflow
"Generuj DAG Airflow dla:
- Dziennego pipeline'u ETL
- Zależności między zadaniami
- Obsługi błędów i ponownych prób
- Monitorowania SLA
- Dynamicznego generowania zadań"
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'zespół-danych',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'pipeline_danych',
default_args=default_args,
description='Dzienny pipeline ETL',
schedule_interval='@daily',
catchup=False,
tags=['etl', 'produkcja']
)
# AI generuje definicje zadań
def extract_data(**context):
# Logika ekstrakcji tutaj
pass
def validate_data(**context):
# Logika walidacji tutaj
pass
def transform_data(**context):
# Logika transformacji tutaj
pass
# Definiuj zadania
extract_task = PythonOperator(
task_id='wyciągnij_dane',
python_callable=extract_data,
dag=dag
)
validate_task = PythonOperator(
task_id='waliduj_dane',
python_callable=validate_data,
dag=dag
)
transform_task = DatabricksSubmitRunOperator(
task_id='transformuj_dane',
databricks_conn_id='databricks_default',
notebook_task={
'notebook_path': '/Repos/etl/transform',
'base_parameters': {
'date': '{{ ds }}'
}
},
dag=dag
)
# Ustaw zależności
extract_task >> validate_task >> transform_task
  1. Zaprojektuj architekturę pipeline’u z pomocą AI
  2. Implementuj ekstraktory dla wielu źródeł
  3. Stwórz logikę transformacji z walidacją
  4. Załaduj dane do hurtowni
  5. Dodaj monitoring i alerty

Ćwiczenie 2: Przetwarzanie strumieni w czasie rzeczywistym

Dział zatytułowany „Ćwiczenie 2: Przetwarzanie strumieni w czasie rzeczywistym”
  1. Skonfiguruj Kafka lub podobną platformę streamingową
  2. Stwórz procesor strumieni z okienkowaniem
  3. Implementuj przetwarzanie exactly-once
  4. Dodaj zarządzanie stanem
  5. Monitoruj przepustowość i opóźnienia
  1. Zaprojektuj reguły jakości dla swoich danych
  2. Implementuj walidatory z pomocą AI
  3. Stwórz system wykrywania anomalii
  4. Zbuduj dashboard jakości
  5. Skonfiguruj alerty dla naruszeń

Idempotentność

Zapewnij, że pipeline’y można bezpiecznie uruchamiać ponownie bez duplikowania danych

Przetwarzanie przyrostowe

Przetwarzaj tylko nowe lub zmienione dane dla poprawy wydajności

Odzyskiwanie błędów

Implementuj kompleksową obsługę błędów i mechanizmy odzyskiwania

Monitoring pierwsza

Buduj monitoring i obserwowalność od początku

DevOps dla danych

Naucz się automatyzacji infrastruktury dla pipeline’ów danych

Monitoring pipeline'ów

Implementuj kompleksowy monitoring pipeline’ów

Wzorce architektoniczne

Eksploruj zaawansowane wzorce architektury danych