🚀 快速安装
复制以下命令并运行,立即安装此 Skill:
npx skills add https://skills.sh/claude-office-skills/skills/data-pipeline
💡 提示:需要 Node.js 和 NPM
数据管道
构建数据管道和 ETL 工作流,用于数据集成、转换和分析自动化。基于 n8n 的数据工作流模板。
概述
此技能涵盖:
- 从多个来源提取数据
- 转换和清理
- 加载到目标系统
- 调度和监控
- 错误处理和告警
ETL 模式
基本 ETL 流程
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 提取 │───▶│ 转换 │───▶│ 加载 │
│ │ │ │ │ │
│ • API │ │ • 清理 │ │ • 数据库 │
│ • 数据库 │ │ • 映射 │ │ • 数据仓库 │
│ • 文件 │ │ • 聚合 │ │ • 文件 │
│ • Webhook │ │ • 丰富 │ │ • API │
└─────────────┘ └─────────────┘ └─────────────┘
n8n ETL 工作流
workflow: "每日销售 ETL"
schedule: "凌晨 2 点每日运行"
nodes:
# 提取
- name: "从 Shopify 提取"
type: shopify
action: get_orders
filter: created_at >= yesterday
- name: "从 Stripe 提取"
type: stripe
action: get_payments
filter: created >= yesterday
# 转换
- name: "合并数据"
type: merge
mode: combine_by_key
key: order_id
- name: "转换"
type: code
code: |
return items.map(item => ({
date: item.created_at.split('T')[0],
order_id: item.id,
customer_email: item.email,
total: parseFloat(item.total_price),
currency: item.currency,
items: item.line_items.length,
source: item.source_name,
payment_status: item.payment.status
}));
# 加载
- name: "加载到 BigQuery"
type: google_bigquery
action: insert_rows
table: sales_daily
- name: "更新 Google Sheets"
type: google_sheets
action: append_rows
spreadsheet: "每日销售报告"
数据源
常见提取器
extractors:
databases:
- postgresql:
connection: connection_string
query: "SELECT * FROM orders WHERE date >= $1"
- mysql:
connection: connection_string
query: custom_sql
- mongodb:
connection: connection_string
collection: orders
filter: {date: {$gte: yesterday}}
apis:
- rest_api:
url: "https://api.example.com/data"
method: GET
headers: {Authorization: "Bearer {token}"}
pagination: handle_automatically
- graphql:
url: "https://api.example.com/graphql"
query: graphql_query
files:
- csv:
source: sftp/s3/google_drive
delimiter: ","
encoding: utf-8
- excel:
source: file_path
sheet: "Sheet1"
- json:
source: api/file
path: "data.items"
saas:
- salesforce: get_objects
- hubspot: get_contacts/deals
- stripe: get_charges
- shopify: get_orders
转换
常见转换
transformations:
cleaning:
- remove_nulls: drop_or_fill
- trim_whitespace: all_string_fields
- deduplicate: by_key
- validate: against_schema
mapping:
- rename_fields: {old_name: new_name}
- convert_types: {date_string: date}
- map_values: {status_code: status_name}
aggregation:
- group_by: [date, category]
- sum: [revenue, quantity]
- count: orders
- average: order_value
enrichment:
- lookup: from_reference_table
- geocode: from_address
- calculate: derived_fields
filtering:
- where: condition
- limit: n_rows
- sample: percentage
代码转换示例
// 清理和规范化数据
function transform(items) {
return items.map(item => ({
// 清理字符串
name: item.name?.trim().toLowerCase(),
// 解析日期
date: new Date(item.created_at).toISOString().split('T')[0],
// 转换类型
amount: parseFloat(item.amount) || 0,
// 映射值
status: statusMap[item.status_code] || 'unknown',
// 计算字段
total: item.quantity * item.unit_price,
// 过滤嵌套数据
tags: item.tags?.filter(t => t.active).map(t => t.name),
// 默认值
source: item.source || 'direct'
}));
}
// 聚合数据
function aggregate(items) {
const grouped = {};
items.forEach(item => {
const key = `${item.date}_${item.category}`;
if (!grouped[key]) {
grouped[key] = {
date: item.date,
category: item.category,
total_revenue: 0,
order_count: 0
};
}
grouped[key].total_revenue += item.amount;
grouped[key].order_count += 1;
});
return Object.values(grouped);
}
数据目标
常见加载器
loaders:
data_warehouses:
- bigquery:
project: project_id
dataset: analytics
table: sales
write_mode: append/truncate
- snowflake:
account: account_id
warehouse: compute_wh
database: analytics
schema: public
- redshift:
cluster: cluster_id
database: analytics
databases:
- postgresql:
upsert: on_conflict_update
- mysql:
batch_insert: 1000_rows
files:
- s3:
bucket: data-lake
path: /processed/{date}/
format: parquet
- google_cloud_storage:
bucket: data-bucket
spreadsheets:
- google_sheets:
mode: append/overwrite
- airtable:
base: base_id
table: table_name
apis:
- webhook:
url: destination_url
batch_size: 100
调度与监控
管道调度
scheduling:
patterns:
hourly:
cron: "0 * * * *"
use_for: 实时仪表板
daily:
cron: "0 2 * * *"
use_for: 每日报告
weekly:
cron: "0 3 * * 1"
use_for: 每周摘要
on_demand:
trigger: webhook/manual
use_for: 临时分析
dependencies:
- pipeline_a: must_complete_before pipeline_b
- wait_for: all_extracts_complete
retries:
max_attempts: 3
delay: exponential_backoff
alert_on: final_failure
监控与告警
monitoring:
metrics:
- rows_processed
- execution_time
- error_count
- data_freshness
alerts:
pipeline_failed:
channels: [slack, pagerduty]
template: |
🚨 *管道运行失败*
管道名称: {pipeline_name}
失败阶段: {failed_stage}
错误信息: {error_message}
[查看日志]({logs_url})
data_quality:
trigger: anomaly_detected
conditions:
- row_count: differs_by > 50%
- null_rate: exceeds_threshold
- schema: changed_unexpectedly
stale_data:
trigger: last_update > threshold
threshold: 2_hours
数据质量
质量检查
data_quality:
schema_validation:
- required_fields: [id, date, amount]
- field_types:
id: integer
date: date
amount: number
- allowed_values:
status: [active, pending, closed]
statistical_checks:
- null_rate: < 5%
- duplicate_rate: < 1%
- value_range:
amount: [0, 1000000]
business_rules:
- total_equals_sum_of_line_items
- dates_are_not_in_future
- email_format_valid
trend_analysis:
- row_count: within_2_std_of_mean
- total_value: within_expected_range
输出示例
请求:“创建一个每日销售数据管道”
输出:
# 每日销售数据管道
## 管道概览
Shopify + Stripe → 转换 → BigQuery + Sheets
## 调度
- 运行时间:每日凌晨 2 点
- 时区:UTC
- 重试次数:3 次
## 提取
### Shopify 订单
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]
Stripe 支付
source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]
转换
// 连接并清理数据
{
date: order.created_at.split('T')[0],
order_id: order.id,
customer: order.email,
revenue: parseFloat(order.total_price),
items: order.line_items.length,
payment_status: payment.status
}
加载
BigQuery
- 表:
analytics.sales_daily - 模式:追加
Google Sheets
- 工作表:“每日销售仪表板”
- 标签:“原始数据”
质量检查
- 行数 > 0
- 无空 order_id
- 总收入与 Stripe 匹配
告警
- Slack:#data-alerts
- 失败时:@data-team
---
*数据管道技能 - Claude 办公技能套件的一部分*
📄 原始文档
完整文档(英文):
https://skills.sh/claude-office-skills/skills/data-pipeline
💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)