Compute Graph Framework SDK Reference  5.6
Port.hpp
Go to the documentation of this file.
1
2//
3// Notice
4// ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
5// NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
6// THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
7// MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
8//
9// NVIDIA CORPORATION & AFFILIATES assumes no responsibility for the consequences of use of such
10// information or for any infringement of patents or other rights of third parties that may
11// result from its use. No license is granted by implication or otherwise under any patent
12// or patent rights of NVIDIA CORPORATION & AFFILIATES. No third party distribution is allowed unless
13// expressly authorized by NVIDIA. Details are subject to change without notice.
14// This code supersedes and replaces all information previously supplied.
15// NVIDIA CORPORATION & AFFILIATES products are not authorized for use as critical
16// components in life support devices or systems without express written approval of
17// NVIDIA CORPORATION & AFFILIATES.
18//
19// SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
20// SPDX-License-Identifier: LicenseRef-NvidiaProprietary
21//
22// NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
23// property and proprietary rights in and to this material, related
24// documentation and any modifications thereto. Any use, reproduction,
25// disclosure or distribution of this material and related documentation
26// without an express license agreement from NVIDIA CORPORATION or
27// its affiliates is strictly prohibited.
28//
30
31#ifndef DW_FRAMEWORK_PORT_H_
32#define DW_FRAMEWORK_PORT_H_
33
35
36#include "SyncPortHelper.hpp"
37
38#include <stdexcept>
39#include <string>
40
41namespace dw
42{
43namespace framework
44{
45
47enum PortDirection : uint8_t
48{
49 INPUT = 0,
52};
53
55{
56public:
57 virtual ~PortBase() = default;
58};
59
61class Port : public PortBase
62{
63public:
64 virtual dwStatus bindChannel(ChannelObject* channel) = 0;
65 virtual dwStatus initialize() { return DW_SUCCESS; }
66 virtual bool isBound() = 0;
67};
68
70
77template <typename T>
78class PortOutput : public SyncPortHelperOutput<T>, public Port
79{
80public:
82 using ApiDataTypeT = T;
85
86 static constexpr char LOG_TAG[] = "PortOutput";
87
88private:
89 ChannelObject::Producer* m_channelProducer{};
90 SpecimenT m_reference{};
91 OnSetSyncAttrs m_waiterAttrs{};
92 OnSetSyncAttrs m_signalerAttrs{};
93
94public:
95 explicit PortOutput(SpecimenT const& ref)
96 : m_reference(ref)
97 {
98 }
99 explicit PortOutput(SpecimenT&& ref)
100 : m_reference(std::move(ref))
101 {
102 }
103
104 explicit PortOutput(SpecimenT const& ref,
105 OnSetSyncAttrs signalerAttrs,
106 OnSetSyncAttrs waiterAttrs = {})
107 : m_reference(ref)
108 , m_waiterAttrs(waiterAttrs)
109 , m_signalerAttrs(signalerAttrs)
110 {
111 }
112
113 // Channel Bind
114 dwStatus bindChannel(ChannelObject* channel) override
115 {
116 auto ref = make_specimen<T>(&m_reference);
117 return bindChannelWithReference(channel, ref);
118 }
119
121 {
122 return Exception::guard([&] {
123 if (isBound())
124 {
125 // TODO(chale): this should be an Exception but applications are currently
126 // doing this. Those applications should be fixed.
127 FRWK_LOGE << "PortOutput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
128 return;
129 }
130 if (channel == nullptr)
131 {
132 throw Exception(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: expected channel != nullptr");
133 }
134
136
138 {
140 }
141
142 ref.setWaiterAttributes = m_waiterAttrs;
143 ref.setSignalerAttributes = m_signalerAttrs;
144
145 m_channelProducer = channel->getProducer(ref);
146 if (m_channelProducer == nullptr)
147 {
148 throw Exception(DW_INTERNAL_ERROR, "PortOutput bindChannel: wrong channel implementations returned.");
149 }
150 });
151 }
152
154 {
155 if (!isBound())
156 {
157 throw Exception(DW_NOT_AVAILABLE, "PortOutput: setOnDataReady: no bound channel");
158 }
159 m_channelProducer->setOnDataReady(opaque, onDataReady);
160 }
161
162 bool isBound() final
163 {
164 return (m_channelProducer != nullptr);
165 }
166
167 dwStatus wait(dwTime_t timeout)
168 {
169 if (!isBound())
170 {
171 throw Exception(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
172 }
173
174 return m_channelProducer->wait(timeout);
175 }
176
177 // Node accessors
178 // TODO(unknown): This function's prototype needs to change to properly propagate errors
180 {
181 dwStatus status = DW_FAILURE;
182 GenericData genericData{};
183 if (m_channelProducer)
184 {
185 status = m_channelProducer->get(&genericData);
186 }
187
188 if (status != DW_SUCCESS)
189 {
190 return nullptr;
191 }
192
194 {
195 return BaseSyncHelper::extractInternalPacket(genericData);
196 }
197
198 return genericData.template getData<T>();
199 }
200
201 // Tx Operations
202 virtual dwStatus send(T* frame)
203 {
204 if (!m_channelProducer)
205 {
206 throw Exception(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
207 }
208
210 {
211 return m_channelProducer->send(BaseSyncHelper::getSyncPacket(frame));
212 }
213
214 return m_channelProducer->send(frame);
215 }
216
218 {
219 if (!m_channelProducer)
220 {
221 throw Exception(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
222 }
223 return m_channelProducer->getSyncSignaler();
224 }
225
226 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
227 {
229 {
230 m_channelProducer->getSyncSignaler().setSignalFences(BaseSyncHelper::getSyncPacket(frame), fences);
231 }
232 else
233 {
234 m_channelProducer->getSyncSignaler().setSignalFences(frame, fences);
235 }
236 }
237
239 {
240 if (!m_channelProducer)
241 {
242 throw Exception(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
243 }
244 return m_channelProducer->getSyncWaiter();
245 }
246
247 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
248 {
250 {
251 m_channelProducer->getSyncWaiter().getWaitFences(BaseSyncHelper::getSyncPacket(frame), fences);
252 }
253 else
254 {
255 m_channelProducer->getSyncWaiter().getWaitFences(frame, fences);
256 }
257 }
258};
259
260template <typename T>
261constexpr char PortOutput<T>::LOG_TAG[];
262
264
271template <typename T>
272class PortInput : public SyncPortHelperInput<T>, public Port
273{
274 static constexpr char LOG_TAG[] = "PortInput";
275
276public:
278 using ApiDataTypeT = T;
281
282 explicit PortInput(SpecimenT const& ref)
283 : m_reference(ref)
284 {
285 }
286 explicit PortInput(SpecimenT&& ref)
287 : m_reference(std::move(ref))
288 {
289 }
290
292 {
293 }
294
295 explicit PortInput(OnSetSyncAttrs waiterAttrs,
296 OnSetSyncAttrs signalerAttrs = {})
297 : m_waiterAttrs(waiterAttrs)
298 , m_signalerAttrs(signalerAttrs)
299 {
300 }
301
302 explicit PortInput(SpecimenT const& ref,
303 OnSetSyncAttrs waiterAttrs,
304 OnSetSyncAttrs signalerAttrs = {})
305 : m_reference(ref)
306 , m_waiterAttrs(waiterAttrs)
307 , m_signalerAttrs(signalerAttrs)
308 {
309 }
310
311 ~PortInput() override = default;
312
313 // Channel Bind
314 dwStatus bindChannel(ChannelObject* channel) override
315 {
316 return Exception::guard([&] {
317 if (isBound())
318 {
319 // TODO(chale): this should be an Exception but applications are currently
320 // doing this. Those applications should be fixed.
321 FRWK_LOGE << "PortInput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
322 return;
323 }
324 if (channel == nullptr)
325 {
326 throw Exception(DW_INVALID_ARGUMENT, "PortInput: bindChannel: expected channel != nullptr");
327 }
328
330 auto ref = make_specimen<T>(nullptr);
331
332 if (m_reference)
333 {
334 ref = make_specimen<T>(&m_reference.value());
335 }
336
338 {
339 ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
340 }
341
342 ref.setWaiterAttributes = m_waiterAttrs;
343 ref.setSignalerAttributes = m_signalerAttrs;
344
345 m_channelConsumer = channel->getConsumer(ref);
346 if (m_channelConsumer == nullptr)
347 {
348 throw Exception(DW_INTERNAL_ERROR, "PortInput bindChannel: wrong channel implementations returned.");
349 }
350 m_reuse = channel->getParams().getReuseEnabled();
351 });
352 }
353
354 bool isBound() override
355 {
356 return !(m_channelConsumer == nullptr);
357 }
358
360 {
361 if (!isBound())
362 {
363 throw Exception(DW_NOT_AVAILABLE, "PortInput: setOnDataReady: no bound channel");
364 }
365 m_channelConsumer->setOnDataReady(opaque, onDataReady);
366 }
367
368 // Rx Operations
369 dwStatus wait(dwTime_t timeout)
370 {
371 if (!isBound())
372 {
373 throw Exception(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
374 }
375
376 // For synced packets, the wait can return DW_NOT_AVAILABLE or DW_SUCCESS
377 // if there are no packets to consume. This is because you need to consume
378 // a packet to make sure it's valid or not.
380 {
381 return DW_SUCCESS;
382 }
384 {
385 return DW_NOT_AVAILABLE;
386 }
388 {
389 timeout = 0;
390 }
391
392 dwTime_t waitTime = m_last ? 0 : timeout;
393 dwStatus status = m_channelConsumer->wait(waitTime);
394 if (m_last && (status == DW_TIME_OUT || status == DW_NOT_AVAILABLE))
395 {
396 return DW_SUCCESS;
397 }
398
399 return status;
400 }
401
402 // TODO(unknown): This function's prototype needs to change to properly propagate errors
403 virtual std::shared_ptr<T> recv()
404 {
405 GenericData data{};
406 std::shared_ptr<T> result;
407 if (!isBound())
408 {
409 return nullptr;
410 }
411
412 T* typedData = nullptr;
413 void* releasePtr = nullptr;
414
416 {
417 // There is a valid packet to consume
419 releasePtr = data.getPointer();
421 }
423 {
424 // There is a buffered packet, but it's not ready to be consumed.
425 return nullptr;
426 }
427 else
428 {
429 dwStatus status = m_channelConsumer->recv(&data);
430 if (status != DW_SUCCESS)
431 {
432 if (m_last != nullptr)
433 {
434 return m_last;
435 }
436 else
437 {
438 return nullptr;
439 }
440 }
441 releasePtr = data.getPointer();
442
444 {
446 if (!typedData)
447 {
448 return nullptr;
449 }
450 }
451 else
452 {
453 typedData = data.template getData<T>();
454 }
455 }
456
457 // don't rely on this class's member when releasing packet
458 auto* channelConsumer = m_channelConsumer;
459 result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
460 channelConsumer->release(releasePtr);
461 });
462 if (m_reuse)
463 {
464 m_last = result;
465 }
466
467 return result;
468 }
469
471 {
472 if (!m_channelConsumer)
473 {
474 throw Exception(DW_NOT_AVAILABLE, "PortInput: channel not bound");
475 }
476 return m_channelConsumer->getSyncSignaler();
477 }
478
479 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
480 {
482 {
483 throw Exception(DW_NOT_SUPPORTED, "PortInput: not supported");
484 }
485 else
486 {
487 m_channelConsumer->getSyncSignaler().setSignalFences(frame, fences);
488 }
489 }
490
492 {
493 if (!m_channelConsumer)
494 {
495 throw Exception(DW_NOT_AVAILABLE, "PortInput: channel not bound");
496 }
497 return m_channelConsumer->getSyncWaiter();
498 }
499
500 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
501 {
503 {
504 throw Exception(DW_NOT_SUPPORTED, "PortInput: not supported");
505 }
506 else
507 {
508 m_channelConsumer->getSyncWaiter().getWaitFences(frame, fences);
509 }
510 }
511
512private:
513 ChannelObject::Consumer* m_channelConsumer{};
514 bool m_reuse{};
515 std::shared_ptr<T> m_last{};
516 dw::core::Optional<SpecimenT> m_reference{};
517 OnSetSyncAttrs m_waiterAttrs{};
518 OnSetSyncAttrs m_signalerAttrs{};
519};
520
521template <typename T>
522constexpr char PortInput<T>::LOG_TAG[];
523
524} // namespace framework
525} // namespace dw
526
527#endif // DW_FRAMEWORK_PORT_H_
#define FRWK_LOGE
Definition: Logger.hpp:41
virtual dwStatus recv(GenericData *data)=0
virtual dwStatus wait(dwTime_t timeout)=0
virtual void setOnDataReady(void *opaque, OnDataReady onDataReady)=0
dw::core::Function< void()> OnDataReady
Definition: Channel.hpp:136
virtual dwStatus get(GenericData *data)=0
virtual dwStatus send(void *data)=0
virtual void setSignalFences(void *data, dw::core::span< const NvSciSyncFence > postFences)=0
virtual void getWaitFences(void *data, dw::core::span< NvSciSyncFence > &waitFences)=0
virtual Producer * getProducer(GenericDataReference ref)=0
virtual const ChannelParams & getParams() const =0
virtual Consumer * getConsumer(GenericDataReference ref)=0
static dwStatus guard(TryBlock tryBlock, dw::core::Logger::Verbosity verbosity=dw::core::Logger::Verbosity::DEBUG)
Definition: Exception.hpp:228
virtual ~PortBase()=default
PortInput(OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:295
virtual std::shared_ptr< T > recv()
Definition: Port.hpp:403
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:369
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:470
PortInput(SpecimenT const &ref, OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:302
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:314
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:491
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:279
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:479
bool isBound() override
Definition: Port.hpp:354
static constexpr PortDirection DIRECTION
Definition: Port.hpp:277
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:359
~PortInput() override=default
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:500
PortInput(SpecimenT &&ref)
Definition: Port.hpp:286
PortInput(SpecimenT const &ref)
Definition: Port.hpp:282
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:167
static constexpr char LOG_TAG[]
Definition: Port.hpp:86
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
Definition: Port.hpp:120
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:114
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:153
PortOutput(SpecimenT const &ref)
Definition: Port.hpp:95
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:83
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:217
bool isBound() final
Definition: Port.hpp:162
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
Definition: Port.hpp:104
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:226
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:238
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:247
static constexpr PortDirection DIRECTION
Definition: Port.hpp:81
PortOutput(SpecimenT &&ref)
Definition: Port.hpp:99
virtual dwStatus send(T *frame)
Definition: Port.hpp:202
virtual dwStatus bindChannel(ChannelObject *channel)=0
virtual bool isBound()=0
virtual dwStatus initialize()
Definition: Port.hpp:65
T * extractInternalPacket(GenericData genericData)
T * extractInternalPacket(GenericData genericData)
SyncedPacketPayload * getSyncPacket(T *frame)
void parseDataSynced(const ChannelParams &params) override
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
Definition: Exception.hpp:47
ChannelPacketTypeID getNewPacketID(ChannelPacketTypeID packetTypeID)
virtual void parseDataSynced(const ChannelParams &params)