摘要:在本文中,我们将构建威胁情报平台(TIP)的“大脑”——情报关联分析引擎。我们将解决“数据孤岛”问题,将我们内部的EDR遥测数据(如进程、网络日志)与外部的威胁情报(IoCs)进行实时碰撞。我们将设计一个基于Redis Pub/Sub和Elasticsearch的异步处理流水线:我们的EDR传感器(发布者)将内部日志实时发布到Redis;一个Python关联引擎(订阅者)在收到日志后,立即查询我们的Elasticsearch情报数据库,以判断日志中的IP、哈希或域名是否命中“黑名单”。通过这种内部日志与外部情报的实时关联,我们的平台将能够从海量的“可疑”事件中,自动筛选出高置信度的“已知威胁”告警,实现从“被动防御”到“主动狩猎”的飞跃。
关键词:Python, 威胁情报, 关联分析, EDR, SIEM, Elasticsearch, Redis Pub/Sub, 实时检测
正文
1. 关联的价值:从“噪音”到“信号”
一个企业每天会产生数亿条日志。单独来看,每一条都可能是无害的“噪音”:
EDR日志:。 (这可能是正常的Windows更新,也可能是C2通信)
[2025-11-03 14:30:05] 'svchost.exe' (PID 1234) connected to 185.12.33.4:80
威胁情报:。 (这是一个孤立的情报点)
[2025-11-03 14:00:00] IoC '185.12.33.4' (ipv4) added to list, associated with 'Emotet C2'
**关联分析(Correlation)**就是将这两条信息“碰撞”在一起的引擎。
关联结果:
[2025-11-03 14:30:05] [!!!] 高危告警: 内部主机(svchost.exe) 正在连接一个已知的'Emotet C2'服务器(185.12.33.4)!
通过关联,一条低置信度的“噪音”瞬间转变成了一条高置信度、可立即采取行动的“信号”。这就是现代SIEM(安全信息和事件管理)和EDR平台的核心价值。
2. 架构设计:实时日志-情报流水线
我们将设计一个解耦的、基于事件的流水线来实现这一目标:
EDR传感器():在端点上运行,捕获到所有进程/网络事件,并将其(作为JSON)
edr_sensor.py到Redis的
PUBLISH频道。
internal_logs
IoC聚合器():在后台定时运行,从外部拉取IoC,并将它们存入
ioc_aggregator.py的
Elasticsearch索引中。
threat_intel_iocs
关联引擎():(我们今天的主角)一个独立的Python服务。它订阅Redis的
correlation_engine.py频道。
internal_logs
工作流程: a. 从Redis收到一条新日志(例如,
correlation_engine)。 b. 它立即向
connect to 185.12.33.4发起查询:“
Elasticsearch索引中,是否存在
threat_intel_iocs的文档?” c. 如果命中,它会生成一个“高危告警”事件。 d. 它将这个告警
ioc_value == 185.12.33.4到Redis的另一个频道,例如
PUBLISH。
high_severity_alerts
响应器():(来自上一章的)订阅
responder.py频道,并执行自动化遏制动作。
high_severity_alerts
3. 代码实现:
correlation_engine.py
correlation_engine.py
环境准备:
确保和
redis-server正在运行。
elasticsearch
安装Python库:
Bash
pip install redis elasticsearch
Python
# correlation_engine.py
import redis
import json
import sys
from elasticsearch import Elasticsearch
from datetime import datetime
# --- 配置 ---
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
ELASTICSEARCH_HOSTS = ["http://localhost:9200"]
IOC_INDEX = "threat_intel_iocs" # 我们存储IoC的ES索引
LOGS_CHANNEL = "internal_logs" # 我们订阅的内部日志频道
ALERTS_CHANNEL = "high_severity_alerts" # 我们发布告警的频道
class CorrelationEngine:
def __init__(self):
try:
self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
self.redis_pubsub = self.redis_client.pubsub()
self.redis_pubsub.subscribe(LOGS_CHANNEL)
print(f"[*] 已连接到Redis并订阅频道: {LOGS_CHANNEL}")
except redis.exceptions.ConnectionError as e:
print(f"[!] 严重错误: 无法连接到Redis! {e}")
sys.exit(1)
try:
self.es_client = Elasticsearch(hosts=ELASTICSEARCH_HOSTS)
if not self.es_client.ping():
raise ConnectionError("ES Ping 失败")
print(f"[*] 已连接到Elasticsearch: {ELASTICSEARCH_HOSTS[0]}")
except Exception as e:
print(f"[!] 严重错误: 无法连接到Elasticsearch! {e}")
sys.exit(1)
def check_ioc_in_es(self, ioc_value, ioc_type):
"""在Elasticsearch中查询一个IoC是否存在。"""
query = {
"query": {
"bool": {
"must": [
{"term": {"ioc_value.keyword": ioc_value}},
{"term": {"ioc_type.keyword": ioc_type}}
]
}
}
}
try:
res = self.es_client.search(index=IOC_INDEX, body=query)
if res['hits']['total']['value'] > 0:
# 命中!返回命中的情报详情
return res['hits']['hits'][0]['_source']
except Exception:
# 索引不存在或查询失败
pass
return None
def process_log(self, log_data):
"""处理一条来自EDR的日志。"""
# 这是一个简化的日志解析
log_type = log_data.get('type')
ioc_to_check = None
ioc_type_to_check = None
if log_type == 'network_connection':
ioc_to_check = log_data.get('remote_ip')
ioc_type_to_check = 'ipv4'
elif log_type == 'process_create':
ioc_to_check = log_data.get('process_hash')
ioc_type_to_check = 'hash_sha256'
elif log_type == 'dns_query':
ioc_to_check = log_data.get('query_name')
ioc_type_to_check = 'domain'
if not ioc_to_check:
return # 这条日志没有可供关联的IoC
# --- 核心关联逻辑 ---
threat_info = self.check_ioc_in_es(ioc_to_check, ioc_type_to_check)
if threat_info:
# 命中了威胁情报!
self.generate_alert(log_data, threat_info)
def generate_alert(self, edr_log, threat_info):
"""生成并发布高危告警。"""
alert = {
"alert_type": "InternalLog_Matches_ThreatIntel",
"level": "HIGH",
"timestamp": datetime.utcnow().isoformat(),
"internal_event": edr_log,
"external_threat": threat_info,
# (为响应器准备的)
"actionable_pid": edr_log.get("pid"),
"actionable_ip": edr_log.get("remote_ip")
}
print("
" + "="*70)
print("[!!!] 实时告警: 内部活动命中威胁情报!")
print(json.dumps(alert, indent=2))
print("="*70 + "
")
# 发布告警,触发自动化响应
self.redis_client.publish(ALERTS_CHANNEL, json.dumps(alert))
def start(self):
"""启动关联引擎的监听循环。"""
try:
for message in self.redis_pubsub.listen():
if message['type'] == 'message':
log_data = json.loads(message['data'])
self.process_log(log_data)
except KeyboardInterrupt:
print("
[*] 正在关闭关联引擎...")
finally:
self.redis_pubsub.close()
if __name__ == "__main__":
engine = CorrelationEngine()
engine.start()
4. 总结与下一步
我们成功地构建了整个安全监控平台的“中央神经系统”。这个服务,就是连接“感知”(EDR传感器)和“反应”(Responder)的关键枢纽。
correlation_engine.py
通过将内部实时日志流与外部聚合的IoC数据库进行自动化、实时的“碰撞”,我们建立了一个高置信度的威胁告警系统。它不再是基于单一规则或单一情报源的“盲人摸象”,而是能够真正地“看到”并“理解”正在发生的威胁。
我们现在有了一个强大的情报平台。但这个平台依赖于一个前提:我们的IoC数据库()是完整的。我们如何从我们分析的恶意软件样本(来自沙箱)或网络流量(来自代理)中,自动地提取出新的、未知的IoC,来反哺我们的情报库呢?
ti_iocs