Przejdź do głównej zawartości

Budowanie pipeline'u danych z Cursor

Twój zespół analityczny potrzebuje codziennych raportów zbudowanych z danych rozrzuconych po trzech źródłach: produkcyjna baza PostgreSQL, zewnętrzne API zwracające JSON oraz eksporty CSV z systemu legacy, które ktoś wysyła mailem co rano. Obecny proces to 400-liniowy skrypt Pythona, który napisała jedna osoba i nikt inny go nie rozumie. Zawodzi po cichu, gdy zmienia się format CSV, nie obsługuje częściowych awarii, a kiedy się wywala o 3 w nocy, dyżurny inżynier spędza dwie godziny na ustalaniu, gdzie się zatrzymał i co trzeba ponownie uruchomić.

Budowanie porządnego pipeline’u danych to jedno z tych zadań, które wyglądają na proste, dopóki nie trafisz na rzeczywistą złożoność: niezgodność schematów, obsługa nulli, idempotentność, ładowanie przyrostowe i logika ponownych prób. Cursor Agent może przyspieszyć każdy etap — od projektowania schematu po logikę transformacji i obsługę błędów — ponieważ doskonale radzi sobie z generowaniem powtarzalnego, ale precyzyjnego kodu, jakiego wymagają pipeline’y.

  • Ustrukturyzowane podejście do projektowania pipeline’ów danych z trybem Plan w Cursor
  • Prompty do generowania kodu ekstrakcji, transformacji i ładowania z prawidłową obsługą błędów
  • Workflow do obsługi ewolucji schematów i walidacji danych
  • Techniki budowania idempotentnych, wznawialnych etapów pipeline’u
  • Reguły projektu, które utrzymują spójność AI w całym pipeline’ie

Zanim napiszesz jakikolwiek kod transformacji, użyj trybu Plan do zmapowania pipeline’u. Pipeline’y danych mają jasną strukturę (ekstrakcja, transformacja, ładowanie), ale diabeł tkwi w szczegółach — jak etapy się łączą i jak propagują się awarie.

Tryb Plan zada pytania uściślające dotyczące wolumenów danych, wymagań latencji i istniejącej infrastruktury. Wynikiem jest plan, na którym możesz iterować przed napisaniem kodu.

Każde źródło danych potrzebuje własnego ekstraktora ze specyficzną obsługą błędów. Kluczowa zasada: ekstraktory powinny produkować spójny format pośredni niezależnie od źródła.

Utwórz ekstraktor PostgreSQL w src/extractors/postgres.ts, który:
1. Łączy się przy użyciu puli połączeń (pg-pool)
2. Ekstrahuje dane dla podanego zakresu dat na podstawie kolumn created_at/updated_at
3. Wspiera ekstrakcję przyrostową (tylko wiersze zmienione od ostatniego uruchomienia)
4. Przechowuje znacznik czasu ostatniej ekstrakcji w tabeli stanu
5. Zapisuje dane jako pliki JSONL do katalogu staging
6. Obsługuje awarie połączeń z 3 ponowieniami i wykładniczym backoffem
7. Loguje liczbę wierszy i czas trwania ekstrakcji dla każdej tabeli
Tabele do ekstrakcji: orders, customers, products, order_items
Każda tabela powinna mieć własną funkcję ekstrakcji.
Użyj TypeScript ze ścisłymi typami dla wszystkich wierszy z bazy.

Ekstrakcja CSV jest zwodniczo złożona, ponieważ formaty zmieniają się bez ostrzeżenia:

Utwórz ekstraktor CSV w src/extractors/csv.ts, który:
1. Czyta pliki CSV z bucketu S3 (konkretny prefix: inventory/YYYY-MM-DD/)
2. Obsługuje różne formaty CSV:
- Rozdzielane przecinkami z polami w cudzysłowach
- Pola mogą zawierać znaki nowej linii wewnątrz cudzysłowów
- Wiersz nagłówkowy może mieć inne nazwy kolumn niż oczekiwane (zmapuj je)
3. Waliduje każdy wiersz względem schematu (product_id wymagane, quantity numeryczne)
4. Zbiera błędy walidacji per wiersz, ale kontynuuje przetwarzanie
5. Zapisuje poprawne wiersze jako JSONL, niepoprawne do osobnego pliku błędów
6. Obsługuje brakujące pliki gracefully (magazyn czasem pomija weekendy)
Użyj csv-parse do parsowania. Utwórz konfigurację mapowania kolumn,
którą można zaktualizować, gdy zespół magazynowy zmieni format eksportu.

Transformacje to miejsce, gdzie egzekwuje się jakość danych i stosuje logikę biznesową. To warstwa, w której Cursor Agent oszczędza najwięcej czasu, ponieważ transformacje są wysoce powtarzalne: koalescencja nulli, rzutowanie typów, łączenie, deduplikacja i obliczanie pól pochodnych.

Utwórz funkcje transformacji w src/transformers/orders.ts, które pobierają
surowe wyekstrahowane dane i produkują rekordy gotowe do hurtowni:
1. Połącz zamówienia z klientami (dopasowanie po customer_id)
2. Połącz zamówienia z płatnościami Stripe (dopasowanie po order_id w metadata)
3. Oblicz pola pochodne:
- order_total_usd (konwersja z centów na dolary)
- customer_lifetime_value (suma wszystkich ich zamówień)
- days_since_last_order (dla każdego klienta)
- payment_status (uzgodnienie statusu zamówienia ze statusem Stripe)
4. Obsłuż nulle:
- Brakujące dane klienta: użyj "Unknown" dla nazwy, null dla email
- Brakujące dane płatności: ustaw payment_status na "pending"
- Brakujące dane magazynowe: oznacz, ale nie blokuj
5. Deduplikacja: jeśli zamówienie pojawia się zarówno w ekstrakcji z bazy, jak i w
re-ekstrakcji z poprzedniego dnia, zachowaj najnowszą wersję
6. Zwaliduj schemat wyjściowy przed zwróceniem
Ściśle typuj wejście i wyjście za pomocą interfejsów TypeScript.
Dołącz testy jednostkowe dla każdej funkcji transformacji.

Warstwa ładowania zapisuje przetransformowane dane do celu. Idempotentność to krytyczne wymaganie: uruchomienie pipeline’u dwa razy dla tej samej daty musi dać ten sam wynik, a nie zduplikowane wiersze.

Poszczególne etapy potrzebują orkiestratora, który uruchamia je w kolejności, obsługuje awarie i zapewnia widoczność stanu pipeline’u.

Utwórz orkiestrator pipeline'u w src/orchestrator.ts, który:
1. Uruchamia pipeline dla podanej daty (domyślnie: wczoraj)
2. Wykonuje etapy w kolejności: ekstrakcja -> transformacja -> ładowanie
3. Każdy etap może się udać, zakończyć niepowodzeniem lub częściowo udać
4. Przy częściowym sukcesie (np. Stripe niedostępny, ale baza się udała):
- Kontynuuj z dostępnymi danymi
- Oznacz uruchomienie jako "partial"
- Zaplanuj ponowienie dla nieudanego źródła
5. Przy całkowitej awarii: przerwij, zaloguj wszystko, wyślij alert
6. Wspiera uzupełnianie: przyjmuje zakres dat i przetwarza każdą datę
7. Zapobiega równoległym uruchomieniom dla tej samej daty (distributed lock)
8. Loguje postęp pipeline'u zarówno na konsolę, jak i do endpointu monitoringu
Użyj wzorca maszyny stanów do śledzenia statusu pipeline'u.
Uruchamianie przez CLI: npx pipeline run --date 2026-02-07

Kontrole jakości danych to siatka bezpieczeństwa między twoim pipeline’em a analitykami. Bez nich złe dane po cichu wpływają do dashboardów i decyzji.

Utwórz kontrole jakości danych w src/quality/checks.ts, które uruchamiają się po załadowaniu:
1. Kompletność: Czy są zamówienia na każdą godzinę dnia? (wykrywanie luk)
2. Wolumen: Czy dzisiejsza liczba wierszy mieści się w 2 odchyleniach standardowych od średniej z 30 dni?
3. Świeżość: Czy najnowszy rekord mieści się w oczekiwanym oknie czasowym?
4. Integralność referencyjna: Czy wszystkie order_items odwołują się do istniejących zamówień?
5. Reguły biznesowe:
- Brak kwot zamówień poniżej 0$ lub powyżej 100 000$
- Brak adresów e-mail klientów, które nie przechodzą podstawowej walidacji formatu
- Brak zduplikowanych order_id w tym samym dniu
6. Wykrywanie trendów: Alertuj, jeśli jakakolwiek metryka zmienia się o więcej niż 20% dzień do dnia
Zwróć ustrukturyzowany raport z wynikiem pass/fail/warning dla każdej kontroli.
Wysyłaj alerty (przez webhook) dla awarii i ostrzeżeń.

Agent generuje transformacje, które po cichu tracą dane. Najgroźniejszy błąd w pipeline’ach to złączenie, które odrzuca wiersze z powodu klucza null. Zawsze proś Agenta o dodanie asercji na liczbę wierszy: “Po złączeniu zweryfikuj, że liczba wierszy wyjściowych wynosi co najmniej 95% liczby wierszy wejściowych. Jeśli nie, zakończ błędem z detalami pokazującymi, które wiersze zostały odrzucone i dlaczego.”

Pipeline nie jest idempotentny. Jeśli twój loader używa INSERT zamiast MERGE/UPSERT, ponowne uruchomienie pipeline’u tworzy duplikaty. Jawnie określ “idempotentny” w każdym prompcie loadera i dołącz test, który uruchamia loader dwa razy i weryfikuje, że liczba wierszy się zgadza.

Zmiany schematu psują pipeline. Gdy źródło dodaje kolumnę lub zmienia typ, ekstraktor zawodzi. Wbuduj walidację schematów w swoje ekstraktory (walidacja Zod w powyższych promptach) i stwórz regułę projektu: “Wszystkie ekstraktory muszą walidować schematy odpowiedzi. Nieznane pola powinny być logowane i ignorowane, a nie powodować awarii.”

Pipeline zajmuje za dużo czasu. Wraz ze wzrostem wolumenów danych przetwarzanie sekwencyjne staje się wąskim gardłem. Poproś Agenta o dodanie równoległości: “Uruchom trzy ekstraktory jednocześnie za pomocą Promise.all. Każdy ekstraktor jest niezależny i zapisuje do osobnych plików staging.”

Komunikaty błędów są bezużyteczne. Błędy pipeline’u o 3 w nocy muszą być samodiagnozujące. Dołącz kontekst do każdego błędu: który etap, która data, który wiersz, jaka była oczekiwana vs rzeczywista wartość. Poproś Agenta: “Każdy log błędu musi zawierać pipeline_run_id, nazwę etapu, nazwę źródła i wystarczający kontekst, aby odtworzyć problem.”