Python调用海康工业相机SDK实现图像实时显示

内容分享2小时前发布
0 0 0

前言

  如果你也是无法通过opencv 的官方接口 VideoCapture() 接口直接调用海康工业相机的小伙伴,那么我相信这篇文章可以帮助你解决问题。当然,本文只提供了实时显示的解决方案,海康相机的SDK包具有丰富的功能,可以利用SDK包实现GUI界面的设计和图像处理功能的实现

  采用VideoCapture() 接口调用可以参考这篇文章:https://blog.csdn.net/qq_39570716/article/details/117073640?spm=1001.2014.3001.5501

  话不多说,干货下面开始!

一、准备工作

1.安装海康威视官方MVS软件(包含python SDK包)

海康威视MVS客户端官方下载地址:https://www.hikrobotics.com/cn/machinevision/service/download/?module=0

Python调用海康工业相机SDK实现图像实时显示

建议直接安装在C盘(在其他帖子看到安装到其他盘出错的问题),目录地址保持默认,保持安装时默认的勾选选项,直接安装即可。

2.打开设备管理器,保证系统能够识别到USB驱动

  本人是通过USB3.0的线将相机与上位机连接的,这里建议上位机的接口也是支持USB3.0协议的(通常为蓝色口),这样能保证相机采集到的画面不会发生卡顿

Python调用海康工业相机SDK实现图像实时显示

3.将MvImport文件夹中的库文件导入到pycharm的项目文件夹中

  海康威视 SDK 中的
MvImport
文件夹通常包含 SDK 运行所需的核心库文件,这些库文件是实现相机控制、图像采集、参数配置等功能的底层支撑,一定要确保将这些库文件复制粘贴到pycharm的项目文件夹中

Python调用海康工业相机SDK实现图像实时显示

4.修改MvCameraControl_class.py程序中DLL的加载逻辑

通过“绝对路径”加载DLL文件,理由很简单Python 3.8 及以上版本修改了 DLL 加载机制,默认不再搜索
Path
环境变量,防止程序运行时报错

Python调用海康工业相机SDK实现图像实时显示

注意文件路径与MvImport文件夹路径不同

二、代码实现

做好以上铺垫工作,基本能保证代码运行过程中不会报错,具体的代码实现可以参考文章:https://blog.csdn.net/qq_39570716/article/details/114066097?spm=1001.2014.3001.5501

本文只是将文中的代码进行了封装,提高代码的灵活性

1.设备枚举器



import sys
import logging
from ctypes import *
from ctypes.wintypes import *
 
sys.path.append("../MvImport")
from MvCameraControl_class import *
 
 
class DeviceEnumerator:
    def __init__(self):
        self.device_list = None
        self.logger = logging.getLogger(__name__)
 
    def enum_devices(self, device_type=0):
        """
        枚举设备

        Args:
            device_type: 0-枚举网口、USB口、未知设备、cameralink设备
        """
        try:
            tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_UNKNOW_DEVICE | MV_1394_DEVICE | MV_CAMERALINK_DEVICE
            self.device_list = MV_CC_DEVICE_INFO_LIST()
 
            ret = MvCamera.MV_CC_EnumDevices(tlayerType, self.device_list)
            if ret != 0:
                self.logger.error(f"枚举设备失败! 错误码: 0x{ret:x}")
                return False
 
            if self.device_list.nDeviceNum == 0:
                self.logger.warning("未找到任何设备!")
                return False
 
            self.logger.info(f"找到 {self.device_list.nDeviceNum} 个设备")
            return True
 
        except Exception as e:
            self.logger.error(f"枚举设备时发生异常: {e}")
            return False
 
    def get_device_count(self):
        """获取设备数量"""
        if self.device_list:
            return self.device_list.nDeviceNum
        return 0
 
    def get_device_info(self, index):
        """获取指定索引的设备信息"""
        if not self.device_list or index >= self.device_list.nDeviceNum:
            return None
 
        device_info = cast(self.device_list.pDeviceInfo[index], POINTER(MV_CC_DEVICE_INFO)).contents
        return self._parse_device_info(device_info, index)
 
    def _parse_device_info(self, device_info, index):
        """解析设备信息"""
        info_dict = {
            'index': index,
            'type': self._get_device_type(device_info.nTLayerType),
            'model_name': '',
            'serial_number': '',
            'manufacturer': '',
            'ip_address': '',
            'mac_address': ''
        }
 
        if device_info.nTLayerType == MV_GIGE_DEVICE:
            self._parse_gige_info(device_info, info_dict)
        elif device_info.nTLayerType == MV_USB_DEVICE:
            self._parse_usb_info(device_info, info_dict)
        elif device_info.nTLayerType == MV_CAMERALINK_DEVICE:
            self._parse_cameralink_info(device_info, info_dict)
 
        return info_dict
 
    def _get_device_type(self, layer_type):
        """获取设备类型字符串"""
        type_map = {
            MV_GIGE_DEVICE: "GigE",
            MV_USB_DEVICE: "USB",
            MV_1394_DEVICE: "1394",
            MV_CAMERALINK_DEVICE: "CameraLink"
        }
        return type_map.get(layer_type, "Unknown")
 
    def _parse_gige_info(self, device_info, info_dict):
        """解析GigE设备信息"""
        info = device_info.SpecialInfo.stGigEInfo
 
        info_dict['model_name'] = "".join(chr(per) for per in info.chModelName if per != 0)
        info_dict['serial_number'] = "".join(chr(per) for per in info.chSerialNumber if per != 0)
        info_dict['manufacturer'] = "".join(chr(per) for per in info.chManufacturerName if per != 0)
        info_dict['ip_address'] = self._format_ip(info.nCurrentIp)
        info_dict['mac_address'] = self._format_mac(info.chMACAddr)
 
    def _parse_usb_info(self, device_info, info_dict):
        """解析USB设备信息"""
        info = device_info.SpecialInfo.stUsb3VInfo
 
        info_dict['model_name'] = "".join(chr(per) for per in info.chModelName if per != 0)
        info_dict['serial_number'] = "".join(chr(per) for per in info.chSerialNumber if per != 0)
        info_dict['manufacturer'] = "".join(chr(per) for per in info.chVendorName if per != 0)
 
    def _parse_cameralink_info(self, device_info, info_dict):
        """解析CameraLink设备信息"""
        info = device_info.SpecialInfo.stCamLInfo
 
        info_dict['model_name'] = "".join(chr(per) for per in info.chModelName if per != 0)
        info_dict['serial_number'] = "".join(chr(per) for per in info.chSerialNumber if per != 0)
        info_dict['manufacturer'] = "".join(chr(per) for per in info.chVendorName if per != 0)
 
    def _format_ip(self, ip_int):
        """格式化IP地址"""
        return f"{(ip_int & 0xff000000) >> 24}.{(ip_int & 0x00ff0000) >> 16}.{(ip_int & 0x0000ff00) >> 8}.{ip_int & 0x000000ff}"
 
    def _format_mac(self, mac_array):
        """格式化MAC地址"""
        return ":".join(f"{b:02x}" for b in mac_array[:6])
 
    def print_device_list(self):
        """打印设备列表"""
        if not self.device_list:
            print("没有找到设备")
            return
 
        print(f"
找到 {self.device_list.nDeviceNum} 个设备:")
        for i in range(self.device_list.nDeviceNum):
            device_info = self.get_device_info(i)
            if device_info:
                print(
                    f"[{device_info['index']}] {device_info['type']} - {device_info['model_name']} - SN: {device_info['serial_number']}")

2.相机连接器



import os
import sys
import logging
from ctypes import *
from device_enumerator import DeviceEnumerator
 
 
sys.path.append("../MvImport")
from MvCameraControl_class import *
 
 
class CameraConnector:
    def __init__(self, log_enabled=False, log_path=None):
        self.cam = None
        self.is_connected = False
        self.log_enabled = log_enabled
        self.log_path = log_path or os.getcwd()
        self.logger = logging.getLogger(__name__)
 
    def connect(self, device_list, device_index=0):
        """
        连接指定设备

        Args:
            device_list: 设备列表
            device_index: 设备索引
        """
        if not device_list or device_index >= device_list.nDeviceNum:
            self.logger.error("设备索引无效")
            return False
 
        try:
            # 创建相机实例
            self.cam = MvCamera()
            stDeviceList = cast(device_list.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents
 
            # 设置日志
            if self.log_enabled:
                ret = self.cam.MV_CC_SetSDKLogPath(self.log_path)
                if ret != 0:
                    self.logger.warning(f"设置日志路径失败! 错误码: 0x{ret:x}")
 
                ret = self.cam.MV_CC_CreateHandle(stDeviceList)
            else:
                ret = self.cam.MV_CC_CreateHandleWithoutLog(stDeviceList)
 
            if ret != 0:
                self.logger.error(f"创建句柄失败! 错误码: 0x{ret:x}")
                return False
 
            # 打开设备
            ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
            if ret != 0:
                self.logger.error(f"打开设备失败! 错误码: 0x{ret:x}")
                return False
 
            self.is_connected = True
            self.logger.info(f"成功连接设备 {device_index}")
            return True
 
        except Exception as e:
            self.logger.error(f"连接设备时发生异常: {e}")
            return False
 
    def disconnect(self):
        """断开设备连接"""
        if not self.is_connected or not self.cam:
            return
 
        try:
            # 关闭设备
            ret = self.cam.MV_CC_CloseDevice()
            if ret != 0:
                self.logger.warning(f"关闭设备失败! 错误码: 0x{ret:x}")
 
            # 销毁句柄
            ret = self.cam.MV_CC_DestroyHandle()
            if ret != 0:
                self.logger.warning(f"销毁句柄失败! 错误码: 0x{ret:x}")
 
            self.is_connected = False
            self.logger.info("设备已断开连接")
 
        except Exception as e:
            self.logger.error(f"断开连接时发生异常: {e}")
 
    def get_camera_instance(self):
        """获取相机实例"""
        return self.cam
 
    def is_connected(self):
        """检查是否已连接"""
        return self.is_connected
 
    def __del__(self):
        """析构函数"""
        self.disconnect()
 
 
def test():
    # 需要先枚举设备
    enumerator = DeviceEnumerator()
    if not enumerator.enum_devices():
        print("没有找到设备,无法测试连接")
        return
 
    connector = CameraConnector()
    if connector.connect(enumerator.device_list, 0):
        print("连接成功")
        # 等待一段时间后断开
        import time
        time.sleep(2)
        connector.disconnect()
    else:
        print("连接失败")
 
if __name__ == "__main__":
    test()

3.图像抓取器



import numpy as np
import cv2
import sys
import threading
import logging
from device_enumerator import DeviceEnumerator
from camera_connector import CameraConnector
from ctypes import *
from ctypes.wintypes import *
 
 
sys.path.append("../MvImport")
from MvCameraControl_class import *
 
 
class ImageGrabber:
    def __init__(self):
        self.cam = None
        self.is_grabbing = False
        self.current_frame = None
        self.frame_lock = threading.Lock()
        self.callback_initialized = False
        self.logger = logging.getLogger(__name__)
 
        # 设置回调函数
        self._setup_callbacks()
 
    def set_camera(self, camera_instance):
        """设置相机实例"""
        self.cam = camera_instance
 
    def _setup_callbacks(self):
        """设置回调函数"""
        if self.callback_initialized:
            return
 
        # 图像回调
        stFrameInfo = POINTER(MV_FRAME_OUT_INFO_EX)
        pData = POINTER(c_ubyte)
        FrameInfoCallBack = WINFUNCTYPE(None, pData, stFrameInfo, c_void_p)
 
        def image_callback(pData, pFrameInfo, pUser):
            stFrameInfo = cast(pFrameInfo, POINTER(MV_FRAME_OUT_INFO_EX)).contents
            try:
                image = self._process_image_data(pData, stFrameInfo)
                if image is not None:
                    with self.frame_lock:
                        self.current_frame = image
            except Exception as e:
                self.logger.error(f"图像回调处理失败: {e}")
 
        self.image_callback_func = FrameInfoCallBack(image_callback)
        self.callback_initialized = True
 
    def start_grabbing(self, grab_mode="callback"):
        """
        开始取流

        Args:
            grab_mode: 取流模式,'callback' 或 'active'
        """
        if not self.cam:
            self.logger.error("相机未设置")
            return False
 
        try:
            if grab_mode == "callback":
                # 注册回调函数
                ret = self.cam.MV_CC_RegisterImageCallBackEx(self.image_callback_func, None)
                if ret != 0:
                    self.logger.error(f"注册图像回调失败! 错误码: 0x{ret:x}")
                    return False
 
            # 开始取流
            ret = self.cam.MV_CC_StartGrabbing()
            if ret != 0:
                self.logger.error(f"开始取流失败! 错误码: 0x{ret:x}")
                return False
 
            self.is_grabbing = True
            self.logger.info("开始取流")
            return True
 
        except Exception as e:
            self.logger.error(f"开始取流时发生异常: {e}")
            return False
 
    def stop_grabbing(self):
        """停止取流"""
        if not self.is_grabbing or not self.cam:
            return
 
        try:
            ret = self.cam.MV_CC_StopGrabbing()
            if ret != 0:
                self.logger.warning(f"停止取流失败! 错误码: 0x{ret:x}")
            else:
                self.is_grabbing = False
                self.logger.info("停止取流")
 
        except Exception as e:
            self.logger.error(f"停止取流时发生异常: {e}")
 
    def get_frame(self):
        """
        获取当前帧

        Returns:
            numpy.ndarray: 当前图像帧,如果没有帧则返回None
        """
        with self.frame_lock:
            frame = self.current_frame
            self.current_frame = None
        return frame
 
    def _process_image_data(self, pData, stFrameInfo):
        """处理图像数据"""
        try:
            if stFrameInfo.enPixelType == 17301505:
                # 单通道
                img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight))()
                cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight)
                data = np.frombuffer(img_buff, dtype=np.uint8)
                image = data.reshape((stFrameInfo.nHeight, stFrameInfo.nWidth))
                return image
 
            elif stFrameInfo.enPixelType == 17301514:
                # Bayer GB
                img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight))()
                cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight)
                data = np.frombuffer(img_buff, dtype=np.uint8)
                data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
                image = cv2.cvtColor(data, cv2.COLOR_BAYER_GB2RGB)
                return image
 
            elif stFrameInfo.enPixelType == 17301513:
                # Bayer RG
                img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight))()
                cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight)
                data = np.frombuffer(img_buff, dtype=np.uint8)
                data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
                image = cv2.cvtColor(data, cv2.COLOR_BAYER_RG2RGB)
                return image
 
            elif stFrameInfo.enPixelType == 35127316:
                # RGB
                img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight * 3))()
                cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight * 3)
                data = np.frombuffer(img_buff, dtype=np.uint8)
                data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
                image = cv2.cvtColor(data, cv2.COLOR_RGB2BGR)
                return image
 
            elif stFrameInfo.enPixelType == 34603039:
                # YUV422
                img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight * 2))()
                cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight * 2)
                data = np.frombuffer(img_buff, dtype=np.uint8)
                data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
                image = cv2.cvtColor(data, cv2.COLOR_YUV2BGR_Y422)
                return image
 
            else:
                self.logger.warning(f"未处理的像素类型: {stFrameInfo.enPixelType}")
                return None
 
        except Exception as e:
            self.logger.error(f"处理图像数据时发生异常: {e}")
            return None
 
    def is_grabbing(self):
        """检查是否正在取流"""
        return self.is_grabbing
def test():
    # 需要先枚举设备并连接
    enumerator = DeviceEnumerator()
    if not enumerator.enum_devices():
        print("没有找到设备,无法测试取流")
        return
 
    connector = CameraConnector()
    if not connector.connect(enumerator.device_list, 0):
        print("连接失败,无法测试取流")
        return
 
    grabber = ImageGrabber()
    grabber.set_camera(connector.get_camera_instance())
 
    # 开始取流(回调方式)
    if grabber.start_grabbing():
        print("开始取流,持续3秒")
        import time
        start_time = time.time()
        while time.time() - start_time < 3:
            frame = grabber.get_frame()
            if frame is not None:
                print(f"获取到一帧,形状: {frame.shape}")
            time.sleep(0.1)
        grabber.stop_grabbing()
    else:
        print("取流失败")
 
    connector.disconnect()
 
if __name__ == "__main__":
    test()

4.图像显示器



import cv2
import numpy as np
import time
 
 
class ImageDisplayer:
    def __init__(self, window_name="Camera Feed", window_size=(600, 400)):
        self.window_name = window_name
        self.window_size = window_size
        self.fps = 0
        self.frame_count = 0
        self.start_time = time.time()
 
    def show_frame(self, frame, show_fps=True):
        """
        显示帧

        Args:
            frame: 要显示的图像帧
            show_fps: 是否显示FPS

        Returns:
            int: 按键值
        """
        if frame is None:
            return 0xFF
 
        # 调整图像大小
        resized_frame = self._resize_frame(frame)
 
        # 显示FPS
        if show_fps:
            resized_frame = self._add_fps_overlay(resized_frame)
 
        # 显示图像
        cv2.imshow(self.window_name, resized_frame)
 
        # 更新FPS计数
        self._update_fps()
 
        # 返回按键值
        return cv2.waitKey(1) & 0xFF
 
    def _resize_frame(self, frame):
        """调整帧大小"""
        return cv2.resize(frame, self.window_size, interpolation=cv2.INTER_AREA)
 
    def _update_fps(self):
        """更新FPS计算"""
        self.frame_count += 1
        current_time = time.time()
        elapsed_time = current_time - self.start_time
 
        if elapsed_time > 1.0:  # 每秒更新一次FPS
            self.fps = self.frame_count / elapsed_time
            self.frame_count = 0
            self.start_time = current_time
 
    def _add_fps_overlay(self, frame):
        """添加FPS叠加层"""
        fps_text = f"FPS: {self.fps:.1f}"
        cv2.putText(frame, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        return frame
 
    def save_frame(self, frame, filename=None):
        """
        保存帧为图像文件

        Args:
            frame: 要保存的图像帧
            filename: 文件名,默认为时间戳

        Returns:
            bool: 保存是否成功
        """
        if frame is None:
            return False
 
        if filename is None:
            timestamp = int(time.time())
            filename = f"captured_frame_{timestamp}.bmp"
 
        try:
            cv2.imwrite(filename, frame)
            print(f"图像已保存为: {filename}")
            return True
        except Exception as e:
            print(f"保存图像失败: {e}")
            return False
 
    def close_window(self):
        """关闭显示窗口"""
        cv2.destroyWindow(self.window_name)
 
    def close_all_windows(self):
        """关闭所有窗口"""
        cv2.destroyAllWindows()

5.主函数



import cv2
import logging
from device_enumerator import DeviceEnumerator
from camera_connector import CameraConnector
from image_grabber import ImageGrabber
from image_displayer import ImageDisplayer
 
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
 
def main():
    # 创建各个组件实例
    device_enumerator = DeviceEnumerator()
    camera_connector = CameraConnector(log_enabled=False)
    image_grabber = ImageGrabber()
    image_displayer = ImageDisplayer("Camera Feed", (800, 600))
 
    try:
        # 1. 枚举设备
        print("正在枚举设备...")
        if not device_enumerator.enum_devices():
            print("未找到任何设备,程序退出")
            return
 
        device_enumerator.print_device_list()
 
        # 2. 连接设备(默认连接第一个设备)
        print("
正在连接设备...")
        if not camera_connector.connect(device_enumerator.device_list, device_index=0):
            print("设备连接失败,程序退出")
            return
 
        # 3. 设置图像采集器
        image_grabber.set_camera(camera_connector.get_camera_instance())
 
        # 4. 开始取流(默认使用回调方式)
        print("开始取流...")
        if not image_grabber.start_grabbing(grab_mode="callback"):
            print("开始取流失败,程序退出")
            return
 
        print("按 'q' 键退出程序")
        print("按 's' 键保存当前帧")
        print("按 'p' 键暂停/继续显示")
 
        paused = False
 
        # 主循环
        while True:
            if not paused:
                # 获取当前帧
                frame = image_grabber.get_frame()
 
                if frame is not None:
                    # 显示图像
                    key = image_displayer.show_frame(frame, show_fps=True)
 
                    # 按键处理
                    if key == ord('q'):
                        break
                    elif key == ord('s'):
                        image_displayer.save_frame(frame)
                    elif key == ord('p'):
                        paused = True
                        print("显示已暂停,按 'p' 继续")
                else:
                    # 如果没有帧,等待一小段时间
                    key = cv2.waitKey(1) & 0xFF
                    if key == ord('q'):
                        break
            else:
                # 暂停状态下只检查按键
                key = cv2.waitKey(100) & 0xFF
                if key == ord('q'):
                    break
                elif key == ord('p'):
                    paused = False
                    print("显示继续")
 
    except KeyboardInterrupt:
        print("
程序被用户中断")
    except Exception as e:
        print(f"程序运行出错: {e}")
    finally:
        # 清理资源
        print("正在清理资源...")
        image_grabber.stop_grabbing()
        camera_connector.disconnect()
        image_displayer.close_all_windows()
        print("程序退出")
 
 
if __name__ == "__main__":
    main()

三、总结

  以上就是通过python调用海康工业相机SDK包实现图像显示的所有内容了。这里顺便补充一下为什么没办法通过VideoCapture() 接口直接调用海康工业相机的原因:海康工业相机不包含UVC虚拟协议,所以上位机没法将海康工业相机的驱动视为外置相机驱动。

© 版权声明

相关文章

暂无评论

none
暂无评论...