🚀 快速安装
复制以下命令并运行,立即安装此 Skill:
npx skills add https://skills.sh/claude-office-skills/skills/etl-pipeline
💡 提示:需要 Node.js 和 NPM
ETL 数据管道
设计和自动化数据提取、转换、加载管道的综合技能。
管道架构
核心 ETL 流程
数据管道架构:
┌─────────────────────────────────────────────────────────┐
│ 抽取 (EXTRACT) │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│Postgres │ MySQL │ MongoDB │ APIs │ 文件 (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
│ │ │ │ │
└─────────┴─────────┴────┬────┴──────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 转换 (TRANSFORM) │
│ • 清洗与验证 • 聚合与连接 │
│ • 标准化 • 计算指标 │
│ • 去重 • 应用业务规则 │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ 加载 (LOAD) │
├─────────────┬─────────────┬─────────────┬───────────────┤
│ BigQuery │ Snowflake │ Redshift │ 数据湖 │
└─────────────┴─────────────┴─────────────┴───────────────┘
数据源连接器
数据库连接
数据源:
postgres:
类型: postgresql
主机: db.example.com
端口: 5432
数据库: production
ssl: true
抽取方式:
方法: 增量抽取
键字段: updated_at
批次大小: 10000
mysql:
类型: mysql
主机: mysql.example.com
端口: 3306
数据库: analytics
抽取方式:
方法: CDC
binlog: true
mongodb:
类型: mongodb
连接字符串: mongodb+srv://...
数据库: app_data
抽取方式:
方法: 变更流
API 数据源
API 数据源:
stripe:
类型: rest_api
基础 URL: https://api.stripe.com/v1
认证方式: bearer_token
端点:
- /charges
- /customers
- /subscriptions
分页方式: cursor
速率限制: 100/秒
salesforce:
类型: salesforce
实例 URL: https://company.salesforce.com
认证方式: oauth2
对象:
- Account
- Opportunity
- Contact
批量 API: true
转换层
常见转换操作
# 数据清洗
转换操作 = {
"填充空值": {
"操作": "fill_null",
"列名": ["email", "phone"],
"填充值": "unknown"
},
"日期标准化": {
"操作": "date_parse",
"列名": ["created_at", "updated_at"],
"格式": "ISO8601"
},
"货币标准化": {
"操作": "convert_currency",
"源列": "amount",
"货币列": "currency",
"目标货币": "USD"
},
"去重": {
"操作": "distinct",
"键列": ["customer_id", "transaction_id"],
"保留策略": "latest"
}
}
聚合规则
-- 每日收入聚合
SELECT
DATE(created_at) as 日期,
product_category as 产品类别,
COUNT(*) as 交易数量,
SUM(amount) as 总收入,
AVG(amount) as 平均订单价值,
COUNT(DISTINCT customer_id) as 独立客户数
FROM orders
WHERE created_at >= '${开始日期}'
GROUP BY 1, 2
连接操作
连接:
- 名称: enrich_orders
左表: orders
右表: customers
类型: left
连接条件:
- 左表字段: customer_id
右表字段: id
选择字段:
- orders.*
- customers.email
- customers.segment
- customers.lifetime_value
- 名称: add_product_details
左表: enriched_orders
右表: products
类型: left
连接条件:
- 左表字段: product_id
右表字段: id
加载策略
BigQuery 加载
bigquery_load:
项目: my-project
数据集: analytics
表名: fact_orders
模式:
- 名称: order_id
类型: STRING
模式: REQUIRED
- 名称: customer_id
类型: STRING
- 名称: amount
类型: NUMERIC
- 名称: created_at
类型: TIMESTAMP
加载配置:
写入策略: WRITE_APPEND
创建策略: CREATE_IF_NEEDED
聚簇字段: [customer_id]
时间分区:
字段: created_at
类型: DAY
Snowflake 加载
snowflake_load:
仓库: ETL_WH
数据库: ANALYTICS
模式: PUBLIC
表名: FACT_ORDERS
临时存储:
存储位置: '@MY_STAGE'
文件格式: JSON
复制选项:
错误处理: CONTINUE
清理源文件: true
列名匹配: CASE_INSENSITIVE
管道编排
DAG 定义
管道:
名称: daily_analytics_etl
调度时间: "0 2 * * *" # 每天凌晨2点
任务:
- id: extract_orders
类型: 抽取
数据源: postgres
查询: "SELECT * FROM orders WHERE date = '${执行日期}'"
- id: extract_customers
类型: 抽取
数据源: postgres
查询: "SELECT * FROM customers"
- id: transform_data
类型: 转换
依赖: [extract_orders, extract_customers]
操作:
- 连接客户数据
- 计算指标
- 应用业务规则
- id: load_warehouse
类型: 加载
依赖: [transform_data]
目标: bigquery
表名: fact_orders
- id: notify_complete
类型: 通知
依赖: [load_warehouse]
渠道: slack
消息: "每日 ETL 成功完成"
错误处理
错误处理:
重试:
最大尝试次数: 3
延迟秒数: 300
指数退避: true
失败时操作:
- 记录错误日志
- 发送告警
- 保存失败记录
死信队列:
启用: true
存储位置: s3://etl-errors/
保留天数: 30
数据质量
验证规则
质量检查:
- 名称: 空值检查
列名: customer_id
规则: not_null
严重级别: error
- 名称: 范围检查
列名: amount
规则: between
最小值: 0
最大值: 100000
严重级别: warning
- 名称: 唯一性检查
列名: [order_id]
规则: unique
严重级别: error
- 名称: 引用完整性
列名: product_id
引用表: products
引用列: id
严重级别: error
- 名称: 新鲜度检查
列名: updated_at
规则: max_age_hours
阈值: 24
严重级别: warning
质量指标仪表板
数据质量报告 - ${日期}
═══════════════════════════════════════
处理记录总数:1,250,000
通过验证: 1,247,500 (99.8%)
验证失败: 2,500 (0.2%)
问题类型分布:
┌─────────────────┬────────┬──────────┐
│ 问题类型 │ 数量 │ 严重级别 │
├─────────────────┼────────┼──────────┤
│ 空值 │ 1,200 │ 警告 │
│ 格式无效 │ 850 │ 错误 │
│ 超出范围 │ 300 │ 警告 │
│ 重复数据 │ 150 │ 错误 │
└─────────────────┴────────┴──────────┘
监控与告警
管道指标
指标:
- 名称: pipeline_duration
类型: gauge
标签: [pipeline_name, status]
- 名称: records_processed
类型: counter
标签: [pipeline_name, source, destination]
- 名称: error_count
类型: counter
标签: [pipeline_name, error_type]
- 名称: data_freshness
类型: gauge
标签: [table_name]
告警配置
告警:
- 名称: 管道失败
条件: status == 'failed'
通知渠道: [pagerduty, slack]
- 名称: 错误率过高
条件: error_rate > 0.05
通知渠道: [slack]
- 名称: 管道耗时过长
条件: duration > 2 * avg_duration
通知渠道: [slack]
- 名称: 数据新鲜度
条件: freshness_hours > 24
通知渠道: [email]
最佳实践
- 增量加载:尽可能使用增量抽取
- 幂等性:确保管道可以安全地重新运行
- 分区:按日期对大型表进行分区
- 监控:跟踪管道健康指标
- 文档:记录所有转换逻辑
- 测试:在生产前使用样本数据测试
- 版本控制:在 git 中跟踪管道变更
📄 原始文档
完整文档(英文):
https://skills.sh/claude-office-skills/skills/etl-pipeline
💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)