SIEM
SIEM(Security Information and Event Management,安全信息和事件管理系统)是一种用于实时监控、检测和响应安全事件的综合安全解决方案。它结合了SIM(Security Information Management,安全信息管理)和SEM(Security Event Management,安全事件管理)的功能,帮助组织管理海量日志数据、识别潜在威胁,并自动化响应流程。SIEM 系统广泛应用于企业安全运维中,尤其在检测异常登录、入侵行为等方面
核心架构
1、数据采集层
- 从各个来源收集日志和时间数据
- 支持多种协议,如Syslog、SNMP、API、文件传输等。采集器部署在数据源端或集中式服务器上,确保数据传输
机制
通常采用push-pull混合:
Push模式由数据源主动发送日志,适合实时性|
|Pull模式由SIEM服务器定期拉取,适合云环境
在异常登录场景,采集Windows Event Logs 或 Linux auth.log 时,需要处理日志轮转(log rotation)以避免数据丢失
缺点:数据量庞大,需要压缩传输和过滤无关日志。Python中,可以利用logging模块或pywin32处理Windows日志
2、数据标准化与解析层
- 把不同格式的原始日志转化为统一的结构,便于后续解析
- 提取关键字段,如时间戳、IP地址、用户ID、事件类型等
- 过滤噪声日志,进行数据清洗,防止系统过载
机制
由于日志格式多样化(.json、CEF、Syslog),解析用正则或解析器。提取字段后,添加元数据(如事件ID、严重度、频率)
这一层和分析关联层高度绑定,同时会牵扯到一个危险度高的潜在攻击——APT
没有经过标准化的日志无法关联,高频的事件报告可以和APT关联起来,防止后续APT渗透内网造成严重财产损失
Python实现:用re或grok-py库解析,统一schema如(“timestamp”:“…”;“event_type”:”…”;),方便团队查看和辨认
3、存储与索引层
- 使用数据库存储日志数据,支持快速查询和归档
- 数据可以按照时间序列存储,并添加索引方便快速检索
4、分析与关联层——核心引擎
- 可以接入大模型,利用规则引擎、进行机器学习或行为分析检测异常
- 规则关联:将多次登录失败和异常IP访问关联起来,判定为暴力破解
- 事实分析:设置监控阈值,短时间内登录失败连续超过五次自动发送警报
- 高级SIEM集成UEBA(User and Entity Behavior Analytics,用户和实体行为分析)来检测异常模式
规则引擎
简单规则用if-then;复杂用CEP(Complex Event Processing)如Esper
CEP(Complex Event Processing)
概念
Complex Event Processing(复杂事件处理)是一种实时处理技术,它从多个事件流(event streams)中检测复杂模式、时间相关性、因果关系或趋势,而不是只看单个事件。 它关注“事件的组合与时序”,能识别出简单规则引擎无法发现的高阶威胁
典型代表工具:Esper(开源 Java CEP 引擎)、Apache Flink CEP、Drools Fusion、IBM Operational Decision Manager 等
主要作用
- 实时产生“复杂事件”(synthesized event),用于触发高级告警
- 检测多事件关联的复杂场景(例如:5 分钟内 3 次失败登录 + 来自同一 IP 的端口扫描 + 异常文件访问)
- 支持时间窗口(sliding window、tumbling window)、序列模式(A 后跟 B 但不跟 C)、聚合(平均、最大、最小)、否定模式(某段时间内没有发生某事件)
示例场景
用户 A 正常工作时间从东京登录,突然在凌晨从俄罗斯 IP 登录,且 2 分钟内尝试访问 10 个敏感服务器 → CEP 可以用一条 EPL(Event Processing Language)语句检测这种跨时间、跨来源的模式
关联示例:192.168.xxx.xxx在五分钟内登录失败超过10次+端口扫描=高危告警
ML集成
用 UEBA 检测异常(如用户平时从 US 登录,突然从 RU,需基线模型)
概念 UEBA = User and Entity Behavior Analytics(用户与实体行为分析)。 它利用机器学习(而非固定规则)为每个用户、主机、IP、应用等实体建立动态行为基线(baseline),然后持续监控当前行为与基线的偏离程度,检测异常
核心机制
- 学习阶段:收集历史数据(登录时间、地点、访问资源、操作频率等),用 ML(如聚类、Isolation Forest、Autoencoder、统计模型)构建“正常”画像
- 检测阶段:实时打分(risk score),偏离基线越远分数越高
- 动态更新:基线会随时间缓慢演化(例如用户换了工作地点)
主要作用
- 发现未知威胁(zero-day、慢速 APT、低速数据窃取、内部威胁)
- 弥补规则-based 检测的盲区(规则写不全所有可能攻击)
- 典型异常示例:
- 用户平时只在美国工作时间从公司 VPN 登录,突然从俄罗斯 IP 凌晨登录
- 财务人员突然大量下载 HR 目录文件
- 服务器平时只跑 Web 服务,突然启动挖矿进程
实例
用户平时 9:00–18:00 从东京 IP 登录公司邮箱,突然 03:00 从俄罗斯 IP 登录 → UEBA 会给该行为打高风险分,生成异常事件,供第 5 层告警。
5、告警与响应层
- 生成告警通知,例如发送邮件、SMS、后台票务系统
- 支持自动化响应,例如封禁IP或脚本利用
- 提供可视化界面报告——连接仪表盘,展示事件、历史趋势并合成相关报告
机制
告警分级(高危/中危/低危/无风险),集成SOAR自动化响应
概念 SOAR = Security Orchestration, Automation and Response(安全编排、自动化与响应)。 它是一个流程自动化平台,把安全工具(SIEM、EDR、防火墙、票务系统、邮件等)串联起来,通过预定义的playbook(剧本)自动或半自动执行响应动作
典型代表工具
- 开源:TheHive + Cortex(分析器)、Shuffle、Demisto(现 Palo Alto XSOAR 的前身开源部分)
- 商用:Splunk SOAR、Swimlane、IBM Resilient、ServiceNow SecOps 等
主要作用
- Response:提供标准化的响应流程,减少人为错误,提高响应速度
- Orchestration:把分散工具连成工作流
- Automation:自动执行重复、低价值动作(查 VirusTotal、封 IP、创建 Jira ticket、隔离主机)
示例场景(TheHive 集成) SIEM 检测到暴力破解 → 自动触发 Shuffle/TheHive playbook:
- 通知 Slack/邮件 + 创建工单
- 查 AbuseIPDB / VirusTotal 评分
- 如果分数高 → 自动创建 TheHive Case
- 自动调用 Cortex Analyzer 分析 IP
- 自动通过 API 让防火墙封禁 IP
集成
Python可调用外部API(Slack for通知、iptables for封禁)
6、管理与合规层
- 配置规则、用户访问控制、审计日志
- 确保符合法规,法规包括且不限于GDPR、HIPAA
工作机制
SIEM采用闭环流程的工作机制:
- 采集与传输:数据源生成日志,采集器推送或拉取数据到中央服务器
- 处理与分析:日志进入管道,经过分析、标准化后进入规则引擎
- 触发规则匹配时间会识别事件
- 触发相关联事件会提升优先级
- 检测异常:针对异常登录,机制包括:
- 阈值检测:1分钟内3次登录失败
- 地理位置检查:登录IP不匹配历史位置
- 行为基线:接入大模型学习历史浏览行为,比对当前行为和历史行为差异
- 告警生成:一旦检测到威胁,生成事件票据(Ticket),通知安全团队。严重事件可触发SOAR(Security Orchestration, Automation and Response)自动化响应。
- 回顾和优化:通过forensic分析历史数据,优化检测,减少误报(False Positives)
graph TB A((采集与传输)) B[处理与分析] C[检测异常] D[告警生成] E((回顾和优化)) A--推送到服务器-->B B--日志进入管道-->C C--规则匹配、阈值检测、行为匹配-->D D--生成票据,告知团队-->E E--模型学习、分析历史-->A
Python模块示例
现假设需要开发一个模块来监控Linux系统下的/var/log/auth.log文件,实时采集日志,检测异常登录,并通过邮件发送告警
这个实例使用Python标准库和少量内置模块(如re用于解析、smtplib用于邮件)采用文件尾随(tailing)机制模拟实时采集(类似linux的tail -f命令),避免依赖外部库
代码示例:
import re
import time
import smtplib
from email.mime.text import MIMEText # 用于构建邮件内容
from collections import defaultdict # 用于存储登录失败的时间戳列表
from datetime import datetime, timedelta # 用于处理时间窗口
# 配置参数
LOG_FILE = '/var/log/auth.log' # 替换为实际日志路径(需读权限)
ALERT_THRESHOLD = 3 # 失败登录阈值
TIME_WINDOW = timedelta(minutes=1) # 时间窗口
SMTP_SERVER = 'smtp.example.com' # 替换为你的 SMTP 服务器
SMTP_PORT = 587 # 替换为你的 SMTP 端口
SENDER_EMAIL = 'alert@example.com' # 替换为你的发送邮箱
SENDER_PASSWORD = 'your_password' # 替换为你的发送邮箱密码
RECIPIENT_EMAIL = 'admin@example.com' # 替换为你的接收邮箱
# 正则表达式解析 sshd 登录日志(示例:Failed password for user from IP)
FAILED_LOGIN_PATTERN = re.compile(r'Failed password for (\w+) from ([\d.]+) port \d+ ssh2')
SUCCESS_LOGIN_PATTERN = re.compile(r'Accepted password for (\w+) from ([\d.]+) port \d+ ssh2')
class LoginTracker: # 追踪登录失败的类
def __init__(self):
self.failures = defaultdict(list) # IP -> list of failure timestamps
def process_log_line(self, line):
failed_match = FAILED_LOGIN_PATTERN.search(line)
if failed_match: # 如果匹配到失败登录
user, ip = failed_match.groups()
now = datetime.now() # 当前时间
self.failures[ip].append(now) # 记录失败时间
# 清理过期记录
self.failures[ip] = [t for t in self.failures[ip] if now - t < TIME_WINDOW] # 保留时间窗口内的记录
# 检查阈值
if len(self.failures[ip]) >= ALERT_THRESHOLD:
self.send_alert(ip, user, len(self.failures[ip]))
self.failures[ip] = [] # 重置计数以避免重复告警
success_match = SUCCESS_LOGIN_PATTERN.search(line)
if success_match:
user, ip = success_match.groups()
print(f"Successful login: User {user} from {ip}")
def send_alert(self, ip, user, count): # 发送告警邮件
msg = MIMEText(f"Abnormal login detected: {count} failed attempts for user {user} from IP {ip}") # 邮件内容
msg['Subject'] = 'Security Alert: Abnormal Login'
msg['From'] = SENDER_EMAIL # 邮件发送者
msg['To'] = RECIPIENT_EMAIL # 邮件接收者
try: # 连接 SMTP 服务器并发送邮件
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) # 连接 SMTP 服务器
server.starttls() # 启用 TLS
server.login(SENDER_EMAIL, SENDER_PASSWORD) # 登录 SMTP 服务器
server.sendmail(SENDER_EMAIL, RECIPIENT_EMAIL, msg.as_string())
server.quit()
print("Alert sent successfully")
except Exception as e:
print(f"Failed to send alert: {e}")
def tail_log_file(file_path): # 实时读取日志文件
with open(file_path, 'r') as f:
# 移动到文件末尾
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.1) # 轮询间隔
continue
yield line.strip() # 返回新行
# 主函数
if __name__ == '__main__':
tracker = LoginTracker()
print("Starting log monitoring...")
for line in tail_log_file(LOG_FILE): # 处理每一行日志
tracker.process_log_line(line) # 解析日志行并更新登录状态
Python代码片段示例
SIEM的核心架构通常被描述为分层管道(pipeline)式结构,逻辑上看,六层架构每层实现的模块分别独立运行后交给下一层,经过闭环流程实现记录、统一、存储、分析、响应、优化。但从实际系统运行中,这些层都是高度并行、异步、流式处理,即同一时刻成千上万各事件在不同层、不同节点同时被处理。下面将针对各层单独用代码举例解释
1、数据采集层(Data Collection / Ingestion)
负责从各种源头实时/批量拉取或接收日志,典型的实现方式有:
文件tail、Syslog UDP/TCP接收、Beats/Filebeat、API polling等
Python示例(文件tail+简单Syslog UDP服务器)
# collector.py - 采集层示例
import socket
import threading
from pathlib import Path
import time
def tail_file(log_path: str, callback): # 定义一个函数来模拟 tail -f 功能,实时读取日志文件并调用回调函数处理新行
"""模拟 tail -f"""
path = Path(log_path)
last_size = path.stat().st_size # 获取文件初始大小,以便后续读取新增内容
while True:
time.sleep(0.2)
current_size = path.stat().st_size # 获取当前文件大小,如果有新增内容,则读取新增部分并调用回调函数处理
if current_size > last_size: # 如果文件有新增内容,则读取新增部分并调用回调函数处理
with open(path, 'r') as f:
f.seek(last_size) # 移动文件指针到上次读取的位置
new_lines = f.readlines()
for line in new_lines: # 处理每一行新增日志,调用回调函数
callback(line.strip()) # 去除行末的换行符并调用回调函数
last_size = current_size
def start_syslog_udp_server(port=514, callback=None): # 定义一个函数来启动一个简单的 UDP Syslog 服务器,监听指定端口并调用回调函数处理接收到的日志消息
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 创建一个 UDP 套接字
sock.bind(('0.0.0.0', port))
print(f"Syslog UDP listening on :{port}")
while True:
data, addr = sock.recvfrom(4096) # 接收 UDP 消息
line = data.decode('utf-8', errors='ignore').strip() # 解码消息并去除行末的换行符
if callback:
callback(line) # 调用回调函数处理接收到的日志消息
# 使用示例
if __name__ == "__main__": # 定义一个简单的回调函数来打印采集到的日志行
def print_line(line):
print(f"[采集] {line}")
# 启动文件 tail
threading.Thread(target=tail_file, args=("/var/log/auth.log", print_line), daemon=True).start()
# 启动 UDP Syslog 接收(可选)
# threading.Thread(target=start_syslog_udp_server, args=(514, print_line), daemon=True).start()
while True:
time.sleep(10) # 主线程保持运行,采集线程在后台工作 # 可以根据需要添加更多的采集方式,如 TCP Syslog、API 采集等
2、数据标准化与解析层(Normalization & Parsing)
把原始日志转化成同一结构,提取关键字段
典型实现方式:正则/Grok/CEF/JSON schema映射
Python实例(简单正则 + CEF-like 输出)
# parser.py
import re
import json
from datetime import datetime
# 示例:sshd 失败登录正则
FAIL_PATTERN = re.compile(r'Failed password for (\S+) from ([\d.]+) port \d+ ssh2')
def parse_line(raw_line: str) -> dict | None: # 解析日志行,提取事件信息
ts = datetime.utcnow().isoformat() + "Z" # 实际应从日志提取时间
event = {
"timestamp": ts,
"raw": raw_line,
"source": "auth.log",
"event_type": "unknown"
}
m = FAIL_PATTERN.search(raw_line) # 匹配失败登录事件
if m:
user, src_ip = m.groups()
event.update({
"event_type": "authentication_failure",
"user": user,
"src_ip": src_ip,
"severity": "medium",
"category": "authentication"
})
return event
# 可继续添加成功登录、sudo、cron 等多种模式...
return None # 忽略不关心的日志
# 使用示例
if __name__ == "__main__": # 测试解析功能
test_lines = [
'Sep 4 10:15:23 server sshd[1234]: Failed password for invalid user admin from 192.168.1.100 port 12345 ssh2',
'Sep 4 10:16:01 server sshd[5678]: Accepted password for rain from 203.0.113.50 port 54321 ssh2'
]
for line in test_lines:
parsed = parse_line(line) # 解析日志行,输出结果
if parsed:
print(json.dumps(parsed, indent=2, ensure_ascii=False))
3、存储与索引层(Storage & Indexing)
把解析后的事件存下来,支持快速检索。
典型实现方式:Elasticsearch、OpenSearch、ClickHouse、PostgreSQL + TimescaleDB、SQLite(原型)。
Python 示例(使用 SQLite + 简单时间分区表)
# storage.py
import sqlite3
import json
from parser import parse_line # 从 parser 模块导入解析函数
from datetime import datetime
DB_PATH = "siem_events.db"
def init_db(): # 初始化数据库,创建事件表
conn = sqlite3.connect(DB_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
event_type TEXT,
src_ip TEXT,
user TEXT,
severity TEXT,
raw TEXT,
json_data TEXT
)
""")
conn.commit()
return conn
def store_event(event: dict): # 存储事件到数据库
conn = sqlite3.connect(DB_PATH) # 连接数据库
conn.execute("""
INSERT INTO events (timestamp, event_type, src_ip, user, severity, raw, json_data)
VALUES (?, ?, ?, ?, ?, ?, ?) # 使用参数化查询防止 SQL 注入
""", (
event["timestamp"],
event.get("event_type"),
event.get("src_ip"),
event.get("user"),
event.get("severity"),
event["raw"],
json.dumps(event)
)) # 将事件字典转换为 JSON 存储
conn.commit()
conn.close()
# 使用示例:结合 parser
if __name__ == "__main__":
init_db()
sample_event = parse_line("Failed password for test from 1.2.3.4 port 22 ssh2") # 从 parser 导入解析函数,解析示例日志行
if sample_event:
store_event(sample_event)
4、分析与关联层(Analysis & Correlation)
核心检测引擎:规则匹配、阈值、时间窗口关联、简单 ML。
典型实现方式:规则引擎(Sigma)、CEP、状态机、ML 异常检测。
Python 示例(简单滑动窗口 + 规则)
# analyzer.py
from collections import defaultdict, deque
from datetime import datetime, timedelta
class FailureWindow: # 监控短时间内的失败事件
def __init__(self, threshold=5, window_sec=300): # threshold: 失败次数阈值, window_sec: 时间窗口长度(秒)
self.failures = defaultdict(lambda: deque(maxlen=100)) # 存储每个IP的失败事件时间戳,使用deque自动丢弃过旧的记录
self.threshold = threshold
self.window = timedelta(seconds=window_sec) # 转换为timedelta对象,方便时间比较
def add_failure(self, ip: str, ts: datetime): # 添加失败事件
self.failures[ip].append(ts) # 添加当前失败事件的时间戳
# 清理过期
while self.failures[ip] and ts - self.failures[ip][0] > self.window: # 移除窗口外的旧记录
self.failures[ip].popleft()
def is_brute_force(self, ip: str) -> bool: # 判断是否达到暴力攻击的条件
if len(self.failures[ip]) >= self.threshold: # 如果当前IP的失败事件数量超过阈值,认为可能是暴力攻击
return True
return False
# 规则示例:短时间多次失败登录
analyzer = FailureWindow(threshold=4, window_sec=120)
def analyze_event(event: dict): # 分析事件,判断是否触发暴力攻击规则
if event.get("event_type") == "authentication_failure":
ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00")) # 解析时间戳,处理UTC格式
ip = event["src_ip"]
analyzer.add_failure(ip, ts)
if analyzer.is_brute_force(ip):
return {
"alert": True,
"type": "brute_force_attempt",
"ip": ip,
"count": len(analyzer.failures[ip]),
"time_window": "2分钟"
}
return None
5、告警与响应层(Alerting & Response)
产生告警、通知、自动化动作。
典型实现方式:邮件、Slack/Webhook、SOAR 脚本、防火墙 API。
Python 示例(邮件 + 简单 iptables 封禁)
#alerter.py
import smtplib
from email.mime.text import MIMEText
import subprocess
def send_email_alert(alert_data: dict, to_email="admin@example.com"): # 发送告警邮件
msg = MIMEText(f"高危告警:{alert_data}")
msg['Subject'] = f"SIEM Alert - {alert_data.get('type', '未知')}"
msg['From'] = "siem@example.com"
msg['To'] = to_email
with smtplib.SMTP("smtp.example.com", 587) as server: # 连接SMTP服务器
server.starttls()
server.login("siem@example.com", "password")
server.send_message(msg)
def auto_block_ip(ip: str): # 自动封禁IP地址
try:
# 注意:生产环境需谨慎,最好用 nftables 或专用防火墙接口
subprocess.run(["sudo", "iptables", "-A", "INPUT", "-s", ip, "-j", "DROP"], check=True, timeout=10) # 使用iptables封禁IP,设置超时避免挂起
print(f"已自动封禁 IP: {ip}")
except Exception as e:
print(f"封禁失败: {e}")
# 使用示例
if __name__ == "__main__": # 模拟接收到一个高危告警
sample_alert = {"type": "brute_force_attempt", "ip": "1.2.3.4", "count": 7} # 模拟一个暴力攻击告警
send_email_alert(sample_alert) # 发送告警邮件
auto_block_ip(sample_alert["ip"]) # 自动封禁攻击IP
6、展示、管理与合规层(Visualization, Management & Compliance)
仪表盘、查询、报告、用户权限、审计、归档。
典型实现方式:Kibana/ Grafana、自定义 Flask/Dash 界面、RBAC。
Python 示例(简单 Flask + SQLite 查询 dashboard)
# dashboard.py - 极简 Web 展示
from flask import Flask, render_template_string
import sqlite3
app = Flask(__name__)
HTML = """
<!doctype html>
<title>SIEM Mini Dashboard</title>
<h1>最近告警事件</h1>
<table border=1>
<tr><th>时间</th><th>类型</th><th>IP</th><th>用户</th><th>原始日志</th></tr>
{% for row in events %}
<tr><td>{{ row[0] }}</td><td>{{ row[1] }}</td><td>{{ row[2] }}</td><td>{{ row[3] }}</td><td>{{ row[6][:100] }}</td></tr>
{% endfor %}
</table>
"""
@app.route("/") # 首页显示最近的告警事件
def dashboard(): # 从数据库读取最近的事件并展示
conn = sqlite3.connect("siem_events.db")
cur = conn.cursor() # 查询最近的事件
cur.execute("SELECT timestamp, event_type, src_ip, user, severity, raw FROM events ORDER BY timestamp DESC LIMIT 20")
events = cur.fetchall() # 关闭数据库连接
conn.close()
return render_template_string(HTML, events=events) # 渲染 HTML 模板并返回
if __name__ == "__main__": # 启动 Flask 应用
app.run(debug=True, port=5005)
之后组合,形成一个小型管道:
# main.py 示例组合
from collector import tail_file
from parser import parse_line
from storage import store_event
from analyzer import analyze_event
from alerter import send_email_alert # 或其他动作
def process_line(raw): # 处理每一行日志
event = parse_line(raw)
if event:
store_event(event)
alert = analyze_event(event)
if alert and alert.get("alert"):
send_email_alert(alert)
tail_file("/var/log/auth.log", process_line) # 监控日志文件并处理新行








