交通监控系统、医疗保健和零售业都从智能视频分析( IVA )中受益匪浅。 DeepStream 是一个 IVA SDK 。 DeepStream 使您能够在运行时附加和分离视频流,而不会影响整个部署。
这篇文章讨论了使用 DeepStream 添加和删除流的细节。我还介绍了如何跨多个孤立的数据中心集中管理大型部署,使用来自多个摄像头的流服务于多个用例。
NVIDIA DeepStream SDK 是一种用于多传感器处理的流分析工具包。流式数据分析用例正在你眼前发生变化。 IVA 在更智能的空间中有着巨大的帮助。DeepStream 运行在离散的 GPU ,如NVIDIA T4 , Nvidia 安培架构和系统芯片上的平台,如 NVIDIA Jetson 系列的设备。
DeepStream 具有灵活性,使您能够使用以下任一功能构建复杂的应用程序:
- 多种深度学习框架
- 多流
- 多个模型串联或并联组合形成一个整体
- 多个模型协同工作
- 以不同的精度计算
- 自定义预处理和后处理
- 与库伯内特斯的配器
DeepStream 应用程序可以有多个插件,如图 1 所示。根据功能,每个插件可以使用 GPU 、 DLA 或专用硬件。
DeepStream 的基本功能是允许大规模部署,确保在任何给定时间的吞吐量和准确性。任何 IVA 管道的规模取决于两个主要因素:
- 流管理
- 计算能力
流管理是任何具有多个摄像头的大型部署的重要方面。任何大型部署都不能用于添加/删除流。如此大规模的部署必须进行故障保护,以便在运行时处理虚假流。此外,部署预计将处理用例到使用特定模型运行的管道的运行时连接/分离。
这篇文章帮助您了解流管理的以下方面:
- 使用 DeepStream Python API 的河流消耗量
- 在运行时添加和删除流
- 在运行时将特定流附加到具有特定模型的管道
- 涉及多个数据中心的大规模部署中的流管理
随着应用程序复杂性的增加,更改变得越来越困难。一个深思熟虑的发展战略从一开始就可以起到很大的作用。在下一节中,我将简要讨论开发 DeepStream 应用程序的不同方法。我还讨论了如何管理流/用例分配和解除分配,并考虑一些最佳实践。
DeepStream 应用程序开发
DeepStream 使您能够为基于 AI 的视频、音频和图像分析创建无缝流媒体管道。 DeepStream 为您提供了使用 C 或 Python 进行开发的选择,为它们提供了更大的灵活性。 DeepStream 附带了几个硬件加速插件。 DeepStream 源于 Gstreamer ,提供 Python 和 C 语言之间的统一 API 。
Python 和用于 DeepStream 的 C API 是统一的。这意味着在 Python 中开发的任何应用程序都可以轻松地转换为 C 和 C 。 Python 和 C 为开发人员提供了所有级别的自由。使用 DeepStream Python 和 C API ,可以设计在运行时处理流和用例的动态应用程序。一些示例 Python 应用程序位于: NVIDIA-AI-IOT/deepstream_python_apps 。
DeepStream SDK 基于 GStreamer 多媒体框架,包括一个 GPU 加速插件管道。 SDK 中包含用于视频输入、视频解码、图像预处理、基于 NVIDIA TensorRT 的推理、对象跟踪和显示的插件,以简化应用程序开发过程。这些功能可用于创建适应性强的多流视频分析解决方案。
插件是制作管道的核心构建块。输入(即管道输入,例如相机和视频文件)和输出(例如屏幕显示)之间的每个数据缓冲区都通过插件传递。视频解码和编码、神经网络推理以及在视频流顶部显示文本就是 plug-ins 的示例。连接的插件构成 pipeline 。
PAD 是插件之间的接口。当数据在管道中从一个插件流向另一个插件时,它从一个插件的源板流向另一个插件的接收器板。每个插件可能有零个、一个或多个源/接收器组件。
前面的示例应用程序由以下插件组成:
GstUriDecodebin
:将 URI 中的数据解码为原始媒体。它选择一个可以处理给定方案的源插件,并将其连接到decodebin
。Nvstreammux
:Gst-nvstreammux
插件从多个输入源形成一批帧。Nvinfer
:Gst-nvinfer
插件使用 TensorRT 对输入数据进行推断。Nvmultistream-tiler
:Gst-nvmultistreamtiler
插件从批处理缓冲区合成 2D 磁贴。Nvvideoconvert
:Gst-nvvideoconvert
执行缩放、裁剪和视频颜色格式转换。NvDsosd
:Gst-nvdsosd
绘制边界框、文本和感兴趣区域( ROI )多边形。GstEglGles
:EglGlesSink
在 EGL 表面上渲染视频帧( xOverlay 界面和本机显示)。
每个插件可以有一个或多个源和接收器焊盘。在这种情况下,当添加流时,Gst-Uridecodebin
插件被添加到管道中,每个流一个插件。每个Gst-Uridecodebin
插件的源组件连接到单个Nv-streammux
插件上的每个接收器组件。Nv-streammux
从来自所有以前插件的帧创建批,并将它们推送到管道中的下一个插件。图 3 显示了如何将多个摄影机流添加到管道中。
缓冲区通过管道传输数据。缓冲区带有时间戳,包含由各种 DeepStream 插件附加的元数据。缓冲区携带诸如有多少插件在使用它、标志和指向内存中对象的指针等信息。
DeepStream 应用程序可以看作是由单个组件插件组成的管道。每个插件使用 TensorRT 或多流解码表示一个类似于功能块的推理。在适用的情况下,使用底层硬件加速插件,以提供最佳性能。 DeepStream 的关键价值在于使视频深度学习易于访问,让您能够集中精力快速构建和定制高效、可扩展的视频分析应用程序。
运行时流添加/删除应用程序
DeepStream 以 Python 和 C 语言提供了运行时添加/删除功能的示例实现。样本位于以下位置:
- DeepStream 源添加和删除( C ): https://github.com/NVIDIA-AI-IOT/deepstream_reference_apps/tree/master/runtime_source_add_delete
- DeepStream 源代码添加和删除( Python ): https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/runtime_source_add_delete
这些应用程序的设计考虑到了简单性。这些应用程序接受一个输入流,同一个流在设定的时间间隔后多次添加到正在运行的管道中。这就是在不重新启动应用程序的情况下将指定数量的流添加到管道的方式。最终,在每个时间间隔删除每个流。删除最后一个流后,应用程序将正常停止。
要从示例应用程序开始,请执行以下步骤。
创建基于 Python 的应用程序
- 从 ngc 拔出 DeepStream Docker 映像 。NVIDIA 。通用域名格式。
- 在服务器上运行 git clone Python 应用程序存储库 在 Docker 容器中。
- 转到 Docker 容器中的以下位置: deepstream \ u python \ u apps / apps / runtime \ u source \ u add \ u delete
- 设置 Python 先决条件 .
- 转到应用程序/运行时\源\添加\删除并按如下方式执行应用程序:
python3 deepstream-test-rt-src-add-del.py <uri> python3 deepstream_rt_src_add_del.py file:///opt/nvidia/deepstream/deepstream-<VER>/samples/streams/sample_720p.mp4
创建基于 C 的应用程序
- 从 ngc 拔出 DeepStream Docker 映像 。NVIDIA 。通用域名格式。 :
- 在 Docker 容器内
/opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/
的 C 应用程序存储库 上运行 git clone 。 - 转到
deepstream_reference_apps/runtime_source_add_delete
,编译并运行应用程序,如下所示:
make ./deepstream-test-rt-src-add-del <uri> ./deepstream-test-rt-src-add-del file:///opt/nvidia/deepstream/deepstream-<VER>/samples/streams/sample_720p.mp4
应用程序方面:运行时摄影机添加和删除
DeepStream Python 或 C 应用程序在运行脚本时通常将输入流作为参数列表。代码执行后,会发生一系列事件,最终将流添加到正在运行的管道中。
在这里,您使用uridecodebin
插件将 URI 中的数据解码为原始媒体。它选择一个可以处理给定方案的源插件,并将其连接到解码箱。
以下是注册任何流时发生的序列列表:
- 源 bin 是通过函数
create_uridecode_bin
从Curidecodebin
插件创建的。函数create_uridecode_bin
接受第一个参数source_id
,它是一个整数,第二个参数是rtsp_url
。在本例中,此整数是流的顺序,从 1 …。。 N 。此整数用于创建唯一可识别的source-bin
名称,如source-bin-1
、source-bin-2
、…source-bin-
N 。 g_source_bin_list
字典在source-bin
和id
值之间映射。- 创建源 bin 后,程序参数中的 RTSP 流 URL 将附加到此源 bin 。
- 稍后,
uridecodebin
的源 bin 值链接到下一个插件streammux
的接收器 bin 。 - 创建多个
uridecodebin
插件,每个插件对应一个流,并连接到streammux
插件。
下面的代码示例显示了 Python 中用于将多个流连接到 DeepStream 管道的最小代码。
g_source_bin_list = [] for i in range(num_sources): print("Creating source_bin ",i," ") uri_name=argv[i] if uri_name.find("rtsp://") == 0 : is_live = True #Create source bin and add to pipeline source_bin=create_uridecode_bin(i, uri_name) g_source_bin_list[rtsp[i]] = source_bin pipeline.add(source_bin)
在更有组织的应用程序中,负责流添加的这些代码行被转移到一个函数,该函数使用两个参数来附加流:stream_id
和rtsp_url
。您可以随时调用此类函数,并将更多流附加到正在运行的应用程序中。
类似地,当流必须与应用程序分离时,会发生以下事件:
- 已连接流的
source_id
被赋予函数stop_release_source
。 - 连接到要释放的
source_id
的streammux
的sink-pad
与uridecodebin
的source bin
分离。 - 然后将 uridecodebin 的源 bin 从管道中移除。
- 活动源计数减少 1 。
下面的代码示例显示了 Python 和 C 从 DeepStream 管道分离流的最小代码。
def stop_release_source(source_id): pad_name = "sink_%u" % source_id print(pad_name) #Retrieve sink pad to be released sinkpad = streammux.get_static_pad(pad_name) #Send flush stop event to the sink pad, then release from the streammux sinkpad.send_event(Gst.Event.new_flush_stop(False)) streammux.release_request_pad(sinkpad) #Remove the source bin from the pipeline pipeline.remove(g_source_bin_list[source_id]) source_id -= 1 g_num_sources -= 1
部署方面:运行时摄影机和用例管理
前面,我讨论了如何在代码中添加和删除流。考虑到部署方面,还有一些因素。
以前,您使用命令行参数获取所有输入流。但是,在程序执行之后,当它处于部署中时,您不能向它提供任何附加参数。如何向正在运行的程序传递要附加或分离哪个流的指令?
部署需要额外的代码,用于定期检查是否有必须附加的新流可用。应删除以下流:
- 流不再需要监视。
- 摄像头问题导致没有流。
- 先前附加的流必须用于另一个用例。
在多个数据中心进行流处理的情况下,优先考虑距离数据中心最近的流源。
DeepStream 管道在主螺纹中运行。需要一个单独的线程来检查要添加或删除的流。谢天谢地, Glib 有一个名为g_timeout_add_seconds
的函数。 Glib 是 gnuclibrary 项目,它为 GNU 系统和 GNU / Linux 系统以及许多其他使用 Linux 作为内核的系统提供核心库。
g_timeout_add_seconds
( set )是管道运行时定期调用的函数。重复调用该函数,直到返回 FALSE ,此时超时将自动销毁,并且不会再次调用该函数。
guint g_timeout_add_seconds (guint interval, GSourceFunc function, gpointer data);
g_timeout_add_seconds
接受三个输入:
Interval
:调用函数之间的时间,以秒为单位。function
:要调用的函数。data
:要传递给函数的数据和参数。
例如,调用函数watchDog
时需要GSourceBinList
。streamURL
和streamId
之间的字典映射。streamId
是将流添加到管道后生成的内部 ID (整数)。最后一个调用方函数类似于以下代码示例:
guint interval = 10; guint g_timeout_add_seconds (interval, watchDog, GSourceBinList, argv);
根据当前间隔设置,watchDog
函数每 10 秒调用一次。必须维护一个数据库来管理和跟踪多个流。表 1 显示了这样一个示例数据库表。函数watchDog
可用于查询数据库,其中根据当前状态和用例维护了所有可用流的列表。
Source ID | RTSP URL | Stream State | Use case | Camera Location | Taken |
1 | Rtsp://123/1.mp4 |
ON | License Plate Detection | Loc1 | True |
2 | Rtsp://123/2.mp4 |
BAD STREAM | License Plate Detection | Loc2 | False |
3 | Rtsp://123/3.mp4 |
OFF | Motion Detection | Loc2 | False |
n | Rtsp://123/n.mp4 |
OFF | Social Distance | Loc3 | False |
下面是一个同时管理多个流所需的最小数据库结构( SQL / no SQL )示例:
- Source ID: 一个唯一的 ID ,也是
nvstreammux
连接到的接收器板 ID 。source_id
对于监视nv-gst
事件非常有用,例如, pad 为每个流添加了已删除的 EOS 。请记住,在前面的简单应用程序中,您考虑过按参数输入顺序将源 bin 设置为source-bin-1
、source-bin-2
、…source-bin-
N 。对许多摄影机使用相同的方法,并跟踪应用程序范围内的所有活动源存储箱。 - RTSP URL: 源插件应使用的 URL 。
- Stream state: 有助于管理流的状态,如打开或关闭。数据库客户机还必须能够根据客户机感知到的情况更改摄像头,例如坏流、无 STREAMm 摄像头故障等。这有助于即时维护。
- Use case: 为相机分配一个用例。将选中此用例,并且仅连接模型当前处于活动状态的摄影机。
- Camera Location: 有助于根据摄像机的位置定位计算机。此检查可避免从位于远处的摄影机进行不必要的捕获,并且可以更好地分配给附近的其他计算群集。
- Taken: 假设部署是具有多个节点的多 GPU 。当在任何机器和任何 GPU 上运行的 DeepStream 应用程序添加任何源时,它会将标志设置为
True
。这可以防止另一个实例再次重复添加相同的源。
如前所述维护一个模式可以从一个中心位置轻松创建和监控仪表板。
回到watchDog
函数,下面是检查流状态并根据位置和用例附加新视频流的伪代码:
FUNCTION watchDog (Dict: GSourceBinList) INITIALIZE streamURL ⟵ List ▸ Dynamic list of stream URLs INITIALIZE streamState ⟵ List ▸ Dynamic list of state corresponding to stream URLs INITIALIZE streamId ⟵ Integer ▸ variable to store id for new stream streamURL, streamState := getCurrentCameraState() FOR X = 1 in length(streamState) IF ((streamURL[X] IN gSourceBinList.keys()) & (streamState[X] == "OFF")) stopReleaseSource(streamURL[X]) ▸ Detach stream streamURL, streamState := getAllStreamByLocationAndUsecase() FOR Y = 1 in length(streamState) IF ((streamURL[Y] NOT IN gSourceBinList.keys()) & (streamState [Y] == "ON") streamId := addSource(streamURL[Y]) ▸ Add new stream GSourceBinList(streamURL, streamId) ▸ update mappings RETURN GSourceBinList
- 在模块加载和全局变量初始化之后,应用程序进入主功能。
- 在主函数中,初始化本地模块和变量。
- 当应用程序第一次启动时,它会在应用位置和用例过滤器后从数据库请求流列表。
- 收到流列表后, DeepStream 管道的所有插件都将初始化、链接并设置为
PLAY
状态。此时,应用程序正在使用提供的所有流运行。 - 在每个设置的时间间隔之后,一个单独的线程检查数据库中当前流的状态。如果数据库中任何已添加流的状态更改为
OFF
,则该流将被释放。该线程还检查数据库中是否列出了状态为ON
的新摄像头,在应用位置和用例过滤器后,该流将添加到 DeepStream 管道中。 - 添加流后,数据库的
Taken
列中的标志必须设置为True
,以便其他进程无法再次添加相同的流。
图 4 显示了有效地添加、删除摄像机流并连接到使用适当模型运行的服务器所需的功能调用的总体流程。
仅仅更改源的数量没有帮助,因为源的下游组件必须能够根据流的数量更改其属性。为此,已经对 DeepStream 应用程序的组件进行了优化,以在运行时更改属性。
然而,许多插件在初始化期间使用批大小作为参数来分配计算/内存资源。在这种情况下,建议在执行应用程序时指定最大批量大小。表 2 显示了一些这样的插件示例:
Plug-ins | Functionality | Runtime changes |
Gst- nvstreammux | Forms a batch of frames from multiple input sources. | The muxer supports the addition and deletion of sources at run time. |
Gst-nvdsanalytics | Performs analytics on metadata attached by nvinfer (primary detector) and nvtracker . |
If the runtime stream resolution is different from the configuration resolution. The plug-in handles the resolution change and scales the rules for the runtime resolution. |
Gst-nvinfer | Performs inferencing on input data using TensorRT. | Enables the reconfiguration of batch size according to number of the stream at runtime. |
Gst-nvinferserver | Performs inferencing on input data using NVIDIA Triton Inference Server. | Enables the reconfiguration of batch size according to number of the stream at runtime. |
Gst-nvmultistreamtiler | Composites a 2D tile from batched buffers. | Reconfigures 2D tile for new sources added at runtime. |
Gst- nvtracker | Enables the DS pipeline to use a low-level tracker to track the detected objects with unique IDs. | Supports tracking on new sources added at runtime and cleanup of resources when sources are removed. |
当检测到流的数量时,可以显式更改属性。要在运行时手动调整插件的属性,请使用 set_property
在 Python 和 C 或 g_object_set
函数在 C 。
最佳做法
- 在添加到管道之前,请始终检查流属性。 可以使用
gst-discoverer-1.0
命令行实用程序检查流属性。它从命令行接受 URI ,并打印有关流的所有信息。了解用于生成媒体的容器和编解码器,以及必须在管道中放入哪些插件才能播放媒体,这非常有用。 Gst Discover 可通过使用各自的 API 与 Python 和 C 一起使用。 - 在开发 DeepStream 应用程序时对其进行概要分析。 这是优化和调整应用程序的第一步。分析有助于理解应用程序的性能特征,并可以轻松识别代码中有改进机会的部分。查找应用程序中的热点和瓶颈,帮助您决定优化工作的重点。
- 通过分析应用程序,计算可在 GPU 上运行的最大流数。在运行时,确保将最大流保持在支持的最大值以下,以便应用程序性能保持稳定。
要提高性能,请参阅 DeepStream 故障排除手册 。
有关更多信息,请参阅以下参考资料: