摘要:在本文中,我们将为EDR系统装上“利剑”,实现其“R”(Response,响应)的功能。我们将超越被动的告警,探讨SOAR(安全编排、自动化与响应)的核心理念,即如何将安全告警自动化地转化为遏制(Containment)动作。我们将设计一个解耦的响应架构:利用Redis Pub/Sub作为告警总线,让我们之前构建的“检测器”发布高危告警。然后,我们将构建一个独立的Python**“响应器”(Responder)**服务,它会订阅这些告警,并根据预设的“剧本(Playbook)”自动执行响应动作,例如:使用终止恶意进程,并使用
psutil模块调用系统防火墙(如
subprocess或
iptables)隔离受感染主机的网络连接。这标志着我们的工具从“监控平台”进化为“主动防御系统”。
netsh
关键词:** Python, EDR, SOAR, 自动化响应, Playbook, ,
psutil, Redis Pub/Sub, 威胁遏制
subprocess
正文
⚠️ 终极警告:高风险自动化!
自动化响应(如
进程、修改防火墙)是极其危险的操作。一个“误报(False Positive)”可能导致你的核心生产服务被自动终止或T-T,造成比攻击本身更严重的损失。本文所有代码必须在完全隔离的、100%可承受破坏的测试环境中实验。绝不能在生产环境或任何重要系统上运行!
kill
1. 为什么“响应”必须自动化?
我们之前构建的“规则引擎”和“异常检测”工具,现在能源源不断地产生告警。但在现代攻击(如勒索软件)面前,从“告警”到“安全分析师看到邮件”再到“管理员登录服务器处理”,这个以分钟甚至小时为单位的响应时间,已经太慢了。攻击者可能在30秒内就已经完成了横向移动和数据加密。
自动化响应的目标,就是将这个响应时间缩短到毫秒级别。在告警产生的瞬间,系统就自动执行预设的“剧本”,在攻击者造成实质性破坏前,将其“遏制(Contain)”。
2. 架构设计:解耦“检测”与“响应”
我们不应该在(检测器)中直接调用
edr_sensor.py。这会使检测和响应紧密耦合,难以维护。
os.system("kill ...")
正确的架构(基于我们在企业级架构中学到的事件驱动思想):
告警总线 (Alert Bus):使用Redis Pub/Sub(或Kafka)创建一个专用的“告警频道”(例如)。
high_severity_alerts
检测器 (Detector):我们之前的(规则引擎)或
edr_sensor.py(异常引擎)作为发布者(Publisher)。当它们发现一个高置信度(High-Fidelity)的威胁时(例如,100%确定的
anomaly_detector.py),它们就向
winword.exe -> powershell.exe频道发布一个JSON格式的告警。
high_severity_alerts
响应器 (Responder):一个全新的、独立的Python服务,作为订阅者(Subscriber)。它监听这个频道,一旦收到告警,就解析它并执行相应的“剧本”。
优点:
解耦:检测器不关心谁来响应,响应器也不关心告警是怎么产生的。
可扩展:我们可以轻松添加更多的“响应器”(如一个用于“自动取证”的,一个用于“邮件通知”的),它们都监听同一个告警频道。
安全:响应逻辑被集中管理,更易于审计和控制。
3. 剧本(Playbook):我们的响应预案
对于发现的“可疑进程创建链”告警,我们的剧本是:
edr_sensor.py
Trigger: 收到频道的消息。
high_severity_alerts
Parse: 解析JSON,获取恶意进程的和
pid(如果存在)。
source_ip
Action 1 (Host Containment): 使用,立即终止该恶意进程。
psutil.Process(pid).terminate()
Action 2 (Network Containment): (可选,如果告警包含IP)使用调用
subprocess或
iptables,在防火墙上拉黑该进程的父进程(如Word)或其通信的IP。
netsh advfirewall
Action 3 (Log): 记录所有已执行的自动化响应动作。
4. 代码实现
a) 修改(检测器),使其成为“发布者” (基于第29篇的代码)
edr_sensor.py
Python
# edr_sensor.py (核心修改: check_rules 和 log_alert 函数)
import redis
import json
# ... (psutil, time, etc.) ...
# 连接到Redis
try:
redis_client = redis.Redis()
redis_client.ping()
print("[*] EDR传感器已连接到Redis。")
except Exception as e:
print(f"[!] 无法连接到Redis: {e}。告警将无法发布!")
redis_client = None
# ... (SUSPICIOUS_PARENT_CHILD_MAP 规则库不变) ...
def check_rules(proc_info: dict):
if not proc_info: return
p_name = proc_info['name'].lower()
parent_name = proc_info['parent_name'].lower()
if parent_name in SUSPICIOUS_PARENT_CHILD_MAP:
if p_name in SUSPICIOUS_PARENT_CHILD_MAP[parent_name]:
# 准备告警数据
alert_data = {
"level": "HIGH",
"title": "可疑的进程创建链 (LotL)!",
"description": f"父进程 '{proc_info['parent_name']}' (PID: {proc_info['ppid']}) 启动了子进程 '{proc_info['name']}' (PID: {proc_info['pid']})",
"details": f"命令行: {proc_info['cmdline']}",
"timestamp": datetime.now().isoformat(),
# --- 关键的响应信息 ---
"actionable_pid": proc_info['pid']
}
log_alert(alert_data) # 本地打印
# --- 发布到Redis频道 ---
if redis_client:
redis_client.publish("high_severity_alerts", json.dumps(alert_data))
# ... (log_alert 函数现在只负责打印) ...
# ... (main_loop 和 main 函数不变) ...
b) 创建 (响应器/订阅者)
responder.py
Python
# responder.py
import redis
import json
import sys
import psutil
import subprocess
class Responder:
def __init__(self):
try:
self.redis_client = redis.Redis(decode_responses=True)
self.pubsub = self.redis_client.pubsub()
self.pubsub.subscribe("high_severity_alerts")
print("[*] 自动响应器已启动,正在订阅 'high_severity_alerts' 频道...")
except Exception as e:
print(f"[!] 严重错误: 无法连接到Redis! {e}")
sys.exit(1)
def contain_process(self, pid: int):
"""剧本动作1:终止进程"""
try:
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
proc_name = proc.name()
print(f" [ACTION-HOST] 正在终止进程: {proc_name} (PID: {pid})")
proc.terminate() # 尝试优雅终止
proc.wait(timeout=1) # 等待1秒
else:
print(f" [ACTION-HOST] 进程 {pid} 已不存在。")
except psutil.NoSuchProcess:
print(f" [ACTION-HOST] 进程 {pid} 已不存在。")
except psutil.AccessDenied:
print(f"[!] 权限错误: 无法终止PID {pid}。请使用root/管理员权限运行本脚本。")
except Exception as e:
print(f"[!] 终止进程 {pid} 时出错: {e}")
def contain_network_ip(self, ip: str):
"""剧本动作2:隔离IP (Linux示例)"""
if sys.platform == "linux":
command = ["iptables", "-A", "INPUT", "-s", ip, "-j", "DROP"]
elif sys.platform == "win32":
# Windows防火墙命令更复杂
command = ["netsh", "advfirewall", "firewall", "add", "rule",
f"name=Block_{ip}", "dir=in", "action=block", f"remoteip={ip}"]
else:
print(f" [ACTION-NET] 不支持的平台: {sys.platform}")
return
try:
print(f" [ACTION-NET] 正在执行防火墙命令,拉黑IP: {ip}")
subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
print(f"[!] 权限错误: 无法执行防火墙命令。请使用root/管理员权限运行。")
except Exception as e:
print(f"[!] 执行防火墙命令时出错: {e}")
def run_playbook(self, alert: dict):
"""执行响应剧本"""
print(f"
[!!!] 收到新告警: {alert.get('title')}")
# --- 剧本执行 ---
# 1. 解析可执行的动作
pid_to_kill = alert.get("actionable_pid")
ip_to_block = alert.get("source_ip") # 假设我们的告警也包含了IP
# 2. 执行主机遏制
if pid_to_kill:
self.contain_process(pid_to_kill)
# 3. 执行网络遏制
if ip_to_block:
self.contain_network_ip(ip_to_block)
print("[*] 剧本执行完毕。")
def start_listening(self):
"""启动监听循环"""
try:
for message in self.pubsub.listen():
if message['type'] == 'message':
alert_data = json.loads(message['data'])
self.run_playbook(alert_data)
except KeyboardInterrupt:
print("
[*] 正在关闭自动响应器...")
finally:
self.pubsub.close()
if __name__ == "__main__":
if (sys.platform == "linux" and os.geteuid() != 0) or
(sys.platform == "win32" and not 'ADMIN' in os.environ.get('USERNAME', '').upper()):
print("[!] 错误: 本工具需要root或管理员权限来终止进程和修改防火墙。")
sys.exit(1)
responder = Responder()
responder.start_listening()
5. 总结
我们成功地将EDR(端点检测与响应)的“检测(D)”与“响应(R)”连接起来,构建了一个微型的SOAR(安全编排、自动化与响应)系统。通过Redis Pub/Sub事件总线,我们的检测器(Sensor)和响应器(Responder)实现了完全解耦,使得整个系统变得高度可扩展和可维护。
现在,当一个已知的恶意行为(如Word启动PowerShell)在端点上被检测到的毫秒内,我们的就会自动收到告警,并立即执行遏制剧本,终止进程、拉黑IP,在威胁造成大规模破坏前将其“扼杀在摇篮里”。
responder.py
我们已经构建了EDR的核心。但EDR(以及我们之前所有的工具)产生的大量告警和数据,如果都分散在各个孤岛上,其价值将大打折扣。
