Python网络安全工具高级开发(三十一):威胁检测之自动化响应机制 (SOAR)

内容分享2个月前发布
0 0 0

摘要:在本文中,我们将为EDR系统装上“利剑”,实现其“R”(Response,响应)的功能。我们将超越被动的告警,探讨SOAR(安全编排、自动化与响应)的核心理念,即如何将安全告警自动化地转化为遏制(Containment)动作。我们将设计一个解耦的响应架构:利用Redis Pub/Sub作为告警总线,让我们之前构建的“检测器”发布高危告警。然后,我们将构建一个独立的Python**“响应器”(Responder)**服务,它会订阅这些告警,并根据预设的“剧本(Playbook)”自动执行响应动作,例如:使用
psutil
终止恶意进程,并使用
subprocess
模块调用系统防火墙(如
iptables

netsh
隔离受感染主机的网络连接。这标志着我们的工具从“监控平台”进化为“主动防御系统”。

关键词:** Python, EDR, SOAR, 自动化响应, Playbook,
psutil
,
subprocess
, Redis Pub/Sub, 威胁遏制


正文

⚠️ 终极警告:高风险自动化!

自动化响应(如
kill
进程、修改防火墙)是极其危险的操作。一个“误报(False Positive)”可能导致你的核心生产服务被自动终止或T-T,造成比攻击本身更严重的损失。本文所有代码必须在完全隔离的、100%可承受破坏的测试环境中实验。绝不能在生产环境或任何重要系统上运行!

1. 为什么“响应”必须自动化?

我们之前构建的“规则引擎”和“异常检测”工具,现在能源源不断地产生告警。但在现代攻击(如勒索软件)面前,从“告警”到“安全分析师看到邮件”再到“管理员登录服务器处理”,这个以分钟甚至小时为单位的响应时间,已经太慢了。攻击者可能在30秒内就已经完成了横向移动和数据加密。

自动化响应的目标,就是将这个响应时间缩短到毫秒级别。在告警产生的瞬间,系统就自动执行预设的“剧本”,在攻击者造成实质性破坏前,将其“遏制(Contain)”。

2. 架构设计:解耦“检测”与“响应”

我们不应该
edr_sensor.py
(检测器)中直接调用
os.system("kill ...")
。这会使检测和响应紧密耦合,难以维护。

正确的架构(基于我们在企业级架构中学到的事件驱动思想):

告警总线 (Alert Bus):使用Redis Pub/Sub(或Kafka)创建一个专用的“告警频道”(例如
high_severity_alerts
)。

检测器 (Detector):我们之前的
edr_sensor.py
(规则引擎)或
anomaly_detector.py
(异常引擎)作为发布者(Publisher)。当它们发现一个高置信度(High-Fidelity)的威胁时(例如,100%确定的
winword.exe -> powershell.exe
),它们就向
high_severity_alerts
频道发布一个JSON格式的告警。

响应器 (Responder):一个全新的、独立的Python服务,作为订阅者(Subscriber)。它监听这个频道,一旦收到告警,就解析它并执行相应的“剧本”。

优点

解耦:检测器不关心谁来响应,响应器也不关心告警是怎么产生的。

可扩展:我们可以轻松添加更多的“响应器”(如一个用于“自动取证”的,一个用于“邮件通知”的),它们都监听同一个告警频道。

安全:响应逻辑被集中管理,更易于审计和控制。

3. 剧本(Playbook):我们的响应预案

对于
edr_sensor.py
发现的“可疑进程创建链”告警,我们的剧本是:

Trigger: 收到
high_severity_alerts
频道的消息。

Parse: 解析JSON,获取恶意进程的
pid

source_ip
(如果存在)。

Action 1 (Host Containment): 使用
psutil.Process(pid).terminate()
,立即终止该恶意进程。

Action 2 (Network Containment): (可选,如果告警包含IP)使用
subprocess
调用
iptables

netsh advfirewall
在防火墙上拉黑该进程的父进程(如Word)或其通信的IP。

Action 3 (Log): 记录所有已执行的自动化响应动作。

4. 代码实现

a) 修改
edr_sensor.py
(检测器),使其成为“发布者”
(基于第29篇的代码)

Python



# edr_sensor.py (核心修改: check_rules 和 log_alert 函数)
import redis
import json
# ... (psutil, time, etc.) ...
 
# 连接到Redis
try:
    redis_client = redis.Redis()
    redis_client.ping()
    print("[*] EDR传感器已连接到Redis。")
except Exception as e:
    print(f"[!] 无法连接到Redis: {e}。告警将无法发布!")
    redis_client = None
 
# ... (SUSPICIOUS_PARENT_CHILD_MAP 规则库不变) ...
 
def check_rules(proc_info: dict):
    if not proc_info: return
    p_name = proc_info['name'].lower()
    parent_name = proc_info['parent_name'].lower()
    
    if parent_name in SUSPICIOUS_PARENT_CHILD_MAP:
        if p_name in SUSPICIOUS_PARENT_CHILD_MAP[parent_name]:
            # 准备告警数据
            alert_data = {
                "level": "HIGH",
                "title": "可疑的进程创建链 (LotL)!",
                "description": f"父进程 '{proc_info['parent_name']}' (PID: {proc_info['ppid']}) 启动了子进程 '{proc_info['name']}' (PID: {proc_info['pid']})",
                "details": f"命令行: {proc_info['cmdline']}",
                "timestamp": datetime.now().isoformat(),
                # --- 关键的响应信息 ---
                "actionable_pid": proc_info['pid'] 
            }
            log_alert(alert_data) # 本地打印
            # --- 发布到Redis频道 ---
            if redis_client:
                redis_client.publish("high_severity_alerts", json.dumps(alert_data))
 
# ... (log_alert 函数现在只负责打印) ...
# ... (main_loop 和 main 函数不变) ...

b) 创建
responder.py
(响应器/订阅者)

Python



# responder.py
import redis
import json
import sys
import psutil
import subprocess
 
class Responder:
    def __init__(self):
        try:
            self.redis_client = redis.Redis(decode_responses=True)
            self.pubsub = self.redis_client.pubsub()
            self.pubsub.subscribe("high_severity_alerts")
            print("[*] 自动响应器已启动,正在订阅 'high_severity_alerts' 频道...")
        except Exception as e:
            print(f"[!] 严重错误: 无法连接到Redis! {e}")
            sys.exit(1)
 
    def contain_process(self, pid: int):
        """剧本动作1:终止进程"""
        try:
            if psutil.pid_exists(pid):
                proc = psutil.Process(pid)
                proc_name = proc.name()
                print(f"  [ACTION-HOST] 正在终止进程: {proc_name} (PID: {pid})")
                proc.terminate() # 尝试优雅终止
                proc.wait(timeout=1) # 等待1秒
            else:
                 print(f"  [ACTION-HOST] 进程 {pid} 已不存在。")
        except psutil.NoSuchProcess:
            print(f"  [ACTION-HOST] 进程 {pid} 已不存在。")
        except psutil.AccessDenied:
             print(f"[!] 权限错误: 无法终止PID {pid}。请使用root/管理员权限运行本脚本。")
        except Exception as e:
            print(f"[!] 终止进程 {pid} 时出错: {e}")
 
    def contain_network_ip(self, ip: str):
        """剧本动作2:隔离IP (Linux示例)"""
        if sys.platform == "linux":
            command = ["iptables", "-A", "INPUT", "-s", ip, "-j", "DROP"]
        elif sys.platform == "win32":
            # Windows防火墙命令更复杂
            command = ["netsh", "advfirewall", "firewall", "add", "rule", 
                       f"name=Block_{ip}", "dir=in", "action=block", f"remoteip={ip}"]
        else:
            print(f"  [ACTION-NET] 不支持的平台: {sys.platform}")
            return
            
        try:
            print(f"  [ACTION-NET] 正在执行防火墙命令,拉黑IP: {ip}")
            subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError:
            print(f"[!] 权限错误: 无法执行防火墙命令。请使用root/管理员权限运行。")
        except Exception as e:
            print(f"[!] 执行防火墙命令时出错: {e}")
 
    def run_playbook(self, alert: dict):
        """执行响应剧本"""
        print(f"
[!!!] 收到新告警: {alert.get('title')}")
        
        # --- 剧本执行 ---
        # 1. 解析可执行的动作
        pid_to_kill = alert.get("actionable_pid")
        ip_to_block = alert.get("source_ip") # 假设我们的告警也包含了IP
        
        # 2. 执行主机遏制
        if pid_to_kill:
            self.contain_process(pid_to_kill)
        
        # 3. 执行网络遏制
        if ip_to_block:
            self.contain_network_ip(ip_to_block)
            
        print("[*] 剧本执行完毕。")
 
    def start_listening(self):
        """启动监听循环"""
        try:
            for message in self.pubsub.listen():
                if message['type'] == 'message':
                    alert_data = json.loads(message['data'])
                    self.run_playbook(alert_data)
        except KeyboardInterrupt:
            print("
[*] 正在关闭自动响应器...")
        finally:
            self.pubsub.close()
 
if __name__ == "__main__":
    if (sys.platform == "linux" and os.geteuid() != 0) or 
       (sys.platform == "win32" and not 'ADMIN' in os.environ.get('USERNAME', '').upper()):
        print("[!] 错误: 本工具需要root或管理员权限来终止进程和修改防火墙。")
        sys.exit(1)
        
    responder = Responder()
    responder.start_listening()

5. 总结

我们成功地将EDR(端点检测与响应)的“检测(D)”与“响应(R)”连接起来,构建了一个微型的SOAR(安全编排、自动化与响应)系统。通过Redis Pub/Sub事件总线,我们的检测器(Sensor)和响应器(Responder)实现了完全解耦,使得整个系统变得高度可扩展和可维护。

现在,当一个已知的恶意行为(如Word启动PowerShell)在端点上被检测到的毫秒内,我们的
responder.py
就会自动收到告警,并立即执行遏制剧本,终止进程、拉黑IP,在威胁造成大规模破坏前将其“扼杀在摇篮里”。

我们已经构建了EDR的核心。但EDR(以及我们之前所有的工具)产生的大量告警和数据,如果都分散在各个孤岛上,其价值将大打折扣。

© 版权声明

相关文章

暂无评论

none
暂无评论...