前言
如果你也是无法通过opencv 的官方接口 VideoCapture() 接口直接调用海康工业相机的小伙伴,那么我相信这篇文章可以帮助你解决问题。当然,本文只提供了实时显示的解决方案,海康相机的SDK包具有丰富的功能,可以利用SDK包实现GUI界面的设计和图像处理功能的实现
采用VideoCapture() 接口调用可以参考这篇文章:https://blog.csdn.net/qq_39570716/article/details/117073640?spm=1001.2014.3001.5501
话不多说,干货下面开始!
一、准备工作
1.安装海康威视官方MVS软件(包含python SDK包)
海康威视MVS客户端官方下载地址:https://www.hikrobotics.com/cn/machinevision/service/download/?module=0

建议直接安装在C盘(在其他帖子看到安装到其他盘出错的问题),目录地址保持默认,保持安装时默认的勾选选项,直接安装即可。
2.打开设备管理器,保证系统能够识别到USB驱动
本人是通过USB3.0的线将相机与上位机连接的,这里建议上位机的接口也是支持USB3.0协议的(通常为蓝色口),这样能保证相机采集到的画面不会发生卡顿

3.将MvImport文件夹中的库文件导入到pycharm的项目文件夹中
海康威视 SDK 中的文件夹通常包含 SDK 运行所需的核心库文件,这些库文件是实现相机控制、图像采集、参数配置等功能的底层支撑,一定要确保将这些库文件复制粘贴到pycharm的项目文件夹中
MvImport

4.修改MvCameraControl_class.py程序中DLL的加载逻辑
通过“绝对路径”加载DLL文件,理由很简单Python 3.8 及以上版本修改了 DLL 加载机制,默认不再搜索环境变量,防止程序运行时报错
Path

注意文件路径与MvImport文件夹路径不同
二、代码实现
做好以上铺垫工作,基本能保证代码运行过程中不会报错,具体的代码实现可以参考文章:https://blog.csdn.net/qq_39570716/article/details/114066097?spm=1001.2014.3001.5501
本文只是将文中的代码进行了封装,提高代码的灵活性
1.设备枚举器
import sys
import logging
from ctypes import *
from ctypes.wintypes import *
sys.path.append("../MvImport")
from MvCameraControl_class import *
class DeviceEnumerator:
def __init__(self):
self.device_list = None
self.logger = logging.getLogger(__name__)
def enum_devices(self, device_type=0):
"""
枚举设备
Args:
device_type: 0-枚举网口、USB口、未知设备、cameralink设备
"""
try:
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE | MV_UNKNOW_DEVICE | MV_1394_DEVICE | MV_CAMERALINK_DEVICE
self.device_list = MV_CC_DEVICE_INFO_LIST()
ret = MvCamera.MV_CC_EnumDevices(tlayerType, self.device_list)
if ret != 0:
self.logger.error(f"枚举设备失败! 错误码: 0x{ret:x}")
return False
if self.device_list.nDeviceNum == 0:
self.logger.warning("未找到任何设备!")
return False
self.logger.info(f"找到 {self.device_list.nDeviceNum} 个设备")
return True
except Exception as e:
self.logger.error(f"枚举设备时发生异常: {e}")
return False
def get_device_count(self):
"""获取设备数量"""
if self.device_list:
return self.device_list.nDeviceNum
return 0
def get_device_info(self, index):
"""获取指定索引的设备信息"""
if not self.device_list or index >= self.device_list.nDeviceNum:
return None
device_info = cast(self.device_list.pDeviceInfo[index], POINTER(MV_CC_DEVICE_INFO)).contents
return self._parse_device_info(device_info, index)
def _parse_device_info(self, device_info, index):
"""解析设备信息"""
info_dict = {
'index': index,
'type': self._get_device_type(device_info.nTLayerType),
'model_name': '',
'serial_number': '',
'manufacturer': '',
'ip_address': '',
'mac_address': ''
}
if device_info.nTLayerType == MV_GIGE_DEVICE:
self._parse_gige_info(device_info, info_dict)
elif device_info.nTLayerType == MV_USB_DEVICE:
self._parse_usb_info(device_info, info_dict)
elif device_info.nTLayerType == MV_CAMERALINK_DEVICE:
self._parse_cameralink_info(device_info, info_dict)
return info_dict
def _get_device_type(self, layer_type):
"""获取设备类型字符串"""
type_map = {
MV_GIGE_DEVICE: "GigE",
MV_USB_DEVICE: "USB",
MV_1394_DEVICE: "1394",
MV_CAMERALINK_DEVICE: "CameraLink"
}
return type_map.get(layer_type, "Unknown")
def _parse_gige_info(self, device_info, info_dict):
"""解析GigE设备信息"""
info = device_info.SpecialInfo.stGigEInfo
info_dict['model_name'] = "".join(chr(per) for per in info.chModelName if per != 0)
info_dict['serial_number'] = "".join(chr(per) for per in info.chSerialNumber if per != 0)
info_dict['manufacturer'] = "".join(chr(per) for per in info.chManufacturerName if per != 0)
info_dict['ip_address'] = self._format_ip(info.nCurrentIp)
info_dict['mac_address'] = self._format_mac(info.chMACAddr)
def _parse_usb_info(self, device_info, info_dict):
"""解析USB设备信息"""
info = device_info.SpecialInfo.stUsb3VInfo
info_dict['model_name'] = "".join(chr(per) for per in info.chModelName if per != 0)
info_dict['serial_number'] = "".join(chr(per) for per in info.chSerialNumber if per != 0)
info_dict['manufacturer'] = "".join(chr(per) for per in info.chVendorName if per != 0)
def _parse_cameralink_info(self, device_info, info_dict):
"""解析CameraLink设备信息"""
info = device_info.SpecialInfo.stCamLInfo
info_dict['model_name'] = "".join(chr(per) for per in info.chModelName if per != 0)
info_dict['serial_number'] = "".join(chr(per) for per in info.chSerialNumber if per != 0)
info_dict['manufacturer'] = "".join(chr(per) for per in info.chVendorName if per != 0)
def _format_ip(self, ip_int):
"""格式化IP地址"""
return f"{(ip_int & 0xff000000) >> 24}.{(ip_int & 0x00ff0000) >> 16}.{(ip_int & 0x0000ff00) >> 8}.{ip_int & 0x000000ff}"
def _format_mac(self, mac_array):
"""格式化MAC地址"""
return ":".join(f"{b:02x}" for b in mac_array[:6])
def print_device_list(self):
"""打印设备列表"""
if not self.device_list:
print("没有找到设备")
return
print(f"
找到 {self.device_list.nDeviceNum} 个设备:")
for i in range(self.device_list.nDeviceNum):
device_info = self.get_device_info(i)
if device_info:
print(
f"[{device_info['index']}] {device_info['type']} - {device_info['model_name']} - SN: {device_info['serial_number']}")
2.相机连接器
import os
import sys
import logging
from ctypes import *
from device_enumerator import DeviceEnumerator
sys.path.append("../MvImport")
from MvCameraControl_class import *
class CameraConnector:
def __init__(self, log_enabled=False, log_path=None):
self.cam = None
self.is_connected = False
self.log_enabled = log_enabled
self.log_path = log_path or os.getcwd()
self.logger = logging.getLogger(__name__)
def connect(self, device_list, device_index=0):
"""
连接指定设备
Args:
device_list: 设备列表
device_index: 设备索引
"""
if not device_list or device_index >= device_list.nDeviceNum:
self.logger.error("设备索引无效")
return False
try:
# 创建相机实例
self.cam = MvCamera()
stDeviceList = cast(device_list.pDeviceInfo[device_index], POINTER(MV_CC_DEVICE_INFO)).contents
# 设置日志
if self.log_enabled:
ret = self.cam.MV_CC_SetSDKLogPath(self.log_path)
if ret != 0:
self.logger.warning(f"设置日志路径失败! 错误码: 0x{ret:x}")
ret = self.cam.MV_CC_CreateHandle(stDeviceList)
else:
ret = self.cam.MV_CC_CreateHandleWithoutLog(stDeviceList)
if ret != 0:
self.logger.error(f"创建句柄失败! 错误码: 0x{ret:x}")
return False
# 打开设备
ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
self.logger.error(f"打开设备失败! 错误码: 0x{ret:x}")
return False
self.is_connected = True
self.logger.info(f"成功连接设备 {device_index}")
return True
except Exception as e:
self.logger.error(f"连接设备时发生异常: {e}")
return False
def disconnect(self):
"""断开设备连接"""
if not self.is_connected or not self.cam:
return
try:
# 关闭设备
ret = self.cam.MV_CC_CloseDevice()
if ret != 0:
self.logger.warning(f"关闭设备失败! 错误码: 0x{ret:x}")
# 销毁句柄
ret = self.cam.MV_CC_DestroyHandle()
if ret != 0:
self.logger.warning(f"销毁句柄失败! 错误码: 0x{ret:x}")
self.is_connected = False
self.logger.info("设备已断开连接")
except Exception as e:
self.logger.error(f"断开连接时发生异常: {e}")
def get_camera_instance(self):
"""获取相机实例"""
return self.cam
def is_connected(self):
"""检查是否已连接"""
return self.is_connected
def __del__(self):
"""析构函数"""
self.disconnect()
def test():
# 需要先枚举设备
enumerator = DeviceEnumerator()
if not enumerator.enum_devices():
print("没有找到设备,无法测试连接")
return
connector = CameraConnector()
if connector.connect(enumerator.device_list, 0):
print("连接成功")
# 等待一段时间后断开
import time
time.sleep(2)
connector.disconnect()
else:
print("连接失败")
if __name__ == "__main__":
test()
3.图像抓取器
import numpy as np
import cv2
import sys
import threading
import logging
from device_enumerator import DeviceEnumerator
from camera_connector import CameraConnector
from ctypes import *
from ctypes.wintypes import *
sys.path.append("../MvImport")
from MvCameraControl_class import *
class ImageGrabber:
def __init__(self):
self.cam = None
self.is_grabbing = False
self.current_frame = None
self.frame_lock = threading.Lock()
self.callback_initialized = False
self.logger = logging.getLogger(__name__)
# 设置回调函数
self._setup_callbacks()
def set_camera(self, camera_instance):
"""设置相机实例"""
self.cam = camera_instance
def _setup_callbacks(self):
"""设置回调函数"""
if self.callback_initialized:
return
# 图像回调
stFrameInfo = POINTER(MV_FRAME_OUT_INFO_EX)
pData = POINTER(c_ubyte)
FrameInfoCallBack = WINFUNCTYPE(None, pData, stFrameInfo, c_void_p)
def image_callback(pData, pFrameInfo, pUser):
stFrameInfo = cast(pFrameInfo, POINTER(MV_FRAME_OUT_INFO_EX)).contents
try:
image = self._process_image_data(pData, stFrameInfo)
if image is not None:
with self.frame_lock:
self.current_frame = image
except Exception as e:
self.logger.error(f"图像回调处理失败: {e}")
self.image_callback_func = FrameInfoCallBack(image_callback)
self.callback_initialized = True
def start_grabbing(self, grab_mode="callback"):
"""
开始取流
Args:
grab_mode: 取流模式,'callback' 或 'active'
"""
if not self.cam:
self.logger.error("相机未设置")
return False
try:
if grab_mode == "callback":
# 注册回调函数
ret = self.cam.MV_CC_RegisterImageCallBackEx(self.image_callback_func, None)
if ret != 0:
self.logger.error(f"注册图像回调失败! 错误码: 0x{ret:x}")
return False
# 开始取流
ret = self.cam.MV_CC_StartGrabbing()
if ret != 0:
self.logger.error(f"开始取流失败! 错误码: 0x{ret:x}")
return False
self.is_grabbing = True
self.logger.info("开始取流")
return True
except Exception as e:
self.logger.error(f"开始取流时发生异常: {e}")
return False
def stop_grabbing(self):
"""停止取流"""
if not self.is_grabbing or not self.cam:
return
try:
ret = self.cam.MV_CC_StopGrabbing()
if ret != 0:
self.logger.warning(f"停止取流失败! 错误码: 0x{ret:x}")
else:
self.is_grabbing = False
self.logger.info("停止取流")
except Exception as e:
self.logger.error(f"停止取流时发生异常: {e}")
def get_frame(self):
"""
获取当前帧
Returns:
numpy.ndarray: 当前图像帧,如果没有帧则返回None
"""
with self.frame_lock:
frame = self.current_frame
self.current_frame = None
return frame
def _process_image_data(self, pData, stFrameInfo):
"""处理图像数据"""
try:
if stFrameInfo.enPixelType == 17301505:
# 单通道
img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight))()
cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight)
data = np.frombuffer(img_buff, dtype=np.uint8)
image = data.reshape((stFrameInfo.nHeight, stFrameInfo.nWidth))
return image
elif stFrameInfo.enPixelType == 17301514:
# Bayer GB
img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight))()
cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight)
data = np.frombuffer(img_buff, dtype=np.uint8)
data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
image = cv2.cvtColor(data, cv2.COLOR_BAYER_GB2RGB)
return image
elif stFrameInfo.enPixelType == 17301513:
# Bayer RG
img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight))()
cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight)
data = np.frombuffer(img_buff, dtype=np.uint8)
data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
image = cv2.cvtColor(data, cv2.COLOR_BAYER_RG2RGB)
return image
elif stFrameInfo.enPixelType == 35127316:
# RGB
img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight * 3))()
cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight * 3)
data = np.frombuffer(img_buff, dtype=np.uint8)
data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
image = cv2.cvtColor(data, cv2.COLOR_RGB2BGR)
return image
elif stFrameInfo.enPixelType == 34603039:
# YUV422
img_buff = (c_ubyte * (stFrameInfo.nWidth * stFrameInfo.nHeight * 2))()
cdll.msvcrt.memcpy(byref(img_buff), pData, stFrameInfo.nWidth * stFrameInfo.nHeight * 2)
data = np.frombuffer(img_buff, dtype=np.uint8)
data = data.reshape(stFrameInfo.nHeight, stFrameInfo.nWidth, -1)
image = cv2.cvtColor(data, cv2.COLOR_YUV2BGR_Y422)
return image
else:
self.logger.warning(f"未处理的像素类型: {stFrameInfo.enPixelType}")
return None
except Exception as e:
self.logger.error(f"处理图像数据时发生异常: {e}")
return None
def is_grabbing(self):
"""检查是否正在取流"""
return self.is_grabbing
def test():
# 需要先枚举设备并连接
enumerator = DeviceEnumerator()
if not enumerator.enum_devices():
print("没有找到设备,无法测试取流")
return
connector = CameraConnector()
if not connector.connect(enumerator.device_list, 0):
print("连接失败,无法测试取流")
return
grabber = ImageGrabber()
grabber.set_camera(connector.get_camera_instance())
# 开始取流(回调方式)
if grabber.start_grabbing():
print("开始取流,持续3秒")
import time
start_time = time.time()
while time.time() - start_time < 3:
frame = grabber.get_frame()
if frame is not None:
print(f"获取到一帧,形状: {frame.shape}")
time.sleep(0.1)
grabber.stop_grabbing()
else:
print("取流失败")
connector.disconnect()
if __name__ == "__main__":
test()
4.图像显示器
import cv2
import numpy as np
import time
class ImageDisplayer:
def __init__(self, window_name="Camera Feed", window_size=(600, 400)):
self.window_name = window_name
self.window_size = window_size
self.fps = 0
self.frame_count = 0
self.start_time = time.time()
def show_frame(self, frame, show_fps=True):
"""
显示帧
Args:
frame: 要显示的图像帧
show_fps: 是否显示FPS
Returns:
int: 按键值
"""
if frame is None:
return 0xFF
# 调整图像大小
resized_frame = self._resize_frame(frame)
# 显示FPS
if show_fps:
resized_frame = self._add_fps_overlay(resized_frame)
# 显示图像
cv2.imshow(self.window_name, resized_frame)
# 更新FPS计数
self._update_fps()
# 返回按键值
return cv2.waitKey(1) & 0xFF
def _resize_frame(self, frame):
"""调整帧大小"""
return cv2.resize(frame, self.window_size, interpolation=cv2.INTER_AREA)
def _update_fps(self):
"""更新FPS计算"""
self.frame_count += 1
current_time = time.time()
elapsed_time = current_time - self.start_time
if elapsed_time > 1.0: # 每秒更新一次FPS
self.fps = self.frame_count / elapsed_time
self.frame_count = 0
self.start_time = current_time
def _add_fps_overlay(self, frame):
"""添加FPS叠加层"""
fps_text = f"FPS: {self.fps:.1f}"
cv2.putText(frame, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
return frame
def save_frame(self, frame, filename=None):
"""
保存帧为图像文件
Args:
frame: 要保存的图像帧
filename: 文件名,默认为时间戳
Returns:
bool: 保存是否成功
"""
if frame is None:
return False
if filename is None:
timestamp = int(time.time())
filename = f"captured_frame_{timestamp}.bmp"
try:
cv2.imwrite(filename, frame)
print(f"图像已保存为: {filename}")
return True
except Exception as e:
print(f"保存图像失败: {e}")
return False
def close_window(self):
"""关闭显示窗口"""
cv2.destroyWindow(self.window_name)
def close_all_windows(self):
"""关闭所有窗口"""
cv2.destroyAllWindows()
5.主函数
import cv2
import logging
from device_enumerator import DeviceEnumerator
from camera_connector import CameraConnector
from image_grabber import ImageGrabber
from image_displayer import ImageDisplayer
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
# 创建各个组件实例
device_enumerator = DeviceEnumerator()
camera_connector = CameraConnector(log_enabled=False)
image_grabber = ImageGrabber()
image_displayer = ImageDisplayer("Camera Feed", (800, 600))
try:
# 1. 枚举设备
print("正在枚举设备...")
if not device_enumerator.enum_devices():
print("未找到任何设备,程序退出")
return
device_enumerator.print_device_list()
# 2. 连接设备(默认连接第一个设备)
print("
正在连接设备...")
if not camera_connector.connect(device_enumerator.device_list, device_index=0):
print("设备连接失败,程序退出")
return
# 3. 设置图像采集器
image_grabber.set_camera(camera_connector.get_camera_instance())
# 4. 开始取流(默认使用回调方式)
print("开始取流...")
if not image_grabber.start_grabbing(grab_mode="callback"):
print("开始取流失败,程序退出")
return
print("按 'q' 键退出程序")
print("按 's' 键保存当前帧")
print("按 'p' 键暂停/继续显示")
paused = False
# 主循环
while True:
if not paused:
# 获取当前帧
frame = image_grabber.get_frame()
if frame is not None:
# 显示图像
key = image_displayer.show_frame(frame, show_fps=True)
# 按键处理
if key == ord('q'):
break
elif key == ord('s'):
image_displayer.save_frame(frame)
elif key == ord('p'):
paused = True
print("显示已暂停,按 'p' 继续")
else:
# 如果没有帧,等待一小段时间
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
else:
# 暂停状态下只检查按键
key = cv2.waitKey(100) & 0xFF
if key == ord('q'):
break
elif key == ord('p'):
paused = False
print("显示继续")
except KeyboardInterrupt:
print("
程序被用户中断")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
# 清理资源
print("正在清理资源...")
image_grabber.stop_grabbing()
camera_connector.disconnect()
image_displayer.close_all_windows()
print("程序退出")
if __name__ == "__main__":
main()
三、总结
以上就是通过python调用海康工业相机SDK包实现图像显示的所有内容了。这里顺便补充一下为什么没办法通过VideoCapture() 接口直接调用海康工业相机的原因:海康工业相机不包含UVC虚拟协议,所以上位机没法将海康工业相机的驱动视为外置相机驱动。

