rogue
Loading...
Searching...
No Matches
ControllerV2.cpp
Go to the documentation of this file.
1
18
19#include <inttypes.h>
20#include <sys/time.h>
21#include <unistd.h>
22
23#include <cmath>
24#include <cstdlib>
25#include <memory>
26
27#include "rogue/GeneralError.h"
28#include "rogue/GilRelease.h"
34
37
38// Local CRC library
40
41// CRC Lookup table for later use
43
45rpp::ControllerV2Ptr rpp::ControllerV2::create(bool enIbCrc,
46 bool enObCrc,
47 bool enSsi,
50 rpp::ControllerV2Ptr r = std::make_shared<rpp::ControllerV2>(enIbCrc, enObCrc, enSsi, tran, app);
51 return (r);
52}
53
55rpp::ControllerV2::ControllerV2(bool enIbCrc,
56 bool enObCrc,
57 bool enSsi,
60 : rpp::Controller::Controller(tran, app, 8, 8, 8, enSsi) {
61 enIbCrc_ = enIbCrc;
62 enObCrc_ = enObCrc;
63}
64
66rpp::ControllerV2::~ControllerV2() {}
67
69void rpp::ControllerV2::transportRx(ris::FramePtr frame) {
70 ris::BufferPtr buff;
71 uint32_t size;
72 uint32_t tmpCount;
73 uint8_t tmpFuser;
74 uint8_t tmpLuser;
75 uint8_t tmpDest;
76 uint8_t tmpId;
77 bool tmpEof;
78 bool tmpSof;
79 bool crcErr;
80 uint32_t flags;
81 uint32_t last;
82 uint32_t crc;
83 uint32_t tmpCrc;
84 uint8_t* data;
85
86 if (frame->isEmpty()) {
87 log_->warning("Empty frame received on transport input");
88 return;
89 }
90
92 ris::FrameLockPtr flock = frame->lock();
93 std::lock_guard<std::mutex> lock(tranMtx_);
94
95 buff = *(frame->beginBuffer());
96 data = buff->begin();
97 size = buff->getPayload();
98
99 // Drop invalid data
100 if (frame->getError() || // Check for frame ERROR
101 (frame->bufferCount() != 1) || // Incoming frame can only have one buffer
102 (size < 24) || // Check for min. size (64-bit header + 64-bit min. payload + 64-bit tail)
103 ((size & 0x7) > 0) || // Check for non 64-bit alignment
104 ((data[0] & 0xF) != 0x2)) { // Check for invalid version only (ignore the CRC mode flag)
105 log_->warning("Dropping frame due to contents: error=0x%" PRIx8 ", payload=%" PRIu32 ", buffers=%" PRIu32
106 ", Version=0x%" PRIx8,
107 frame->getError(),
108 size,
109 frame->bufferCount(),
110 data[0] & 0xF);
111 dropCount_++;
112 return;
113 }
114
115 // Header word 0
116 tmpFuser = data[1];
117 tmpDest = data[2];
118 tmpId = data[3];
119
120 // Header word 1
121 tmpCount = static_cast<uint32_t>(data[4]) << 0;
122 tmpCount |= static_cast<uint32_t>(data[5]) << 8;
123 tmpSof = ((data[7] & 0x80) ? true : false); // SOF (PACKETIZER2_HDR_SOF_BIT_C = 63)
124
125 // Tail word 0
126 tmpLuser = data[size - 8];
127 tmpEof = ((data[size - 7] & 0x1) ? true : false);
128 last = static_cast<uint32_t>(data[size - 6]);
129
130 if (enIbCrc_) {
131 // Tail word 1
132 tmpCrc = static_cast<uint32_t>(data[size - 1]) << 0;
133 tmpCrc |= static_cast<uint32_t>(data[size - 2]) << 8;
134 tmpCrc |= static_cast<uint32_t>(data[size - 3]) << 16;
135 tmpCrc |= static_cast<uint32_t>(data[size - 4]) << 24;
136
137 // Compute CRC
138 if (tmpSof)
139 crc_[tmpDest] = CRC::Calculate(data, size - 4, crcTable_);
140 else
141 crc_[tmpDest] = CRC::Calculate(data, size - 4, crcTable_, crc_[tmpDest]);
142
143 crcErr = (tmpCrc != crc_[tmpDest]);
144 } else {
145 crcErr = false;
146 }
147
148 log_->debug("transportRx: Raw header: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
149 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
150 data[0],
151 data[1],
152 data[2],
153 data[3],
154 data[4],
155 data[5],
156 data[6],
157 data[7]);
158 log_->debug("transportRx: Raw footer: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
159 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
160 data[size - 8],
161 data[size - 7],
162 data[size - 6],
163 data[size - 5],
164 data[size - 4],
165 data[size - 3],
166 data[size - 2],
167 data[size - 1]);
168 log_->debug("transportRx: Got frame: Fuser=0x%" PRIx8 ", Dest=0x%" PRIx8 ", Id=0x%" PRIx8 ", Count=%" PRIu32
169 ", Sof=%" PRIu8 ", Luser=0x%" PRIx8 ", Eof=%" PRIu8 ", Last=%" PRIu32 ", crcErr=%" PRIu8,
170 tmpFuser,
171 tmpDest,
172 tmpId,
173 tmpCount,
174 tmpSof,
175 tmpLuser,
176 tmpEof,
177 last,
178 crcErr);
179
180 // Shorten message by removing tail and adjusting for last value
181 // Do this before adjusting tail reservation
182 buff->adjustPayload(-8 + (static_cast<int32_t>(last) - 8));
183
184 // Add 8 bytes to headroom and tail reservation
185 buff->adjustHeader(8);
186 buff->adjustTail(8);
187
188 // Drop frame and reset state if mismatch
189 if ((transSof_[tmpDest] != tmpSof) || crcErr || tmpCount != tranCount_[tmpDest]) {
190 log_->warning("Dropping frame: gotDest=%" PRIu8 ", gotSof=%" PRIu8 ", crcErr=%" PRIu8 ", expCount=%" PRIu32
191 ", gotCount=%" PRIu32,
192 tmpDest,
193 tmpSof,
194 crcErr,
195 tranCount_[tmpDest],
196 tmpCount);
197 dropCount_++;
198 transSof_[tmpDest] = true;
199 tranCount_[tmpDest] = 0;
200 tranFrame_[tmpDest].reset();
201 return;
202 }
203
204 // First frame
205 if (transSof_[tmpDest]) {
206 transSof_[tmpDest] = false;
207 if ((tranCount_[tmpDest] != 0) || !tmpSof || crcErr) {
208 log_->warning("Dropping frame: gotDest=%" PRIu8 ", gotSof=%" PRIu8 ", crcErr=%" PRIu8 ", expCount=%" PRIu32
209 ", gotCount=%" PRIu32,
210 tmpDest,
211 tmpSof,
212 crcErr,
213 tranCount_[tmpDest],
214 tmpCount);
215 dropCount_++;
216 transSof_[tmpDest] = true;
217 tranCount_[tmpDest] = 0;
218 tranFrame_[tmpDest].reset();
219 return;
220 }
221
222 tranFrame_[tmpDest] = ris::Frame::create();
223 tranCount_[tmpDest] = 0;
224
225 tranFrame_[tmpDest]->setFirstUser(tmpFuser);
226 }
227
228 tranFrame_[tmpDest]->appendBuffer(buff);
229 frame->clear(); // Empty old frame
230
231 // Last of transfer
232 if (tmpEof) {
233 tranFrame_[tmpDest]->setLastUser(tmpLuser);
234 transSof_[tmpDest] = true;
235 tranCount_[tmpDest] = 0;
236
237 // Detect SSI error — must precede pushFrame/reset or it's a null deref.
238 if (enSsi_ & (tmpLuser & 0x1)) tranFrame_[tmpDest]->setError(0x80);
239
240 if (app_[tmpDest]) {
241 app_[tmpDest]->pushFrame(tranFrame_[tmpDest]);
242 }
243 tranFrame_[tmpDest].reset();
244 } else {
245 tranCount_[tmpDest] = (tranCount_[tmpDest] + 1) & 0xFFFF;
246 }
247}
248
250void rpp::ControllerV2::applicationRx(ris::FramePtr frame, uint8_t tDest) {
251 ris::Frame::BufferIterator it;
252 uint32_t segment;
253 uint8_t* data;
254 uint32_t size;
255 uint8_t fUser;
256 uint8_t lUser;
257 uint32_t crc;
258 uint32_t last;
259 struct timeval startTime;
260 struct timeval currTime;
261 struct timeval endTime;
262
263 gettimeofday(&startTime, NULL);
264 timeradd(&startTime, &timeout_, &endTime);
265
266 if (frame->isEmpty()) {
267 log_->warning("Empty frame received on application input");
268 return;
269 }
270
271 if (frame->getError()) return;
272
273 rogue::GilRelease noGil;
274 ris::FrameLockPtr flock = frame->lock();
275 std::lock_guard<std::mutex> lock(appMtx_);
276
277 // Wait while queue is busy
278 while (tranQueue_.busy()) {
279 usleep(10);
280 gettimeofday(&currTime, NULL);
281 if (timercmp(&currTime, &endTime, >)) {
282 log_->critical("ControllerV2::applicationRx: Timeout waiting for outbound queue after %" PRIu32 ".%" PRIu32
283 " seconds! May be caused by outbound backpressure.",
284 timeout_.tv_sec,
285 timeout_.tv_usec);
286 gettimeofday(&startTime, NULL);
287 timeradd(&startTime, &timeout_, &endTime);
288 }
289 }
290
291 fUser = frame->getFirstUser();
292 lUser = frame->getLastUser();
293
294 // Inject SOF
295 if (enSsi_) fUser |= 0x2;
296
297 segment = 0;
298 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
299 ris::FramePtr tFrame = ris::Frame::create();
300
301 // Compute last, and aligned payload to 64-bits
302 last = (*it)->getPayload() % 8;
303 if (last == 0) last = 8;
304 (*it)->adjustPayload(8 - last);
305
306 // Rem 8 bytes head and tail reservation before setting new size
307 (*it)->adjustHeader(-8);
308 (*it)->adjustTail(-8);
309
310 // Add tail to payload, set new size (header reduction added 8)
311 (*it)->adjustPayload(8);
312
313 // Get data pointer and new size
314 data = (*it)->begin();
315 size = (*it)->getPayload();
316
317 // Header word 0
318 data[0] = 0x2; // (Version=0x2)
319 if (enObCrc_) data[0] |= 0x20; // Enable CRC
320 data[1] = fUser;
321 data[2] = tDest;
322 data[3] = 0; // TID Unused
323
324 // Header word 1
325 data[4] = segment & 0xFF;
326 data[5] = (segment >> 8) & 0xFF;
327 data[6] = 0;
328 data[7] = (segment == 0) ? 0x80 : 0x0; // SOF (PACKETIZER2_HDR_SOF_BIT_C = 63)
329
330 // Tail word 0
331 data[size - 8] = lUser;
332 data[size - 7] = (it == (frame->endBuffer() - 1)) ? 0x1 : 0x0; // EOF
333 data[size - 6] = last;
334 data[size - 5] = 0;
335
336 if (enObCrc_) {
337 // Compute CRC
338 if (segment == 0)
339 crc = CRC::Calculate(data, size - 4, crcTable_);
340 else
341 crc = CRC::Calculate(data, size - 4, crcTable_, crc);
342
343 // Tail word 1
344 data[size - 1] = (crc >> 0) & 0xFF;
345 data[size - 2] = (crc >> 8) & 0xFF;
346 data[size - 3] = (crc >> 16) & 0xFF;
347 data[size - 4] = (crc >> 24) & 0xFF;
348
349 } else {
350 data[size - 1] = 0;
351 data[size - 2] = 0;
352 data[size - 3] = 0;
353 data[size - 4] = 0;
354 }
355
356 log_->debug("applicationRx: Gen frame: Size=%" PRIu32 ", Fuser=0x%" PRIu8 ", Dest=0x%" PRIu8 ", Count=%" PRIu32
357 ", Sof=%" PRIu8 ", Luser=0x%" PRIu8 ", Eof=%" PRIu8 ", Last=%" PRIu32,
358 (*it)->getPayload(),
359 fUser,
360 tDest,
361 segment,
362 data[7] >> 7,
363 lUser,
364 data[size - 7],
365 last);
366 log_->debug("applicationRx: Raw header: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
367 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
368 data[0],
369 data[1],
370 data[2],
371 data[3],
372 data[4],
373 data[5],
374 data[6],
375 data[7]);
376 log_->debug("applicationRx: Raw footer: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
377 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
378 data[size - 8],
379 data[size - 7],
380 data[size - 6],
381 data[size - 5],
382 data[size - 4],
383 data[size - 3],
384 data[size - 2],
385 data[size - 1]);
386
387 tFrame->appendBuffer(*it);
388 tranQueue_.push(tFrame);
389 segment++;
390 }
391 appIndex_++;
392 frame->clear(); // Empty old frame
393}
static const CRC::Table< uint32_t, 32 > crcTable_(CRC::CRC_32())
static CRCType Calculate(const void *data, crcpp_size size, const Parameters< CRCType, CRCWidth > &parameters)
Computes a CRC.
Definition CRC.h:433
static const Parameters< crcpp_uint32, 32 > & CRC_32()
Returns a set of parameters for CRC-32 (aka CRC-32 ADCCP, CRC-32 PKZip).
Definition CRC.h:1532
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Packetizer base controller.
Definition Controller.h:46
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::packetizer::Transport > TransportPtr
Definition Transport.h:113
std::shared_ptr< rogue::protocols::packetizer::ControllerV2 > ControllerV2Ptr
std::shared_ptr< rogue::protocols::packetizer::Application > ApplicationPtr
CRC lookup table. After construction, the CRC parameters are fixed.
Definition CRC.h:166