🚀 快速安装

复制以下命令并运行,立即安装此 Skill:

npx skills add https://skills.sh/claude-office-skills/skills/etl-pipeline

💡 提示:需要 Node.js 和 NPM

ETL 数据管道

设计和自动化数据提取、转换、加载管道的综合技能。

管道架构

核心 ETL 流程

数据管道架构:
┌─────────────────────────────────────────────────────────┐
│                     抽取 (EXTRACT)                       │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│Postgres │  MySQL  │ MongoDB │  APIs   │  文件 (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
     │         │         │         │              │
     └─────────┴─────────┴────┬────┴──────────────┘
┌─────────────────────────────────────────────────────────┐
│                     转换 (TRANSFORM)                     │
│  • 清洗与验证    • 聚合与连接                            │
│  • 标准化       • 计算指标                               │
│  • 去重         • 应用业务规则                           │
└────────────────────────┬────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│                     加载 (LOAD)                          │
├─────────────┬─────────────┬─────────────┬───────────────┤
│  BigQuery   │  Snowflake  │  Redshift   │   数据湖      │
└─────────────┴─────────────┴─────────────┴───────────────┘

数据源连接器

数据库连接

数据源:
  postgres:
    类型: postgresql
    主机: db.example.com
    端口: 5432
    数据库: production
    ssl: true
    抽取方式:
      方法: 增量抽取
      键字段: updated_at
      批次大小: 10000

  mysql:
    类型: mysql
    主机: mysql.example.com
    端口: 3306
    数据库: analytics
    抽取方式:
      方法: CDC
      binlog: true

  mongodb:
    类型: mongodb
    连接字符串: mongodb+srv://...
    数据库: app_data
    抽取方式:
      方法: 变更流

API 数据源

API 数据源:
  stripe:
    类型: rest_api
    基础 URL: https://api.stripe.com/v1
    认证方式: bearer_token
    端点:
      - /charges
      - /customers
      - /subscriptions
    分页方式: cursor
    速率限制: 100/秒

  salesforce:
    类型: salesforce
    实例 URL: https://company.salesforce.com
    认证方式: oauth2
    对象:
      - Account
      - Opportunity
      - Contact
    批量 API: true

转换层

常见转换操作

# 数据清洗
转换操作 = {
    "填充空值": {
        "操作": "fill_null",
        "列名": ["email", "phone"],
        "填充值": "unknown"
    },
    
    "日期标准化": {
        "操作": "date_parse",
        "列名": ["created_at", "updated_at"],
        "格式": "ISO8601"
    },
    
    "货币标准化": {
        "操作": "convert_currency",
        "源列": "amount",
        "货币列": "currency",
        "目标货币": "USD"
    },
    
    "去重": {
        "操作": "distinct",
        "键列": ["customer_id", "transaction_id"],
        "保留策略": "latest"
    }
}

聚合规则

-- 每日收入聚合
SELECT 
    DATE(created_at) as 日期,
    product_category as 产品类别,
    COUNT(*) as 交易数量,
    SUM(amount) as 总收入,
    AVG(amount) as 平均订单价值,
    COUNT(DISTINCT customer_id) as 独立客户数
FROM orders
WHERE created_at >= '${开始日期}'
GROUP BY 1, 2

连接操作

连接:
  - 名称: enrich_orders
    左表: orders
    右表: customers
    类型: left
    连接条件:
      - 左表字段: customer_id
        右表字段: id
    选择字段:
      - orders.*
      - customers.email
      - customers.segment
      - customers.lifetime_value

  - 名称: add_product_details
    左表: enriched_orders
    右表: products
    类型: left
    连接条件:
      - 左表字段: product_id
        右表字段: id

加载策略

BigQuery 加载

bigquery_load:
  项目: my-project
  数据集: analytics
  表名: fact_orders
  
  模式:
    - 名称: order_id
      类型: STRING
      模式: REQUIRED
    - 名称: customer_id
      类型: STRING
    - 名称: amount
      类型: NUMERIC
    - 名称: created_at
      类型: TIMESTAMP
  
  加载配置:
    写入策略: WRITE_APPEND
    创建策略: CREATE_IF_NEEDED
    聚簇字段: [customer_id]
    时间分区:
      字段: created_at
      类型: DAY

Snowflake 加载

snowflake_load:
  仓库: ETL_WH
  数据库: ANALYTICS
  模式: PUBLIC
  表名: FACT_ORDERS
  
  临时存储:
    存储位置: '@MY_STAGE'
    文件格式: JSON
  
  复制选项:
    错误处理: CONTINUE
    清理源文件: true
    列名匹配: CASE_INSENSITIVE

管道编排

DAG 定义

管道:
  名称: daily_analytics_etl
  调度时间: "0 2 * * *"  # 每天凌晨2点
  
  任务:
    - id: extract_orders
      类型: 抽取
      数据源: postgres
      查询: "SELECT * FROM orders WHERE date = '${执行日期}'"
      
    - id: extract_customers
      类型: 抽取
      数据源: postgres
      查询: "SELECT * FROM customers"
      
    - id: transform_data
      类型: 转换
      依赖: [extract_orders, extract_customers]
      操作:
        - 连接客户数据
        - 计算指标
        - 应用业务规则
      
    - id: load_warehouse
      类型: 加载
      依赖: [transform_data]
      目标: bigquery
      表名: fact_orders
      
    - id: notify_complete
      类型: 通知
      依赖: [load_warehouse]
      渠道: slack
      消息: "每日 ETL 成功完成"

错误处理

错误处理:
  重试:
    最大尝试次数: 3
    延迟秒数: 300
    指数退避: true
  
  失败时操作:
    - 记录错误日志
    - 发送告警
    - 保存失败记录
  
  死信队列:
    启用: true
    存储位置: s3://etl-errors/
    保留天数: 30

数据质量

验证规则

质量检查:
  - 名称: 空值检查
    列名: customer_id
    规则: not_null
    严重级别: error
    
  - 名称: 范围检查
    列名: amount
    规则: between
    最小值: 0
    最大值: 100000
    严重级别: warning
    
  - 名称: 唯一性检查
    列名: [order_id]
    规则: unique
    严重级别: error
    
  - 名称: 引用完整性
    列名: product_id
    引用表: products
    引用列: id
    严重级别: error
    
  - 名称: 新鲜度检查
    列名: updated_at
    规则: max_age_hours
    阈值: 24
    严重级别: warning

质量指标仪表板

数据质量报告 - ${日期}
═══════════════════════════════════════
处理记录总数:1,250,000
通过验证:       1,247,500 (99.8%)
验证失败:           2,500 (0.2%)

问题类型分布:
┌─────────────────┬────────┬──────────┐
│ 问题类型        │ 数量   │ 严重级别 │
├─────────────────┼────────┼──────────┤
│ 空值            │ 1,200  │ 警告     │
│ 格式无效        │   850  │ 错误     │
│ 超出范围        │   300  │ 警告     │
│ 重复数据        │   150  │ 错误     │
└─────────────────┴────────┴──────────┘

监控与告警

管道指标

指标:
  - 名称: pipeline_duration
    类型: gauge
    标签: [pipeline_name, status]
    
  - 名称: records_processed
    类型: counter
    标签: [pipeline_name, source, destination]
    
  - 名称: error_count
    类型: counter
    标签: [pipeline_name, error_type]
    
  - 名称: data_freshness
    类型: gauge
    标签: [table_name]

告警配置

告警:
  - 名称: 管道失败
    条件: status == 'failed'
    通知渠道: [pagerduty, slack]
    
  - 名称: 错误率过高
    条件: error_rate > 0.05
    通知渠道: [slack]
    
  - 名称: 管道耗时过长
    条件: duration > 2 * avg_duration
    通知渠道: [slack]
    
  - 名称: 数据新鲜度
    条件: freshness_hours > 24
    通知渠道: [email]

最佳实践

  1. 增量加载:尽可能使用增量抽取
  2. 幂等性:确保管道可以安全地重新运行
  3. 分区:按日期对大型表进行分区
  4. 监控:跟踪管道健康指标
  5. 文档:记录所有转换逻辑
  6. 测试:在生产前使用样本数据测试
  7. 版本控制:在 git 中跟踪管道变更

📄 原始文档

完整文档(英文):

https://skills.sh/claude-office-skills/skills/etl-pipeline

💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。