Compute Graph Framework SDK Reference  5.14
Port.hpp
Go to the documentation of this file.
1
2//
3// Notice
4// ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
5// NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
6// THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
7// MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
8//
9// NVIDIA CORPORATION & AFFILIATES assumes no responsibility for the consequences of use of such
10// information or for any infringement of patents or other rights of third parties that may
11// result from its use. No license is granted by implication or otherwise under any patent
12// or patent rights of NVIDIA CORPORATION & AFFILIATES. No third party distribution is allowed unless
13// expressly authorized by NVIDIA. Details are subject to change without notice.
14// This code supersedes and replaces all information previously supplied.
15// NVIDIA CORPORATION & AFFILIATES products are not authorized for use as critical
16// components in life support devices or systems without express written approval of
17// NVIDIA CORPORATION & AFFILIATES.
18//
19// SPDX-FileCopyrightText: Copyright (c) 2018-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
20// SPDX-License-Identifier: LicenseRef-NvidiaProprietary
21//
22// NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
23// property and proprietary rights in and to this material, related
24// documentation and any modifications thereto. Any use, reproduction,
25// disclosure or distribution of this material and related documentation
26// without an express license agreement from NVIDIA CORPORATION or
27// its affiliates is strictly prohibited.
28//
30
31#ifndef DW_FRAMEWORK_PORT_H_
32#define DW_FRAMEWORK_PORT_H_
33
35#include <dwcgf/Exception.hpp>
36#include <dwshared/dwfoundation/dw/core/container/StringView.hpp>
37#include <dwshared/dwfoundation/dw/core/logger/Logger.hpp>
39
40#include "SyncPortHelper.hpp"
41
42#include <nvscisync.h>
43#include <stdexcept>
44#include <string>
45
46namespace dw
47{
48namespace framework
49{
50
52enum class PortDirection : uint8_t
53{
54 INPUT = 0,
55 OUTPUT,
56};
57
58// coverity[autosar_cpp14_m3_4_1_violation]
60{
61public:
62 virtual ~PortBase() = default;
63};
64
66class Port : public PortBase
67{
68public:
69 virtual dwStatus bindChannel(ChannelObject* channel) = 0;
70 virtual bool isBound() = 0;
72 {
73 return m_channel;
74 };
75
76protected:
78};
79
81
88template <typename T>
89// coverity[autosar_cpp14_a10_1_1_violation]
90class PortOutput : public SyncPortHelperOutput<T>, public Port
91{
92public:
94 // coverity[autosar_cpp14_a0_1_6_violation]
95 using ApiDataTypeT = T;
98
99 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
100
101 // coverity[autosar_cpp14_a2_10_5_violation]
102 static constexpr char LOG_TAG[]{"PortOutput"};
103
104private:
105 ChannelObject::Producer* m_channelProducer;
106 SpecimenT m_reference;
107 OnSetSyncAttrs m_waiterAttrs;
108 OnSetSyncAttrs m_signalerAttrs;
109 uint32_t m_sendSeqNum;
110
111public:
112 explicit PortOutput(SpecimenT const& ref)
114 , Port()
115 , m_channelProducer(nullptr)
116 , m_reference(ref)
117 , m_sendSeqNum(0U)
118 {
119 }
120 explicit PortOutput(SpecimenT&& ref)
122 , Port()
123 , m_channelProducer(nullptr)
124 , m_reference(std::move(ref))
125 , m_sendSeqNum(0U)
126 {
127 }
128
129 explicit PortOutput(SpecimenT const& ref,
130 OnSetSyncAttrs signalerAttrs,
131 OnSetSyncAttrs waiterAttrs = {})
133 , Port()
134 , m_channelProducer(nullptr)
135 , m_reference(ref)
136 , m_waiterAttrs(waiterAttrs)
137 , m_signalerAttrs(signalerAttrs)
138 , m_sendSeqNum(0U)
139 {
140 }
141
142 // Channel Bind
143 dwStatus bindChannel(ChannelObject* channel) override
144 {
145 GenericDataReference ref{make_specimen<T>(&m_reference)};
146 return bindChannelWithReference(channel, ref);
147 }
148
150 {
151 return ExceptionGuard::guard([&] {
152 if (isBound())
153 {
154 // TODO(chale): this should be an Exception but applications are currently
155 // doing this. Those applications should be fixed.
156 DW_LOGE << dw::core::StringView{"PortOutput: bindChannel: attempted to bind the same port twice, ignoring this bind!"} << Logger::State::endl;
157 return;
158 }
159 if (channel == nullptr)
160 {
161 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: expected channel != nullptr");
162 }
163 m_channel = channel;
166 ref.setWaiterAttributes = m_waiterAttrs;
167 ref.setSignalerAttributes = m_signalerAttrs;
168
169 m_channelProducer = channel->getProducer(ref);
170 if (m_channelProducer == nullptr)
171 {
172 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortOutput bindChannel: wrong channel implementations returned.");
173 }
174 },
175 dw::core::Logger::Verbosity::DEBUG);
176 }
177
179 {
180 if (!isBound())
181 {
182 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: setOnDataReady: no bound channel");
183 }
184 m_channelProducer->setOnDataReady(opaque, std::move(onDataReady));
185 }
186
187 bool isBound() final
188 {
189 return (m_channelProducer != nullptr);
190 }
191
192 dwStatus wait(dwTime_t timeout)
193 {
194 if (!isBound())
195 {
196 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
197 }
198
199 return m_channelProducer->wait(timeout);
200 }
201
202 // Node accessors
203 // TODO(unknown): This function's prototype needs to change to properly propagate errors
205 {
206 dwStatus status{DW_FAILURE};
207 GenericData genericData{};
208 if (m_channelProducer)
209 {
210 status = m_channelProducer->get(&genericData);
211 }
212
213 if (status != DW_SUCCESS)
214 {
215 return nullptr;
216 }
217
218 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
219 extractMetadata(genericData)->header.validFields = 0U;
220 return BaseSyncHelper::extractInternalPacket(genericData);
221 }
222
223 // Tx Operations
224 virtual dwStatus send(T* frame)
225 {
226 if (!m_channelProducer)
227 {
228 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
229 }
230
232 populateDefaultMetadata(payload->header);
233 return m_channelProducer->send(payload);
234 }
235
236 // coverity[autosar_cpp14_a2_10_5_violation] RFD Pending: TID-2053
238 {
239 if (!m_channelProducer)
240 {
241 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
242 }
243
245 return payload->header;
246 }
247
249 {
250 if (!m_channelProducer)
251 {
252 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
253 }
254 return m_channelProducer->getSyncSignaler();
255 }
256
257 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
258 {
259 m_channelProducer->getSyncSignaler().setSignalFences(BaseSyncHelper::getMetadataPacket(frame), fences);
260 }
261
263 {
264 if (!m_channelProducer)
265 {
266 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
267 }
268 return m_channelProducer->getSyncWaiter();
269 }
270
271 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
272 {
273 m_channelProducer->getSyncWaiter().getWaitFences(BaseSyncHelper::getMetadataPacket(frame), fences);
274 }
275
276protected:
278 {
279 setSequenceNumber(header, m_sendSeqNum);
280 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
281 dw::core::safeIncrement(m_sendSeqNum, 1U);
282 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
283 header.producerId = 0U;
284
286 {
288 header.validFields |= static_cast<uint16_t>(MetadataFlags::METADATA_ITERATION_COUNT);
289 }
290 }
291};
292
293template <typename T>
294constexpr char PortOutput<T>::LOG_TAG[];
295
297
304template <typename T>
305// coverity[autosar_cpp14_a10_1_1_violation]
306class PortInput : public SyncPortHelperInput<T>, public Port
307{
309 "Channel packet type not declared. Ensure channel packet type "
310 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_POD "
311 "or DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
312 // coverity[autosar_cpp14_a2_10_5_violation]
313 static constexpr char LOG_TAG[]{"PortInput"};
314
315public:
317 // coverity[autosar_cpp14_a0_1_6_violation]
318 using ApiDataTypeT = T;
321
322 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
323
324 explicit PortInput(SpecimenT const& ref)
326 , Port()
327 , m_channelConsumer(nullptr)
328 , m_reuse(false)
329 , m_reference(ref)
330 {
331 }
332 explicit PortInput(SpecimenT&& ref)
334 , Port()
335 , m_channelConsumer(nullptr)
336 , m_reuse(false)
337 , m_reference(std::move(ref))
338 {
339 }
340
343 , Port()
344 , m_channelConsumer(nullptr)
345 , m_reuse(false)
346 {
347 }
348
349 explicit PortInput(OnSetSyncAttrs waiterAttrs,
350 OnSetSyncAttrs signalerAttrs = {})
352 , Port()
353 , m_channelConsumer(nullptr)
354 , m_reuse(false)
355 , m_waiterAttrs(waiterAttrs)
356 , m_signalerAttrs(signalerAttrs)
357 {
358 }
359
360 explicit PortInput(SpecimenT const& ref,
361 OnSetSyncAttrs waiterAttrs,
362 OnSetSyncAttrs signalerAttrs = {})
364 , Port()
365 , m_channelConsumer(nullptr)
366 , m_reuse(false)
367 , m_reference(ref)
368 , m_waiterAttrs(waiterAttrs)
369 , m_signalerAttrs(signalerAttrs)
370 {
371 }
372
373 ~PortInput() override = default;
374
375 // Channel Bind
376 dwStatus bindChannel(ChannelObject* channel) override
377 {
378 return ExceptionGuard::guard([&] {
379 if (isBound())
380 {
381 // TODO(chale): this should be an Exception but applications are currently
382 // doing this. Those applications should be fixed.
383 DW_LOGE << dw::core::StringView{"PortInput: bindChannel: attempted to bind the same port twice, ignoring this bind!"} << Logger::State::endl;
384 return;
385 }
386 if (channel == nullptr)
387 {
388 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortInput: bindChannel: expected channel != nullptr");
389 }
390 m_channel = channel;
391
393 GenericDataReference ref{make_specimen<T>(nullptr)};
394
395 if (m_reference.has_value())
396 {
397 ref = make_specimen<T>(&m_reference.value());
398 }
399
400 ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
401 ref.setWaiterAttributes = m_waiterAttrs;
402 ref.setSignalerAttributes = m_signalerAttrs;
403
404 m_channelConsumer = channel->getConsumer(ref);
405 if (m_channelConsumer == nullptr)
406 {
407 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortInput bindChannel: wrong channel implementations returned.");
408 }
409 m_reuse = channel->getParams().getReuseEnabled();
410 },
411 dw::core::Logger::Verbosity::DEBUG);
412 }
413
414 bool isBound() override
415 {
416 return !(m_channelConsumer == nullptr);
417 }
418
420 {
421 if (!isBound())
422 {
423 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: setOnDataReady: no bound channel");
424 }
425 m_channelConsumer->setOnDataReady(opaque, std::move(onDataReady));
426 }
427
428 // Rx Operations
429 dwStatus wait(dwTime_t timeout)
430 {
431 if (!isBound())
432 {
433 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
434 }
435
436 // For synced packets, the wait can return DW_NOT_AVAILABLE or DW_SUCCESS
437 // if there are no packets to consume. This is because you need to consume
438 // a packet to make sure it's valid or not.
440 {
441 return DW_SUCCESS;
442 }
444 {
445 return DW_NOT_AVAILABLE;
446 }
448 {
449 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
450 timeout = 0;
451 }
452
453 dwTime_t waitTime{m_last.get() != nullptr ? 0 : timeout};
454 dwStatus status{m_channelConsumer->wait(waitTime)};
455 if (m_last.get() != nullptr && (status == DW_TIME_OUT || status == DW_NOT_AVAILABLE))
456 {
457 return DW_SUCCESS;
458 }
459
460 return status;
461 }
462
463 // TODO(unknown): This function's prototype needs to change to properly propagate errors
464 virtual std::shared_ptr<T> recv()
465 {
466 GenericData data{};
467 std::shared_ptr<T> result{};
468 if (!isBound())
469 {
470 return nullptr;
471 }
472
473 // coverity[autosar_cpp14_a0_1_1_violation]
474 T* typedData{nullptr};
475 // coverity[autosar_cpp14_a0_1_1_violation]
476 void* releasePtr{nullptr};
477
479 {
480 // There is a valid packet to consume
482 releasePtr = data.getPointer();
484 }
486 {
487 // There is a buffered packet, but it's not ready to be consumed.
488 return nullptr;
489 }
490 else
491 {
492 dwStatus status{m_channelConsumer->recv(&data)};
493 if (status != DW_SUCCESS)
494 {
495 if (m_last != nullptr)
496 {
497 return m_last;
498 }
499 else
500 {
501 return nullptr;
502 }
503 }
505 {
506 typedData = BaseSyncHelper::extractSyncPacket(data);
507 if (!typedData)
508 {
509 return nullptr;
510 }
511 }
512 else
513 {
515 }
516 releasePtr = data.getPointer();
517 }
518
519 // don't rely on this class's member when releasing packet
520 ChannelObject::Consumer* channelConsumer{m_channelConsumer};
521 // coverity[autosar_cpp14_a5_1_9_violation]
522 result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
523 channelConsumer->release(releasePtr);
524 });
525 if (m_reuse)
526 {
527 m_last = result;
528 }
529
530 return result;
531 }
532
533 // coverity[autosar_cpp14_a2_10_5_violation] RFD Pending: TID-2053
535 {
536 if (!m_channelConsumer)
537 {
538 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
539 }
540
542 return payload->header;
543 }
544
546 {
547 if (!m_channelConsumer)
548 {
549 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
550 }
551 return m_channelConsumer->getSyncSignaler();
552 }
553
554 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
555 {
556 m_channelConsumer->getSyncSignaler().setSignalFences(BaseSyncHelper::getMetadataPacket(frame), fences);
557 }
558
560 {
561 if (!m_channelConsumer)
562 {
563 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
564 }
565 return m_channelConsumer->getSyncWaiter();
566 }
567
568 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
569 {
570 m_channelConsumer->getSyncWaiter().getWaitFences(BaseSyncHelper::getMetadataPacket(frame), fences);
571 }
572
573private:
574 ChannelObject::Consumer* m_channelConsumer;
575 bool m_reuse;
576 std::shared_ptr<T> m_last;
577 dw::core::Optional<SpecimenT> m_reference;
578 OnSetSyncAttrs m_waiterAttrs;
579 OnSetSyncAttrs m_signalerAttrs;
580};
581
582template <typename T>
583constexpr char PortInput<T>::LOG_TAG[];
584
585} // namespace framework
586} // namespace dw
587
588#endif // DW_FRAMEWORK_PORT_H_
virtual dwStatus recv(GenericData *data)=0
virtual dwStatus wait(dwTime_t timeout)=0
virtual void setOnDataReady(void *opaque, OnDataReady onDataReady)=0
dw::core::Function< void()> OnDataReady
Definition: Channel.hpp:143
virtual dwStatus get(GenericData *data)=0
virtual dwStatus send(void *data)=0
virtual void setSignalFences(void *data, dw::core::span< const NvSciSyncFence > postFences)=0
virtual void getWaitFences(void *data, dw::core::span< NvSciSyncFence > &waitFences)=0
virtual const ChannelParams & getParams() const =0
virtual Consumer * getConsumer(const GenericDataReference &ref)=0
virtual Producer * getProducer(const GenericDataReference &ref)=0
static dwStatus guard(TryBlock const &tryBlock, ::dw::core::Logger::Verbosity verbosity=::dw::core::Logger::Verbosity::ERROR)
Definition: Exception.hpp:167
virtual ~PortBase()=default
PortInput(OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:349
virtual std::shared_ptr< T > recv()
Definition: Port.hpp:464
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:429
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:545
PortInput(SpecimenT const &ref, OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:360
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:376
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:559
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:319
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:554
bool isBound() override
Definition: Port.hpp:414
static constexpr PortDirection DIRECTION
Definition: Port.hpp:316
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:534
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:419
~PortInput() override=default
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:568
PortInput(SpecimenT &&ref)
Definition: Port.hpp:332
PortInput(SpecimenT const &ref)
Definition: Port.hpp:324
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:192
static constexpr char LOG_TAG[]
Definition: Port.hpp:102
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
Definition: Port.hpp:149
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:237
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:143
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:178
PortOutput(SpecimenT const &ref)
Definition: Port.hpp:112
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:96
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:248
void populateDefaultMetadata(ChannelMetadata &header)
Definition: Port.hpp:277
bool isBound() final
Definition: Port.hpp:187
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
Definition: Port.hpp:129
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:257
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:262
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:271
static constexpr PortDirection DIRECTION
Definition: Port.hpp:93
PortOutput(SpecimenT &&ref)
Definition: Port.hpp:120
virtual dwStatus send(T *frame)
Definition: Port.hpp:224
ChannelObject * m_channel
Definition: Port.hpp:77
virtual dwStatus bindChannel(ChannelObject *channel)=0
virtual bool isBound()=0
virtual ChannelObject * getChannel()
Definition: Port.hpp:71
T * extractInternalPacket(GenericData genericData)
T * extractSyncPacket(GenericData genericData)
MetadataPayload * getMetadataPacket(T *frame)
void parseDataSynced(const ChannelParams &params) override
T * extractInternalPacket(GenericData genericData)
void parseDataSynced(const ChannelParams &params) override
MetadataPayload * getMetadataPacket(T *frame)
MetadataPayload * extractMetadata(GenericData packet)
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
OnSetSyncAttrs setSignalerAttributes
lambda to set the signaler attributes of the endpoint.
ChannelPacketTypeID packetTypeID
The ID of the type of the endpoint.
uint16_t validFields
Bit map defining which ChannelMetadata fields are set. See MetadataFlags.
uint32_t producerId
Id of the producer channel.
uint32_t iterationCount
Producer iteration count.
void setSequenceNumber(ChannelMetadata &header, uint32_t const &sequenceNum)
OnSetSyncAttrs setWaiterAttributes
lambda to set the waiter attributes of the endpoint.
@ METADATA_ITERATION_COUNT
Producer iteration count is set.
Definition: Buffer.hpp:40
ChannelPacketTypeID getNewPacketID(ChannelPacketTypeID packetTypeID)
void stampSyncCount(uint32_t &syncCountOut) const