Compute Graph Framework SDK Reference  5.10
Port.hpp
Go to the documentation of this file.
1
2//
3// Notice
4// ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
5// NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
6// THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
7// MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
8//
9// NVIDIA CORPORATION & AFFILIATES assumes no responsibility for the consequences of use of such
10// information or for any infringement of patents or other rights of third parties that may
11// result from its use. No license is granted by implication or otherwise under any patent
12// or patent rights of NVIDIA CORPORATION & AFFILIATES. No third party distribution is allowed unless
13// expressly authorized by NVIDIA. Details are subject to change without notice.
14// This code supersedes and replaces all information previously supplied.
15// NVIDIA CORPORATION & AFFILIATES products are not authorized for use as critical
16// components in life support devices or systems without express written approval of
17// NVIDIA CORPORATION & AFFILIATES.
18//
19// SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
20// SPDX-License-Identifier: LicenseRef-NvidiaProprietary
21//
22// NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
23// property and proprietary rights in and to this material, related
24// documentation and any modifications thereto. Any use, reproduction,
25// disclosure or distribution of this material and related documentation
26// without an express license agreement from NVIDIA CORPORATION or
27// its affiliates is strictly prohibited.
28//
30
31#ifndef DW_FRAMEWORK_PORT_H_
32#define DW_FRAMEWORK_PORT_H_
33
35#include <dw/core/base/Exception.hpp>
36#include <dw/core/logger/Logger.hpp>
38
39#include "SyncPortHelper.hpp"
40
41#include <nvscisync.h>
42#include <stdexcept>
43#include <string>
44
45namespace dw
46{
47namespace framework
48{
49
51enum class PortDirection : uint8_t
52{
53 INPUT = 0,
54 OUTPUT,
55 COUNT,
56};
57
59{
60public:
61 virtual ~PortBase() = default;
62};
63
65class Port : public PortBase
66{
67public:
68 virtual dwStatus bindChannel(ChannelObject* channel) = 0;
69 virtual dwStatus initialize() { return DW_SUCCESS; }
70 virtual bool isBound() = 0;
72 {
73 return m_channel;
74 };
75
76protected:
78};
79
81
88template <typename T>
89class PortOutput : public SyncPortHelperOutput<T>, public Port
90{
91public:
93 using ApiDataTypeT = T;
96
97 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
98
99 static constexpr char LOG_TAG[] = "PortOutput";
100
101private:
102 ChannelObject::Producer* m_channelProducer{};
103 SpecimenT m_reference{};
104 OnSetSyncAttrs m_waiterAttrs{};
105 OnSetSyncAttrs m_signalerAttrs{};
106 uint32_t m_sendSeqNum{};
107
108public:
109 explicit PortOutput(SpecimenT const& ref)
110 : m_reference(ref)
111 {
112 }
113 explicit PortOutput(SpecimenT&& ref)
114 : m_reference(std::move(ref))
115 {
116 }
117
118 explicit PortOutput(SpecimenT const& ref,
119 OnSetSyncAttrs signalerAttrs,
120 OnSetSyncAttrs waiterAttrs = {})
121 : m_reference(ref)
122 , m_waiterAttrs(waiterAttrs)
123 , m_signalerAttrs(signalerAttrs)
124 {
125 }
126
127 // Channel Bind
128 dwStatus bindChannel(ChannelObject* channel) override
129 {
130 auto ref = make_specimen<T>(&m_reference);
131 return bindChannelWithReference(channel, ref);
132 }
133
135 {
136 return Exception::guard([&] {
137 if (isBound())
138 {
139 // TODO(chale): this should be an Exception but applications are currently
140 // doing this. Those applications should be fixed.
141 DW_LOGE << "PortOutput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
142 return;
143 }
144 if (channel == nullptr)
145 {
146 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: expected channel != nullptr");
147 }
148 m_channel = channel;
151 ref.setWaiterAttributes = m_waiterAttrs;
152 ref.setSignalerAttributes = m_signalerAttrs;
153
154 m_channelProducer = channel->getProducer(ref);
155 if (m_channelProducer == nullptr)
156 {
157 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortOutput bindChannel: wrong channel implementations returned.");
158 }
159 },
160 dw::core::Logger::Verbosity::DEBUG);
161 }
162
164 {
165 if (!isBound())
166 {
167 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: setOnDataReady: no bound channel");
168 }
169 m_channelProducer->setOnDataReady(opaque, onDataReady);
170 }
171
172 bool isBound() final
173 {
174 return (m_channelProducer != nullptr);
175 }
176
177 dwStatus wait(dwTime_t timeout)
178 {
179 if (!isBound())
180 {
181 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
182 }
183
184 return m_channelProducer->wait(timeout);
185 }
186
187 // Node accessors
188 // TODO(unknown): This function's prototype needs to change to properly propagate errors
190 {
191 dwStatus status = DW_FAILURE;
192 GenericData genericData{};
193 if (m_channelProducer)
194 {
195 status = m_channelProducer->get(&genericData);
196 }
197
198 if (status != DW_SUCCESS)
199 {
200 return nullptr;
201 }
202
203 extractMetadata(genericData)->header.validFields = 0U;
204 return BaseSyncHelper::extractInternalPacket(genericData);
205 }
206
207 // Tx Operations
208 virtual dwStatus send(T* frame)
209 {
210 if (!m_channelProducer)
211 {
212 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
213 }
214
217 return m_channelProducer->send(payload);
218 }
219
221 {
222 if (!m_channelProducer)
223 {
224 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
225 }
226
228 return payload->header;
229 }
230
232 {
233 if (!m_channelProducer)
234 {
235 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
236 }
237 return m_channelProducer->getSyncSignaler();
238 }
239
240 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
241 {
242 m_channelProducer->getSyncSignaler().setSignalFences(BaseSyncHelper::getMetadataPacket(frame), fences);
243 }
244
246 {
247 if (!m_channelProducer)
248 {
249 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
250 }
251 return m_channelProducer->getSyncWaiter();
252 }
253
254 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
255 {
256 m_channelProducer->getSyncWaiter().getWaitFences(BaseSyncHelper::getMetadataPacket(frame), fences);
257 }
258
259protected:
261 {
262 header.sequenceNum = m_sendSeqNum;
263 m_sendSeqNum++;
264 header.producerId = 0;
265
267 {
269 header.validFields |= static_cast<uint16_t>(MetadataFlags::METADATA_ITERATION_COUNT);
270 }
271 }
272};
273
274template <typename T>
275constexpr char PortOutput<T>::LOG_TAG[];
276
278
285template <typename T>
286class PortInput : public SyncPortHelperInput<T>, public Port
287{
289 "Channel packet type not declared. Ensure channel packet type "
290 "handling is declared with DWFRAMEWORK_DECLARE_PACKET_TYPE_RELATION");
291
292 static constexpr char LOG_TAG[] = "PortInput";
293
294public:
296 using ApiDataTypeT = T;
299
300 static_assert(std::is_copy_constructible<SpecimenT>::value, "SpecimenT is not copy constructible");
301
302 explicit PortInput(SpecimenT const& ref)
303 : m_reference(ref)
304 {
305 }
306 explicit PortInput(SpecimenT&& ref)
307 : m_reference(std::move(ref))
308 {
309 }
310
312 {
313 }
314
315 explicit PortInput(OnSetSyncAttrs waiterAttrs,
316 OnSetSyncAttrs signalerAttrs = {})
317 : m_waiterAttrs(waiterAttrs)
318 , m_signalerAttrs(signalerAttrs)
319 {
320 }
321
322 explicit PortInput(SpecimenT const& ref,
323 OnSetSyncAttrs waiterAttrs,
324 OnSetSyncAttrs signalerAttrs = {})
325 : m_reference(ref)
326 , m_waiterAttrs(waiterAttrs)
327 , m_signalerAttrs(signalerAttrs)
328 {
329 }
330
331 ~PortInput() override = default;
332
333 // Channel Bind
334 dwStatus bindChannel(ChannelObject* channel) override
335 {
336 return Exception::guard([&] {
337 if (isBound())
338 {
339 // TODO(chale): this should be an Exception but applications are currently
340 // doing this. Those applications should be fixed.
341 DW_LOGE << "PortInput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
342 return;
343 }
344 if (channel == nullptr)
345 {
346 throw ExceptionWithStatus(DW_INVALID_ARGUMENT, "PortInput: bindChannel: expected channel != nullptr");
347 }
348 m_channel = channel;
349
351 auto ref = make_specimen<T>(nullptr);
352
353 if (m_reference)
354 {
355 ref = make_specimen<T>(&m_reference.value());
356 }
357
358 ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
359 ref.setWaiterAttributes = m_waiterAttrs;
360 ref.setSignalerAttributes = m_signalerAttrs;
361
362 m_channelConsumer = channel->getConsumer(ref);
363 if (m_channelConsumer == nullptr)
364 {
365 throw ExceptionWithStatus(DW_INTERNAL_ERROR, "PortInput bindChannel: wrong channel implementations returned.");
366 }
367 m_reuse = channel->getParams().getReuseEnabled();
368 },
369 dw::core::Logger::Verbosity::DEBUG);
370 }
371
372 bool isBound() override
373 {
374 return !(m_channelConsumer == nullptr);
375 }
376
378 {
379 if (!isBound())
380 {
381 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: setOnDataReady: no bound channel");
382 }
383 m_channelConsumer->setOnDataReady(opaque, onDataReady);
384 }
385
386 // Rx Operations
387 dwStatus wait(dwTime_t timeout)
388 {
389 if (!isBound())
390 {
391 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
392 }
393
394 // For synced packets, the wait can return DW_NOT_AVAILABLE or DW_SUCCESS
395 // if there are no packets to consume. This is because you need to consume
396 // a packet to make sure it's valid or not.
398 {
399 return DW_SUCCESS;
400 }
402 {
403 return DW_NOT_AVAILABLE;
404 }
406 {
407 timeout = 0;
408 }
409
410 dwTime_t waitTime = m_last ? 0 : timeout;
411 dwStatus status = m_channelConsumer->wait(waitTime);
412 if (m_last && (status == DW_TIME_OUT || status == DW_NOT_AVAILABLE))
413 {
414 return DW_SUCCESS;
415 }
416
417 return status;
418 }
419
420 // TODO(unknown): This function's prototype needs to change to properly propagate errors
421 virtual std::shared_ptr<T> recv()
422 {
423 GenericData data{};
424 std::shared_ptr<T> result;
425 if (!isBound())
426 {
427 return nullptr;
428 }
429
430 T* typedData = nullptr;
431 void* releasePtr = nullptr;
432
434 {
435 // There is a valid packet to consume
437 releasePtr = data.getPointer();
439 }
441 {
442 // There is a buffered packet, but it's not ready to be consumed.
443 return nullptr;
444 }
445 else
446 {
447 dwStatus status = m_channelConsumer->recv(&data);
448 if (status != DW_SUCCESS)
449 {
450 if (m_last != nullptr)
451 {
452 return m_last;
453 }
454 else
455 {
456 return nullptr;
457 }
458 }
459 releasePtr = data.getPointer();
460
462 {
463 typedData = BaseSyncHelper::extractSyncPacket(data);
464 if (!typedData)
465 {
466 return nullptr;
467 }
468 }
469 else
470 {
472 }
473 }
474
475 // don't rely on this class's member when releasing packet
476 auto* channelConsumer = m_channelConsumer;
477 result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
478 channelConsumer->release(releasePtr);
479 });
480 if (m_reuse)
481 {
482 m_last = result;
483 }
484
485 return result;
486 }
487
489 {
490 if (!m_channelConsumer)
491 {
492 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
493 }
494
496 return payload->header;
497 }
498
500 {
501 if (!m_channelConsumer)
502 {
503 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
504 }
505 return m_channelConsumer->getSyncSignaler();
506 }
507
508 void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
509 {
510 m_channelConsumer->getSyncSignaler().setSignalFences(BaseSyncHelper::getMetadataPacket(frame), fences);
511 }
512
514 {
515 if (!m_channelConsumer)
516 {
517 throw ExceptionWithStatus(DW_NOT_AVAILABLE, "PortInput: channel not bound");
518 }
519 return m_channelConsumer->getSyncWaiter();
520 }
521
522 void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
523 {
524 m_channelConsumer->getSyncWaiter().getWaitFences(BaseSyncHelper::getMetadataPacket(frame), fences);
525 }
526
527private:
528 ChannelObject::Consumer* m_channelConsumer{};
529 bool m_reuse{};
530 std::shared_ptr<T> m_last{};
531 dw::core::Optional<SpecimenT> m_reference{};
532 OnSetSyncAttrs m_waiterAttrs{};
533 OnSetSyncAttrs m_signalerAttrs{};
534};
535
536template <typename T>
537constexpr char PortInput<T>::LOG_TAG[];
538
539} // namespace framework
540} // namespace dw
541
542#endif // DW_FRAMEWORK_PORT_H_
virtual dwStatus recv(GenericData *data)=0
virtual dwStatus wait(dwTime_t timeout)=0
virtual void setOnDataReady(void *opaque, OnDataReady onDataReady)=0
dw::core::Function< void()> OnDataReady
Definition: Channel.hpp:134
virtual dwStatus get(GenericData *data)=0
virtual dwStatus send(void *data)=0
virtual void setSignalFences(void *data, dw::core::span< const NvSciSyncFence > postFences)=0
virtual void getWaitFences(void *data, dw::core::span< NvSciSyncFence > &waitFences)=0
virtual Producer * getProducer(GenericDataReference ref)=0
virtual const ChannelParams & getParams() const =0
virtual Consumer * getConsumer(GenericDataReference ref)=0
virtual ~PortBase()=default
PortInput(OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:315
virtual std::shared_ptr< T > recv()
Definition: Port.hpp:421
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:387
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:499
PortInput(SpecimenT const &ref, OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:322
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:334
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:513
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:297
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:508
bool isBound() override
Definition: Port.hpp:372
static constexpr PortDirection DIRECTION
Definition: Port.hpp:295
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:488
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:377
~PortInput() override=default
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:522
PortInput(SpecimenT &&ref)
Definition: Port.hpp:306
PortInput(SpecimenT const &ref)
Definition: Port.hpp:302
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:177
static constexpr char LOG_TAG[]
Definition: Port.hpp:99
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
Definition: Port.hpp:134
ChannelMetadata & getMetadata(T *frame)
Definition: Port.hpp:220
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:128
void setOnDataReady(void *opaque, ChannelObject::PacketPool::OnDataReady onDataReady)
Definition: Port.hpp:163
PortOutput(SpecimenT const &ref)
Definition: Port.hpp:109
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:94
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:231
void populateDefaultMetadata(ChannelMetadata &header)
Definition: Port.hpp:260
bool isBound() final
Definition: Port.hpp:172
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
Definition: Port.hpp:118
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:240
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:245
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:254
static constexpr PortDirection DIRECTION
Definition: Port.hpp:92
PortOutput(SpecimenT &&ref)
Definition: Port.hpp:113
virtual dwStatus send(T *frame)
Definition: Port.hpp:208
ChannelObject * m_channel
Definition: Port.hpp:77
virtual dwStatus bindChannel(ChannelObject *channel)=0
virtual bool isBound()=0
virtual dwStatus initialize()
Definition: Port.hpp:69
virtual ChannelObject * getChannel()
Definition: Port.hpp:71
T * extractInternalPacket(GenericData genericData)
T * extractSyncPacket(GenericData genericData)
MetadataPayload * getMetadataPacket(T *frame)
void parseDataSynced(const ChannelParams &params) override
T * extractInternalPacket(GenericData genericData)
void parseDataSynced(const ChannelParams &params) override
MetadataPayload * getMetadataPacket(T *frame)
MetadataPayload * extractMetadata(GenericData packet)
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
OnSetSyncAttrs setSignalerAttributes
lambda to set the signaler attributes of the endpoint.
ChannelPacketTypeID packetTypeID
The ID of the type of the endpoint.
OnSetSyncAttrs setWaiterAttributes
lambda to set the waiter attributes of the endpoint.
Definition: Buffer.hpp:40
ChannelPacketTypeID getNewPacketID(ChannelPacketTypeID packetTypeID)
void stampSyncCount(uint32_t &syncCountOut) const