Compute Graph Framework SDK Reference  5.8
dwSensorDrainerTemplate.hpp
Go to the documentation of this file.
1
2// This code contains NVIDIA Confidential Information and is disclosed
3// under the Mutual Non-Disclosure Agreement.
4//
5// Notice
6// ALL NVIDIA DESIGN SPECIFICATIONS AND CODE ("MATERIALS") ARE PROVIDED "AS IS" NVIDIA MAKES
7// NO REPRESENTATIONS, WARRANTIES, EXPRESSED, IMPLIED, STATUTORY, OR OTHERWISE WITH RESPECT TO
8// THE MATERIALS, AND EXPRESSLY DISCLAIMS ANY IMPLIED WARRANTIES OF NONINFRINGEMENT,
9// MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
10//
11// NVIDIA Corporation assumes no responsibility for the consequences of use of such
12// information or for any infringement of patents or other rights of third parties that may
13// result from its use. No license is granted by implication or otherwise under any patent
14// or patent rights of NVIDIA Corporation. No third party distribution is allowed unless
15// expressly authorized by NVIDIA. Details are subject to change without notice.
16// This code supersedes and replaces all information previously supplied.
17// NVIDIA Corporation products are not authorized for use as critical
18// components in life support devices or systems without express written approval of
19// NVIDIA Corporation.
20//
21// Copyright (c) 2020-2022 NVIDIA Corporation. All rights reserved.
22//
23// NVIDIA Corporation and its licensors retain all intellectual property and proprietary
24// rights in and to this software and related documentation and any modifications thereto.
25// Any use, reproduction, disclosure or distribution of this software and related
26// documentation without an express license agreement from NVIDIA Corporation is
27// strictly prohibited.
28//
30#ifndef DW_FRAMEWORK_SENSOR_DRAINER_TEMPLATE_HPP_
31#define DW_FRAMEWORK_SENSOR_DRAINER_TEMPLATE_HPP_
32
33#include <dw/sensors/Sensors.h>
34#include <dw/sensors/radar/Radar.h>
35#include <dw/sensors/canbus/CAN.h>
36#include <dw/core/base/Types.h>
37
38#if defined(DW_SDK_BUILD_EXPERIMENTAL) && defined(LINUX)
39#include <dwexperimental/sensors/lockstep/Lockstep.h>
40#include <dwexperimental/sensors/Sensors.h>
41#endif
42
43#include <dwcgf/Exception.hpp>
44#include <dwcgf/node/Node.hpp>
46
47#include <unistd.h>
48#include <memory>
49
50namespace dw
51{
52namespace framework
53{
54
56{
57 const char* nodeName;
58 dwTime_t blockingTimeout; // Blocking timeout must be > 1/f_sensor
59 dwTime_t nonBlockingTimeout; // Nonblocking timeout must be < 1/f_sensor
60 bool isVirtual; // true if this sensor reads from dataset
61 bool drainStaleData; // true if drainer should read from sensor until DW SAL is empty
62 bool waitForNewData; // true if drainer should block until new data is available
64};
65
66template <typename ProcessedDataType, typename ReadProcessedDataFunc, typename DataSourceType>
68{
69public:
70 static constexpr char LOG_TAG[] = "dwBaseDrainerTemplate";
71
72 // WAR: This is a guard for drain out sensor data, avoid huge sensor data make too long loop time, eg: drain out can msg > 100 ms, current find 50 is good enough
73 static constexpr int32_t DRAIN_SENSOR_DATA_COUNT_MAX = 50;
74 using OnDataDropped = dw::core::Function<void(dwTime_t const)>;
75
76 explicit dwBaseDrainerTemplate(dwSensorDrainerParams params, std::unique_ptr<ReadProcessedDataFunc> readProcessedDataFunc, DataSourceType dataSource)
77 : m_dataSource(dataSource)
78 , m_name(params.nodeName)
79 , m_blockingTimeout(params.blockingTimeout)
80 , m_nonBlockingTimeout(params.nonBlockingTimeout)
81 , m_isVirtual(params.isVirtual)
82 , m_dropStaleData(params.drainStaleData)
83 , m_waitForNewData(params.waitForNewData)
84 , m_readProcessedDataFunc(std::move(readProcessedDataFunc))
85 {
86 }
87
88 void setOnDataDropped(OnDataDropped onDataDropped)
89 {
90 m_onDataDropped = onDataDropped;
91 }
92
93 virtual dwStatus getNextTimestamp(dwTime_t& timestamp, dwTime_t timeout)
94 {
95 // If dwSensor_getNextTimestamp API not available, get timestamp by looking ahead in dataset
96 if (!m_nextDataReady)
97 {
98 dwStatus status = m_readProcessedDataFunc->readNextData(timeout, m_dataSource);
99 if (status != DW_SUCCESS)
100 {
101 return status;
102 }
103 m_nextDataReady = true;
104 }
105
106 return m_readProcessedDataFunc->getNextDataTimestamp(timestamp);
107 }
108
109 virtual dwStatus getNextData(ProcessedDataType* outFrame, dwTime_t timeout)
110 {
111 if (!m_nextDataReady)
112 {
113 // Do the read here if look ahead did not happen
114 dwStatus status = m_readProcessedDataFunc->readNextData(timeout, m_dataSource);
115 if (status != DW_SUCCESS)
116 {
117 FRWK_LOGE << "getNextData: readNextData error!" << Logger::State::endl;
118 return status;
119 }
120 m_nextDataReady = true;
121 }
122 return m_readProcessedDataFunc->getNextData(outFrame, m_dataSource);
123 }
124
125 virtual dwStatus tryRead(ProcessedDataType* outFrame,
126 dwTime_t& latestTimestamp,
127 dwTime_t timeout,
128 bool isDroppingData = false)
129 {
130 dwTime_t nextTime{};
131 dwStatus status = getNextTimestamp(nextTime, timeout);
132 if (status != DW_SUCCESS)
133 {
134 return status;
135 }
136
137 // Check if next data is ready. If not, mimic Live Sensor DW SAL buffer empty with a DW_TIME_OUT
138 if (m_isVirtual && !isVirtualDataReady(nextTime, timeout))
139 {
140 return DW_TIME_OUT;
141 }
142
143 status = getNextData(outFrame, timeout);
144 if (status != DW_SUCCESS)
145 {
146 FRWK_LOGE << "tryRead: getNextData error!" << Logger::State::endl;
147 return status;
148 }
149
150 if (isDroppingData)
151 {
152 FRWK_LOGW << m_name << " Dropping data" << Logger::State::endl;
153 if (m_onDataDropped)
154 {
155 m_onDataDropped(latestTimestamp);
156 }
157 }
158
159 latestTimestamp = nextTime;
160 m_nextDataReady = false;
161 m_outputAvailable = true;
162
163 return status;
164 }
165
166 virtual dwStatus reset()
167 {
169 m_nextDataReady = false;
171 return DW_SUCCESS;
172 }
173
174 virtual dwTime_t getReadTimeout() const
175 {
177 }
178
179 virtual void setVirtualSyncTime(dwTime_t virtualSyncTime)
180 {
181 m_virtualSyncTime = virtualSyncTime;
182 }
183
184 // This function is original designed for mimicing real time live sensors for non-determinisitc mode.
185 // But now removed the sleep before return as deterministic mode is the only supported dataset mode.
186 // Returns true if the next sensor data should be available
187 virtual bool isVirtualDataReady(dwTime_t dataTime, dwTime_t timeout)
188 {
189 dwTime_t timeToNextData = dataTime - m_virtualSyncTime;
190
191 // For dataset, the first frame's m_virtualSyncTime is 0. Return true directly and set m_virtualSyncTime = -1
192 // so that the second round tryRead will return DW_TIME_OUT and use the first frame.
193 if (m_virtualSyncTime == 0)
194 {
195 // WAR, fix this
197 return true;
198 }
199 if (timeToNextData >= timeout)
200 {
201 // Next sensor data is far in the future. Timeout specified not sufficient to reach next sensor data
202 // Technically, we should still usleep(timeout) to mimic live sensor, but can skip this for perf
203 // TODO(Oven): WAR to guarantee some data must be read when timeout is m_blockingTimeout,
204 // otherwise we may send out uninitialized sensor data to downstreams
205 if (timeout == m_blockingTimeout)
206 {
207 FRWK_LOGD << "isVirtualDataReady: timeToNextData is larger than m_blockingTimeout, must read some data. "
208 << "timeToNextData: " << timeToNextData << ", dataTime: " << dataTime << Logger::State::endl;
209 }
210 else
211 {
212 return false;
213 }
214 }
215 if (timeToNextData <= 0)
216 {
217 // Next sensor data is in the past. Should be available.
218 return true;
219 }
220 return true;
221 }
222
223 // Utility to read Sensor data from DW SAL in a consistent way across live/virtual sensor
224 // May block on new sensor data, or drop stale data. See dwSensorDrainerParams
225 // Retuns DW_SUCCESS if processedOutput and timestampOutput is valid
226 // nextTimestampOutput will always be valid for virtual case
227 //
228 // [out] processedOutput the latest valid sensor frame
229 // [out] timestampOutput timestamp associated with the latest valid sensor frame
230 // [out] nextTimestampOutput timestamp associated with the next sensor frame. Only available in virtual case
231 // [in] virtualSyncTime current timestamp to decide whether virtual data is ready to be sent
232 // [in] sensor handle to sensor
233 virtual dwStatus drainProcessedData(ProcessedDataType* processedOutput,
234 dwTime_t& timestampOutput,
235 dwTime_t& nextTimestampOutput,
236 dwTime_t virtualSyncTime)
237 {
238 m_virtualSyncTime = virtualSyncTime;
239
240 dwStatus status = DW_SUCCESS;
241 m_outputAvailable = false;
242
243 // Blocking read guarantees some data is read, and slows processing down to data rate
244 dwTime_t readTimeout = getReadTimeout();
245 int32_t drainCount = 0;
246
247 // For determinisitic mode, camera/radar/imugps will always return available frame at the first loop with longer readTimeout.
248 // They will return DW_TIME_OUT with shorter m_nonBlockingTimeout to guarentee deterministic for each frame at second loop.
249 // Note: This do-while has different behavior when reach EndOfStream for flag m_dropStaleData. Assume dataset has 0~100 frames.
250 // If true, the first loop will read frame 100 successfully. Then enter the second loop and hit EndOfStream. It will reset
251 // the sensor and tryRead again. Which means it will return frame 0 and drop frame 100. Later, getNextTimestamp will return
252 // frame 1's timestamp. So the final result is: processedOutput = frame 0, nextTimestampOutput = frame 1' timestamp
253 // If false, the first loop will read frame 100 successfully and exit do-while loop directly. Later, nextTimestampOutput is
254 // reset to 0 and failed getNextTimestamp. So the final result is: processedOutput = frame 100, nextTimestampOutput = 0
255 do
256 {
257 status = readProcessedData(processedOutput, timestampOutput, readTimeout, drainCount > 0);
258 // Non-blocking read and drop stale data to drain the DW SAL buffer, send newest available data
259 readTimeout = m_nonBlockingTimeout;
260
261#if defined(DW_SDK_BUILD_EXPERIMENTAL) && defined(LINUX)
262 isLockstepDataAvailable(status, readTimeout);
263#endif
264 } while (status == DW_SUCCESS && m_dropStaleData && (drainCount++ < DRAIN_SENSOR_DATA_COUNT_MAX));
265
267 {
268 FRWK_LOGE << m_name << " blockingTimeout not sufficient to successfully waitForNewData. No data available!" << Logger::State::endl;
269 }
270 else if (status == DW_TIME_OUT && m_outputAvailable)
271 {
272 // DW_TIME_OUT indicates that DW SAL has been successfully drained
273 status = DW_SUCCESS;
274 }
275
276 if (m_isVirtual)
277 {
278 // In virtual case, send out the timestamp of next sensor data
279 nextTimestampOutput = 0;
280 const dwStatus nextTimeStatus = getNextTimestamp(nextTimestampOutput, m_blockingTimeout);
281 if (nextTimeStatus != DW_SUCCESS)
282 {
283 FRWK_LOGD << m_name << " Failed to get next timestamp: " << dwGetStatusName(nextTimeStatus);
284 }
285 }
286
287 return status;
288 }
289
290 virtual dwStatus readProcessedData(ProcessedDataType* outFrame, dwTime_t& latestTimestamp, dwTime_t timeout, bool isDroppingData = false) = 0;
291
292 virtual void isLockstepDataAvailable(dwStatus& status, dwTime_t& readTimeout) {}
293
294protected:
295 DataSourceType m_dataSource;
296
297 std::unique_ptr<ReadProcessedDataFunc> m_readProcessedDataFunc;
298
299 dwTime_t m_blockingTimeout = 60000;
300 dwTime_t m_nonBlockingTimeout = 100;
301 dwTime_t m_virtualSyncTime = 0;
302
304 bool m_outputAvailable = false;
305 bool m_nextDataReady = false;
306 bool m_isVirtual = false;
307 bool m_dropStaleData = false;
308 bool m_waitForNewData = false;
309
311
312 static constexpr dwTime_t REPLAY_SENSOR_READ_TIMEOUT = 10000000;
313};
314
315template <typename ProcessedDataType, typename ReadProcessedDataFunc>
316class dwSensorDrainerTemplate : public dwBaseDrainerTemplate<ProcessedDataType, ReadProcessedDataFunc, dwSensorHandle_t>
317{
318public:
319 static constexpr char LOG_TAG[] = "dwSensorDrainerTemplate";
321
322 explicit dwSensorDrainerTemplate(dwSensorDrainerParams params, std::unique_ptr<ReadProcessedDataFunc> readProcessedDataFunc, dwSensorHandle_t hsensor)
323 : dwBaseDrainerTemplate<ProcessedDataType, ReadProcessedDataFunc, dwSensorHandle_t>(params, std::move(readProcessedDataFunc), hsensor)
324 , m_isLockstep(params.isLockstep)
325 {
326 }
327
328 virtual ~dwSensorDrainerTemplate() = default;
329
330 dwStatus getNextTimestamp(dwTime_t& timestamp,
331 dwTime_t timeout) override
332 {
333#if defined(DW_SDK_BUILD_EXPERIMENTAL) && defined(LINUX)
335 {
336 dwStatus nextStatus = dwSensor_getNextTimestamp(&timestamp, Base::m_dataSource);
337 if (nextStatus == DW_SUCCESS || nextStatus == DW_END_OF_STREAM)
338 {
339 return nextStatus;
340 }
341 }
342#endif
343
344 return Base::getNextTimestamp(timestamp, timeout);
345 }
346
347 virtual dwStatus reset() override
348 {
349 dwStatus res = Base::reset();
350 if (res != DW_SUCCESS)
351 {
352 return res;
353 }
354 return dwSensor_reset(Base::m_dataSource);
355 }
356
357 // Reads a piece of sensor data from sensor
358 // In virtual sensor case, uses m_virtualSyncTime and timeout to determine
359 // whether next piece of sensor data should be available
360 //
361 // [out] outFrame output sensor frame
362 // [in/out] latestTimestamp timestamp associated with the sensor frame
363 // [in] timeout sensor reading timeout in microseconds
364 // [in] isDroppingData indicates if data is being dropped
365 dwStatus readProcessedData(ProcessedDataType* outFrame,
366 dwTime_t& latestTimestamp,
367 dwTime_t timeout,
368 bool isDroppingData = false) override
369 {
370 dwStatus status = Base::tryRead(outFrame, latestTimestamp, timeout, isDroppingData);
371 if (status == DW_END_OF_STREAM)
372 {
373 // Reset the sensor and try to read again, but propagate the DW_END_OF_STREAM
374 // Use blocking read after reset since camera prefetch may not be ready
375 reset();
376 timeout = Base::m_blockingTimeout;
377 Base::tryRead(outFrame, latestTimestamp, timeout);
378 }
379 return status;
380 }
381
382 // Utility to drop sensor frames according to the events to be replayed
383 // until the next non-drop event is read or there are no more replay events
384 // @param [output] processedOutput the latest frame read from the sensor
385 // @param [output] dataEvent the next non-drop event to be replayed.
386 // @param [input] sensor the sensor to replay frames from
387 // @param [input] readCb the callback to read replay events
388 dwStatus replayDroppedFrames(ProcessedDataType* processedOutput,
389 SensorNode::DataEvent& dataEvent,
391 {
392 // Process any number of drop events.
393 bool isDropEvent;
394 do
395 {
396 // Read next data event
397 if (!readCb(dataEvent)) // no more events to be replayed
398 {
399 FRWK_LOGW << "replayDroppedFrames: Trace cannot be read." << Logger::State::endl;
400 return DW_END_OF_STREAM;
401 }
402
403 // Drop the sensor frame (if applicable)
404 isDropEvent = dataEvent.dataEventType == SensorNode::DataEventType::DROP;
405 if (isDropEvent)
406 {
407 dwTime_t timestampOutput{};
408 dwStatus status = readProcessedData(processedOutput, timestampOutput, Base::REPLAY_SENSOR_READ_TIMEOUT);
409 if (status == DW_END_OF_STREAM)
410 {
411 return status;
412 }
413 else if (status == DW_SUCCESS)
414 {
415 // For now we assume that the timestamps read from the sensor need to match the trace exactly
416 // so if there isn't a match then there is no valid way to continue, throw an exception.
417 if (timestampOutput != dataEvent.timestamp)
418 {
419 FRWK_LOGE << "replayDroppedFrames: Data/trace mismatch: current: " << timestampOutput << " recorded: " << dataEvent.timestamp << Logger::State::endl;
420 throw Exception(DW_FAILURE, "dwSensorDrainerTemplate: replayDroppedFrames: data/trace mismatch.");
421 }
422 }
423 else
424 {
425 FRWK_LOGE << "replayDroppedFrames: Cannot read next data." << Logger::State::endl;
426 throw Exception(DW_FAILURE, "dwSensorDrainerTemplate: replayDroppedFrames: cannot read next data.");
427 }
428 }
429 } while (isDropEvent);
430
431 return DW_SUCCESS;
432 }
433
434 // Replay the next event for the sensor node-run.
435 // @param [output] processedOutput the frame replayed from the sensor
436 // @param [output] timestampOutput the timestamp of the frame replayed from the sensor
437 // @param [input] sensor the sensor to replay frames from
438 // @param [input] readCb the callback to read replay events
439 virtual dwStatus replayProcessedData(ProcessedDataType* processedOutput,
440 dwTime_t& timestampOutput,
442 {
443 // First drop frames according to the replay event
444 // also, retrieve the first non-drop event.
446 dwStatus status = replayDroppedFrames(processedOutput, de, readCb);
447
448 // When end of stream was reached, just read the next frame and ignore the replay event.
449 if (status == DW_END_OF_STREAM)
450 {
451 readProcessedData(processedOutput, timestampOutput, Base::REPLAY_SENSOR_READ_TIMEOUT);
452 return status;
453 }
454
455 // If some unexpected status was returned from the sensor just return the status as this is an error.
456 if (status != DW_SUCCESS)
457 {
458 return status;
459 }
460
461 // otherwise, if no data was produced for this event, then just return the status that was recorded
462 // (usually this is DW_NOT_AVAILABLE or DW_TIME_OUT)
463 if (de.dataEventType == SensorNode::DataEventType::NONE)
464 {
465 status = de.status;
466 }
467 // if data was produced, read the next data from the sensor and make sure it has the timetamp we expect.
468 else if (de.dataEventType == SensorNode::DataEventType::PRODUCE)
469 {
470 status = readProcessedData(processedOutput, timestampOutput, Base::REPLAY_SENSOR_READ_TIMEOUT);
471 if (status != DW_SUCCESS && status != DW_END_OF_STREAM)
472 {
473 FRWK_LOGE << "replayProcessedData: Cannot read next data." << Logger::State::endl;
474 throw Exception(DW_FAILURE, "dwSensorDrainerTemplate: replayProcessedData: cannot read next data.");
475 }
476 // For now we assume that the timestamps read from the sensor need to match the trace exactly
477 // so if there isn't a match then there is no valid way to continue, throw an exception.
478 if (timestampOutput != de.timestamp)
479 {
480 FRWK_LOGE << "replayProcessedData: Data/trace mismatch." << Logger::State::endl;
481 throw Exception(DW_FAILURE, "dwSensorDrainerTemplate: replayProcessedData: data/trace mismatch.");
482 }
483 }
484 else
485 {
486 FRWK_LOGE << "replayProcessedData: UnhandledEventType." << Logger::State::endl;
487 throw Exception(DW_FAILURE, "dwSensorDrainerTemplate: unhandled event type");
488 }
489
490 return status;
491 }
492
493#if defined(DW_SDK_BUILD_EXPERIMENTAL) && defined(LINUX)
494 void isLockstepDataAvailable(dwStatus& status, dwTime_t& readTimeout) override
495 {
496 if (m_isLockstep)
497 {
498 bool isDataAvailable = false;
499 if (dwLockstep_isDataAvailable(&isDataAvailable, Base::m_dataSource) != DW_SUCCESS)
500 {
501 char8_t const* protocol;
502 dwSensor_getProtocol(&protocol, Base::m_dataSource);
503 throw Exception(DW_FAILURE, "dwSensorDrainerTemplate: can't get the data available flag from:", protocol, " sensor");
504 }
505
506 if (!isDataAvailable && status != DW_SUCCESS)
507 {
508 status = DW_TIME_OUT;
509 }
510 else
511 {
512 status = DW_SUCCESS;
513 readTimeout = Base::m_blockingTimeout;
514 }
515 }
516 }
517#endif
518
519protected:
520 bool m_isLockstep = false;
521};
522
523template <typename ProcessedDataType, typename ReadProcessedDataFunc, typename DataSourceType>
525
526template <typename ProcessedDataType, typename ReadProcessedDataFunc>
528} // namespace framework
529} // namespace dw
530
531#endif // DW_FRAMEWORK_SENSOR_DRAINER_TEMPLATE_HPP_
#define FRWK_LOGE
Definition: Logger.hpp:41
#define FRWK_LOGW
Definition: Logger.hpp:40
#define FRWK_LOGD
Definition: Logger.hpp:38
FixedString< MAX_NAME_LEN > Name_t
Definition: Node.hpp:67
ISensorNode::DataEventReadCallback DataEventReadCallback
Definition: Node.hpp:339
virtual dwStatus tryRead(ProcessedDataType *outFrame, dwTime_t &latestTimestamp, dwTime_t timeout, bool isDroppingData=false)
static constexpr int32_t DRAIN_SENSOR_DATA_COUNT_MAX
void setOnDataDropped(OnDataDropped onDataDropped)
dw::core::Function< void(dwTime_t const)> OnDataDropped
virtual dwStatus getNextTimestamp(dwTime_t &timestamp, dwTime_t timeout)
virtual dwStatus getNextData(ProcessedDataType *outFrame, dwTime_t timeout)
dwBaseDrainerTemplate(dwSensorDrainerParams params, std::unique_ptr< ReadProcessedDataFunc > readProcessedDataFunc, DataSourceType dataSource)
virtual dwStatus drainProcessedData(ProcessedDataType *processedOutput, dwTime_t &timestampOutput, dwTime_t &nextTimestampOutput, dwTime_t virtualSyncTime)
virtual void setVirtualSyncTime(dwTime_t virtualSyncTime)
virtual bool isVirtualDataReady(dwTime_t dataTime, dwTime_t timeout)
virtual void isLockstepDataAvailable(dwStatus &status, dwTime_t &readTimeout)
std::unique_ptr< ReadProcessedDataFunc > m_readProcessedDataFunc
virtual dwStatus readProcessedData(ProcessedDataType *outFrame, dwTime_t &latestTimestamp, dwTime_t timeout, bool isDroppingData=false)=0
static constexpr dwTime_t REPLAY_SENSOR_READ_TIMEOUT
dwStatus readProcessedData(ProcessedDataType *outFrame, dwTime_t &latestTimestamp, dwTime_t timeout, bool isDroppingData=false) override
dwSensorDrainerTemplate(dwSensorDrainerParams params, std::unique_ptr< ReadProcessedDataFunc > readProcessedDataFunc, dwSensorHandle_t hsensor)
virtual dwStatus replayProcessedData(ProcessedDataType *processedOutput, dwTime_t &timestampOutput, SensorNode::DataEventReadCallback readCb)
dwStatus getNextTimestamp(dwTime_t &timestamp, dwTime_t timeout) override
dwStatus replayDroppedFrames(ProcessedDataType *processedOutput, SensorNode::DataEvent &dataEvent, SensorNode::DataEventReadCallback readCb)
dwCalibrationStatus status
Definition: Exception.hpp:47