Compute Graph Framework SDK Reference  5.12
Port.hpp
Go to the documentation of this file.
1
2//
3// Notice
4// ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
5// NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
6// THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
7// MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
8//
9// NVIDIA CORPORATION & AFFILIATES assumes no responsibility for the consequences of use of such
10// information or for any infringement of patents or other rights of third parties that may
11// result from its use. No license is granted by implication or otherwise under any patent
12// or patent rights of NVIDIA CORPORATION & AFFILIATES. No third party distribution is allowed unless
13// expressly authorized by NVIDIA. Details are subject to change without notice.
14// This code supersedes and replaces all information previously supplied.
15// NVIDIA CORPORATION & AFFILIATES products are not authorized for use as critical
16// components in life support devices or systems without express written approval of
17// NVIDIA CORPORATION & AFFILIATES.
18//
19// SPDX-FileCopyrightText: Copyright (c) 2018-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
20// SPDX-License-Identifier: LicenseRef-NvidiaProprietary
21//
22// NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
23// property and proprietary rights in and to this material, related
24// documentation and any modifications thereto. Any use, reproduction,
25// disclosure or distribution of this material and related documentation
26// without an express license agreement from NVIDIA CORPORATION or
27// its affiliates is strictly prohibited.
28//
30
31#ifndef DW_FRAMEWORK_PORT_H_
32#define DW_FRAMEWORK_PORT_H_
33
35#include <dwcgf/Exception.hpp>
36#include <dwshared/dwfoundation/dw/core/logger/Logger.hpp>
38
39#include "SyncPortHelper.hpp"
40
41#include <nvscisync.h>
42#include <stdexcept>
43#include <string>
44
45namespace dw
46{
47namespace framework
48{
49
51enum class PortDirection : uint8_t
52{
53 INPUT = 0,
54 OUTPUT,
55 COUNT,
56};
57
58// coverity[autosar_cpp14_m3_4_1_violation]
60{
61public:
62 virtual ~PortBase() = default;
63};
64
66class Port : public PortBase
67{
68public:
69 virtual dwStatus bindChannel(ChannelObject* channel) = 0;
70 virtual bool isBound() = 0;
72 {
73 return m_channel;
74 };
75
76protected:
78};
79
81
88template <typename T>
89// coverity[autosar_cpp14_a10_1_1_violation]
90class PortOutput : public SyncPortHelperOutput<T>, public Port
91{
92public:
94 // coverity[autosar_cpp14_a0_1_6_violation]
95 using ApiDataTypeT = T;
98
99 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
100
101 static constexpr char LOG_TAG[]{"PortOutput"};
102
103private:
104 ChannelObject::Producer* m_channelProducer;
105 SpecimenT m_reference;
106 OnSetSyncAttrs m_waiterAttrs;
107 OnSetSyncAttrs m_signalerAttrs;
108 uint32_t m_sendSeqNum;
109
110public:
111 explicit PortOutput(SpecimenT const& ref)
113 , Port()
114 , m_channelProducer(nullptr)
115 , m_reference(ref)
116 , m_sendSeqNum(0U)
117 {
118 }
119 explicit PortOutput(SpecimenT&& ref)
121 , Port()
122 , m_channelProducer(nullptr)
123 , m_reference(std::move(ref))
124 , m_sendSeqNum(0U)
125 {
126 }
127
128 explicit PortOutput(SpecimenT const& ref,
129 OnSetSyncAttrs signalerAttrs,
130 OnSetSyncAttrs waiterAttrs = {})
132 , Port()
133 , m_channelProducer(nullptr)
134 , m_reference(ref)
135 , m_waiterAttrs(waiterAttrs)
136 , m_signalerAttrs(signalerAttrs)
137 , m_sendSeqNum(0U)
138 {
139 }
140
141 // Channel Bind
142 dwStatus bindChannel(ChannelObject* channel) override
143 {
144 GenericDataReference ref{make_specimen<T>(&m_reference)};
145 return bindChannelWithReference(channel, ref);
146 }
147
149 {
150 return ExceptionGuard::guard([&] {
151 if (isBound())
152 {
153 // TODO(chale): this should be an Exception but applications are currently
154 // doing this. Those applications should be fixed.
155 // coverity[autosar_cpp14_a5_1_1_violation]
156 DW_LOGE << "PortOutput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
157 return;
158 }
159 if (channel == nullptr)
160 {
161 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: expected channel != nullptr");
162 }
163 m_channel = channel;
166 ref.setWaiterAttributes = m_waiterAttrs;
167 ref.setSignalerAttributes = m_signalerAttrs;
168
169 m_channelProducer = channel->getProducer(ref);
170 if (m_channelProducer == nullptr)
171 {
172 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortOutput bindChannel: wrong channel implementations returned.");
173 }
174 },
175 dw::core::Logger::Verbosity::DEBUG);
176 }
177
179 {
180 if (!isBound())
181 {
182 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: setOnDataReady: no bound channel");
183 }
184 m_channelProducer->setOnDataReady(opaque, std::move(onDataReady));
185 }
186
187 bool isBound() final
188 {
189 return (m_channelProducer != nullptr);
190 }
191
192 dwStatus wait(dwTime_t timeout)
193 {
194 if (!isBound())
195 {
196 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
197 }
198
199 return m_channelProducer->wait(timeout);
200 }
201
202 // Node accessors
203 // TODO(unknown): This function's prototype needs to change to properly propagate errors
205 {
206 dwStatus status{DW_FAILURE};
207 GenericData genericData{};
208 if (m_channelProducer)
209 {
210 status = m_channelProducer->get(&genericData);
211 }
212
213 if (status != DW_SUCCESS)
214 {
215 return nullptr;
216 }
217
218 // coverity[autosar_cpp14_a5_1_1_violation]
219 extractMetadata(genericData)->header.validFields = 0U;
220 return BaseSyncHelper::extractInternalPacket(genericData);
221 }
222
223 // Tx Operations
224 virtual dwStatus send(T* frame)
225 {
226 if (!m_channelProducer)
227 {
228 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
229 }
230
232 populateDefaultMetadata(payload->header);
233 return m_channelProducer->send(payload);
234 }
235
236 // coverity[autosar_cpp14_a2_10_5_violation] RFD Pending: TID-2053
238 {
239 if (!m_channelProducer)
240 {
241 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
242 }
243
245 return payload->header;
246 }
247
249 {
250 if (!m_channelProducer)
251 {
252 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
253 }
254 return m_channelProducer->getSyncSignaler();
255 }
256
257 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
258 {
259 m_channelProducer->getSyncSignaler().setSignalFences(BaseSyncHelper::getMetadataPacket(frame), fences);
260 }
261
263 {
264 if (!m_channelProducer)
265 {
266 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
267 }
268 return m_channelProducer->getSyncWaiter();
269 }
270
271 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
272 {
273 m_channelProducer->getSyncWaiter().getWaitFences(BaseSyncHelper::getMetadataPacket(frame), fences);
274 }
275
276protected:
278 {
279 header.sequenceNum = m_sendSeqNum;
280 // coverity[autosar_cpp14_a4_7_1_violation]
281 // coverity[cert_int30_c_violation]
282 m_sendSeqNum++;
283 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
284 header.producerId = 0U;
285
287 {
289 header.validFields |= static_cast<uint16_t>(MetadataFlags::METADATA_ITERATION_COUNT);
290 }
291 }
292};
293
294template <typename T>
295constexpr char PortOutput<T>::LOG_TAG[];
296
298
305template <typename T>
306// coverity[autosar_cpp14_a10_1_1_violation]
307class PortInput : public SyncPortHelperInput<T>, public Port
308{
310 "Channel packet type not declared. Ensure channel packet type "
311 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_POD "
312 "or DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
313
314 static constexpr char LOG_TAG[]{"PortInput"};
315
316public:
318 // coverity[autosar_cpp14_a0_1_6_violation]
319 using ApiDataTypeT = T;
322
323 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
324
325 explicit PortInput(SpecimenT const& ref)
327 , Port()
328 , m_channelConsumer(nullptr)
329 , m_reuse(false)
330 , m_reference(ref)
331 {
332 }
333 explicit PortInput(SpecimenT&& ref)
335 , Port()
336 , m_channelConsumer(nullptr)
337 , m_reuse(false)
338 , m_reference(std::move(ref))
339 {
340 }
341
344 , Port()
345 , m_channelConsumer(nullptr)
346 , m_reuse(false)
347 {
348 }
349
350 explicit PortInput(OnSetSyncAttrs waiterAttrs,
351 OnSetSyncAttrs signalerAttrs = {})
353 , Port()
354 , m_channelConsumer(nullptr)
355 , m_reuse(false)
356 , m_waiterAttrs(waiterAttrs)
357 , m_signalerAttrs(signalerAttrs)
358 {
359 }
360
361 explicit PortInput(SpecimenT const& ref,
362 OnSetSyncAttrs waiterAttrs,
363 OnSetSyncAttrs signalerAttrs = {})
365 , Port()
366 , m_channelConsumer(nullptr)
367 , m_reuse(false)
368 , m_reference(ref)
369 , m_waiterAttrs(waiterAttrs)
370 , m_signalerAttrs(signalerAttrs)
371 {
372 }
373
374 ~PortInput() override = default;
375
376 // Channel Bind
377 dwStatus bindChannel(ChannelObject* channel) override
378 {
379 return ExceptionGuard::guard([&] {
380 if (isBound())
381 {
382 // TODO(chale): this should be an Exception but applications are currently
383 // doing this. Those applications should be fixed.
384 // coverity[autosar_cpp14_a5_1_1_violation]
385 DW_LOGE << "PortInput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
386 return;
387 }
388 if (channel == nullptr)
389 {
390 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortInput: bindChannel: expected channel != nullptr");
391 }
392 m_channel = channel;
393
395 GenericDataReference ref{make_specimen<T>(nullptr)};
396
397 if (m_reference.has_value())
398 {
399 ref = make_specimen<T>(&m_reference.value());
400 }
401
402 ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
403 ref.setWaiterAttributes = m_waiterAttrs;
404 ref.setSignalerAttributes = m_signalerAttrs;
405
406 m_channelConsumer = channel->getConsumer(ref);
407 if (m_channelConsumer == nullptr)
408 {
409 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortInput bindChannel: wrong channel implementations returned.");
410 }
411 m_reuse = channel->getParams().getReuseEnabled();
412 },
413 dw::core::Logger::Verbosity::DEBUG);
414 }
415
416 bool isBound() override
417 {
418 return !(m_channelConsumer == nullptr);
419 }
420
422 {
423 if (!isBound())
424 {
425 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: setOnDataReady: no bound channel");
426 }
427 m_channelConsumer->setOnDataReady(opaque, std::move(onDataReady));
428 }
429
430 // Rx Operations
431 dwStatus wait(dwTime_t timeout)
432 {
433 if (!isBound())
434 {
435 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
436 }
437
438 // For synced packets, the wait can return DW_NOT_AVAILABLE or DW_SUCCESS
439 // if there are no packets to consume. This is because you need to consume
440 // a packet to make sure it's valid or not.
442 {
443 return DW_SUCCESS;
444 }
446 {
447 return DW_NOT_AVAILABLE;
448 }
450 {
451 // coverity[autosar_cpp14_a5_1_1_violation]
452 timeout = 0;
453 }
454
455 dwTime_t waitTime{m_last.get() != nullptr ? 0 : timeout};
456 dwStatus status{m_channelConsumer->wait(waitTime)};
457 if (m_last.get() != nullptr && (status == DW_TIME_OUT || status == DW_NOT_AVAILABLE))
458 {
459 return DW_SUCCESS;
460 }
461
462 return status;
463 }
464
465 // TODO(unknown): This function's prototype needs to change to properly propagate errors
466 virtual std::shared_ptr<T> recv()
467 {
468 GenericData data{};
469 std::shared_ptr<T> result{};
470 if (!isBound())
471 {
472 return nullptr;
473 }
474
475 // coverity[autosar_cpp14_a0_1_1_violation]
476 T* typedData{nullptr};
477 // coverity[autosar_cpp14_a0_1_1_violation]
478 void* releasePtr{nullptr};
479
481 {
482 // There is a valid packet to consume
484 releasePtr = data.getPointer();
486 }
488 {
489 // There is a buffered packet, but it's not ready to be consumed.
490 return nullptr;
491 }
492 else
493 {
494 dwStatus status{m_channelConsumer->recv(&data)};
495 if (status != DW_SUCCESS)
496 {
497 if (m_last != nullptr)
498 {
499 return m_last;
500 }
501 else
502 {
503 return nullptr;
504 }
505 }
507 {
508 typedData = BaseSyncHelper::extractSyncPacket(data);
509 if (!typedData)
510 {
511 return nullptr;
512 }
513 }
514 else
515 {
517 }
518 releasePtr = data.getPointer();
519 }
520
521 // don't rely on this class's member when releasing packet
522 ChannelObject::Consumer* channelConsumer{m_channelConsumer};
523 // coverity[autosar_cpp14_a5_1_9_violation]
524 result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
525 channelConsumer->release(releasePtr);
526 });
527 if (m_reuse)
528 {
529 m_last = result;
530 }
531
532 return result;
533 }
534
535 // coverity[autosar_cpp14_a2_10_5_violation] RFD Pending: TID-2053
537 {
538 if (!m_channelConsumer)
539 {
540 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
541 }
542
544 return payload->header;
545 }
546
548 {
549 if (!m_channelConsumer)
550 {
551 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
552 }
553 return m_channelConsumer->getSyncSignaler();
554 }
555
556 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
557 {
558 m_channelConsumer->getSyncSignaler().setSignalFences(BaseSyncHelper::getMetadataPacket(frame), fences);
559 }
560
562 {
563 if (!m_channelConsumer)
564 {
565 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
566 }
567 return m_channelConsumer->getSyncWaiter();
568 }
569
570 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
571 {
572 m_channelConsumer->getSyncWaiter().getWaitFences(BaseSyncHelper::getMetadataPacket(frame), fences);
573 }
574
575private:
576 ChannelObject::Consumer* m_channelConsumer;
577 bool m_reuse;
578 std::shared_ptr<T> m_last;
579 dw::core::Optional<SpecimenT> m_reference;
580 OnSetSyncAttrs m_waiterAttrs;
581 OnSetSyncAttrs m_signalerAttrs;
582};
583
584template <typename T>
585constexpr char PortInput<T>::LOG_TAG[];
586
587} // namespace framework
588} // namespace dw
589
590#endif // DW_FRAMEWORK_PORT_H_
virtual dwStatus recv(GenericData *data)=0
virtual dwStatus wait(dwTime_t timeout)=0
virtual void setOnDataReady(void *opaque, OnDataReady onDataReady)=0
dw::core::Function< void()> OnDataReady
Definition: Channel.hpp:143
virtual dwStatus get(GenericData *data)=0
virtual dwStatus send(void *data)=0
virtual void setSignalFences(void *data, dw::core::span< const NvSciSyncFence > postFences)=0
virtual void getWaitFences(void *data, dw::core::span< NvSciSyncFence > &waitFences)=0
virtual const ChannelParams & getParams() const =0
virtual Consumer * getConsumer(const GenericDataReference &ref)=0
virtual Producer * getProducer(const GenericDataReference &ref)=0
static dwStatus guard(TryBlock const &tryBlock, ::dw::core::Logger::Verbosity verbosity=::dw::core::Logger::Verbosity::ERROR)
Definition: Exception.hpp:177
virtual ~PortBase()=default
PortInput(OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:350
virtual std::shared_ptr< T > recv()
Definition: Port.hpp:466
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:431
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:547
PortInput(SpecimenT const &ref, OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:361
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:377
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:561
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:320
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:556
bool isBound() override
Definition: Port.hpp:416
static constexpr PortDirection DIRECTION
Definition: Port.hpp:317
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:536
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:421
~PortInput() override=default
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:570
PortInput(SpecimenT &&ref)
Definition: Port.hpp:333
PortInput(SpecimenT const &ref)
Definition: Port.hpp:325
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:192
static constexpr char LOG_TAG[]
Definition: Port.hpp:101
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
Definition: Port.hpp:148
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:237
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:142
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:178
PortOutput(SpecimenT const &ref)
Definition: Port.hpp:111
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:96
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:248
void populateDefaultMetadata(ChannelMetadata &header)
Definition: Port.hpp:277
bool isBound() final
Definition: Port.hpp:187
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
Definition: Port.hpp:128
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:257
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:262
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:271
static constexpr PortDirection DIRECTION
Definition: Port.hpp:93
PortOutput(SpecimenT &&ref)
Definition: Port.hpp:119
virtual dwStatus send(T *frame)
Definition: Port.hpp:224
ChannelObject * m_channel
Definition: Port.hpp:77
virtual dwStatus bindChannel(ChannelObject *channel)=0
virtual bool isBound()=0
virtual ChannelObject * getChannel()
Definition: Port.hpp:71
T * extractInternalPacket(GenericData genericData)
T * extractSyncPacket(GenericData genericData)
MetadataPayload * getMetadataPacket(T *frame)
void parseDataSynced(const ChannelParams &params) override
T * extractInternalPacket(GenericData genericData)
void parseDataSynced(const ChannelParams &params) override
MetadataPayload * getMetadataPacket(T *frame)
MetadataPayload * extractMetadata(GenericData packet)
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
OnSetSyncAttrs setSignalerAttributes
lambda to set the signaler attributes of the endpoint.
ChannelPacketTypeID packetTypeID
The ID of the type of the endpoint.
OnSetSyncAttrs setWaiterAttributes
lambda to set the waiter attributes of the endpoint.
Definition: Buffer.hpp:40
ChannelPacketTypeID getNewPacketID(ChannelPacketTypeID packetTypeID)
void stampSyncCount(uint32_t &syncCountOut) const