Data Processing Pipeline with Cursor
Your analytics team needs daily reports built from data scattered across three sources: a PostgreSQL production database, a third-party API that returns JSON, and CSV exports from a legacy system that someone emails every morning. The current process is a 400-line Python script that one person wrote and nobody else understands. It fails silently when the CSV format changes, it does not handle partial failures, and when it crashes at 3 AM, the on-call engineer spends two hours figuring out where it stopped and what to re-run.
Building a proper data pipeline is one of those tasks that seems straightforward until you hit the real-world complexity: schema mismatches, null handling, idempotency, incremental loads, and retry logic. Cursor Agent can accelerate every stage of this — from schema design to transformation logic to error handling — because it excels at generating the kind of repetitive-but-precise code that pipelines demand.
What You’ll Walk Away With
Section titled “What You’ll Walk Away With”- A structured approach to designing data pipelines with Cursor’s Plan mode
- Prompts for generating extraction, transformation, and loading code with proper error handling
- A workflow for handling schema evolution and data validation
- Techniques for building idempotent, restartable pipeline stages
- Project rules that keep the AI consistent across pipeline components
Designing the Pipeline Architecture
Section titled “Designing the Pipeline Architecture”Before writing any transformation code, use Plan mode to map out the pipeline. Data pipelines have a clear structure (extract, transform, load) but the devil is in the details of how stages connect and how failures propagate.
Plan mode will ask clarifying questions about your data volumes, latency requirements, and existing infrastructure. The output is a plan you can iterate on before writing code.
Building the Extraction Layer
Section titled “Building the Extraction Layer”Each data source needs its own extractor with source-specific error handling. The key principle: extractors should produce a consistent intermediate format regardless of the source.
Database Extraction
Section titled “Database Extraction”Create a PostgreSQL extractor at src/extractors/postgres.ts that:
1. Connects using connection pooling (pg-pool)2. Extracts data for a given date range using created_at/updated_at columns3. Supports incremental extraction (only rows changed since last run)4. Stores the last extraction timestamp in a state table5. Outputs data as JSONL files to a staging directory6. Handles connection failures with 3 retries and exponential backoff7. Logs row counts and extraction duration for each table
Tables to extract: orders, customers, products, order_itemsEach table should have its own extraction function.Use TypeScript with strict types for all database rows.API Extraction
Section titled “API Extraction”CSV Extraction
Section titled “CSV Extraction”CSV extraction is deceptively complex because formats change without warning:
Create a CSV extractor at src/extractors/csv.ts that:
1. Reads CSV files from an S3 bucket (specific prefix: inventory/YYYY-MM-DD/)2. Handles multiple CSV formats: - Comma-separated with quoted fields - Fields may contain newlines within quotes - Header row may have different column names than expected (map them)3. Validates each row against a schema (product_id required, quantity numeric)4. Collects validation errors per row but continues processing5. Outputs valid rows as JSONL, invalid rows to a separate error file6. Handles missing files gracefully (the warehouse sometimes skips weekends)
Use csv-parse for parsing. Create a column mapping config that can beupdated when the warehouse team changes their export format.Building the Transformation Layer
Section titled “Building the Transformation Layer”Transformations are where data quality is enforced and business logic is applied. This is the layer where Cursor Agent saves the most time because transformations are highly repetitive: null coalescing, type casting, joining, deduplicating, and computing derived fields.
Create transformation functions at src/transformers/orders.ts that takeraw extracted data and produce warehouse-ready records:
1. Join orders with customers (match on customer_id)2. Join orders with Stripe payments (match on order_id in metadata)3. Compute derived fields: - order_total_usd (convert from cents to dollars) - customer_lifetime_value (sum of all their orders) - days_since_last_order (for each customer) - payment_status (reconcile order status with Stripe status)4. Handle nulls: - Missing customer data: use "Unknown" for name, null for email - Missing payment data: set payment_status to "pending" - Missing inventory data: flag but don't block5. Deduplicate: if an order appears in both the database extract and a previous day's re-extract, keep the most recent version6. Validate the output schema before returning
Type the input and output strictly with TypeScript interfaces.Include unit tests for each transformation function.Building the Loading Layer
Section titled “Building the Loading Layer”The loading layer writes transformed data to the destination. Idempotency is the critical requirement: running the pipeline twice for the same date must produce the same result, not duplicate rows.
Orchestrating the Pipeline
Section titled “Orchestrating the Pipeline”Individual stages need an orchestrator that runs them in order, handles failures, and provides visibility into pipeline state.
Create a pipeline orchestrator at src/orchestrator.ts that:
1. Runs the pipeline for a given date (default: yesterday)2. Executes stages in order: extract -> transform -> load3. Each stage can succeed, fail, or partially succeed4. On partial success (e.g., Stripe unavailable but DB succeeded): - Continue with available data - Mark the run as "partial" - Schedule a retry for the failed source5. On complete failure: abort, log everything, send alert6. Supports backfilling: accepts a date range and processes each date7. Prevents concurrent runs for the same date (distributed lock)8. Logs pipeline progress to both console and a monitoring endpoint
Use a state machine pattern for tracking pipeline status.Make it runnable via CLI: npx pipeline run --date 2026-02-07Data Quality Checks
Section titled “Data Quality Checks”Data quality checks are the safety net between your pipeline and your analysts. Without them, bad data flows silently into dashboards and decisions.
Create data quality checks at src/quality/checks.ts that run after loading:
1. Completeness: Are there orders for every hour of the day? (gap detection)2. Volume: Is today's row count within 2 standard deviations of the 30-day average?3. Freshness: Is the most recent record within the expected time window?4. Referential integrity: Do all order_items reference valid orders?5. Business rules: - No order totals below $0 or above $100,000 - No customer emails that fail basic format validation - No duplicate order_ids within the same day6. Trend detection: Alert if any metric changes by more than 20% day-over-day
Return a structured report with pass/fail/warning for each check.Send alerts (via webhook) for failures and warnings.When This Breaks
Section titled “When This Breaks”Agent generates transformations that lose data silently. The most dangerous pipeline bug is a join that drops rows because of a null key. Always ask Agent to add row count assertions: “After the join, verify that the output row count is at least 95% of the input row count. If not, fail with a detailed error showing which rows were dropped and why.”
The pipeline is not idempotent. If your loader uses INSERT instead of MERGE/UPSERT, re-running the pipeline creates duplicates. Explicitly state “idempotent” in every loader prompt and include a test that runs the loader twice and verifies row counts match.
Schema changes break the pipeline. When a source adds a column or changes a type, the extractor fails. Build schema validation into your extractors (the Zod validation in the prompts above), and create a project rule: “All extractors must validate response schemas. Unknown fields should be logged and ignored, not cause failures.”
The pipeline takes too long. As data volumes grow, sequential processing becomes a bottleneck. Ask Agent to add parallelism: “Run the three extractors concurrently using Promise.all. Each extractor is independent and writes to separate staging files.”
Error messages are useless. Pipeline errors at 3 AM need to be self-diagnosing. Include context in every error: which stage, which date, which row, what the expected vs actual value was. Ask Agent: “Every error log must include the pipeline_run_id, stage name, source name, and enough context to reproduce the issue.”