dbt数据转换工具
约 1241 字大约 4 分钟
dbttransformation
2025-09-12
dbt(data build tool)是现代数据栈中的 T(Transform),它让数据分析师能用 SQL 构建可测试、可文档化、可版本控制的数据转换管道。本文详细介绍 dbt 的项目结构、模型分层、物化策略、测试和高级特性。
dbt 在数据栈中的位置
dbt 的核心理念:
- SQL 优先:用 SELECT 语句定义转换,dbt 处理 DDL/DML
- 版本控制:所有代码在 Git 中管理
- 可测试:内置数据测试框架
- 文档化:自动生成数据文档网站
- 依赖管理:自动推断模型间的依赖关系
项目结构
dbt_project/
├── dbt_project.yml # 项目配置
├── profiles.yml # 连接配置(通常在 ~/.dbt/)
├── models/
│ ├── staging/ # 原始数据的标准化视图
│ │ ├── stg_orders.sql
│ │ ├── stg_customers.sql
│ │ └── _stg_models.yml # schema + 测试定义
│ ├── intermediate/ # 中间转换层
│ │ └── int_order_items.sql
│ └── marts/ # 面向业务的最终模型
│ ├── finance/
│ │ └── fct_revenue.sql
│ └── marketing/
│ └── dim_customers.sql
├── tests/ # 自定义测试
│ └── assert_positive_revenue.sql
├── macros/ # 可复用的 SQL 宏
│ └── generate_schema_name.sql
├── seeds/ # 静态 CSV 数据
│ └── country_codes.csv
├── snapshots/ # SCD Type 2 快照
│ └── snap_customers.sql
└── packages.yml # dbt 包依赖# dbt_project.yml
name: 'my_analytics'
version: '1.0.0'
profile: 'my_warehouse'
model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
seed-paths: ["seeds"]
snapshot-paths: ["snapshots"]
models:
my_analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics模型分层
Staging 模型
对原始数据进行标准化:重命名列、类型转换、简单过滤。每个 source 表对应一个 staging 模型。
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
id as order_id,
user_id as customer_id,
cast(created_at as timestamp) as order_date,
cast(amount as decimal(10, 2)) as order_amount,
status as order_status,
cast(_loaded_at as timestamp) as loaded_at
from source
where id is not null
)
select * from renamedIntermediate 模型
实现复杂的业务逻辑和多表关联,作为 staging 和 marts 之间的桥梁。
-- models/intermediate/int_order_items_joined.sql
with orders as (
select * from {{ ref('stg_orders') }}
),
products as (
select * from {{ ref('stg_products') }}
),
order_items as (
select * from {{ ref('stg_order_items') }}
)
select
orders.order_id,
orders.customer_id,
orders.order_date,
order_items.product_id,
products.product_name,
products.category,
order_items.quantity,
order_items.unit_price,
order_items.quantity * order_items.unit_price as line_total
from orders
inner join order_items on orders.order_id = order_items.order_id
inner join products on order_items.product_id = products.product_idMarts 模型
面向特定业务领域的最终模型,通常分为事实表(fct_)和维度表(dim_)。
-- models/marts/finance/fct_revenue.sql
with order_items as (
select * from {{ ref('int_order_items_joined') }}
),
daily_revenue as (
select
date_trunc('day', order_date) as revenue_date,
category,
count(distinct order_id) as order_count,
sum(line_total) as total_revenue,
avg(line_total) as avg_order_value
from order_items
group by 1, 2
)
select * from daily_revenue物化策略(Materializations)
| 物化类型 | SQL 行为 | 存储 | 适用场景 |
|---|---|---|---|
| view | CREATE VIEW | 无额外存储 | Staging 层,小数据量 |
| table | CREATE TABLE | 全量存储 | Marts 层,查询频繁 |
| incremental | INSERT/MERGE | 增量存储 | 大数据量,频繁更新 |
| ephemeral | CTE(内联) | 无存储 | Intermediate 层 |
Incremental 模型
-- models/marts/fct_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
on_schema_change='append_new_columns'
)
}}
with events as (
select * from {{ ref('stg_events') }}
{% if is_incremental() %}
-- 只处理增量数据
where event_time > (select max(event_time) from {{ this }})
{% endif %}
)
select
event_id,
user_id,
event_type,
event_time,
properties,
current_timestamp() as dbt_loaded_at
from events测试
内置测试
# models/staging/_stg_models.yml
version: 2
models:
- name: stg_orders
description: "标准化后的订单数据"
columns:
- name: order_id
description: "订单唯一标识"
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: order_status
tests:
- accepted_values:
values: ['pending', 'processing', 'completed', 'cancelled']
- name: order_amount
tests:
- not_null自定义测试
-- tests/assert_positive_revenue.sql
-- 确保每日收入不为负数
select
revenue_date,
total_revenue
from {{ ref('fct_revenue') }}
where total_revenue < 0通用测试(Generic Tests)
-- macros/tests/test_not_negative.sql
{% test not_negative(model, column_name) %}
select
{{ column_name }} as invalid_value
from {{ model }}
where {{ column_name }} < 0
{% endtest %}Sources
声明外部数据源,支持数据新鲜度检查。
# models/staging/_sources.yml
version: 2
sources:
- name: raw
database: raw_database
schema: public
tables:
- name: orders
description: "原始订单表"
loaded_at_field: _loaded_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
- name: customers
description: "原始客户表"# 检查数据新鲜度
dbt source freshnessMacros(宏)
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round(cast({{ column_name }} as decimal(18, {{ precision }})) / 100, {{ precision }})
{% endmacro %}
-- 在模型中使用
select
order_id,
{{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ ref('stg_orders') }}Packages
# packages.yml
packages:
- package: dbt-labs/dbt_utils
version: "1.1.1"
- package: dbt-labs/codegen
version: "0.12.1"
- package: calogica/dbt_expectations
version: "0.10.1"# 安装包
dbt deps-- 使用 dbt_utils 包
select * from {{ dbt_utils.union_relations(
relations=[ref('stg_orders_us'), ref('stg_orders_eu')]
) }}
-- 使用 surrogate key
select
{{ dbt_utils.generate_surrogate_key(['customer_id', 'order_date']) }} as sk,
customer_id,
order_date
from {{ ref('stg_orders') }}文档
# 在 schema.yml 中添加描述
models:
- name: fct_revenue
description: >
每日收入事实表,按产品类别汇总。
数据源: stg_orders + stg_products
更新频率: 每日
负责人: data-team
columns:
- name: revenue_date
description: "收入日期"
- name: total_revenue
description: "总收入金额(美元)"# 生成文档网站
dbt docs generate
dbt docs serve --port 8080总结
dbt 通过 SQL + Jinja 的方式将数据转换工程化,引入了软件工程的最佳实践(版本控制、测试、文档、模块化)。staging/intermediate/marts 三层模型设计是组织 dbt 项目的推荐方式。incremental 物化处理大数据量场景,内置测试和 source freshness 保障数据质量,packages 生态提供了丰富的可复用功能。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于