SIEM与安全日志分析
约 1764 字大约 6 分钟
siemlogging
2025-08-26
概述
安全信息与事件管理(Security Information and Event Management, SIEM)是将来自不同来源的安全日志进行集中收集、关联分析和实时告警的平台。SIEM 是安全运营中心(SOC)的核心工具,它将海量的日志数据转化为可操作的安全情报。
SIEM 架构
日志收集
Syslog
# rsyslog 配置 — 接收远程 syslog
# /etc/rsyslog.conf
# 启用 TCP/UDP 接收
module(load="imudp")
input(type="imudp" port="514")
module(load="imtcp")
input(type="imtcp" port="514")
# 按来源分类存储
template(name="RemoteLog" type="string"
string="/var/log/remote/%HOSTNAME%/%PROGRAMNAME%.log")
if $fromhost-ip != '127.0.0.1' then {
action(type="omfile" dynaFile="RemoteLog")
stop
}Filebeat(Elastic Agent)
# filebeat.yml — 日志采集配置
filebeat.inputs:
# Nginx 访问日志
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
log_type: nginx_access
json.keys_under_root: true
# 应用日志
- type: log
enabled: true
paths:
- /var/log/myapp/*.log
fields:
log_type: application
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
# 系统认证日志
- type: log
enabled: true
paths:
- /var/log/auth.log
fields:
log_type: auth
output.elasticsearch:
hosts: ["https://elasticsearch:9200"]
username: "elastic"
password: "${ES_PASSWORD}"
ssl.certificate_authorities: ["/etc/filebeat/ca.crt"]
# 或输出到 Logstash
output.logstash:
hosts: ["logstash:5044"]结构化日志输出
import structlog
import json
# 配置结构化日志
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
],
)
logger = structlog.get_logger()
# 安全相关的日志事件
def log_auth_event(event_type: str, user: str, ip: str, success: bool, **extra):
logger.info("auth_event",
event_type=event_type, # login, logout, password_change, mfa_verify
user=user,
source_ip=ip,
success=success,
user_agent=extra.get("user_agent"),
geo_location=extra.get("geo"),
)
def log_access_event(user: str, resource: str, action: str, allowed: bool):
logger.info("access_event",
user=user,
resource=resource,
action=action,
allowed=allowed,
)
# 输出示例(JSON 格式,便于 SIEM 解析)
# {"event": "auth_event", "event_type": "login", "user": "admin",
# "source_ip": "203.0.113.50", "success": false,
# "timestamp": "2024-01-15T10:30:00Z", "level": "info"}关联规则
基于签名的检测
# Wazuh 规则示例
# 暴力破解检测:5 分钟内同一 IP 登录失败 5 次
<group name="authentication">
<rule id="100001" level="10" frequency="5" timeframe="300">
<if_matched_sid>5710</if_matched_sid> # SSH 认证失败
<same_source_ip />
<description>Brute force attack detected from %(srcip)</description>
<mitre>
<id>T1110</id> # MITRE ATT&CK: Brute Force
</mitre>
</rule>
</group># 使用 Elasticsearch 查询实现关联规则
# 检测暴力破解
brute_force_query = {
"query": {
"bool": {
"must": [
{"term": {"event_type": "login"}},
{"term": {"success": False}},
{"range": {"@timestamp": {"gte": "now-5m"}}}
]
}
},
"aggs": {
"by_ip": {
"terms": {"field": "source_ip", "min_doc_count": 5},
"aggs": {
"by_user": {
"terms": {"field": "user"}
}
}
}
}
}
# 检测异常登录(新地理位置)
anomalous_login_query = {
"query": {
"bool": {
"must": [
{"term": {"event_type": "login"}},
{"term": {"success": True}},
],
"must_not": [
# 排除已知的正常登录位置
{"terms": {"geo.country": ["CN", "US"]}}
]
}
}
}基于异常的检测
# 简单的统计异常检测
import numpy as np
from collections import defaultdict
class AnomalyDetector:
def __init__(self, window_size=7, threshold=3.0):
self.window_size = window_size # 基线窗口(天数)
self.threshold = threshold # 标准差倍数
self.history = defaultdict(list)
def check(self, metric_name: str, current_value: float) -> bool:
"""检查当前值是否异常"""
history = self.history[metric_name]
if len(history) < self.window_size:
history.append(current_value)
return False
mean = np.mean(history[-self.window_size:])
std = np.std(history[-self.window_size:])
if std == 0:
is_anomaly = current_value != mean
else:
z_score = abs(current_value - mean) / std
is_anomaly = z_score > self.threshold
history.append(current_value)
return is_anomaly
# 使用示例
detector = AnomalyDetector()
# 检测某用户的登录失败次数是否异常
if detector.check(f"login_failures:{user_id}", failure_count):
alert("Anomalous login failure count", user_id=user_id, count=failure_count)ELK Stack 安全分析
# docker-compose.yml — 安全分析 ELK 部署
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=changeme
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=changeme
ports:
- "5601:5601"
depends_on:
- elasticsearch
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
depends_on:
- elasticsearch
volumes:
es_data:# Logstash 安全日志解析管道
# logstash/pipeline/security.conf
input {
beats {
port => 5044
}
}
filter {
if [fields][log_type] == "nginx_access" {
grok {
match => { "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes}' }
}
# GeoIP 地理位置富化
geoip {
source => "client_ip"
target => "geo"
}
# 标记可疑请求
if [status] == "403" or [status] == "401" {
mutate { add_tag => ["security_event"] }
}
# 检测常见攻击模式
if [request] =~ /(\.\.\/)|(union\s+select)|(script>)/i {
mutate {
add_tag => ["attack_detected"]
add_field => { "alert_type" => "web_attack" }
}
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "changeme"
}
# 高优先级告警发送到告警系统
if "attack_detected" in [tags] {
http {
url => "https://alerts.example.com/webhook"
http_method => "post"
format => "json"
}
}
}Wazuh(开源 SIEM)
# Wazuh 部署(Docker)
# docker-compose.yml
version: '3.8'
services:
wazuh-manager:
image: wazuh/wazuh-manager:4.7.0
hostname: wazuh-manager
ports:
- "1514:1514" # Agent 通信
- "55000:55000" # API
volumes:
- wazuh_api_configuration:/var/ossec/api/configuration
- wazuh_etc:/var/ossec/etc
- wazuh_logs:/var/ossec/logs
wazuh-dashboard:
image: wazuh/wazuh-dashboard:4.7.0
ports:
- "443:5601"
environment:
- WAZUH_API_URL=https://wazuh-manager:55000威胁狩猎(Threat Hunting)
# 威胁狩猎查询示例
# 狩猎 1: 异常的 DNS 查询(可能是 C2 通信或数据外传)
dns_hunting = {
"query": {
"bool": {
"must": [
{"term": {"event.category": "dns"}},
{"range": {"@timestamp": {"gte": "now-24h"}}}
],
"filter": [
# 查找超长域名(可能是 DNS 隧道)
{"script": {"script": "doc['dns.question.name'].value.length() > 50"}}
]
}
},
"aggs": {
"by_domain": {
"terms": {"field": "dns.question.registered_domain", "size": 20}
}
}
}
# 狩猎 2: 非工作时间的管理员操作
admin_hunting = {
"query": {
"bool": {
"must": [
{"term": {"user.roles": "admin"}},
{"range": {"@timestamp": {"gte": "now-7d"}}}
],
"filter": [
{"script": {
"script": "doc['@timestamp'].value.getHour() < 6 || doc['@timestamp'].value.getHour() > 22"
}}
]
}
}
}
# 狩猎 3: 大量数据外传
data_exfil_hunting = {
"query": {
"bool": {
"must": [
{"range": {"network.bytes_out": {"gte": 104857600}}}, # > 100MB
{"range": {"@timestamp": {"gte": "now-24h"}}}
]
}
},
"aggs": {
"by_destination": {
"terms": {"field": "destination.ip", "size": 10}
}
}
}关键安全日志事件
| 事件类型 | 检测内容 | 优先级 |
|---|---|---|
| 认证失败累积 | 暴力破解 | 高 |
| 非常规时间登录 | 账户被盗 | 中 |
| 权限提升 | 内部威胁 | 高 |
| 大量数据访问 | 数据泄露 | 高 |
| 新进程/服务 | 恶意软件 | 中 |
| 防火墙规则变更 | 后门植入 | 严重 |
| DNS 异常查询 | C2 通信 | 高 |
| 用户创建/删除 | 持久化 | 中 |
最佳实践
- 集中收集所有安全相关日志,包括网络、主机、应用和身份认证
- 使用结构化日志格式(JSON),便于 SIEM 解析和查询
- 日志包含必要上下文:时间戳、源 IP、用户、操作、结果、请求 ID
- 建立关联规则覆盖常见攻击:暴力破解、横向移动、权限提升
- 结合签名检测和异常检测,签名检测已知威胁,异常检测未知威胁
- 日志保留至少 90 天(在线),1 年(归档),满足合规要求
- 定期进行威胁狩猎,主动搜索潜伏的威胁
- 测试告警规则的有效性,定期模拟攻击验证检测能力
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于