data-pipelinelisted
Install: claude install-skill afine907/skills
# Data Pipeline — 数据管道设计与实现
ETL 管道设计 + Airflow DAG + dbt 转换 + 数据验证 + 监控告警,完整的数据工程方案。
不适用:实时流处理(用 Flink/Kafka Streams);BI 报表制作;数据库运维操作(用 database-ops)。
## Goal
ETL 管道设计、Airflow/dbt 模式、数据验证、监控告警
## Trigger
- 用户要求"设计数据管道"、"ETL流程"
- 需要搭建 Airflow DAG
- 数据转换和验证
## 工作流程
```
收集需求 → 设计管道架构 → 选择工具 → 实现 ETL → 配置验证 → 设置监控 → 输出方案
```
### Step 1: 收集需求
从用户描述中提取:
- **数据源**: 数据库、API、文件(CSV/JSON/Parquet)、消息队列
- **目标存储**: 数据仓库(Snowflake/BigQuery/Redshift)、数据湖(S3/GCS)
- **数据量**: 日增量、全量大小
- **时效性**: 批处理(T+1)、近实时(分钟级)、实时(秒级)
- **转换逻辑**: 清洗、聚合、关联、特征工程
- **调度频率**: 每小时、每日、每周、事件驱动
- **已有技术栈**: Python/Spark/dbt/Airflow
如果信息不足,询问 1-2 个关键问题,不要过度追问。
### Step 2: 设计管道架构
根据需求选择架构模式:
**批处理架构(最常见)**:
```
数据源 → 提取(Extract) → 暂存区(Staging) → 转换(Transform) → 加载(Load) → 数据仓库
↓
数据验证(GE)
```
**Lambda 架构**:
```
数据源 → 批处理层(Batch Layer) → 服务层 → 查询
↘ 速度层(Speed Layer) ↗
```
**Kappa 架构(纯流式)**:
```
数据源 → Kafka → 流处理(Flink/Spark Streaming) → 服务层 → 查询
```
**Medallion 架构(湖仓一体)**:
```
原始数据 → Bronze(原始层) → Silver(清洗层) → Gold(聚合层)
```
### Step 3: Airflow DAG 实现
读取 [references/airflow-dag.md](references/airflow-dag.md) 获取 DAG 模板和最佳实践。
**基础 DAG 结构**:
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postg