← ClaudeAtlas

pipeline-designlisted

Design ETL/ELT pipelines end-to-end — source connectors, extraction strategies, transform logic, load patterns, idempotency, scheduling, and error handling. Use this skill whenever the user is starting a new ingestion job, planning how data moves from a source (REST API, database, file, webhook, message queue) into a data warehouse or data lake. Also trigger when the user asks about pipeline architecture, incremental vs. full loads, backfill strategies, CDC, retry logic, or orchestration choices (Airflow, Prefect, dbt). This skill should feel like pairing with a senior data engineer on day one of a new pipeline project.
Methasit-Pun/data_engineer_claude_skills · ★ 0 · Data & Documents · score 62
Install: claude install-skill Methasit-Pun/data_engineer_claude_skills
# Pipeline Design ## Starting Point: Understand Before You Build Before writing a single line of ingestion code, answer these four questions. Every architectural decision flows from them. 1. **Source shape** — Is this a REST API, a database (Postgres, MySQL), a file drop (S3/GCS), a message queue (Kafka, Pub/Sub), or a webhook push? 2. **Volume and velocity** — Millions of rows daily? Tens of thousands? Real-time events or nightly batch? 3. **Change pattern** — Does the source expose a `updated_at` timestamp? A CDC stream? Or must you full-scan every time? 4. **Downstream contract** — Who consumes this data and how quickly do they need it? That drives scheduling and latency tolerance. --- ## Extraction Strategies ### Full load Re-extract everything every run. Simple, always correct, expensive at scale. Use when: - The source has no reliable `updated_at` or sequence key - The table is small (< a few million rows) - Correctness > cost ### Incremental load (watermark-based) Track the highest `updated_at` (or auto-increment ID) seen so far. On each run, pull rows where `updated_at > last_watermark`. ```python last_watermark = read_watermark(pipeline_name) rows = source.query(f"SELECT * FROM orders WHERE updated_at > '{last_watermark}'") write_to_warehouse(rows) save_watermark(pipeline_name, max(row["updated_at"] for row in rows)) ``` **Risk:** Rows with backdated `updated_at` (late-arriving data) are silently missed. Add a safety buffer of a few hours if this is a concer