【3D标注】- 世界模型样本标注【配完整源代码】

内容分享2个月前发布
1 0 0

【3D标注】- 世界模型样本标注

背景环境多样性原则交互行为丰富性操作精准同步视觉纯净度标准体验自然性约束视觉连续性要求内容筛选规范光照视觉条件资源选取范围
调研基于深度强化学习平台找可用3D游戏一、Unity ML-Agents二、DeepMind Lab三、OpenAI Universe四、OpenAI-Unity
游戏安装与python测试一、steam登录,并下载、安装、启动游戏

代码

背景

有需求 – 沉浸式虚拟环境行为数据捕获方案。目标:windows下,监听所有指定的窗口中鼠标和键盘的控件动作,并按一定的格式转成json存放下来,作为对视频动作操作行为的标注文件。


本方案旨在构建一个多模态交互行为数据库,需从动态三维虚拟世界中采集高质量的行为-环境对应数据。( 推测作为3D世界模型训练的样本来训练3D模型 )数据源应选自具有丰富交互可能性的虚拟平台,具体要求如下:

环境多样性原则

虚拟场景需覆盖多种生态与建筑类型,包括但不限于都市景观、自然地貌、人工建筑群等。每个录制片段应展现独特的环境特征,避免同类场景的重复出现。单一环境内的滞留时间过长或重复采集将被视为无效数据。

交互行为丰富性

必须捕捉虚拟角色与环境的动态互动,例如物体操纵、载具使用、机关触发等行为。单纯的角色移动或静态停留不构成有效交互,需排除此类低效数据片段。

操作精准同步

完整记录所有输入设备信号(如键鼠操作轨迹),并确保每个操作指令与对应视频帧的时间偏差小于100毫秒。建立精确的时间戳映射机制,保证行为与视觉数据的严格对齐。

视觉纯净度标准

最大程度消除界面元素对场景的遮挡,包括HUD组件、提示信息等非场景内容。有效画面区域(去除界面遮挡后)需保持原始分辨率70%以上的可视面积。界面元素应最小化呈现,确保场景视觉主体突出。

体验自然性约束

保持虚拟世界的物理合理性,避免过度拥挤的实体分布和夸张的特效呈现。交互规模应控制在合理范围内,确保行为数据的真实性和可分析性。

视觉连续性要求

视角变换需保持平滑自然,禁止出现剧烈视角跳跃或抖动式运镜。摄像机运动应符合人体工学规律。

内容筛选规范

排除预渲染动画序列及二维交互程序(如棋牌类应用)。优先选择具有三维空间交互特性的虚拟环境。

光照视觉条件

确保场景光照充足,可见度良好。避免昏暗环境(如夜视场景、地下空间等)影响行为识别精度。

资源选取范围

参考主流数字分发平台的历史及当前热门交互体验(如Steam平台Top500现时热门及Top2000历史经典内容),确保样本来源的多样性和代表性。

:本方案侧重于构建具有时空连续性的行为-环境对应数据集,所有采集内容需满足机器学习对数据质量的一致性要求。

调研

Steam平台 Top500 现时热门及 Top2000 历史经典内容,我首先想到的是用深度强化学习去遍历,基于python记录。找到类 Top 500 & Top 2000 中的 3D 类游戏,反映客观世界规律的游戏。

基于深度强化学习平台找可用3D游戏

【3D标注】- 世界模型样本标注【配完整源代码】

针对以下4个开源平台进行可用游戏调研:
Unity ML-Agents
DeepMind Lab
OpenAI Universe
OpenAI-Unity

请注意,真正意义上图形顶尖的商业3A游戏大作,由于严格的版权保护,通常不会免费开放用于AI训练。目前AI训练领域的主力军是专门为研究设计的开源仿真平台(如上文所列)以及部分支持模组或提供专门接口的独立游戏。在选择时,请务必仔细阅读其许可协议,确认允许用于非商业或研究性质的AI训练。

一、Unity ML-Agents

第一印象,是一些小游戏,甚至只是用于实验性的东西。因此可能不适合。

二、DeepMind Lab

三、OpenAI Universe

四、OpenAI-Unity

另外,本次调研的目的是技术拉通,跟强化学习其实暂时无关,只需要我的python能够提取键盘鼠标的操作,构建为对应json文件即是成功。


游戏安装与python测试

一、steam登录,并下载、安装、启动游戏

【3D标注】- 世界模型样本标注【配完整源代码】
【3D标注】- 世界模型样本标注【配完整源代码】
我现在想要一个没有夜晚的游戏,因为电脑支持度不行,显示太垃圾。

代码

运行代码,给出选择监控的窗口。(再加上监控这个窗口是否活跃状态,就可过滤掉那些非此窗口的操作,不过暂没实现,实际上也并不难)


import json
import time
import threading
import cv2
import numpy as np
import pyautogui
from pynput import keyboard, mouse
import win32gui
import win32process
import psutil
from datetime import datetime
from collections import deque
import os
import sys

class GameRecordingSystem:
    def __init__(self, game_window_title=None, output_dir="recordings"):
        """
        游戏录屏与输入记录一体化系统
        
        Args:
            game_window_title: 游戏窗口标题
            output_dir: 输出目录
        """
        self.game_window_title = game_window_title
        self.output_dir = output_dir
        self.is_recording = False
        self.is_game_active = False
        
        # 创建输出目录
        os.makedirs(output_dir, exist_ok=True)
        
        # 生成会话ID和时间戳
        self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.video_name = f"game_video_{self.session_id}"
        
        # 文件路径
        self.video_file = os.path.join(output_dir, f"{self.video_name}.mp4")
        self.input_file = os.path.join(output_dir, f"game_inputs_{self.session_id}.json")
        self.manual_file = os.path.join(output_dir, f"operation_manual_{self.session_id}.json")
        
        # 初始化数据结构(符合文档1格式)
        self.session_data = {
            "game_metadata": {
                "game_title": game_window_title or "Unknown Game",
                "session_id": self.session_id,
                "video_name": self.video_name,
                "start_time": None,
                "duration_seconds": 0,
                "video_file": self.video_file,
                "input_file": self.input_file
            },
            "movement_events": [],
            "view_events": [],
            "action_events": []
        }
        
        # 视频录制参数(符合文档1质量要求)
        self.fps = 30  # ≥24 fps
        self.resolution = (2560, 1440)  # ≥2560x1440
        self.video_writer = None
        
        # 输入记录状态
        self.event_buffer = deque(maxlen=1000)
        self.buffer_lock = threading.Lock()
        self.active_movement_keys = set()
        self.active_view_keys = set()
        self.current_movement = "Neutral"
        self.current_view = "Neutral"
        self.movement_start_time = None
        self.view_start_time = None
        
        # 键位映射
        self.movement_key_map = {'w': 'W', 's': 'B', 'a': 'L', 'd': 'R'}
        self.view_key_map = {'up': 'U', 'down': 'D', 'left': 'L', 'right': 'R'}
        
        # 同步控制
        self.start_time = None
        self.frame_count = 0
        
        print(f"🎮 游戏录屏系统初始化完成")
        print(f"📁 输出目录: {output_dir}")
        print(f"🎯 目标游戏: {game_window_title or '自动检测'}")
        print(f"📹 视频参数: {self.resolution[0]}x{self.resolution[1]} @ {self.fps}fps")

    def get_game_window_rect(self):
        """获取游戏窗口位置和大小"""
        try:
            if self.game_window_title:
                hwnd = win32gui.FindWindow(None, self.game_window_title)
                if hwnd:
                    rect = win32gui.GetWindowRect(hwnd)
                    # 确保窗口在屏幕内且大小合适
                    if rect[2] - rect[0] >= 800 and rect[3] - rect[1] >= 600:
                        return rect
            return None  # 返回None表示录制全屏
        except Exception as e:
            print(f"❌ 获取窗口位置失败: {e}")
            return None

    def setup_video_recorder(self):
        """设置视频录制器"""
        try:
            # 获取游戏窗口区域
            game_rect = self.get_game_window_rect()
            
            if game_rect:
                # 录制特定窗口
                width = game_rect[2] - game_rect[0]
                height = game_rect[3] - game_rect[1]
                print(f"🎯 录制游戏窗口: {width}x{height}")
            else:
                # 录制全屏
                width, height = pyautogui.size()
                print(f"🖥️ 录制全屏: {width}x{height}")
            
            # 设置视频编码器(使用MP4V编码)
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            self.video_writer = cv2.VideoWriter(
                self.video_file, 
                fourcc, 
                self.fps, 
                (width, height)
            )
            
            return game_rect
        except Exception as e:
            print(f"❌ 视频录制器设置失败: {e}")
            return None

    def record_video(self, game_rect):
        """录制视频线程"""
        print("📹 开始视频录制...")
        
        while self.is_recording:
            try:
                # 捕获屏幕
                if game_rect:
                    # 录制特定窗口
                    screenshot = pyautogui.screenshot(region=game_rect)
                else:
                    # 录制全屏
                    screenshot = pyautogui.screenshot()
                
                # 转换为OpenCV格式
                frame = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR)
                
                # 写入视频帧
                self.video_writer.write(frame)
                self.frame_count += 1
                
                # 控制帧率
                time.sleep(1/self.fps)
                
            except Exception as e:
                print(f"❌ 视频录制错误: {e}")
                time.sleep(0.1)

    def update_movement_direction(self, current_time):
        """更新移动方向"""
        directions = [self.movement_key_map[k] for k in self.active_movement_keys if k in self.movement_key_map]
        new_direction = ''.join(sorted(directions)) if directions else "Neutral"
        
        if new_direction != self.current_movement:
            if self.current_movement != "Neutral" and self.movement_start_time is not None:
                event = {
                    "direction": self.current_movement,
                    "start_time": round(self.movement_start_time - self.start_time, 1),
                    "end_time": round(current_time - self.start_time, 1)
                }
                self.session_data["movement_events"].append(event)
            
            self.current_movement = new_direction
            self.movement_start_time = current_time

    def update_view_direction(self, current_time):
        """更新视角方向"""
        directions = [self.view_key_map[k] for k in self.active_view_keys if k in self.view_key_map]
        new_direction = ''.join(sorted(directions)) if directions else "Neutral"
        
        if new_direction != self.current_view:
            if self.current_view != "Neutral" and self.view_start_time is not None:
                event = {
                    "direction": self.current_view,
                    "start_time": round(self.view_start_time - self.start_time, 1),
                    "end_time": round(current_time - self.start_time, 1)
                }
                self.session_data["view_events"].append(event)
            
            self.current_view = new_direction
            self.view_start_time = current_time

    def on_key_press(self, key):
        """键盘按下事件"""
        if not self.is_recording or not self.is_game_active:
            return
            
        try:
            current_time = time.time()
            key_str = str(key).replace("'", "").lower()
            
            # 处理移动键
            if key_str in self.movement_key_map:
                self.active_movement_keys.add(key_str)
                self.update_movement_direction(current_time)
            
            # 处理视角键
            elif key_str in self.view_key_map:
                self.active_view_keys.add(key_str)
                self.update_view_direction(current_time)
            
            # 其他键作为动作事件
            else:
                action_event = {
                    "timestamp": round(current_time - self.start_time, 1),
                    "mouse_event": "",
                    "keyboard_event": key_str.upper(),
                    "action": f"按下 {key_str.upper()} 键"
                }
                self.session_data["action_events"].append(action_event)
            
            # 缓冲原始事件
            event = {
                "timestamp": current_time,
                "type": "key_press",
                "key": key_str,
                "window_info": self.get_active_window_info()
            }
            
            with self.buffer_lock:
                self.event_buffer.append(event)
                
        except Exception as e:
            print(f"❌ 键盘按下错误: {e}")

    def on_key_release(self, key):
        """键盘释放事件"""
        if not self.is_recording or not self.is_game_active:
            return
            
        try:
            current_time = time.time()
            key_str = str(key).replace("'", "").lower()
            
            # 处理移动键释放
            if key_str in self.movement_key_map and key_str in self.active_movement_keys:
                self.active_movement_keys.remove(key_str)
                self.update_movement_direction(current_time)
            
            # 处理视角键释放
            if key_str in self.view_key_map and key_str in self.active_view_keys:
                self.active_view_keys.remove(key_str)
                self.update_view_direction(current_time)
            
            # 缓冲原始事件
            event = {
                "timestamp": current_time,
                "type": "key_release", 
                "key": key_str,
                "window_info": self.get_active_window_info()
            }
            
            with self.buffer_lock:
                self.event_buffer.append(event)
                
        except Exception as e:
            print(f"❌ 键盘释放错误: {e}")

    def on_mouse_click(self, x, y, button, pressed):
        """鼠标点击事件"""
        if not self.is_recording or not self.is_game_active:
            return
            
        try:
            current_time = time.time()
            button_str = str(button)
            
            # 鼠标点击作为动作事件
            if pressed:
                action_event = {
                    "timestamp": round(current_time - self.start_time, 1),
                    "mouse_event": f"{button_str}_press",
                    "keyboard_event": "",
                    "action": f"鼠标{button_str}按下"
                }
                self.session_data["action_events"].append(action_event)
            
            # 缓冲原始事件
            event_type = "mouse_press" if pressed else "mouse_release"
            event = {
                "timestamp": current_time,
                "type": event_type,
                "button": button_str,
                "x": x,
                "y": y,
                "window_info": self.get_active_window_info()
            }
            
            with self.buffer_lock:
                self.event_buffer.append(event)
                
        except Exception as e:
            print(f"❌ 鼠标点击错误: {e}")

    def on_mouse_move(self, x, y):
        """鼠标移动事件"""
        if not self.is_recording or not self.is_game_active:
            return
            
        # 降低采样频率
        current_time = time.time()
        if hasattr(self, 'last_mouse_time'):
            if current_time - self.last_mouse_time < 0.01:
                return
        self.last_mouse_time = current_time
        
        try:
            # 鼠标移动作为视角事件
            view_event = {
                "timestamp": round(current_time - self.start_time, 1),
                "mouse_event": f"move_to_{x}_{y}",
                "keyboard_event": "",
                "action": "视角移动"
            }
            self.session_data["action_events"].append(view_event)
            
            # 缓冲原始事件
            event = {
                "timestamp": current_time,
                "type": "mouse_move",
                "x": x,
                "y": y,
                "window_info": self.get_active_window_info()
            }
            
            with self.buffer_lock:
                self.event_buffer.append(event)
                
        except Exception as e:
            print(f"❌ 鼠标移动错误: {e}")

    def get_active_window_info(self):
        """获取活动窗口信息"""
        try:
            hwnd = win32gui.GetForegroundWindow()
            window_title = win32gui.GetWindowText(hwnd)
            _, pid = win32process.GetWindowThreadProcessId(hwnd)
            
            try:
                process = psutil.Process(pid)
                process_name = process.name()
            except:
                process_name = "Unknown"
                
            return {
                "title": window_title,
                "pid": pid,
                "process_name": process_name,
                "is_game": self.game_window_title.lower() in window_title.lower() if self.game_window_title else True
            }
        except Exception as e:
            return {"title": "Unknown", "pid": 0, "process_name": "Unknown", "is_game": False}

    def window_monitor(self):
        """窗口监控线程"""
        while self.is_recording:
            window_info = self.get_active_window_info()
            self.is_game_active = window_info["is_game"]
            time.sleep(0.1)

    def generate_operation_manual(self):
        """生成操作说明书"""
        manual = {
            "W": {"type": "keyboard", "action": "前进"},
            "S": {"type": "keyboard", "action": "后退"},
            "A": {"type": "keyboard", "action": "左移"},
            "D": {"type": "keyboard", "action": "右移"},
            "UP": {"type": "keyboard", "action": "向上看"},
            "DOWN": {"type": "keyboard", "action": "向下看"},
            "LEFT": {"type": "keyboard", "action": "向左看"},
            "RIGHT": {"type": "keyboard", "action": "向右看"},
            "Button.left": {"type": "mouse", "action": "左键点击"},
            "Button.right": {"type": "mouse", "action": "右键点击"},
            "Button.middle": {"type": "mouse", "action": "中键点击"}
        }
        
        try:
            with open(self.manual_file, 'w', encoding='utf-8') as f:
                json.dump(manual, f, indent=2, ensure_ascii=False)
            print(f"📖 操作说明书已生成: {self.manual_file}")
        except Exception as e:
            print(f"❌ 操作说明书生成失败: {e}")

    def save_input_data(self):
        """定期保存输入数据"""
        while self.is_recording:
            time.sleep(5)  # 每5秒保存一次
            
            with self.buffer_lock:
                if self.event_buffer:
                    # 保存备份数据
                    backup_file = self.input_file.replace('.json', '_backup.json')
                    try:
                        with open(backup_file, 'w', encoding='utf-8') as f:
                            json.dump({
                                "backup_events": list(self.event_buffer),
                                "last_save": datetime.now().isoformat()
                            }, f, indent=2, ensure_ascii=False)
                        self.event_buffer.clear()
                    except Exception as e:
                        print(f"❌ 备份保存失败: {e}")

    def start_recording(self):
        """开始录制"""
        print("🚀 启动游戏录屏与记录系统...")
        
        # 设置视频录制器
        game_rect = self.setup_video_recorder()
        if not self.video_writer:
            print("❌ 视频录制器初始化失败")
            return False
        
        # 设置同步时间
        self.start_time = time.time()
        self.session_data["game_metadata"]["start_time"] = datetime.now().isoformat()
        self.movement_start_time = self.start_time
        self.view_start_time = self.start_time
        
        # 开始录制
        self.is_recording = True
        
        # 启动视频录制线程
        video_thread = threading.Thread(target=self.record_video, args=(game_rect,))
        video_thread.daemon = True
        video_thread.start()
        
        # 启动窗口监控线程
        window_thread = threading.Thread(target=self.window_monitor)
        window_thread.daemon = True
        window_thread.start()
        
        # 启动数据保存线程
        save_thread = threading.Thread(target=self.save_input_data)
        save_thread.daemon = True
        save_thread.start()
        
        # 设置输入监听器
        keyboard_listener = keyboard.Listener(
            on_press=self.on_key_press,
            on_release=self.on_key_release
        )
        
        mouse_listener = mouse.Listener(
            on_move=self.on_mouse_move,
            on_click=self.on_mouse_click
        )
        
        # 启动监听器
        keyboard_listener.start()
        mouse_listener.start()
        
        print("✅ 系统启动完成,按 ESC 键停止录制")
        
        # 等待停止信号
        def on_activate():
            self.stop_recording()
            return False
        
        with keyboard.GlobalHotKeys({'<esc>': on_activate}) as h:
            h.join()
        
        # 等待线程结束
        keyboard_listener.join()
        mouse_listener.join()
        
        return True

    def stop_recording(self):
        """停止录制"""
        print("
🛑 停止录制...")
        self.is_recording = False
        
        # 等待一小段时间确保所有线程收到停止信号
        time.sleep(0.5)
        
        # 计算持续时间
        end_time = time.time()
        duration = end_time - self.start_time
        self.session_data["game_metadata"]["duration_seconds"] = round(duration, 1)
        self.session_data["game_metadata"]["recording_end"] = datetime.now().isoformat()
        
        # 结束最后的移动和视角事件
        if self.current_movement != "Neutral" and self.movement_start_time is not None:
            event = {
                "direction": self.current_movement,
                "start_time": round(self.movement_start_time - self.start_time, 1),
                "end_time": round(end_time - self.start_time, 1)
            }
            self.session_data["movement_events"].append(event)
        
        if self.current_view != "Neutral" and self.view_start_time is not None:
            event = {
                "direction": self.current_view,
                "start_time": round(self.view_start_time - self.start_time, 1),
                "end_time": round(end_time - self.start_time, 1)
            }
            self.session_data["view_events"].append(event)
        
        # 释放视频录制器
        if self.video_writer:
            self.video_writer.release()
            print(f"✅ 视频文件已保存: {self.video_file}")
            print(f"📊 总帧数: {self.frame_count}")
        
        # 生成操作说明书
        self.generate_operation_manual()
        
        # 保存最终输入数据
        try:
            with open(self.input_file, 'w', encoding='utf-8') as f:
                json.dump(self.session_data, f, indent=2, ensure_ascii=False)
            print(f"✅ 输入记录已保存: {self.input_file}")
            print(f"📊 移动事件: {len(self.session_data['movement_events'])} 条")
            print(f"📊 视角事件: {len(self.session_data['view_events'])} 条")
            print(f"📊 动作事件: {len(self.session_data['action_events'])} 条")
        except Exception as e:
            print(f"❌ 输入记录保存失败: {e}")
        
        print("🎉 录制完成!")


def find_game_windows():
    """查找游戏窗口"""
    windows = []
    
    def enum_windows_proc(hwnd, _):
        if win32gui.IsWindowVisible(hwnd):
            title = win32gui.GetWindowText(hwnd)
            if title:
                try:
                    _, pid = win32process.GetWindowThreadProcessId(hwnd)
                    process = psutil.Process(pid)
                    process_name = process.name()
                except:
                    process_name = "Unknown"
                
                windows.append({
                    "hwnd": hwnd,
                    "title": title,
                    "process_name": process_name,
                    "pid": pid
                })
    
    win32gui.EnumWindows(enum_windows_proc, None)
    
    # 过滤游戏窗口
    game_keywords = ['steam', 'game', 'unity', 'unreal', 'directx', 'opengl']
    game_windows = []
    
    for window in windows:
        title_lower = window['title'].lower()
        process_lower = window['process_name'].lower()
        
        is_game = any(keyword in title_lower or keyword in process_lower for keyword in game_keywords)
        is_game = is_game or len(title_lower) > 5
        
        if is_game:
            game_windows.append(window)
    
    game_windows.sort(key=lambda w: len(w["title"]), reverse=True)
    return game_windows


def main():
    """主函数"""
    print("=" * 60)
    print("🎮 游戏录屏与输入记录一体化系统")
    print("=" * 60)
    
    # 检测游戏窗口
    print("🔍 正在检测游戏窗口...")
    game_windows = find_game_windows()
    
    if game_windows:
        print("找到以下游戏窗口:")
        for i, window in enumerate(game_windows):
            print(f"{i+1}. {window['title']} (进程: {window['process_name']})")
        
        choice = input("请选择要录制的窗口编号 (直接回车录制所有窗口): ").strip()
        if choice and choice.isdigit() and 0 < int(choice) <= len(game_windows):
            game_title = game_windows[int(choice)-1]["title"]
            print(f"🎯 已选择: {game_title}")
        else:
            game_title = None
            print("🔍 将录制所有窗口")
    else:
        print("未找到游戏窗口,将录制所有窗口")
        game_title = None
    
    # 设置输出目录
    output_dir = input("请输入输出目录 (直接回车使用默认目录 'recordings'): ").strip()
    if not output_dir:
        output_dir = "recordings"
    
    # 创建录屏系统
    recorder = GameRecordingSystem(
        game_window_title=game_title,
        output_dir=output_dir
    )
    
    try:
        # 开始录制
        recorder.start_recording()
    except KeyboardInterrupt:
        print("
⏹ 用户中断录制")
        recorder.stop_recording()
    except Exception as e:
        print(f"❌ 系统错误: {e}")
        recorder.stop_recording()


if __name__ == "__main__":
    # 检查依赖
    try:
        import cv2
        import pyautogui
        import pynput
        import win32gui
        import psutil
    except ImportError as e:
        print(f"❌ 缺少依赖库: {e}")
        print("请安装以下库:")
        print("pip install opencv-python pyautogui pynput pywin32 psutil")
        sys.exit(1)
    
    main()
© 版权声明

相关文章

暂无评论

none
暂无评论...