🚀 快速安装

复制以下命令并运行,立即安装此 Skill:

npx skills add https://skills.sh/claude-office-skills/skills/data-pipeline

💡 提示:需要 Node.js 和 NPM

数据管道

构建数据管道和 ETL 工作流,用于数据集成、转换和分析自动化。基于 n8n 的数据工作流模板。

概述

此技能涵盖:

  • 从多个来源提取数据
  • 转换和清理
  • 加载到目标系统
  • 调度和监控
  • 错误处理和告警

ETL 模式

基本 ETL 流程

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│     提取     │───▶│     转换     │───▶│     加载    │
│             │    │             │    │             │
│ • API       │    │ • 清理       │    │ • 数据库     │
│ • 数据库     │    │ • 映射       │    │ • 数据仓库   │
│ • 文件       │    │ • 聚合       │    │ • 文件      │
│ • Webhook   │    │ • 丰富       │    │ • API       │
└─────────────┘    └─────────────┘    └─────────────┘

n8n ETL 工作流

workflow: "每日销售 ETL"
schedule: "凌晨 2 点每日运行"

nodes:
  # 提取
  - name: "从 Shopify 提取"
    type: shopify
    action: get_orders
    filter: created_at >= yesterday
    
  - name: "从 Stripe 提取"
    type: stripe
    action: get_payments
    filter: created >= yesterday
    
  # 转换
  - name: "合并数据"
    type: merge
    mode: combine_by_key
    key: order_id
    
  - name: "转换"
    type: code
    code: |
      return items.map(item => ({
        date: item.created_at.split('T')[0],
        order_id: item.id,
        customer_email: item.email,
        total: parseFloat(item.total_price),
        currency: item.currency,
        items: item.line_items.length,
        source: item.source_name,
        payment_status: item.payment.status
      }));
      
  # 加载
  - name: "加载到 BigQuery"
    type: google_bigquery
    action: insert_rows
    table: sales_daily
    
  - name: "更新 Google Sheets"
    type: google_sheets
    action: append_rows
    spreadsheet: "每日销售报告"

数据源

常见提取器

extractors:
  databases:
    - postgresql:
        connection: connection_string
        query: "SELECT * FROM orders WHERE date >= $1"
        
    - mysql:
        connection: connection_string
        query: custom_sql
        
    - mongodb:
        connection: connection_string
        collection: orders
        filter: {date: {$gte: yesterday}}
        
  apis:
    - rest_api:
        url: "https://api.example.com/data"
        method: GET
        headers: {Authorization: "Bearer {token}"}
        pagination: handle_automatically
        
    - graphql:
        url: "https://api.example.com/graphql"
        query: graphql_query
        
  files:
    - csv:
        source: sftp/s3/google_drive
        delimiter: ","
        encoding: utf-8
        
    - excel:
        source: file_path
        sheet: "Sheet1"
        
    - json:
        source: api/file
        path: "data.items"
        
  saas:
    - salesforce: get_objects
    - hubspot: get_contacts/deals
    - stripe: get_charges
    - shopify: get_orders

转换

常见转换

transformations:
  cleaning:
    - remove_nulls: drop_or_fill
    - trim_whitespace: all_string_fields
    - deduplicate: by_key
    - validate: against_schema
    
  mapping:
    - rename_fields: {old_name: new_name}
    - convert_types: {date_string: date}
    - map_values: {status_code: status_name}
    
  aggregation:
    - group_by: [date, category]
    - sum: [revenue, quantity]
    - count: orders
    - average: order_value
    
  enrichment:
    - lookup: from_reference_table
    - geocode: from_address
    - calculate: derived_fields
    
  filtering:
    - where: condition
    - limit: n_rows
    - sample: percentage

代码转换示例

// 清理和规范化数据
function transform(items) {
  return items.map(item => ({
    // 清理字符串
    name: item.name?.trim().toLowerCase(),
    
    // 解析日期
    date: new Date(item.created_at).toISOString().split('T')[0],
    
    // 转换类型
    amount: parseFloat(item.amount) || 0,
    
    // 映射值
    status: statusMap[item.status_code] || 'unknown',
    
    // 计算字段
    total: item.quantity * item.unit_price,
    
    // 过滤嵌套数据
    tags: item.tags?.filter(t => t.active).map(t => t.name),
    
    // 默认值
    source: item.source || 'direct'
  }));
}

// 聚合数据
function aggregate(items) {
  const grouped = {};
  
  items.forEach(item => {
    const key = `${item.date}_${item.category}`;
    if (!grouped[key]) {
      grouped[key] = {
        date: item.date,
        category: item.category,
        total_revenue: 0,
        order_count: 0
      };
    }
    grouped[key].total_revenue += item.amount;
    grouped[key].order_count += 1;
  });
  
  return Object.values(grouped);
}

数据目标

常见加载器

loaders:
  data_warehouses:
    - bigquery:
        project: project_id
        dataset: analytics
        table: sales
        write_mode: append/truncate
        
    - snowflake:
        account: account_id
        warehouse: compute_wh
        database: analytics
        schema: public
        
    - redshift:
        cluster: cluster_id
        database: analytics
        
  databases:
    - postgresql:
        upsert: on_conflict_update
        
    - mysql:
        batch_insert: 1000_rows
        
  files:
    - s3:
        bucket: data-lake
        path: /processed/{date}/
        format: parquet
        
    - google_cloud_storage:
        bucket: data-bucket
        
  spreadsheets:
    - google_sheets:
        mode: append/overwrite
        
    - airtable:
        base: base_id
        table: table_name
        
  apis:
    - webhook:
        url: destination_url
        batch_size: 100

调度与监控

管道调度

scheduling:
  patterns:
    hourly:
      cron: "0 * * * *"
      use_for: 实时仪表板
      
    daily:
      cron: "0 2 * * *"
      use_for: 每日报告
      
    weekly:
      cron: "0 3 * * 1"
      use_for: 每周摘要
      
    on_demand:
      trigger: webhook/manual
      use_for: 临时分析
      
  dependencies:
    - pipeline_a: must_complete_before pipeline_b
    - wait_for: all_extracts_complete
    
  retries:
    max_attempts: 3
    delay: exponential_backoff
    alert_on: final_failure

监控与告警

monitoring:
  metrics:
    - rows_processed
    - execution_time
    - error_count
    - data_freshness
    
  alerts:
    pipeline_failed:
      channels: [slack, pagerduty]
      template: |
        🚨 *管道运行失败*
        
        管道名称: {pipeline_name}
        失败阶段: {failed_stage}
        错误信息: {error_message}
        
        [查看日志]({logs_url})
        
    data_quality:
      trigger: anomaly_detected
      conditions:
        - row_count: differs_by > 50%
        - null_rate: exceeds_threshold
        - schema: changed_unexpectedly
        
    stale_data:
      trigger: last_update > threshold
      threshold: 2_hours

数据质量

质量检查

data_quality:
  schema_validation:
    - required_fields: [id, date, amount]
    - field_types:
        id: integer
        date: date
        amount: number
    - allowed_values:
        status: [active, pending, closed]
        
  statistical_checks:
    - null_rate: < 5%
    - duplicate_rate: < 1%
    - value_range:
        amount: [0, 1000000]
        
  business_rules:
    - total_equals_sum_of_line_items
    - dates_are_not_in_future
    - email_format_valid
    
  trend_analysis:
    - row_count: within_2_std_of_mean
    - total_value: within_expected_range

输出示例

请求:“创建一个每日销售数据管道”

输出

# 每日销售数据管道

## 管道概览

Shopify + Stripe → 转换 → BigQuery + Sheets

## 调度
- 运行时间:每日凌晨 2 点
- 时区:UTC
- 重试次数:3 次

## 提取

### Shopify 订单
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]

Stripe 支付

source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]

转换

// 连接并清理数据
{
  date: order.created_at.split('T')[0],
  order_id: order.id,
  customer: order.email,
  revenue: parseFloat(order.total_price),
  items: order.line_items.length,
  payment_status: payment.status
}

加载

BigQuery

  • 表:analytics.sales_daily
  • 模式:追加

Google Sheets

  • 工作表:“每日销售仪表板”
  • 标签:“原始数据”

质量检查

  • 行数 > 0
  • 无空 order_id
  • 总收入与 Stripe 匹配

告警

  • Slack:#data-alerts
  • 失败时:@data-team

---

*数据管道技能 - Claude 办公技能套件的一部分*

📄 原始文档

完整文档(英文):

https://skills.sh/claude-office-skills/skills/data-pipeline

💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。