数据质量框架
约 1618 字大约 5 分钟
data-qualitygovernance
2025-09-13
数据质量是数据平台可靠性的基石。"垃圾进,垃圾出"——低质量的数据会导致错误的分析结论和不可靠的 ML 模型。本文系统介绍数据质量的六大维度、主流工具和数据合约方案。
数据质量六大维度
| 维度 | 定义 | 检查示例 |
|---|---|---|
| 准确性 | 数据是否正确反映真实世界 | 邮箱格式是否合法、金额是否合理 |
| 完整性 | 关键字段是否缺失 | NULL 值比例、必填字段检查 |
| 一致性 | 跨系统/跨表数据是否一致 | 订单数在两个表中是否匹配 |
| 时效性 | 数据是否及时到达 | 数据延迟是否在 SLA 内 |
| 唯一性 | 是否存在重复记录 | 主键唯一性检查 |
| 有效性 | 数据是否符合预定义规则 | 枚举值检查、范围检查 |
数据质量检查策略
关键位置的检查内容:
- 摄取后:数据量是否符合预期、格式是否正确、必填字段是否完整
- 转换后:业务规则是否满足、聚合值是否合理、外键关系是否完整
- 加载后:端到端一致性、SLA 满足情况
Great Expectations
Great Expectations 是最流行的开源数据质量框架,提供声明式的数据验证规则定义。
import great_expectations as gx
# 初始化上下文
context = gx.get_context()
# 连接数据源
datasource = context.sources.add_pandas("my_datasource")
asset = datasource.add_dataframe_asset("orders_asset")
batch_request = asset.build_batch_request(dataframe=orders_df)
# 创建 Expectation Suite
suite = context.add_expectation_suite("orders_quality_suite")
# 定义期望(Expectations)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_quality_suite"
)
# 完整性检查
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")
# 唯一性检查
validator.expect_column_values_to_be_unique("order_id")
# 有效性检查
validator.expect_column_values_to_be_in_set(
"status", ["pending", "processing", "completed", "cancelled", "refunded"]
)
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=1000000)
# 准确性检查(正则表达式)
validator.expect_column_values_to_match_regex("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
# 数据量检查
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=100000)
# 一致性检查(列间关系)
validator.expect_column_pair_values_a_to_be_greater_than_b(
"shipped_date", "order_date", or_equal=True, ignore_row_if="either_value_is_missing"
)
# 保存并运行验证
validator.save_expectation_suite(discard_failed_expectations=False)
results = validator.validate()
print(f"验证通过: {results.success}")
print(f"通过的期望数: {results.statistics['successful_expectations']}")
print(f"失败的期望数: {results.statistics['unsuccessful_expectations']}")数据文档
Great Expectations 自动生成交互式的数据质量文档:
# 生成数据文档
context.build_data_docs()
context.open_data_docs()Soda
Soda 提供更简洁的 YAML 语法定义数据质量检查。
# checks/orders_checks.yml
checks for orders:
# 数据量检查
- row_count > 0
- row_count between 1000 and 100000
# 完整性检查
- missing_count(order_id) = 0
- missing_percent(customer_id) < 1%
# 唯一性检查
- duplicate_count(order_id) = 0
# 有效性检查
- invalid_count(status) = 0:
valid values: [pending, processing, completed, cancelled]
- min(amount) >= 0
- max(amount) <= 1000000
# 数据新鲜度
- freshness(order_date) < 24h
# Schema 检查
- schema:
fail:
when required column missing: [order_id, customer_id, amount, status]
when wrong type:
order_id: integer
amount: decimal
# 自定义 SQL 检查
- failed rows:
fail condition: shipped_date < order_date
name: "发货日期不应早于订单日期"
# 跨数据源一致性检查
- row_count same as orders in warehouse_db# 使用 Soda Python API
from soda.core.scan import Scan
scan = Scan()
scan.set_data_source_name("my_warehouse")
scan.add_configuration_yaml_file("configuration.yml")
scan.add_sodacl_yaml_file("checks/orders_checks.yml")
scan.execute()
# 获取结果
results = scan.get_scan_results()
print(f"通过: {results['pass']}, 警告: {results['warn']}, 失败: {results['fail']}")dbt Tests
dbt 内置了数据测试能力,可以在数据转换后立即验证质量。
# models/schema.yml
version: 2
models:
- name: fct_orders
tests:
# 自定义 SQL 测试
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
columns:
- name: order_id
tests:
- unique
- not_null
- name: amount
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'processing', 'completed', 'cancelled']# 运行测试
dbt test # 运行所有测试
dbt test --select fct_orders # 只测试特定模型
dbt build # run + test 一起执行数据合约(Data Contracts)
数据合约是数据生产者和消费者之间的正式协议,定义了数据的 schema、质量规则和 SLA。
# data_contracts/orders_contract.yml
dataContractSpecification: 0.9.3
id: orders-v1
info:
title: "订单数据合约"
version: "1.0.0"
owner: "order-service-team"
contact:
email: "order-team@company.com"
schema:
type: object
properties:
order_id:
type: integer
description: "订单唯一标识"
required: true
unique: true
customer_id:
type: integer
description: "客户 ID"
required: true
amount:
type: number
description: "订单金额(元)"
required: true
minimum: 0
status:
type: string
description: "订单状态"
required: true
enum: [pending, processing, completed, cancelled]
order_date:
type: timestamp
description: "下单时间"
required: true
quality:
- type: freshness
column: order_date
threshold: "PT24H" # ISO 8601 持续时间:24小时
- type: row_count
threshold: "> 1000"
- type: null_percentage
column: customer_id
threshold: "< 0.01" # NULL 占比小于 1%
sla:
availability: 99.9%
latency: "PT2H" # 数据在事件发生后 2 小时内可用
support_hours: "24/7"SLA/SLO for Data
| 指标 | 定义 | 示例 |
|---|---|---|
| 数据新鲜度 SLO | 最新数据的最大延迟 | 订单数据延迟 < 2 小时 |
| 数据完整性 SLO | NULL 值/缺失记录的最大比例 | 关键字段 NULL < 0.1% |
| 数据准确性 SLO | 错误记录的最大比例 | 格式错误 < 0.01% |
| 管道可靠性 SLO | 管道成功运行的比例 | 成功率 > 99.5% |
| 处理延迟 SLO | 端到端处理时间 | P99 < 30 分钟 |
# 数据 SLO 监控
class DataSLOMonitor:
def __init__(self, db_conn, alerting_client):
self.db = db_conn
self.alerter = alerting_client
def check_freshness_slo(self, table, timestamp_col, max_delay_hours):
"""检查数据新鲜度 SLO"""
query = f"""
SELECT EXTRACT(EPOCH FROM NOW() - MAX({timestamp_col})) / 3600
AS delay_hours FROM {table}
"""
delay = self.db.execute(query).fetchone()[0]
if delay > max_delay_hours:
self.alerter.send(
severity="critical",
message=f"数据新鲜度 SLO 违反: {table} 延迟 {delay:.1f} 小时 "
f"(SLO: {max_delay_hours} 小时)",
)
return delay <= max_delay_hours
def check_completeness_slo(self, table, column, max_null_pct):
"""检查数据完整性 SLO"""
query = f"""
SELECT COUNT(*) FILTER (WHERE {column} IS NULL) * 100.0 / COUNT(*)
AS null_pct FROM {table}
"""
null_pct = self.db.execute(query).fetchone()[0]
if null_pct > max_null_pct:
self.alerter.send(
severity="warning",
message=f"完整性 SLO 违反: {table}.{column} NULL 占比 {null_pct:.2f}% "
f"(SLO: < {max_null_pct}%)",
)
return null_pct <= max_null_pct数据质量仪表板
总结
数据质量管理需要从六大维度系统化地设计检查规则,在数据管道的关键节点嵌入验证。Great Expectations、Soda 和 dbt tests 是三个主流工具,各有特点:Great Expectations 功能最全面,Soda 语法最简洁,dbt tests 与转换流程集成最紧密。数据合约和 SLO 体系将质量管理从被动检测提升到主动治理,是构建可靠数据平台的关键实践。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于