百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

使用 OpenCV、Kafka 和 Spark 技术的视频流分析

bigegpt 2025-02-07 16:38 7 浏览

关键要点

  • 对于大规模视频流数据的可靠处理和高效处理,需要可扩展、容错和松散耦合的分布式系统。
  • 本文中的示例应用程序使用开源技术——OpenCV、Kafka 和 Spark——来构建这样一个系统。Amazon S3 或 HDFS 可用于存储.
  • 该系统包括三个主要组件——视频流收集器、流数据缓冲器和视频流处理器。
  • 视频流收集器工作带有一组 IP 摄像机,它们提供视频内容流数据的实时馈送,并使用 OpenCV 视频处理库将视频流转换为帧,将 JSON 中的数据传递给用于 Stream Data Buffer 组件的 Kafka Broker。
  • Video Stream Processor 组件基于 Apache Spark 构建,并再次使用 OpenCV 处理视频流数据。

技术给非结构化数据带来了前所未有的爆炸式增长。移动设备、网站、社交媒体、科学仪器、卫星、物联网设备和监控摄像头等来源每秒都会生成大量图像和视频。

管理和有效分析这些数据是一项挑战。考虑一个城市的视频监控摄像头网络。监视每个摄像机的视频流以发现任何感兴趣的对象或事件是不切实际且低效的。相反,计算机视觉 (CV) 库处理这些视频流并提供智能视频分析和对象检测。

然而,传统的 CV 系统有局限性。在传统的视频分析系统中,带有 CV 库的服务器同时收集和处理数据,因此服务器故障会丢失视频流数据。检测节点故障并将处理切换到另一个节点可能会导致数据碎片化。

许多任务推动了大数据技术在视频流分析中的使用:大规模视频流的并行和按需处理,从视频帧中提取不同的信息集,使用不同的机器学习库分析数据,管道传输分析数据到应用程序的不同组件进行进一步处理,并以不同格式输出处理后的数据。


视频流分析 - 运动检测

为了可靠地处理和高效处理大规模视频流数据,需要一个可扩展的、容错的、松散耦合的分布式系统。本文中讨论的视频流分析就是根据这些原则设计的。

视频流分析的类型包括:

  • 对象跟踪,
  • 运动检测,
  • 人脸识别,
  • 手势识别,
  • 增强现实,以及
  • 图像分割。

本文示例应用程序的用例是视频流中的运动检测。

运动检测是发现物体(通常是人)相对于其周围环境的位置变化的过程。它主要用于持续监控特定区域的视频监控系统。CV 库提供的算法分析此类摄像机发送的视频源并查找任何运动。检测运动会触发一个事件,该事件可以向应用程序发送消息或提醒用户。

本文的视频流分析应用程序具有三个主要组件:

  • 视频流收集器,
  • 流数据缓冲区,
  • 视频流处理器。

视频流收集器从一组 IP 摄像机接收视频流数据。该组件将视频帧序列化到流数据缓冲区,这是一个用于流视频数据的容错数据队列。视频流处理器消耗来自缓冲区的流数据并对其进行处理。该组件将应用视频处理算法来检测视频流数据中的运动。最后,处理后的数据或图像文件将存储在S3 存储桶或HDFS目录中。该视频流处理系统是使用OpenCV、Apache Kafka和 Apache Spark框架设计的。


OpenCV、Kafka和Spark的简要细节

以下是相关框架的一些细节。

OpenCV

OpenCV(开源计算机视觉库)是一个开源的 BSD 许可库。这个库是用 C++ 编写的,但也提供了 Java API。OpenCV 包含数百种 CV 算法,可用于处理和分析图像和视频文件。请查看此文档 以获取更多详细信息。

Apache Kafka

Apache Kafka 是一个分布式流媒体平台,它提供了用于发布和订阅记录流的系统。这些记录可以以容错的方式存储,消费者可以处理数据。

Apache Spark

Apache Spark 是一个快速、通用的集群计算系统。它提供了用于 SQL 和结构化数据处理的模块、用于机器学习的 MLlib、用于图形处理的 GraphX 和 Spark Streaming。


系统架构

视频流分析系统的架构图如下图1所示。

设计与实施

以下部分提供了示例应用程序中视频流收集器、流数据缓冲区和视频流处理器的设计和实现细节。

视频流收集器

视频流收集器与一组提供实时视频馈送的 IP 摄像机配合使用。该组件必须从每个摄像头读取提要并将视频流转换为一系列视频帧。为了区分每个 IP 摄像头,采集器维护了摄像头 ID 和 URL 的映射关系相机网址和相机.ida中的属性流收集器.properties文件。这些属性可以具有以逗号分隔的相机 URL 和 ID 列表。不同的相机可以提供不同规格的数据,例如编解码器、分辨率或每秒帧数。收集器在从视频流创建帧时必须保留这些细节。

视频流收集器使用 OpenCV 视频处理库将视频流转换为帧。每个帧都被调整为所需的处理分辨率(例如 640x480)。OpenCV 将每个帧或图像存储为一个Mat对象。Mat 需要通过保持帧的详细信息(即行、列和类型)完整地转换为可串行化(字节数组)形式。视频流收集器使用以下 JSON 消息结构来存储这些详细信息。

{"cameraId":"cam-01","timestamp":1488627991133,"rows":12,"cols":15,"type":16,"data":"asdfh"}

cameraId是相机的唯一 ID。timestamp是生成帧的时间。rows, cols,并且type是 OpenCV Mat 特定的细节。data是帧的字节数组的 ba?se-64 编码字符串。

视频流收集器使用Gson库将数据转换为 JSON 消息,在video-stream-event主题中发布。它使用KafkaProducer客户端将 JSON 消息发送到 Kafka 代理。KafkaProducer 将每个 key 的数据发送到同一个分区,并保证这些消息的顺序。

Kafka 主要是为小尺寸的文本消息设计的,但是包含视频帧的字节数组的 JSON 消息会很大(例如 1.5 MB),因此 Kafka 需要更改配置才能处理这些较大的消息。需要调整以下 KafkaProducer 属性:

  • batch.size
  • max.request.size
  • compression.type

流数据缓冲区

为了不丢失地处理大量视频流数据,需要将流数据存储在临时存储器中。Kafka 代理充当视频流收集器生成的数据的缓冲队列。Kafka 使用文件系统来存储消息,并且保留这??些消息的时间长度是可配置的。

在处理之前将数据保存在存储中可确保其持久性并提高系统的整体性能,因为处理器可以根据负载在不同时间以不同速度处理数据。当数据生产速度超过数据处理速度时,这提高了系统的可靠性。

Kafka 保证给定主题的单个分区中的消息顺序。当数据的顺序很重要时,这对于处理数据非常有帮助。要存储大消息,可能需要在server.propertiesKafka 服务器的文件中调整以下配置:

  • message.max.bytes
  • replica.fetch.max.bytes

视频流处理器

视频流处理器执行三个步骤:

  1. 以数据集的形式从 Kafka 代理读取 JSON 消息VideoEventData
  2. 按摄像机 ID对数据集进行分组VideoEventData并将其传递给视频流处理器。
  3. 从 JSON 数据创建一个 Mat 对象并处理视频流数据。

视频流处理器基于 Apache Spark 构建。Spark 提供了一个Spark Streaming API,它使用离散化流或 DStream,以及一个新的Structured Streaming基于数据集的 API。此应用程序的视频流处理器使用结构化流 API 来使用和处理来自 Kafka 的 JSON 消息。请注意,此应用程序以 JSON 消息的形式处理结构化数据,非结构化视频数据是视频流处理器将处理的这些 JSON 消息的属性。Spark 文档指出“结构化流式处理提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需对流式处理进行推理。” 这就是为什么视频流处理器是围绕 Spark 的结构化流设计的。结构化流引擎为结构化文本数据和聚合查询的状态管理提供内置支持。

要处理大消息,必须将以下 Kafka 消费者配置传递给 Spark 引擎:

  • max.partition.fetch.bytes
  • max.poll.records

该组件的主要类是VideoStreamProcessor. 此类首先创建一个SparkSession对象,该对象是使用 Spark SQL 引擎的入口点。下一步是为传入的 JSON 消息定义一个模式,以便 Spark 可以使用该模式将消息的字符串格式解析为 JSON 格式。Spark 的 bean 编码器可以将其转换为Dataset. VideoEventData是一个保存 JSON 消息数据的 Java bean 类。

下一个,groupByKey按相机 ID 对数据集进行分组以获取KeyValueGroupedDataset. 它使用一个mapGroupsWithState转变为一组工作视频事件数据(迭代器) 用于按摄像机 ID 分组的当前批次视频帧。此转换首先检查最后处理的视频事件数据(视频帧)存在并将其传递给视频处理器进行下一步处理。视频处理后,最后处理的视频事件数据(视频帧)从视频处理器返回并更新状态。要启动流媒体应用程序,写流使用控制台接收器和更新输出模式在数据集上调用方法。

视频流处理器使用 OpenCV 库来处理视频流数据。我们的应用程序旨在检测运动;视频运动检测器是具有用于检测一系列帧中的运动的逻辑的类。这个过程的第一步是对列表进行排序视频事件数据(迭代器) 通过给定摄像机 ID 的时间戳来按顺序比较视频帧。下一步是迭代排序的列表视频事件数据对象并将它们转换为 OpenCV垫目的。如果最后处理的视频帧可用,则它将其用作处理当前帧系列的第一个视频帧。视频运动检测器比较两个连续的帧并使用 OpenCV 库提供的 API 检测差异。如果它发现超出定义标准的差异,则将其视为运动。视频运动检测器将以图像文件的形式将此检测到的运动保存到预配置的 S3 存储桶或 HDFS 目录。此图像文件可以由另一个应用程序进行进一步处理,或者视频运动检测器可以触发事件以通知用户或应用程序已检测到运动。


技术和工具

下表显示了用于此视频流分析系统的技术和工具。

工具和技术

版本

下载网址

JDK

1.8

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Maven

3.3.9

https://maven.apache.org/download.cgi

ZooKeeper

3.4.8

https://zookeeper.apache.org/releases.html

Kafka

2.11-0.10.2.0

http://kafka.apache.org/downloads.html

Spark

2.2.0

http://spark.apache.org/downloads.html

OpenCV

3.2.0

http://opencv.org/releases.html

请参阅有关安装和配置这些工具的文档。Kafka 文档和Spark 文档提供了有关如何在独立模式或集群模式下设置和运行应用程序的详细信息。要安装 OpenCV,请参阅OpenCV 文档。


构建和部署

本节详细介绍如何构建和运行示例应用程序的视频流收集器和视频流处理器组件。此应用程序可用于处理离线视频文件和实时摄像机源,但此处配置为分析离线示例视频文件。请按照以下步骤构建和运行此应用程序:

1.下载并安装上表中列出的工具。确保 ZooKeeper 和 Kafka 服务器已启动并正在运行。

2.此应用程序使用 OpenCV 本地库(.dll 或 .so)并使用该System.loadLibrary()方法加载它们。在系统环境变量中设置这些本机库的目录路径或将此路径作为命令行参数传递。例如,对于 64 位 Windows 机器,本机库文件(opencv_java320.dll 和 opencv_ffmpeg320_64.dll)的路径将为 {OpenCV 安装目录}\build\java\x64。

3.stream-collector.properties文件的 Kafka 主题为video-stream-event. 在 Kafka 中创建此主题和分区。使用该kafka-topic命令创建主题和分区。

kafka-topics.sh --create --zookeeper localhost:2181 --topic video-stream-event --replication-factor 1 --partitions 3

4.stream-processor.properties文件有一个processed.output.dir属性,就是保存处理过的图片的目录路径。创建并设置此属性的目录路径。

5.stream-collector.properties文件具有camera.url保存视频文件或视频源的路径或 URL 的属性。确保路径或 URL 正确。

6.检查log4j.properties文件VideoStreamCollector和组件,并VideoStreamProcessor设置文件的目录路径。检查这些日志文件以获取应用程序生成的日志消息,这有助于在运行应用程序时出现错误。stream-collector.logstream-processor.log

7.此应用程序使用 OpenCV JAR 文件中的 OpenCV API,但 OpenCV JAR 文件在 Maven 中央存储库中不可用。此应用程序与可安装到本地 Maven 存储库的 OpenCV JAR 文件捆绑在一起。在 pom.xml 文件中,maven-install-plugin已配置并与用于安装此 JAR 文件的 clean 阶段相关联。要在本地 Maven 存储库中安装 OpenCV JAR,请转到 video-stream-processor 文件夹并执行此命令。

mvn clean

8.为保持应用程序逻辑简单,VideoStreamProcessor 只处理新消息。在启动组件之前,该VideoStreamProcessor组件应该已启动并运行VideoStreamCollector。要VideoStreamProcessor使用 Maven 运行,请转到 video-stream-processor 文件夹并执行此命令。

mvn clean package exec:java -Dexec.mainClass="com.iot.video.app.spark.processor.VideoStreamProcessor"

9.VideoStreamProcessor启动后,启动VideoStreamCollector组件。转到 video-stream-collector 文件夹并执行此命令。

mvn clean package exec:java -Dexec.mainClass="com.iot.video.app.kafka.collector.VideoStreamCollector" -Dexec.cleanupDaemonThreads=false

GitHub 项目捆绑了一个 sample.mp4 视频文件。此视频文件的 URL 和 ID 已配置为stream-collector.properties 文件的属性camera.urlcamera.id处理完视频文件后,图像将保存在预先配置的目录中(步骤 4)。图 4 显示了此应用程序的示例输出。

此应用程序可以配置和处理多个视频源(离线和实时源)。例如,要与 sample.mp4 一起添加网络摄像头供稿,请编辑
stream-collector.properties 文件并在属性中添加整数(第一个网络摄像头为 0,第二个网络摄像头为 1,依此类推),
camera.url并添加相应的摄像头ID(cam-01、cam-02 等)在camera.id属性中用逗号分隔。这是一个例子:

camera.url=../sample-video/sample.mp4,0

camera.id=vid-01,cam-01


结论

视频流的大规模视频分析需要一个由大数据技术支持的强大系统。OpenCV、Kafka 和 Spark 等开源技术可用于构建用于视频流分析的容错分布式系统。我们使用 OpenCV 和 Kafka 构建了一个视频流收集器组件,用于接收来自不同来源的视频流并将它们发送到流数据缓冲区组件。Kafka 充当流数据缓冲区组件,提供流数据的持久存储。视频流处理器组件是使用 OpenCV 和 Spark 的 Structured Streaming 开发的。该组件从流数据缓冲区接收流数据并分析该数据。处理后的文件存储在预配置的 HDFS 或 S3 存储桶中。我们使用运动检测作为用例来演示视频流分析示例应用程序。



相关推荐

【Docker 新手入门指南】第十章:Dockerfile

Dockerfile是Docker镜像构建的核心配置文件,通过预定义的指令集实现镜像的自动化构建。以下从核心概念、指令详解、最佳实践三方面展开说明,帮助你系统掌握Dockerfile的使用逻...

Windows下最简单的ESP8266_ROTS_ESP-IDF环境搭建与腾讯云SDK编译

前言其实也没啥可说的,只是我感觉ESP-IDF对新手来说很不友好,很容易踩坑,尤其是对业余DIY爱好者搭建环境非常困难,即使有官方文档,或者网上的其他文档,但是还是很容易踩坑,多研究,记住两点就行了,...

python虚拟环境迁移(python虚拟环境conda)

主机A的虚拟环境向主机B迁移。前提条件:主机A和主机B已经安装了virtualenv1.主机A操作如下虚拟环境目录:venv进入虚拟环境:sourcevenv/bin/active(1)记录虚拟环...

Python爬虫进阶教程(二):线程、协程

简介线程线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能...

基于网络安全的Docker逃逸(docker)

如何判断当前机器是否为Docker容器环境Metasploit中的checkcontainer模块、(判断是否为虚拟机,checkvm模块)搭配学习教程1.检查根目录下是否存在.dockerenv文...

Python编程语言被纳入浙江高考,小学生都开始学了

今年9月份开始的新学期,浙江省三到九年级信息技术课将同步替换新教材。其中,新初二将新增Python编程课程内容。新高一信息技术编程语言由VB替换为Python,大数据、人工智能、程序设计与算法按照教材...

CentOS 7下安装Python 3.10的完整过程

1.安装相应的编译工具yum-ygroupinstall"Developmenttools"yum-yinstallzlib-develbzip2-develope...

如何在Ubuntu 20.04上部署Odoo 14

Odoo是世界上最受欢迎的多合一商务软件。它提供了一系列业务应用程序,包括CRM,网站,电子商务,计费,会计,制造,仓库,项目管理,库存等等,所有这些都无缝集成在一起。Odoo可以通过几种不同的方式进...

Ubuntu 系统安装 PyTorch 全流程指南

当前环境:Ubuntu22.04,显卡为GeForceRTX3080Ti1、下载显卡驱动驱动网站:https://www.nvidia.com/en-us/drivers/根据自己的显卡型号和...

spark+python环境搭建(python 环境搭建)

最近项目需要用到spark大数据相关技术,周末有空spark环境搭起来...目标spark,python运行环境部署在linux服务器个人通过vscode开发通过远程python解释器执行代码准备...

centos7.9安装最新python-3.11.1(centos安装python环境)

centos7.9安装最新python-3.11.1centos7.9默认安装的是python-2.7.5版本,安全扫描时会有很多漏洞,比如:Python命令注入漏洞(CVE-2015-2010...

Linux系统下,五大步骤安装Python

一、下载Python包网上教程大多是通过官方地址进行下载Python的,但由于国内网络环境问题,会导致下载很慢,所以这里建议通过国内镜像进行下载例如:淘宝镜像http://npm.taobao.or...

centos7上安装python3(centos7安装python3.7.2一键脚本)

centos7上默认安装的是python2,要使用python3则需要自行下载源码编译安装。1.安装依赖yum-ygroupinstall"Developmenttools"...

利用本地数据通过微调方式训练 本地DeepSeek-R1 蒸馏模型

网络上相应的教程基本都基于LLaMA-Factory进行,本文章主要顺着相应的教程一步步实现大模型的微调和训练。训练环境:可自行定义,mac、linux或者window之类的均可以,本文以ma...

【法器篇】天啦噜,库崩了没备份(天啦噜是什么意思?)

背景数据库没有做备份,一天突然由于断电或其他原因导致无法启动了,且设置了innodb_force_recovery=6都无法启动,里面的数据怎么才能恢复出来?本例采用解析建表语句+表空间传输的方式进行...