Python网络安全工具高级开发(三十三):威胁检测之情报关联分析引擎

内容分享3小时前发布
0 0 0

摘要:在本文中,我们将构建威胁情报平台(TIP)的“大脑”——情报关联分析引擎。我们将解决“数据孤岛”问题,将我们内部的EDR遥测数据(如进程、网络日志)与外部的威胁情报(IoCs)进行实时碰撞。我们将设计一个基于Redis Pub/SubElasticsearch的异步处理流水线:我们的EDR传感器(发布者)将内部日志实时发布到Redis;一个Python关联引擎(订阅者)在收到日志后,立即查询我们的Elasticsearch情报数据库,以判断日志中的IP、哈希或域名是否命中“黑名单”。通过这种内部日志与外部情报的实时关联,我们的平台将能够从海量的“可疑”事件中,自动筛选出高置信度的“已知威胁”告警,实现从“被动防御”到“主动狩猎”的飞跃。

关键词:Python, 威胁情报, 关联分析, EDR, SIEM, Elasticsearch, Redis Pub/Sub, 实时检测


正文

1. 关联的价值:从“噪音”到“信号”

一个企业每天会产生数亿条日志。单独来看,每一条都可能是无害的“噪音”:

EDR日志
[2025-11-03 14:30:05] 'svchost.exe' (PID 1234) connected to 185.12.33.4:80
。 (这可能是正常的Windows更新,也可能是C2通信)

威胁情报
[2025-11-03 14:00:00] IoC '185.12.33.4' (ipv4) added to list, associated with 'Emotet C2'
。 (这是一个孤立的情报点)

**关联分析(Correlation)**就是将这两条信息“碰撞”在一起的引擎。

关联结果
[2025-11-03 14:30:05] [!!!] 高危告警: 内部主机(svchost.exe) 正在连接一个已知的'Emotet C2'服务器(185.12.33.4)!

通过关联,一条低置信度的“噪音”瞬间转变成了一条高置信度、可立即采取行动的“信号”。这就是现代SIEM(安全信息和事件管理)和EDR平台的核心价值。

2. 架构设计:实时日志-情报流水线

我们将设计一个解耦的、基于事件的流水线来实现这一目标:

EDR传感器(
edr_sensor.py
:在端点上运行,捕获到所有进程/网络事件,并将其(作为JSON)
PUBLISH
到Redis的
internal_logs
频道。

IoC聚合器(
ioc_aggregator.py
:在后台定时运行,从外部拉取IoC,并将它们存入
Elasticsearch

threat_intel_iocs
索引中。

关联引擎(
correlation_engine.py
(我们今天的主角)一个独立的Python服务。它订阅Redis的
internal_logs
频道。

工作流程: a.
correlation_engine
从Redis收到一条新日志(例如,
connect to 185.12.33.4
)。 b. 它立即
Elasticsearch
发起查询:“
threat_intel_iocs
索引中,是否存在
ioc_value == 185.12.33.4
的文档?” c. 如果命中,它会生成一个“高危告警”事件。 d. 它将这个告警
PUBLISH
到Redis的另一个频道,例如
high_severity_alerts

响应器(
responder.py
:(来自上一章的)订阅
high_severity_alerts
频道,并执行自动化遏制动作。

3. 代码实现:
correlation_engine.py

环境准备

确保
redis-server

elasticsearch
正在运行。

安装Python库:

Bash


pip install redis elasticsearch

Python



# correlation_engine.py
import redis
import json
import sys
from elasticsearch import Elasticsearch
from datetime import datetime
 
# --- 配置 ---
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
ELASTICSEARCH_HOSTS = ["http://localhost:9200"]
IOC_INDEX = "threat_intel_iocs"     # 我们存储IoC的ES索引
LOGS_CHANNEL = "internal_logs"      # 我们订阅的内部日志频道
ALERTS_CHANNEL = "high_severity_alerts" # 我们发布告警的频道
 
class CorrelationEngine:
    def __init__(self):
        try:
            self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
            self.redis_pubsub = self.redis_client.pubsub()
            self.redis_pubsub.subscribe(LOGS_CHANNEL)
            print(f"[*] 已连接到Redis并订阅频道: {LOGS_CHANNEL}")
        except redis.exceptions.ConnectionError as e:
            print(f"[!] 严重错误: 无法连接到Redis! {e}")
            sys.exit(1)
            
        try:
            self.es_client = Elasticsearch(hosts=ELASTICSEARCH_HOSTS)
            if not self.es_client.ping():
                raise ConnectionError("ES Ping 失败")
            print(f"[*] 已连接到Elasticsearch: {ELASTICSEARCH_HOSTS[0]}")
        except Exception as e:
            print(f"[!] 严重错误: 无法连接到Elasticsearch! {e}")
            sys.exit(1)
 
    def check_ioc_in_es(self, ioc_value, ioc_type):
        """在Elasticsearch中查询一个IoC是否存在。"""
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"term": {"ioc_value.keyword": ioc_value}},
                        {"term": {"ioc_type.keyword": ioc_type}}
                    ]
                }
            }
        }
        try:
            res = self.es_client.search(index=IOC_INDEX, body=query)
            if res['hits']['total']['value'] > 0:
                # 命中!返回命中的情报详情
                return res['hits']['hits'][0]['_source']
        except Exception:
            # 索引不存在或查询失败
            pass
        return None
 
    def process_log(self, log_data):
        """处理一条来自EDR的日志。"""
        # 这是一个简化的日志解析
        log_type = log_data.get('type')
        ioc_to_check = None
        ioc_type_to_check = None
        
        if log_type == 'network_connection':
            ioc_to_check = log_data.get('remote_ip')
            ioc_type_to_check = 'ipv4'
        elif log_type == 'process_create':
            ioc_to_check = log_data.get('process_hash')
            ioc_type_to_check = 'hash_sha256'
        elif log_type == 'dns_query':
            ioc_to_check = log_data.get('query_name')
            ioc_type_to_check = 'domain'
        
        if not ioc_to_check:
            return # 这条日志没有可供关联的IoC
 
        # --- 核心关联逻辑 ---
        threat_info = self.check_ioc_in_es(ioc_to_check, ioc_type_to_check)
        
        if threat_info:
            # 命中了威胁情报!
            self.generate_alert(log_data, threat_info)
 
    def generate_alert(self, edr_log, threat_info):
        """生成并发布高危告警。"""
        alert = {
            "alert_type": "InternalLog_Matches_ThreatIntel",
            "level": "HIGH",
            "timestamp": datetime.utcnow().isoformat(),
            "internal_event": edr_log,
            "external_threat": threat_info,
            # (为响应器准备的)
            "actionable_pid": edr_log.get("pid"),
            "actionable_ip": edr_log.get("remote_ip") 
        }
        
        print("
" + "="*70)
        print("[!!!] 实时告警: 内部活动命中威胁情报!")
        print(json.dumps(alert, indent=2))
        print("="*70 + "
")
        
        # 发布告警,触发自动化响应
        self.redis_client.publish(ALERTS_CHANNEL, json.dumps(alert))
 
    def start(self):
        """启动关联引擎的监听循环。"""
        try:
            for message in self.redis_pubsub.listen():
                if message['type'] == 'message':
                    log_data = json.loads(message['data'])
                    self.process_log(log_data)
        except KeyboardInterrupt:
            print("
[*] 正在关闭关联引擎...")
        finally:
            self.redis_pubsub.close()
 
if __name__ == "__main__":
    engine = CorrelationEngine()
    engine.start()

4. 总结与下一步

我们成功地构建了整个安全监控平台的“中央神经系统”。这个
correlation_engine.py
服务,就是连接“感知”(EDR传感器)和“反应”(Responder)的关键枢纽。

通过将内部实时日志流外部聚合的IoC数据库进行自动化、实时的“碰撞”,我们建立了一个高置信度的威胁告警系统。它不再是基于单一规则或单一情报源的“盲人摸象”,而是能够真正地“看到”并“理解”正在发生的威胁。

我们现在有了一个强大的情报平台。但这个平台依赖于一个前提:我们的IoC数据库(
ti_iocs
)是完整的。我们如何从我们分析的恶意软件样本(来自沙箱)或网络流量(来自代理)中,自动地提取出新的、未知的IoC,来反哺我们的情报库呢?

© 版权声明

相关文章

暂无评论

none
暂无评论...