Compute Graph Framework SDK Reference
5.4.5418 Release
For Test and Development only

Port.hpp
Go to the documentation of this file.
1 //
3 // Notice
4 // ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
5 // NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
6 // THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
7 // MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
8 //
9 // NVIDIA CORPORATION & AFFILIATES assumes no responsibility for the consequences of use of such
10 // information or for any infringement of patents or other rights of third parties that may
11 // result from its use. No license is granted by implication or otherwise under any patent
12 // or patent rights of NVIDIA CORPORATION & AFFILIATES. No third party distribution is allowed unless
13 // expressly authorized by NVIDIA. Details are subject to change without notice.
14 // This code supersedes and replaces all information previously supplied.
15 // NVIDIA CORPORATION & AFFILIATES products are not authorized for use as critical
16 // components in life support devices or systems without express written approval of
17 // NVIDIA CORPORATION & AFFILIATES.
18 //
19 // SPDX-FileCopyrightText: Copyright (c) 2018-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
20 // SPDX-License-Identifier: LicenseRef-NvidiaProprietary
21 //
22 // NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
23 // property and proprietary rights in and to this material, related
24 // documentation and any modifications thereto. Any use, reproduction,
25 // disclosure or distribution of this material and related documentation
26 // without an express license agreement from NVIDIA CORPORATION or
27 // its affiliates is strictly prohibited.
28 //
30 
31 #ifndef DW_FRAMEWORK_PORT_H_
32 #define DW_FRAMEWORK_PORT_H_
33 
35 
36 #include "SyncPortHelper.hpp"
37 
38 #include <stdexcept>
39 #include <string>
40 
41 namespace dw
42 {
43 namespace framework
44 {
45 
47 enum PortDirection : uint8_t
48 {
49  INPUT = 0,
52 };
53 
54 class PortBase
55 {
56 public:
57  virtual ~PortBase() = default;
58 };
59 
61 class Port : public PortBase
62 {
63 public:
64  virtual dwStatus bindChannel(ChannelObject* channel) = 0;
65  virtual dwStatus initialize() { return DW_SUCCESS; }
66  virtual bool isBound() = 0;
67 };
68 
70 
77 template <typename T>
78 class PortOutput : public SyncPortHelperOutput<T>, public Port
79 {
80 public:
81  static constexpr PortDirection DIRECTION = PortDirection::OUTPUT;
82  using ApiDataTypeT = T;
85 
86  static constexpr char LOG_TAG[] = "PortOutput";
87 
88 private:
89  ChannelObject::Producer* m_channelProducer{};
90  SpecimenT m_reference{};
91  OnSetSyncAttrs m_waiterAttrs{};
92  OnSetSyncAttrs m_signalerAttrs{};
93 
94 public:
95  explicit PortOutput(SpecimenT const& ref)
96  : m_reference(ref)
97  {
98  }
99  explicit PortOutput(SpecimenT&& ref)
100  : m_reference(std::move(ref))
101  {
102  }
103 
104  explicit PortOutput(SpecimenT const& ref,
105  OnSetSyncAttrs signalerAttrs,
106  OnSetSyncAttrs waiterAttrs = {})
107  : m_reference(ref)
108  , m_waiterAttrs(waiterAttrs)
109  , m_signalerAttrs(signalerAttrs)
110  {
111  }
112 
113  // Channel Bind
114  dwStatus bindChannel(ChannelObject* channel) override
115  {
116  auto ref = make_specimen<T>(&m_reference);
117  return bindChannelWithReference(channel, ref);
118  }
119 
121  {
122  return Exception::guard([&] {
123  if (isBound())
124  {
125  // TODO(chale): this should be an Exception but applications are currently
126  // doing this. Those applications should be fixed.
127  FRWK_LOGE << "PortOutput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
128  return;
129  }
130  if (channel == nullptr)
131  {
132  throw Exception(DW_INVALID_ARGUMENT, "PortOutput: bindChannel: expected channel != nullptr");
133  }
134 
135  BaseSyncHelper::parseDataSynced(channel->getParams());
136 
137  if (BaseSyncHelper::isDataSynced())
138  {
139  ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
140  }
141 
142  ref.setWaiterAttributes = m_waiterAttrs;
143  ref.setSignalerAttributes = m_signalerAttrs;
144 
145  m_channelProducer = channel->getProducer(ref);
146  if (m_channelProducer == nullptr)
147  {
148  throw Exception(DW_INTERNAL_ERROR, "PortOutput bindChannel: wrong channel implementations returned.");
149  }
150  });
151  }
152 
153  void setOnDataReady(void* opaque, IChannelPacketFactory::OnDataReady onDataReady)
154  {
155  if (!isBound())
156  {
157  throw Exception(DW_NOT_AVAILABLE, "PortOutput: setOnDataReady: no bound channel");
158  }
159  m_channelProducer->setOnDataReady(opaque, onDataReady);
160  }
161 
162  bool isBound() final
163  {
164  return (m_channelProducer != nullptr);
165  }
166 
167  dwStatus wait(dwTime_t timeout)
168  {
169  if (!isBound())
170  {
171  throw Exception(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
172  }
173 
174  return m_channelProducer->wait(timeout);
175  }
176 
177  // Node accessors
178  // TODO(unknown): This function's prototype needs to change to properly propagate errors
180  {
181  dwStatus status = DW_FAILURE;
182  GenericData genericData{};
183  if (m_channelProducer)
184  {
185  status = m_channelProducer->get(&genericData);
186  }
187 
188  if (status != DW_SUCCESS)
189  {
190  return nullptr;
191  }
192 
193  if (BaseSyncHelper::isDataSynced())
194  {
195  return BaseSyncHelper::extractInternalPacket(genericData);
196  }
197 
198  return genericData.template getData<T>();
199  }
200 
201  // Tx Operations
202  virtual dwStatus send(T* frame)
203  {
204  if (!m_channelProducer)
205  {
206  throw Exception(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
207  }
208 
209  if (BaseSyncHelper::isDataSynced())
210  {
211  return m_channelProducer->send(BaseSyncHelper::getSyncPacket(frame));
212  }
213 
214  return m_channelProducer->send(frame);
215  }
216 
218  {
219  if (!m_channelProducer)
220  {
221  throw Exception(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
222  }
223  return m_channelProducer->getSyncSignaler();
224  }
225 
226  void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
227  {
228  if (BaseSyncHelper::isDataSynced())
229  {
230  m_channelProducer->getSyncSignaler().setSignalFences(BaseSyncHelper::getSyncPacket(frame), fences);
231  }
232  else
233  {
234  m_channelProducer->getSyncSignaler().setSignalFences(frame, fences);
235  }
236  }
237 
239  {
240  if (!m_channelProducer)
241  {
242  throw Exception(DW_NOT_AVAILABLE, "PortOutput: channel not bound");
243  }
244  return m_channelProducer->getSyncWaiter();
245  }
246 
247  void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
248  {
249  if (BaseSyncHelper::isDataSynced())
250  {
251  m_channelProducer->getSyncWaiter().getWaitFences(BaseSyncHelper::getSyncPacket(frame), fences);
252  }
253  else
254  {
255  m_channelProducer->getSyncWaiter().getWaitFences(frame, fences);
256  }
257  }
258 };
259 
260 template <typename T>
261 constexpr char PortOutput<T>::LOG_TAG[];
262 
264 
271 template <typename T>
272 class PortInput : public SyncPortHelperInput<T>, public Port
273 {
274  static constexpr char LOG_TAG[] = "PortInput";
275 
276 public:
277  static constexpr PortDirection DIRECTION = PortDirection::INPUT;
278  using ApiDataTypeT = T;
281 
282  explicit PortInput(SpecimenT const& ref)
283  : m_reference(ref)
284  {
285  }
286  explicit PortInput(SpecimenT&& ref)
287  : m_reference(std::move(ref))
288  {
289  }
290 
292  {
293  }
294 
295  explicit PortInput(OnSetSyncAttrs waiterAttrs,
296  OnSetSyncAttrs signalerAttrs = {})
297  : m_waiterAttrs(waiterAttrs)
298  , m_signalerAttrs(signalerAttrs)
299  {
300  }
301 
302  explicit PortInput(SpecimenT const& ref,
303  OnSetSyncAttrs waiterAttrs,
304  OnSetSyncAttrs signalerAttrs = {})
305  : m_reference(ref)
306  , m_waiterAttrs(waiterAttrs)
307  , m_signalerAttrs(signalerAttrs)
308  {
309  }
310 
311  ~PortInput() override = default;
312 
313  // Channel Bind
314  dwStatus bindChannel(ChannelObject* channel) override
315  {
316  return Exception::guard([&] {
317  if (isBound())
318  {
319  // TODO(chale): this should be an Exception but applications are currently
320  // doing this. Those applications should be fixed.
321  FRWK_LOGE << "PortInput: bindChannel: attempted to bind the same port twice, ignoring this bind!" << Logger::State::endl;
322  return;
323  }
324  if (channel == nullptr)
325  {
326  throw Exception(DW_INVALID_ARGUMENT, "PortInput: bindChannel: expected channel != nullptr");
327  }
328 
329  BaseSyncHelper::parseDataSynced(channel->getParams());
330  auto ref = make_specimen<T>(nullptr);
331 
332  if (m_reference)
333  {
334  ref = make_specimen<T>(&m_reference.value());
335  }
336 
337  if (BaseSyncHelper::isDataSynced())
338  {
339  ref.packetTypeID = BaseSyncHelper::getNewPacketID(ref.packetTypeID);
340  }
341 
342  ref.setWaiterAttributes = m_waiterAttrs;
343  ref.setSignalerAttributes = m_signalerAttrs;
344 
345  m_channelConsumer = channel->getConsumer(ref);
346  if (m_channelConsumer == nullptr)
347  {
348  throw Exception(DW_INTERNAL_ERROR, "PortInput bindChannel: wrong channel implementations returned.");
349  }
350  m_reuse = channel->getParams().getReuseEnabled();
351  });
352  }
353 
354  bool isBound() override
355  {
356  return !(m_channelConsumer == nullptr);
357  }
358 
359  void setOnDataReady(void* opaque, IChannelPacketFactory::OnDataReady onDataReady)
360  {
361  if (!isBound())
362  {
363  throw Exception(DW_NOT_AVAILABLE, "PortInput: setOnDataReady: no bound channel");
364  }
365  m_channelConsumer->setOnDataReady(opaque, onDataReady);
366  }
367 
368  // Rx Operations
369  dwStatus wait(dwTime_t timeout)
370  {
371  if (!isBound())
372  {
373  throw Exception(DW_NOT_AVAILABLE, "PortInput: wait: no bound channel");
374  }
375 
376  // For synced packets, the wait can return DW_NOT_AVAILABLE or DW_SUCCESS
377  // if there are no packets to consume. This is because you need to consume
378  // a packet to make sure it's valid or not.
379  if (BaseSyncHelper::isValidPacketBuffered())
380  {
381  return DW_SUCCESS;
382  }
383  else if (BaseSyncHelper::isPacketBuffered())
384  {
385  return DW_NOT_AVAILABLE;
386  }
387  else if (BaseSyncHelper::isDataSynced())
388  {
389  timeout = 0;
390  }
391 
392  dwTime_t waitTime = m_last ? 0 : timeout;
393  dwStatus status = m_channelConsumer->wait(waitTime);
394  if (m_last && (status == DW_TIME_OUT || status == DW_NOT_AVAILABLE))
395  {
396  return DW_SUCCESS;
397  }
398 
399  return status;
400  }
401 
402  // TODO(unknown): This function's prototype needs to change to properly propagate errors
403  virtual std::shared_ptr<T> recv()
404  {
405  GenericData data{};
406  std::shared_ptr<T> result;
407  if (!isBound())
408  {
409  return nullptr;
410  }
411 
412  T* typedData = nullptr;
413  void* releasePtr = nullptr;
414 
415  if (BaseSyncHelper::isValidPacketBuffered())
416  {
417  // There is a valid packet to consume
418  data = BaseSyncHelper::getBufferedPacket();
419  releasePtr = data.getPointer();
420  typedData = BaseSyncHelper::extractInternalPacket(data);
421  }
422  else if (BaseSyncHelper::isPacketBuffered())
423  {
424  // There is a buffered packet, but it's not ready to be consumed.
425  return nullptr;
426  }
427  else
428  {
429  dwStatus status = m_channelConsumer->recv(&data);
430  if (status != DW_SUCCESS)
431  {
432  if (m_last != nullptr)
433  {
434  return m_last;
435  }
436  else
437  {
438  return nullptr;
439  }
440  }
441  releasePtr = data.getPointer();
442 
443  if (BaseSyncHelper::isDataSynced())
444  {
445  typedData = BaseSyncHelper::extractInternalPacket(data);
446  if (!typedData)
447  {
448  return nullptr;
449  }
450  }
451  else
452  {
453  typedData = data.template getData<T>();
454  }
455  }
456 
457  // don't rely on this class's member when releasing packet
458  auto* channelConsumer = m_channelConsumer;
459  result = std::shared_ptr<T>(typedData, [channelConsumer, releasePtr](T*) {
460  channelConsumer->release(releasePtr);
461  });
462  if (m_reuse)
463  {
464  m_last = result;
465  }
466 
467  return result;
468  }
469 
471  {
472  if (!m_channelConsumer)
473  {
474  throw Exception(DW_NOT_AVAILABLE, "PortInput: channel not bound");
475  }
476  return m_channelConsumer->getSyncSignaler();
477  }
478 
479  void setSignalFences(T* frame, dw::core::span<NvSciSyncFence> fences)
480  {
481  if (BaseSyncHelper::isDataSynced())
482  {
483  throw Exception(DW_NOT_SUPPORTED, "PortInput: not supported");
484  }
485  else
486  {
487  m_channelConsumer->getSyncSignaler().setSignalFences(frame, fences);
488  }
489  }
490 
492  {
493  if (!m_channelConsumer)
494  {
495  throw Exception(DW_NOT_AVAILABLE, "PortInput: channel not bound");
496  }
497  return m_channelConsumer->getSyncWaiter();
498  }
499 
500  void getWaitFences(T* frame, dw::core::span<NvSciSyncFence> fences)
501  {
502  if (BaseSyncHelper::isDataSynced())
503  {
504  throw Exception(DW_NOT_SUPPORTED, "PortInput: not supported");
505  }
506  else
507  {
508  m_channelConsumer->getSyncWaiter().getWaitFences(frame, fences);
509  }
510  }
511 
512 private:
513  ChannelObject::Consumer* m_channelConsumer{};
514  bool m_reuse{};
515  std::shared_ptr<T> m_last{};
516  dw::core::Optional<SpecimenT> m_reference{};
517  OnSetSyncAttrs m_waiterAttrs{};
518  OnSetSyncAttrs m_signalerAttrs{};
519 };
520 
521 template <typename T>
522 constexpr char PortInput<T>::LOG_TAG[];
523 
524 } // namespace framework
525 } // namespace dw
526 
527 #endif // DW_FRAMEWORK_PORT_H_
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:314
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:167
PortOutput mimics an Output Block.
Definition: Port.hpp:78
virtual dwStatus send(T *frame)
Definition: Port.hpp:202
PortInput(SpecimenT const &ref)
Definition: Port.hpp:282
virtual Producer * getProducer(GenericDataReference ref)=0
Register a producer client.
void setOnDataReady(void *opaque, IChannelPacketFactory::OnDataReady onDataReady)
Definition: Port.hpp:153
static dwStatus guard(TryBlock tryBlock)
Same as previous guard but with a simpler tryBlock with signature &#39;void tryBlock()&#39; Always returns DW...
Definition: Exception.hpp:228
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:217
Child interface to consume packets on the channel.
Definition: Channel.hpp:173
virtual Consumer * getConsumer(GenericDataReference ref)=0
Register a consumer client.
PortOutput(SpecimenT const &ref, OnSetSyncAttrs signalerAttrs, OnSetSyncAttrs waiterAttrs={})
Definition: Port.hpp:104
dwStatus bindChannel(ChannelObject *channel) override
Definition: Port.hpp:114
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:226
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:83
PortOutput(SpecimenT &&ref)
Definition: Port.hpp:99
PortInput(SpecimenT const &ref, OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:302
dwStatus bindChannelWithReference(ChannelObject *channel, GenericDataReference &ref)
Definition: Port.hpp:120
dw::core::Function< void(NvSciSyncAttrList)> OnSetSyncAttrs
Function signature to call back to the application to set the sync attributes when needed...
PortInput mimics an Output Block.
Definition: Port.hpp:272
void setOnDataReady(void *opaque, IChannelPacketFactory::OnDataReady onDataReady)
Definition: Port.hpp:359
void setSignalFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:479
PortInput(SpecimenT &&ref)
Definition: Port.hpp:286
#define FRWK_LOGE
Definition: Logger.hpp:41
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:247
bool isBound() override
Definition: Port.hpp:354
Definition: Exception.hpp:46
typename parameter_traits< T >::SpecimenT SpecimenT
Definition: Port.hpp:279
bool isBound() final
Definition: Port.hpp:162
virtual std::shared_ptr< T > recv()
Definition: Port.hpp:403
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:491
dw::core::Function< void()> OnDataReady
virtual const ChannelParams & getParams() const =0
Get the parameters for this channel.
virtual ~PortBase()=default
Child interface to produce packets on the channel.
Definition: Channel.hpp:148
PortInput(OnSetSyncAttrs waiterAttrs, OnSetSyncAttrs signalerAttrs={})
Definition: Port.hpp:295
ChannelObject::SyncWaiter & getSyncWaiter()
Definition: Port.hpp:238
ChannelObject::SyncSignaler & getSyncSignaler()
Definition: Port.hpp:470
PortOutput(SpecimenT const &ref)
Definition: Port.hpp:95
virtual dwStatus initialize()
Definition: Port.hpp:65
void getWaitFences(T *frame, dw::core::span< NvSciSyncFence > fences)
Definition: Port.hpp:500
dwStatus wait(dwTime_t timeout)
Definition: Port.hpp:369