Compute Graph Framework SDK Reference  5.22
Port.hpp
Go to the documentation of this file.
1
2//
3// Notice
4// ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
5// NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
6// THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
7// MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
8//
9// NVIDIA CORPORATION & AFFILIATES assumes no responsibility for the consequences of use of such
10// information or for any infringement of patents or other rights of third parties that may
11// result from its use. No license is granted by implication or otherwise under any patent
12// or patent rights of NVIDIA CORPORATION & AFFILIATES. No third party distribution is allowed unless
13// expressly authorized by NVIDIA. Details are subject to change without notice.
14// This code supersedes and replaces all information previously supplied.
15// NVIDIA CORPORATION & AFFILIATES products are not authorized for use as critical
16// components in life support devices or systems without express written approval of
17// NVIDIA CORPORATION & AFFILIATES.
18//
19// SPDX-FileCopyrightText: Copyright (c) 2018-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
20// SPDX-License-Identifier: LicenseRef-NvidiaProprietary
21//
22// NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
23// property and proprietary rights in and to this material, related
24// documentation and any modifications thereto. Any use, reproduction,
25// disclosure or distribution of this material and related documentation
26// without an express license agreement from NVIDIA CORPORATION or
27// its affiliates is strictly prohibited.
28//
30
31#ifndef DW_FRAMEWORK_PORT_H_
32#define DW_FRAMEWORK_PORT_H_
33
35#include <dwcgf/Exception.hpp>
36#include <dwshared/dwfoundation/dw/core/container/StringView.hpp>
37#include <dwshared/dwfoundation/dw/core/logger/Logger.hpp>
39
40#include "SyncPortHelper.hpp"
41
42#include <nvscisync.h>
43#include <stdexcept>
44#include <string>
45
46namespace dw
47{
48namespace framework
49{
50
51namespace detail
52{
53
54template <typename T>
55// coverity[autosar_cpp14_a2_10_5_violation] FP: nvbugs/3907242
56T* getBufferTyped(GenericData buffer)
57{
58 MetadataPayload* metadataPacket{extractMetadata(buffer)};
59 T* ptr{metadataPacket->data.template getData<T>()};
60
61 if (nullptr == ptr)
62 {
63 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "getBufferTyped: type mismatch");
64 }
65 return ptr;
66}
67
68template <typename T>
69struct vectorIterable
70{
71 static_assert(std::is_constructible<T>::value, "T must be constructible");
72
73 explicit vectorIterable(dw::core::VectorFixed<GenericData> allBuffers)
74 : m_allBuffers(std::move(allBuffers))
75 {
76 }
77
79 template <class TT>
80 class iterator : public dw::core::VectorFixed<GenericData>::iterator
81 {
82 static_assert(std::is_constructible<TT>::value, "TT must be constructible");
83
84 public:
85 using Base = dw::core::VectorFixed<GenericData>::iterator;
86 // Same naming is used in dwshared, hence keeping the iterator name and its accessors for now
87 iterator(Base&& base)
88 : Base(std::move(base))
89 {
90 }
91
92 const Base& baseFromThis() const
93 {
94 return *this;
95 }
96
97 TT* operator*() const
98 {
99 GenericData buffer{*baseFromThis()};
100 return getBufferTyped<TT>(buffer);
101 }
102 };
103
104 iterator<T> begin() { return iterator<T>(m_allBuffers.begin()); }
105
106 iterator<T> end() { return iterator<T>(m_allBuffers.end()); }
107
108private:
109 dw::core::VectorFixed<GenericData> m_allBuffers;
110};
111
112} // namespace detail
113
115enum class PortDirection : uint8_t
116{
117 INPUT = 0,
118 OUTPUT,
119};
120
121// coverity[autosar_cpp14_m3_4_1_violation] RFD Pending: TID-2586
123{
124public:
125 virtual ~PortBase() = default;
126};
127
129class Port : public PortBase
130{
131public:
132 virtual dwStatus bindChannel(ChannelObject* channel) = 0;
133 virtual bool isBound() = 0;
135 {
136 return m_channel;
137 };
138
139protected:
141};
142
144
151template <typename T>
152// coverity[autosar_cpp14_a10_1_1_violation]
153class PortOutput : public SyncPortHelperOutput<T>, public Port
154{
155 static_assert(std::is_constructible<T>::value, "T must be constructible");
156
157public:
159 // coverity[autosar_cpp14_a0_1_6_violation]
160 using ApiDataTypeT = T;
163
164 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
165
166 // coverity[autosar_cpp14_a2_10_5_violation]
167 static constexpr char LOG_TAG[]{"PortOutput"};
168
169private:
170 ChannelObject::Producer* m_channelProducer;
171 SpecimenT m_reference;
172 OnSetSyncAttrs m_waiterAttrs;
173 OnSetSyncAttrs m_signalerAttrs;
174 void* m_onDataReadyOpaque;
175 OnDataReady m_onDataReady;
176 uint32_t m_sendSeqNum;
177
178public:
179 explicit PortOutput(SpecimenT const& ref)
180 : PortOutput(ref, {})
181 {
182 }
183 explicit PortOutput(SpecimenT&& ref)
185 , Port()
186 , m_channelProducer(nullptr)
187 , m_reference(std::move(ref))
188 , m_onDataReadyOpaque()
189 , m_onDataReady()
190 , m_sendSeqNum(0U)
191 {
192 }
193
194 explicit PortOutput(SpecimenT const& ref,
195 OnSetSyncAttrs signalerAttrs,
196 OnSetSyncAttrs waiterAttrs = {})
198 , Port()
199 , m_channelProducer(nullptr)
200 , m_reference(ref)
201 , m_waiterAttrs(std::move(waiterAttrs))
202 , m_signalerAttrs(std::move(signalerAttrs))
203 , m_onDataReadyOpaque()
204 , m_onDataReady()
205 , m_sendSeqNum(0U)
206 {
207 }
208
209 // Channel Bind
210 dwStatus bindChannel(ChannelObject* channel) override
211 {
212 GenericDataReference ref{make_specimen<T>(&m_reference)};
213 return bindChannelWithReference(channel, ref);
214 }
215
217 {
219 [&] {
220 if (isBound())
221 {
222 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: port already bound");
223 }
224 if (nullptr == channel)
225 {
226 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: expected channel != nullptr");
227 }
228 m_channel = channel;
231 ref.setWaiterAttributes = m_waiterAttrs;
232 ref.setSignalerAttributes = m_signalerAttrs;
233 ref.onDataReadyOpaque = m_onDataReadyOpaque;
234 ref.onDataReady = m_onDataReady;
235
236 m_channelProducer = channel->getProducer(ref);
237 if (nullptr == m_channelProducer)
238 {
239 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortOutput bindChannel: wrong channel implementations returned.");
240 }
241 },
242 dw::core::Logger::Verbosity::DEBUG);
243 }
244
251 {
253 [&] {
254 if (isBound())
255 {
256 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindCbindChannelForPODTypePackethannel: port already bound");
257 }
258 if (nullptr == channel)
259 {
260 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindChannelForPODTypePacket: expected channel != nullptr");
261 }
262 if (ChannelType::SHMEM_LOCAL != channel->getParams().getType())
263 {
264 throw dw::core::ExceptionWithStatus(DW_CALL_NOT_ALLOWED, "PortOutput: bindChannelForPODTypePacket: setting channel to use POD type only allowed for local channels.");
265 }
266
268
269 // coverity[autosar_cpp14_a20_8_4_violation] FP: nvbugs/4552679
272 ref.typeSize = sizeof(T);
273 ref.data = GenericData(static_cast<T*>(nullptr));
274 ref.setWaiterAttributes = m_waiterAttrs;
275 ref.setSignalerAttributes = m_signalerAttrs;
276 ref.onDataReadyOpaque = m_onDataReadyOpaque;
277 ref.onDataReady = m_onDataReady;
278
279 m_channelProducer = channel->getProducer(ref);
280 if (nullptr == m_channelProducer)
281 {
282 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortOutput bindChannelForPODTypePacket: wrong channel implementations returned.");
283 }
284 },
285 dw::core::Logger::Verbosity::DEBUG);
286 }
287
288 void setOnDataReady(void* opaque, OnDataReady onDataReady)
289 {
290 if (isBound())
291 {
292 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: setOnDataReady: channel already bound");
293 }
294 m_onDataReadyOpaque = opaque;
295 m_onDataReady = std::move(onDataReady);
296 }
297
298 bool isBound() final
299 {
300 return (nullptr != m_channelProducer);
301 }
302
303 dwStatus wait(dwTime_t timeout)
304 {
305 if (!isBound())
306 {
307 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: wait: no bound channel");
308 }
309
310 return m_channelProducer->wait(timeout);
311 }
312
313 // Node accessors
314 // TODO(unknown): This function's prototype needs to change to properly propagate errors
316 {
317 dwStatus status{DW_FAILURE};
318 GenericData genericData{};
319 if (m_channelProducer)
320 {
321 status = m_channelProducer->get(&genericData);
322 }
323
324 if (DW_SUCCESS != status)
325 {
326 return nullptr;
327 }
328
329 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
330 extractMetadata(genericData)->header.validFields = 0U;
331 return BaseSyncHelper::extractInternalPacket(genericData);
332 }
333
334 // Tx Operations
335 virtual dwStatus send(T* frame, const dwTime_t* publishTimestamp = nullptr)
336 {
337 if (!m_channelProducer)
338 {
339 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
340 }
341
343 populateDefaultMetadata(payload->header, publishTimestamp);
344 return m_channelProducer->send(payload);
345 }
346
347 // coverity[autosar_cpp14_a2_10_5_violation]
349 {
350 if (!m_channelProducer)
351 {
352 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
353 }
354
356 return payload->header;
357 }
358
360 {
361 if (!m_channelProducer)
362 {
363 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
364 }
365 return m_channelProducer->getSyncSignaler();
366 }
367
368 void setSignalFences(T* frame, dw::core::span<const NvSciSyncFence> fences)
369 {
371 }
372
374 {
375 if (!m_channelProducer)
376 {
377 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
378 }
379 return m_channelProducer->getSyncWaiter();
380 }
381
382 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence>& fences)
383 {
385 }
386
390 detail::vectorIterable<T> getAllBufferIter()
391 {
392 return detail::vectorIterable<T>(m_channelProducer->getAllBuffers());
393 }
394
395protected:
396 void populateDefaultMetadata(ChannelMetadata& header, const dwTime_t* publishTimestamp)
397 {
398 setSequenceNumber(header, m_sendSeqNum);
399 if (m_sendSeqNum < std::numeric_limits<decltype(m_sendSeqNum)>::max())
400 {
401 m_sendSeqNum++;
402 }
403 else
404 {
405 m_sendSeqNum = std::numeric_limits<decltype(m_sendSeqNum)>::min();
406 }
407 setTimestamp(header, nullptr != publishTimestamp ? *publishTimestamp : m_channelProducer->getCurrentTime());
408 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
409 header.producerId = 0U;
410
412 {
414 header.validFields |= static_cast<uint16_t>(MetadataFlags::METADATA_ITERATION_COUNT);
415 }
416 }
417};
418
419template <typename T>
420constexpr char PortOutput<T>::LOG_TAG[];
421
423
430template <typename T>
431// coverity[autosar_cpp14_a10_1_1_violation]
432class PortInput : public SyncPortHelperInput<T>, public Port
433{
434 static_assert(std::is_constructible<T>::value, "T must be constructible");
436 "Channel packet type not declared. Ensure channel packet type "
437 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_POD "
438 "or DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
439 // coverity[autosar_cpp14_a2_10_5_violation]
440 static constexpr char LOG_TAG[]{"PortInput"};
441
442public:
444 // coverity[autosar_cpp14_a0_1_6_violation]
445 using ApiDataTypeT = T;
448
449 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
450
451 explicit PortInput(SpecimenT const& ref)
452 : PortInput(ref, OnSetSyncAttrs())
453 {
454 }
455 explicit PortInput(SpecimenT&& ref)
457 , Port()
458 , m_channelConsumer(nullptr)
459 , m_reuse(false)
460 , m_calledRecvImpl(RECV_API_CALLED_NONE)
461 , m_lastTypedData(nullptr)
462 , m_lastReleasePtr(nullptr)
463 , m_existingUniquePtr(false)
464 , m_reference(std::move(ref))
465 , m_waiterAttrs()
466 , m_signalerAttrs()
467 , m_onDataReadyOpaque()
468 , m_onDataReady()
469 {
470 }
471
474 {
475 }
476
477 explicit PortInput(OnSetSyncAttrs waiterAttrs,
478 OnSetSyncAttrs signalerAttrs = {})
480 , Port()
481 , m_channelConsumer(nullptr)
482 , m_reuse(false)
483 , m_calledRecvImpl(RECV_API_CALLED_NONE)
484 , m_lastTypedData(nullptr)
485 , m_lastReleasePtr(nullptr)
486 , m_existingUniquePtr(false)
487 , m_waiterAttrs(std::move(waiterAttrs))
488 , m_signalerAttrs(std::move(signalerAttrs))
489 , m_onDataReadyOpaque()
490 , m_onDataReady()
491 {
492 }
493
494 explicit PortInput(SpecimenT const& ref,
495 OnSetSyncAttrs waiterAttrs,
496 OnSetSyncAttrs signalerAttrs = {})
498 , Port()
499 , m_channelConsumer(nullptr)
500 , m_reuse(false)
501 , m_calledRecvImpl(RECV_API_CALLED_NONE)
502 , m_lastTypedData(nullptr)
503 , m_lastReleasePtr(nullptr)
504 , m_existingUniquePtr(false)
505 , m_reference(ref)
506 , m_waiterAttrs(std::move(waiterAttrs))
507 , m_signalerAttrs(std::move(signalerAttrs))
508 , m_onDataReadyOpaque()
509 , m_onDataReady()
510 {
511 }
512
513 ~PortInput() override
514 {
515 // release data cached for reuse
516 if (nullptr != m_channelConsumer && nullptr != m_lastReleasePtr)
517 {
518 if (m_existingUniquePtr.load())
519 {
520 DW_LOGE << dw::core::StringView{"~PortInput: Cannot release reused packet since the unique_ptr has not been returned by caller yet"} << Logger::State::endl;
521 }
522 else
523 {
524 static_cast<void>(m_channelConsumer->release(m_lastReleasePtr));
525 }
526 }
527 }
528
529 // Channel Bind
530 dwStatus bindChannel(ChannelObject* channel) override
531 {
533 [&] {
534 if (isBound())
535 {
536 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortInput: bindChannel: port already bound");
537 }
538 if (nullptr == channel)
539 {
540 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortInput: bindChannel: expected channel != nullptr");
541 }
542 m_channel = channel;
543
545 GenericDataReference ref{make_specimen<T>(nullptr)};
546
547 if (m_reference.has_value())
548 {
549 ref = make_specimen<T>(&m_reference.value());
550 }
551
552 ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
553 ref.setWaiterAttributes = m_waiterAttrs;
554 ref.setSignalerAttributes = m_signalerAttrs;
555 ref.onDataReadyOpaque = m_onDataReadyOpaque;
556 ref.onDataReady = m_onDataReady;
557
558 m_channelConsumer = channel->getConsumer(ref);
559 if (nullptr == m_channelConsumer)
560 {
561 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortInput bindChannel: wrong channel implementations returned.");
562 }
563 m_reuse = channel->getParams().getReuseEnabled();
564 },
565 dw::core::Logger::Verbosity::DEBUG);
566 }
567
568 bool isBound() override
569 {
570 return !(nullptr == m_channelConsumer);
571 }
572
573 void setOnDataReady(void* opaque, OnDataReady onDataReady)
574 {
575 if (isBound())
576 {
577 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: setOnDataReady: channel already bound");
578 }
579 m_onDataReadyOpaque = opaque;
580 m_onDataReady = std::move(onDataReady);
581 }
582
583 // Rx Operations
584 dwStatus wait(dwTime_t timeout)
585 {
586 if (!isBound())
587 {
588 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
589 }
590
591 // For synced packets, the wait can return DW_NOT_AVAILABLE or DW_SUCCESS
592 // if there are no packets to consume. This is because you need to consume
593 // a packet to make sure it's valid or not.
595 {
596 return DW_SUCCESS;
597 }
599 {
600 return DW_NOT_AVAILABLE;
601 }
603 {
604 // coverity[autosar_cpp14_a5_1_1_violation] RFD Accepted: TID-2056
605 timeout = 0;
606 }
607
608 bool hasLast{(nullptr != m_last.get()) || (nullptr != m_lastTypedData)};
609 dwTime_t waitTime{hasLast ? 0 : timeout};
610 dwStatus status{m_channelConsumer->wait(waitTime)};
611 if (hasLast && (DW_TIME_OUT == status || DW_NOT_AVAILABLE == status))
612 {
613 return DW_SUCCESS;
614 }
615
616 return status;
617 }
618
620 virtual std::shared_ptr<T> recv()
621 {
622 if (RECV_API_CALLED_RECV_UNIQUE == m_calledRecvImpl)
623 {
624 throw ExceptionWithStatus(DW_CALL_NOT_ALLOWED, "PortInput: recv() can't be called after calling recvUnique() before");
625 }
626 m_calledRecvImpl = RECV_API_CALLED_RECV;
627
628 GenericData data{};
629 std::shared_ptr<T> result{};
630 if (!isBound())
631 {
632 return nullptr;
633 }
634
635 // coverity[autosar_cpp14_a0_1_1_violation]
636 T* typedData{nullptr};
637 // coverity[autosar_cpp14_a0_1_1_violation]
638 void* releasePtr{nullptr};
639
641 {
642 // There is a valid packet to consume
644 releasePtr = data.getPointer();
646 }
648 {
649 // There is a buffered packet, but it's not ready to be consumed.
650 return nullptr;
651 }
652 else
653 {
654 dwStatus status{m_channelConsumer->recv(&data)};
655 if (DW_SUCCESS != status)
656 {
657 if (nullptr != m_last)
658 {
659 return m_last;
660 }
661 else
662 {
663 return nullptr;
664 }
665 }
667 {
668 typedData = BaseSyncHelper::extractSyncPacket(data);
669 if (!typedData)
670 {
671 return nullptr;
672 }
673 }
674 else
675 {
677 }
678 releasePtr = data.getPointer();
679 }
680
681 // don't rely on this class's member when releasing packet
682 ChannelObject::Consumer* channelConsumer{m_channelConsumer};
683 // coverity[autosar_cpp14_a5_1_9_violation] FP: nvbugs/4347682
684 result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
685 static_cast<void>(channelConsumer->release(releasePtr));
686 });
687 if (m_reuse)
688 {
689 m_last = result;
690 }
691
692 return result;
693 }
694
696 {
697 void operator()(T* p) const
698 {
699 if (nullptr == port)
700 {
701 // coverity[autosar_cpp14_a18_5_2_violation] RFD Accepted: TID-2417
702 delete p;
703 }
704 else
705 {
706 if (!port->m_reuse)
707 {
708 static_cast<void>(port->m_channelConsumer->release(releasePtr));
709 }
710 else
711 {
712 port->m_existingUniquePtr = false;
713 }
714 }
715 }
718 };
719
720 using UniquePacketPtr = std::unique_ptr<T, PacketDeleter>;
721
723 {
724 if (RECV_API_CALLED_RECV == m_calledRecvImpl)
725 {
726 throw ExceptionWithStatus(DW_CALL_NOT_ALLOWED, "PortInput: recvUnique() can't be called after calling recv() before");
727 }
728 m_calledRecvImpl = RECV_API_CALLED_RECV_UNIQUE;
729
730 if (!isBound())
731 {
732 return nullptr;
733 }
734 GenericData data{};
735
736 // coverity[autosar_cpp14_a0_1_1_violation]
737 T* typedData{nullptr};
738 // coverity[autosar_cpp14_a0_1_1_violation]
739 void* releasePtr{nullptr};
741 {
742 // There is a valid packet to consume
744 releasePtr = data.getPointer();
746 }
748 {
749 // There is a buffered packet, but it's not ready to be consumed.
750 return nullptr;
751 }
752 else
753 {
754 dwStatus status{m_channelConsumer->recv(&data)};
755 if (DW_SUCCESS != status)
756 {
757 return makeUniquePtr();
758 }
760 {
761 typedData = BaseSyncHelper::extractSyncPacket(data);
762 if (!typedData)
763 {
764 return nullptr;
765 }
766 }
767 else
768 {
770 }
771 releasePtr = data.getPointer();
772 }
773
774 return makeUniquePtr(typedData, releasePtr);
775 }
776
777 // coverity[autosar_cpp14_a2_10_5_violation]
779 {
780 if (!m_channelConsumer)
781 {
782 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
783 }
784
786 return payload->header;
787 }
788
790 {
791 if (!m_channelConsumer)
792 {
793 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
794 }
795 return m_channelConsumer->getSyncSignaler();
796 }
797
798 void setSignalFences(T* frame, dw::core::span<const NvSciSyncFence> fences)
799 {
801 }
802
804 {
805 if (!m_channelConsumer)
806 {
807 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
808 }
809 return m_channelConsumer->getSyncWaiter();
810 }
811
812 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence>& fences)
813 {
815 }
816
820 detail::vectorIterable<T> getAllBufferIter()
821 {
822 return detail::vectorIterable<T>(m_channelConsumer->getAllBuffers());
823 }
824
825private:
826 UniquePacketPtr makeUniquePtr(T* typedData = nullptr, void* releasePtr = nullptr)
827 {
828 if (!m_reuse)
829 {
830 return UniquePacketPtr(typedData, PacketDeleter{this, releasePtr});
831 }
832
833 if (m_existingUniquePtr.load())
834 {
835 // never hand out more than one shared_ptr when reuse is enabled
836 // the caller must release the previous shared_ptr before requesting a new one with recv()
837 // indenpendent if the new unique_ptr refers to the same reused packet or a new one
838 throw ExceptionWithStatus(DW_CALL_NOT_ALLOWED, "Cannot return unique_ptr of reused packet since previous unique_ptr has not been returned");
839 }
840
841 if (nullptr == typedData && nullptr == m_lastTypedData)
842 {
843 return nullptr;
844 }
845
846 if (nullptr != typedData)
847 {
848 if (nullptr != m_lastTypedData)
849 {
850 // release previous data when new data has been received
851 static_cast<void>(m_channelConsumer->release(m_lastReleasePtr));
852 }
853 m_lastTypedData = typedData;
854 m_lastReleasePtr = releasePtr;
855 }
856
857 m_existingUniquePtr = true;
858 return std::move(UniquePacketPtr(m_lastTypedData, PacketDeleter{this, releasePtr}));
859 }
860
862 static constexpr uint8_t RECV_API_CALLED_NONE{0U};
864 static constexpr uint8_t RECV_API_CALLED_RECV{1U};
866 static constexpr uint8_t RECV_API_CALLED_RECV_UNIQUE{2U};
867
868 ChannelObject::Consumer* m_channelConsumer;
869 bool m_reuse;
871 uint8_t m_calledRecvImpl;
873 std::shared_ptr<T> m_last;
875 T* m_lastTypedData;
877 void* m_lastReleasePtr;
879 std::atomic<bool> m_existingUniquePtr;
880 dw::core::Optional<SpecimenT> m_reference;
881 OnSetSyncAttrs m_waiterAttrs;
882 OnSetSyncAttrs m_signalerAttrs;
883 void* m_onDataReadyOpaque;
884 OnDataReady m_onDataReady;
885};
886
887template <typename T>
888constexpr char PortInput<T>::LOG_TAG[];
889
890} // namespace framework
891} // namespace dw
892
893#endif // DW_FRAMEWORK_PORT_H_
virtual dwStatus recv(GenericData *data)=0
virtual dwStatus release(void *data)=0
virtual dwStatus wait(dwTime_t timeout)=0
virtual dw::core::VectorFixed< GenericData > getAllBuffers()=0
virtual dwStatus get(GenericData *data)=0
virtual dwStatus send(void *data)=0
virtual void setSignalFences(void *data, dw::core::span< const NvSciSyncFence > postFences)=0
virtual void getWaitFences(void *data, dw::core::span< NvSciSyncFence > &waitFences)=0
virtual const ChannelParams & getParams() const =0
virtual Consumer * getConsumer(const GenericDataReference &ref)=0
virtual Producer * getProducer(const GenericDataReference &ref)=0
static dwStatus guard(TryBlock const &tryBlock, ::dw::core::Logger::Verbosity verbosity=::dw::core::Logger::Verbosity::ERROR)
Definition: Exception.hpp:167
virtual ~PortBase()=default
PortInput(OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:477
virtual std::shared_ptr< T > recv()
Definition: Port.hpp:620
virtual UniquePacketPtr recvUnique()
Definition: Port.hpp:722
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:584
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:789
PortInput(SpecimenT const &ref, OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:494
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:530
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:803
std::unique_ptr< T, PacketDeleter > UniquePacketPtr
Definition: Port.hpp:720
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:446
void setSignalFences(T *frame, dw::core::span< const NvSciSyncFence > fences)
Definition: Port.hpp:798
~PortInput() override
Definition: Port.hpp:513
bool isBound() override
Definition: Port.hpp:568
static constexpr PortDirection DIRECTION
Definition: Port.hpp:443
detail::vectorIterable< T > getAllBufferIter()
Definition: Port.hpp:820
void setOnDataReady(void *opaque, OnDataReady onDataReady)
Definition: Port.hpp:573
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:778
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > &fences)
Definition: Port.hpp:812
PortInput(SpecimenT &&ref)
Definition: Port.hpp:455
PortInput(SpecimenT const &ref)
Definition: Port.hpp:451
detail::vectorIterable< T > getAllBufferIter()
Definition: Port.hpp:390
virtual dwStatus send(T *frame, const dwTime_t *publishTimestamp=nullptr)
Definition: Port.hpp:335
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:303
static constexpr char LOG_TAG[]
Definition: Port.hpp:167
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
Definition: Port.hpp:216
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:348
dwStatus bindChannelForPODTypePacket(ChannelObject *channel)
Definition: Port.hpp:250
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:210
void setOnDataReady(void *opaque, OnDataReady onDataReady)
Definition: Port.hpp:288
PortOutput(SpecimenT const &ref)
Definition: Port.hpp:179
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:161
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:359
bool isBound() final
Definition: Port.hpp:298
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
Definition: Port.hpp:194
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:373
static constexpr PortDirection DIRECTION
Definition: Port.hpp:158
void populateDefaultMetadata(ChannelMetadata &header, const dwTime_t *publishTimestamp)
Definition: Port.hpp:396
PortOutput(SpecimenT &&ref)
Definition: Port.hpp:183
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > &fences)
Definition: Port.hpp:382
void setSignalFences(T *frame, dw::core::span< const NvSciSyncFence > fences)
Definition: Port.hpp:368
ChannelObject * m_channel
Definition: Port.hpp:140
virtual dwStatus bindChannel(ChannelObject *channel)=0
virtual bool isBound()=0
virtual ChannelObject * getChannel()
Definition: Port.hpp:134
T * extractInternalPacket(GenericData genericData)
T * extractSyncPacket(GenericData genericData)
MetadataPayload * getMetadataPacket(T *frame)
void parseDataSynced(const ChannelParams &params) override
T * extractInternalPacket(GenericData genericData)
void parseDataSynced(const ChannelParams &params) override
MetadataPayload * getMetadataPacket(T *frame)
OnDataReady onDataReady
lambda to handle data ready
static constexpr const uint32_t DWFRAMEWORK_METADATA_PACKET_TYPE_ID_OFFSET
MetadataPayload * extractMetadata(GenericData packet)
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
void setTimestamp(ChannelMetadata &header, dwTime_t const &timestamp)
OnSetSyncAttrs setSignalerAttributes
lambda to set the signaler attributes of the endpoint.
ChannelPacketTypeID packetTypeID
The ID of the type of the endpoint.
uint16_t validFields
Bit map defining which ChannelMetadata fields are set. See MetadataFlags.
dw::core::Function< void()> OnDataReady
uint32_t ChannelPacketTypeID
constexpr ChannelPacketTypeID DWFRAMEWORK_PACKET_ID_DEFAULT
uint32_t producerId
Id of the producer channel.
typename ManagedPortInput< T >::UniquePacketPtr UniquePacketPtr
uint32_t iterationCount
Producer iteration count.
void setSequenceNumber(ChannelMetadata &header, uint32_t const &sequenceNum)
OnSetSyncAttrs setWaiterAttributes
lambda to set the waiter attributes of the endpoint.
@ METADATA_ITERATION_COUNT
Producer iteration count is set.
void * onDataReadyOpaque
pointer hint for data ready
@ SHMEM_LOCAL
local shared memory
Definition: Buffer.hpp:41
ChannelPacketTypeID getNewPacketID(ChannelPacketTypeID packetTypeID)
void stampSyncCount(uint32_t &syncCountOut) const