rogue
Loading...
Searching...
No Matches
ControllerV2.cpp
Go to the documentation of this file.
1
18
19#include <inttypes.h>
20#include <sys/time.h>
21#include <unistd.h>
22
23#include <cmath>
24#include <cstdlib>
25#include <memory>
26
27#include "rogue/GeneralError.h"
28#include "rogue/GilRelease.h"
34
37
38// Local CRC library
40
41// CRC Lookup table for later use
43
45rpp::ControllerV2Ptr rpp::ControllerV2::create(bool enIbCrc,
46 bool enObCrc,
47 bool enSsi,
50 rpp::ControllerV2Ptr r = std::make_shared<rpp::ControllerV2>(enIbCrc, enObCrc, enSsi, tran, app);
51 return (r);
52}
53
55rpp::ControllerV2::ControllerV2(bool enIbCrc,
56 bool enObCrc,
57 bool enSsi,
60 : rpp::Controller::Controller(tran, app, 8, 8, 8, enSsi) {
61 enIbCrc_ = enIbCrc;
62 enObCrc_ = enObCrc;
63}
64
66rpp::ControllerV2::~ControllerV2() {}
67
69void rpp::ControllerV2::transportRx(ris::FramePtr frame) {
70 ris::BufferPtr buff;
71 uint32_t size;
72 uint32_t tmpCount;
73 uint8_t tmpFuser;
74 uint8_t tmpLuser;
75 uint8_t tmpDest;
76 uint8_t tmpId;
77 bool tmpEof;
78 bool tmpSof;
79 bool crcErr;
80 uint32_t flags;
81 uint32_t last;
82 uint32_t crc;
83 uint32_t tmpCrc;
84 uint8_t* data;
85
86 if (frame->isEmpty()) {
87 log_->warning("Bad incoming transportRx frame, size=0");
88 return;
89 }
90
92 ris::FrameLockPtr flock = frame->lock();
93 std::lock_guard<std::mutex> lock(tranMtx_);
94
95 buff = *(frame->beginBuffer());
96 data = buff->begin();
97 size = buff->getPayload();
98
99 // Drop invalid data
100 if (frame->getError() || // Check for frame ERROR
101 (frame->bufferCount() != 1) || // Incoming frame can only have one buffer
102 (size < 24) || // Check for min. size (64-bit header + 64-bit min. payload + 64-bit tail)
103 ((size & 0x7) > 0) || // Check for non 64-bit alignment
104 ((data[0] & 0xF) != 0x2)) { // Check for invalid version only (ignore the CRC mode flag)
105 log_->warning("Dropping frame due to contents: error=0x%" PRIx8 ", payload=%" PRIu32 ", buffers=%" PRIu32
106 ", Version=0x%" PRIx8,
107 frame->getError(),
108 size,
109 frame->bufferCount(),
110 data[0] & 0xF);
111 dropCount_++;
112 return;
113 }
114
115 // Header word 0
116 tmpFuser = data[1];
117 tmpDest = data[2];
118 tmpId = data[3];
119
120 // Header word 1
121 tmpCount = static_cast<uint32_t>(data[4]) << 0;
122 tmpCount |= static_cast<uint32_t>(data[5]) << 8;
123 tmpSof = ((data[7] & 0x80) ? true : false); // SOF (PACKETIZER2_HDR_SOF_BIT_C = 63)
124
125 // Tail word 0
126 tmpLuser = data[size - 8];
127 tmpEof = ((data[size - 7] & 0x1) ? true : false);
128 last = static_cast<uint32_t>(data[size - 6]);
129
130 if (enIbCrc_) {
131 // Tail word 1
132 tmpCrc = static_cast<uint32_t>(data[size - 1]) << 0;
133 tmpCrc |= static_cast<uint32_t>(data[size - 2]) << 8;
134 tmpCrc |= static_cast<uint32_t>(data[size - 3]) << 16;
135 tmpCrc |= static_cast<uint32_t>(data[size - 4]) << 24;
136
137 // Compute CRC
138 if (tmpSof)
139 crc_[tmpDest] = CRC::Calculate(data, size - 4, crcTable_);
140 else
141 crc_[tmpDest] = CRC::Calculate(data, size - 4, crcTable_, crc_[tmpDest]);
142
143 crcErr = (tmpCrc != crc_[tmpDest]);
144 } else {
145 crcErr = false;
146 }
147
148 log_->debug("transportRx: Raw header: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
149 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
150 data[0],
151 data[1],
152 data[2],
153 data[3],
154 data[4],
155 data[5],
156 data[6],
157 data[7]);
158 log_->debug("transportRx: Raw footer: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
159 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
160 data[size - 8],
161 data[size - 7],
162 data[size - 6],
163 data[size - 5],
164 data[size - 4],
165 data[size - 3],
166 data[size - 2],
167 data[size - 1]);
168 log_->debug("transportRx: Got frame: Fuser=0x%" PRIx8 ", Dest=0x%" PRIx8 ", Id=0x%" PRIx8 ", Count=%" PRIu32
169 ", Sof=%" PRIu8 ", Luser=0x%" PRIx8 ", Eof=%" PRIu8 ", Last=%" PRIu32 ", crcErr=%" PRIu8,
170 tmpFuser,
171 tmpDest,
172 tmpId,
173 tmpCount,
174 tmpSof,
175 tmpLuser,
176 tmpEof,
177 last,
178 crcErr);
179
180 // Shorten message by removing tail and adjusting for last value
181 // Do this before adjusting tail reservation
182 buff->adjustPayload(-8 + (static_cast<int32_t>(last) - 8));
183
184 // Add 8 bytes to headroom and tail reservation
185 buff->adjustHeader(8);
186 buff->adjustTail(8);
187
188 // Drop frame and reset state if mismatch
189 if ((transSof_[tmpDest] != tmpSof) || crcErr || tmpCount != tranCount_[tmpDest]) {
190 log_->warning("Dropping frame: gotDest=%" PRIu8 ", gotSof=%" PRIu8 ", crcErr=%" PRIu8 ", expCount=%" PRIu32
191 ", gotCount=%" PRIu32,
192 tmpDest,
193 tmpSof,
194 crcErr,
195 tranCount_[tmpDest],
196 tmpCount);
197 dropCount_++;
198 transSof_[tmpDest] = true;
199 tranCount_[tmpDest] = 0;
200 tranFrame_[tmpDest].reset();
201 return;
202 }
203
204 // First frame
205 if (transSof_[tmpDest]) {
206 transSof_[tmpDest] = false;
207 if ((tranCount_[tmpDest] != 0) || !tmpSof || crcErr) {
208 log_->warning("Dropping frame: gotDest=%" PRIu8 ", gotSof=%" PRIu8 ", crcErr=%" PRIu8 ", expCount=%" PRIu32
209 ", gotCount=%" PRIu32,
210 tmpDest,
211 tmpSof,
212 crcErr,
213 tranCount_[tmpDest],
214 tmpCount);
215 dropCount_++;
216 transSof_[tmpDest] = true;
217 tranCount_[tmpDest] = 0;
218 tranFrame_[tmpDest].reset();
219 return;
220 }
221
222 tranFrame_[tmpDest] = ris::Frame::create();
223 tranCount_[tmpDest] = 0;
224
225 tranFrame_[tmpDest]->setFirstUser(tmpFuser);
226 }
227
228 tranFrame_[tmpDest]->appendBuffer(buff);
229 frame->clear(); // Empty old frame
230
231 // Last of transfer
232 if (tmpEof) {
233 tranFrame_[tmpDest]->setLastUser(tmpLuser);
234 transSof_[tmpDest] = true;
235 tranCount_[tmpDest] = 0;
236 if (app_[tmpDest]) {
237 app_[tmpDest]->pushFrame(tranFrame_[tmpDest]);
238 }
239 tranFrame_[tmpDest].reset();
240
241 // Detect SSI error
242 if (enSsi_ & (tmpLuser & 0x1)) tranFrame_[tmpDest]->setError(0x80);
243 } else {
244 tranCount_[tmpDest] = (tranCount_[tmpDest] + 1) & 0xFFFF;
245 }
246}
247
249void rpp::ControllerV2::applicationRx(ris::FramePtr frame, uint8_t tDest) {
250 ris::Frame::BufferIterator it;
251 uint32_t segment;
252 uint8_t* data;
253 uint32_t size;
254 uint8_t fUser;
255 uint8_t lUser;
256 uint32_t crc;
257 uint32_t last;
258 struct timeval startTime;
259 struct timeval currTime;
260 struct timeval endTime;
261
262 gettimeofday(&startTime, NULL);
263 timeradd(&startTime, &timeout_, &endTime);
264
265 if (frame->isEmpty()) {
266 log_->warning("Bad incoming applicationRx frame, size=0");
267 return;
268 }
269
270 if (frame->getError()) return;
271
272 rogue::GilRelease noGil;
273 ris::FrameLockPtr flock = frame->lock();
274 std::lock_guard<std::mutex> lock(appMtx_);
275
276 // Wait while queue is busy
277 while (tranQueue_.busy()) {
278 usleep(10);
279 gettimeofday(&currTime, NULL);
280 if (timercmp(&currTime, &endTime, >)) {
281 log_->critical("ControllerV2::applicationRx: Timeout waiting for outbound queue after %" PRIu32 ".%" PRIu32
282 " seconds! May be caused by outbound backpressure.",
283 timeout_.tv_sec,
284 timeout_.tv_usec);
285 gettimeofday(&startTime, NULL);
286 timeradd(&startTime, &timeout_, &endTime);
287 }
288 }
289
290 fUser = frame->getFirstUser();
291 lUser = frame->getLastUser();
292
293 // Inject SOF
294 if (enSsi_) fUser |= 0x2;
295
296 segment = 0;
297 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
298 ris::FramePtr tFrame = ris::Frame::create();
299
300 // Compute last, and aligned payload to 64-bits
301 last = (*it)->getPayload() % 8;
302 if (last == 0) last = 8;
303 (*it)->adjustPayload(8 - last);
304
305 // Rem 8 bytes head and tail reservation before setting new size
306 (*it)->adjustHeader(-8);
307 (*it)->adjustTail(-8);
308
309 // Add tail to payload, set new size (header reduction added 8)
310 (*it)->adjustPayload(8);
311
312 // Get data pointer and new size
313 data = (*it)->begin();
314 size = (*it)->getPayload();
315
316 // Header word 0
317 data[0] = 0x2; // (Version=0x2)
318 if (enObCrc_) data[0] |= 0x20; // Enable CRC
319 data[1] = fUser;
320 data[2] = tDest;
321 data[3] = 0; // TID Unused
322
323 // Header word 1
324 data[4] = segment & 0xFF;
325 data[5] = (segment >> 8) & 0xFF;
326 data[6] = 0;
327 data[7] = (segment == 0) ? 0x80 : 0x0; // SOF (PACKETIZER2_HDR_SOF_BIT_C = 63)
328
329 // Tail word 0
330 data[size - 8] = lUser;
331 data[size - 7] = (it == (frame->endBuffer() - 1)) ? 0x1 : 0x0; // EOF
332 data[size - 6] = last;
333 data[size - 5] = 0;
334
335 if (enObCrc_) {
336 // Compute CRC
337 if (segment == 0)
338 crc = CRC::Calculate(data, size - 4, crcTable_);
339 else
340 crc = CRC::Calculate(data, size - 4, crcTable_, crc);
341
342 // Tail word 1
343 data[size - 1] = (crc >> 0) & 0xFF;
344 data[size - 2] = (crc >> 8) & 0xFF;
345 data[size - 3] = (crc >> 16) & 0xFF;
346 data[size - 4] = (crc >> 24) & 0xFF;
347
348 } else {
349 data[size - 1] = 0;
350 data[size - 2] = 0;
351 data[size - 3] = 0;
352 data[size - 4] = 0;
353 }
354
355 log_->debug("applicationRx: Gen frame: Size=%" PRIu32 ", Fuser=0x%" PRIu8 ", Dest=0x%" PRIu8 ", Count=%" PRIu32
356 ", Sof=%" PRIu8 ", Luser=0x%" PRIu8 ", Eof=%" PRIu8 ", Last=%" PRIu32,
357 (*it)->getPayload(),
358 fUser,
359 tDest,
360 segment,
361 data[7] >> 7,
362 lUser,
363 data[size - 7],
364 last);
365 log_->debug("applicationRx: Raw header: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
366 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
367 data[0],
368 data[1],
369 data[2],
370 data[3],
371 data[4],
372 data[5],
373 data[6],
374 data[7]);
375 log_->debug("applicationRx: Raw footer: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
376 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
377 data[size - 8],
378 data[size - 7],
379 data[size - 6],
380 data[size - 5],
381 data[size - 4],
382 data[size - 3],
383 data[size - 2],
384 data[size - 1]);
385
386 tFrame->appendBuffer(*it);
387 tranQueue_.push(tFrame);
388 segment++;
389 }
390 appIndex_++;
391 frame->clear(); // Empty old frame
392}
static const CRC::Table< uint32_t, 32 > crcTable_(CRC::CRC_32())
static CRCType Calculate(const void *data, crcpp_size size, const Parameters< CRCType, CRCWidth > &parameters)
Computes a CRC.
Definition CRC.h:433
static const Parameters< crcpp_uint32, 32 > & CRC_32()
Returns a set of parameters for CRC-32 (aka CRC-32 ADCCP, CRC-32 PKZip).
Definition CRC.h:1532
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Packetizer base controller.
Definition Controller.h:45
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::packetizer::Transport > TransportPtr
Definition Transport.h:109
std::shared_ptr< rogue::protocols::packetizer::ControllerV2 > ControllerV2Ptr
std::shared_ptr< rogue::protocols::packetizer::Application > ApplicationPtr
CRC lookup table. After construction, the CRC parameters are fixed.
Definition CRC.h:166