← ClaudeAtlas

data-pipelinelisted

【数据管道】ETL 管道设计、Airflow/dbt 模式、数据验证、监控告警。 触发时机: - 用户要求"设计数据管道"、"ETL流程" - 需要搭建 Airflow DAG - 数据转换和验证 提供完整的数据管道设计方案。
afine907/skills · ★ 0 · Data & Documents · score 75
Install: claude install-skill afine907/skills
# Data Pipeline — 数据管道设计与实现 ETL 管道设计 + Airflow DAG + dbt 转换 + 数据验证 + 监控告警,完整的数据工程方案。 不适用:实时流处理(用 Flink/Kafka Streams);BI 报表制作;数据库运维操作(用 database-ops)。 ## Goal ETL 管道设计、Airflow/dbt 模式、数据验证、监控告警 ## Trigger - 用户要求"设计数据管道"、"ETL流程" - 需要搭建 Airflow DAG - 数据转换和验证 ## 工作流程 ``` 收集需求 → 设计管道架构 → 选择工具 → 实现 ETL → 配置验证 → 设置监控 → 输出方案 ``` ### Step 1: 收集需求 从用户描述中提取: - **数据源**: 数据库、API、文件(CSV/JSON/Parquet)、消息队列 - **目标存储**: 数据仓库(Snowflake/BigQuery/Redshift)、数据湖(S3/GCS) - **数据量**: 日增量、全量大小 - **时效性**: 批处理(T+1)、近实时(分钟级)、实时(秒级) - **转换逻辑**: 清洗、聚合、关联、特征工程 - **调度频率**: 每小时、每日、每周、事件驱动 - **已有技术栈**: Python/Spark/dbt/Airflow 如果信息不足,询问 1-2 个关键问题,不要过度追问。 ### Step 2: 设计管道架构 根据需求选择架构模式: **批处理架构(最常见)**: ``` 数据源 → 提取(Extract) → 暂存区(Staging) → 转换(Transform) → 加载(Load) → 数据仓库 ↓ 数据验证(GE) ``` **Lambda 架构**: ``` 数据源 → 批处理层(Batch Layer) → 服务层 → 查询 ↘ 速度层(Speed Layer) ↗ ``` **Kappa 架构(纯流式)**: ``` 数据源 → Kafka → 流处理(Flink/Spark Streaming) → 服务层 → 查询 ``` **Medallion 架构(湖仓一体)**: ``` 原始数据 → Bronze(原始层) → Silver(清洗层) → Gold(聚合层) ``` ### Step 3: Airflow DAG 实现 读取 [references/airflow-dag.md](references/airflow-dag.md) 获取 DAG 模板和最佳实践。 **基础 DAG 结构**: ```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postg