Fluentd日志收集架构
约 1383 字大约 5 分钟
fluentdlogging
2025-06-24
Fluentd 是 CNCF 毕业项目,是一个统一的日志收集和路由层。它通过插件化架构支持数百种数据源和目标,是 Kubernetes 日志收集的主流方案之一。
架构概览
核心配置结构
Fluentd 的配置由 <source>、<filter>、<match> 三大指令组成:
# 输入:监听 HTTP 日志
<source>
@type http
port 9880
bind 0.0.0.0
</source>
# 输入:读取文件
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluentd/app.log.pos
tag app.*
read_from_head true
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
# 过滤:添加字段
<filter app.**>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
environment production
</record>
</filter>
# 过滤:排除健康检查日志
<filter app.**>
@type grep
<exclude>
key path
pattern /^\/healthz/
</exclude>
</filter>
# 输出:发送到 Elasticsearch
<match app.**>
@type elasticsearch
host elasticsearch.monitoring.svc
port 9200
logstash_format true
logstash_prefix app-logs
<buffer>
@type file
path /var/log/fluentd/buffer/es
flush_interval 5s
flush_thread_count 4
chunk_limit_size 8MB
total_limit_size 2GB
retry_max_interval 30s
retry_forever true
</buffer>
</match>Input Plugins(输入插件)
tail - 文件监控
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd/containers.log.pos
tag kubernetes.*
read_from_head true
follow_inodes true
refresh_interval 5
<parse>
@type regexp
expression /^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>forward - Fluentd 间转发
<source>
@type forward
port 24224
bind 0.0.0.0
<transport tls>
cert_path /etc/fluentd/tls/cert.pem
private_key_path /etc/fluentd/tls/key.pem
</transport>
<security>
shared_key my-shared-key
self_hostname fluentd-aggregator
</security>
</source>syslog - 系统日志
<source>
@type syslog
port 5140
tag system
<parse>
@type syslog
with_priority true
</parse>
</source>Filter Plugins(过滤插件)
record_transformer - 记录转换
<filter kubernetes.**>
@type record_transformer
enable_ruby true
<record>
# 添加静态字段
cluster_name "production-cluster"
# Ruby 表达式
message ${record["log"].strip rescue record["log"]}
# 条件字段
severity ${record["stream"] == "stderr" ? "ERROR" : "INFO"}
</record>
remove_keys $.kubernetes.labels.pod-template-hash
</filter>parser - 日志解析
<filter app.**>
@type parser
key_name log
reserve_data true
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</filter>grep - 日志过滤
<filter app.**>
@type grep
# 包含条件
<regexp>
key severity
pattern /^(ERROR|WARN|FATAL)$/
</regexp>
# 排除条件
<exclude>
key message
pattern /^DEBUG/
</exclude>
</filter>Routing(路由机制)
Tag-based Routing
Fluentd 使用 tag 进行路由匹配:
# 精确匹配
<match app.web>
@type elasticsearch
# ...
</match>
# 通配符匹配
<match app.**>
@type elasticsearch
# ...
</match>
# 多 tag 匹配
<match app.web app.api>
@type elasticsearch
# ...
</match>
# 所有匹配(兜底)
<match **>
@type s3
# ...
</match>Label Routing
Label 提供了更灵活的路由控制,避免标签冲突:
<source>
@type tail
path /var/log/app/*.log
tag app.log
@label @MAIN_PIPELINE
</source>
<source>
@type tail
path /var/log/audit/*.log
tag audit.log
@label @AUDIT_PIPELINE
</source>
# 主处理管道
<label @MAIN_PIPELINE>
<filter **>
@type record_transformer
<record>
pipeline main
</record>
</filter>
<match **>
@type elasticsearch
host es-main:9200
</match>
</label>
# 审计管道
<label @AUDIT_PIPELINE>
<filter **>
@type record_transformer
<record>
pipeline audit
</record>
</filter>
<match **>
@type s3
s3_bucket audit-logs
</match>
</label>Buffer 机制
Buffer 是 Fluentd 可靠性的核心,在输出失败时暂存数据:
<match **>
@type elasticsearch
host elasticsearch:9200
<buffer tag, time>
@type file
path /var/log/fluentd/buffer/es
# 分块策略
timekey 1h # 按小时分块
timekey_wait 5m # 等待 5 分钟确保数据完整
chunk_limit_size 16MB # 单个 chunk 最大 16MB
chunk_limit_records 10000 # 单个 chunk 最大记录数
# 刷新策略
flush_mode interval
flush_interval 30s
flush_thread_count 4
flush_at_shutdown true
# 重试策略
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 60s
retry_max_times 30
retry_forever false
# 容量限制
total_limit_size 4GB
overflow_action drop_oldest_chunk
# 压缩
compress gzip
</buffer>
</match>Fluentd vs Fluent Bit
| 特性 | Fluentd | Fluent Bit |
|---|---|---|
| 语言 | Ruby + C | C |
| 内存占用 | ~40MB | ~450KB |
| 插件数量 | 1000+ | 100+ |
| 适用角色 | Aggregator | Agent/Forwarder |
| 配置复杂度 | 中 | 低 |
| 日志处理能力 | 强(Ruby 灵活性) | 中 |
| 容器化部署 | Deployment | DaemonSet |
推荐架构:Fluent Bit(DaemonSet)-> Fluentd(Aggregator)-> 后端存储
Kubernetes DaemonSet 部署
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/control-plane
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-1
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.monitoring.svc"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
resources:
limits:
memory: 512Mi
cpu: 500m
requests:
memory: 256Mi
cpu: 100m
volumeMounts:
- name: varlog
mountPath: /var/log
readOnly: true
- name: containers
mountPath: /var/log/containers
readOnly: true
- name: config
mountPath: /fluentd/etc/conf.d
readOnly: true
- name: buffer
mountPath: /var/log/fluentd/buffer
volumes:
- name: varlog
hostPath:
path: /var/log
- name: containers
hostPath:
path: /var/log/containers
- name: config
configMap:
name: fluentd-config
- name: buffer
emptyDir:
sizeLimit: 2Gi结构化日志最佳实践
{
"timestamp": "2025-06-24T10:30:00.123Z",
"level": "ERROR",
"service": "order-service",
"trace_id": "abc123def456",
"span_id": "789ghi",
"message": "Failed to process order",
"error": {
"type": "PaymentFailedException",
"message": "Insufficient funds",
"stack_trace": "..."
},
"context": {
"order_id": "ORD-12345",
"user_id": "USR-67890",
"amount": 99.99
}
}性能调优
- 使用文件 Buffer 而非内存 Buffer,防止数据丢失
- 调整 flush_thread_count 匹配输出目标的吞吐能力
- 合理设置 chunk_limit_size,过大增加内存消耗,过小增加 I/O
- 启用 gzip 压缩减少网络带宽和存储
- 使用 grep 过滤 尽早丢弃不需要的日志
- 避免 Ruby 表达式中的复杂运算,影响吞吐
- 监控 Fluentd 自身指标:
/api/plugins.json查看插件状态
总结
Fluentd 是企业级日志收集的可靠选择。在 Kubernetes 环境中,推荐使用 Fluent Bit 作为节点级 Agent 采集日志,Fluentd 作为集群级 Aggregator 处理路由和转换,最终输出到 Elasticsearch、Loki 或对象存储。合理配置 Buffer 和重试策略是保障日志不丢失的关键。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于