rogue
Loading...
Searching...
No Matches
ControllerV1.cpp
Go to the documentation of this file.
1
18
19#include <inttypes.h>
20#include <sys/time.h>
21#include <unistd.h>
22
23#include <cmath>
24#include <cstdlib>
25#include <memory>
26
27#include "rogue/GeneralError.h"
28#include "rogue/GilRelease.h"
34
37
39rpp::ControllerV1Ptr rpp::ControllerV1::create(bool enSsi, rpp::TransportPtr tran, rpp::ApplicationPtr* app) {
40 rpp::ControllerV1Ptr r = std::make_shared<rpp::ControllerV1>(enSsi, tran, app);
41 return (r);
42}
43
45rpp::ControllerV1::ControllerV1(bool enSsi, rpp::TransportPtr tran, rpp::ApplicationPtr* app)
46 : rpp::Controller::Controller(tran, app, 8, 1, 8, enSsi) {}
47
49rpp::ControllerV1::~ControllerV1() {}
50
52void rpp::ControllerV1::transportRx(ris::FramePtr frame) {
53 ris::BufferPtr buff;
54 uint32_t size;
55 uint32_t tmpIdx;
56 uint32_t tmpCount;
57 uint8_t tmpFuser;
58 uint8_t tmpLuser;
59 uint8_t tmpDest;
60 uint8_t tmpId;
61 bool tmpEof;
62 uint32_t flags;
63 uint8_t* data;
64
65 if (frame->isEmpty()) log_->warning("Empty frame received on transport input");
66
68 ris::FrameLockPtr flock = frame->lock();
69 std::lock_guard<std::mutex> lock(tranMtx_);
70
71 buff = *(frame->beginBuffer());
72 data = buff->begin();
73 size = buff->getPayload();
74
75 // Drop invalid data
76 if (frame->getError() || // Check for frame ERROR
77 (frame->bufferCount() != 1) || // Incoming frame can only have one buffer
78 (size < 10) || // Check for min. size (64-bit header + 8-bit min. payload + 8-bit tail)
79 ((data[0] & 0xF) != 0)) { // Check for invalid version only
80 log_->warning("Dropping frame due to contents: error=0x%" PRIx8 ", payload=%" PRIu32 ", buffers=%" PRIu32
81 ", Version=0x%" PRIx8,
82 frame->getError(),
83 size,
84 frame->bufferCount(),
85 data[0] & 0xF);
86 dropCount_++;
87 return;
88 }
89
90 tmpIdx = static_cast<uint32_t>(data[0]) >> 4;
91 tmpIdx |= static_cast<uint32_t>(data[1]) << 4;
92
93 tmpCount = static_cast<uint32_t>(data[2]);
94 tmpCount |= static_cast<uint32_t>(data[3]) << 8;
95 tmpCount |= static_cast<uint32_t>(data[4]) << 16;
96
97 tmpDest = data[5];
98 tmpId = data[6];
99 tmpFuser = data[7];
100
101 tmpLuser = data[size - 1] & 0x7F;
102 tmpEof = data[size - 1] & 0x80;
103
104 log_->debug("transportRx: Raw header: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
105 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
106 data[0],
107 data[1],
108 data[2],
109 data[3],
110 data[4],
111 data[5],
112 data[6],
113 data[7]);
114 log_->debug("transportRx: Raw footer: 0x%" PRIx8, data[size - 1]);
115 log_->debug("transportRx: Got frame: Fuser=0x%" PRIx8 ", Dest=0x%" PRIx8 ", Id=0x%" PRIx32 ", Count=%" PRIx32
116 ", Luser=0x%" PRIx8 ", Eof=%" PRIu8 ", size=%" PRIu32,
117 tmpFuser,
118 tmpDest,
119 tmpIdx,
120 tmpCount,
121 tmpLuser,
122 tmpEof,
123 size);
124
125 // Shorten message by one byte (before adjusting tail)
126 buff->adjustPayload(-1);
127
128 // Adjust header and tail reservations
129 buff->adjustHeader(8);
130 buff->adjustTail(1);
131
132 // Drop frame and reset state if mismatch
133 if (tmpCount > 0 && (tmpIdx != tranIndex_ || tmpCount != tranCount_[0])) {
134 log_->warning("Dropping frame due to state mismatch: expIdx=%" PRIu32 ", gotIdx=%" PRIu32 ", expCount=%" PRIu32
135 ", gotCount=%" PRIu32,
136 tranIndex_,
137 tmpIdx,
138 tranCount_[0],
139 tmpCount);
140 dropCount_++;
141 tranCount_[0] = 0;
142 tranFrame_[0].reset();
143 return;
144 }
145
146 // First frame
147 if (tmpCount == 0) {
148 if (tranCount_[0] != 0)
149 log_->warning("Dropping frame due to new incoming frame: expIdx=%" PRIu32 ", expCount=%" PRIu32,
150 tranIndex_,
151 tranCount_[0]);
152
153 tranFrame_[0] = ris::Frame::create();
154 tranIndex_ = tmpIdx;
155 tranDest_ = tmpDest;
156 tranCount_[0] = 0;
157
158 tranFrame_[0]->setFirstUser(tmpFuser);
159 }
160
161 tranFrame_[0]->appendBuffer(buff);
162 frame->clear(); // Empty old frame
163
164 // Last of transfer
165 if (tmpEof) {
166 tranFrame_[0]->setLastUser(tmpLuser);
167 tranCount_[0] = 0;
168
169 // Detect SSI error — must precede pushFrame/reset or the indication is lost.
170 if (enSsi_ & (tmpLuser & 0x1)) tranFrame_[0]->setError(0x80);
171
172 if (app_[tranDest_]) {
173 app_[tranDest_]->pushFrame(tranFrame_[0]);
174 }
175 tranFrame_[0].reset();
176 } else {
177 tranCount_[0]++;
178 }
179}
180
182void rpp::ControllerV1::applicationRx(ris::FramePtr frame, uint8_t tDest) {
183 ris::Frame::BufferIterator it;
184 uint32_t segment;
185 uint8_t* data;
186 uint32_t size;
187 uint8_t fUser;
188 uint8_t lUser;
189 struct timeval startTime;
190 struct timeval currTime;
191 struct timeval endTime;
192
193 gettimeofday(&startTime, NULL);
194 timeradd(&startTime, &timeout_, &endTime);
195
196 if (frame->isEmpty()) log_->warning("Empty frame received on application input");
197
198 if (frame->getError()) return;
199
200 rogue::GilRelease noGil;
201 ris::FrameLockPtr flock = frame->lock();
202 std::lock_guard<std::mutex> lock(appMtx_);
203
204 // Wait while queue is busy
205 while (tranQueue_.busy()) {
206 usleep(10);
207 gettimeofday(&currTime, NULL);
208 if (timercmp(&currTime, &endTime, >)) {
209 log_->critical("ControllerV1::applicationRx: Timeout waiting for outbound queue after %" PRIu32 ".%" PRIu32
210 " seconds! May be caused by outbound backpressure.",
211 timeout_.tv_sec,
212 timeout_.tv_usec);
213 gettimeofday(&startTime, NULL);
214 timeradd(&startTime, &timeout_, &endTime);
215 }
216 }
217
218 // User fields
219 fUser = frame->getFirstUser();
220 lUser = frame->getLastUser();
221
222 // Inject SOF
223 if (enSsi_) fUser |= 0x2;
224
225 segment = 0;
226 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
227 ris::FramePtr tFrame = ris::Frame::create();
228
229 // Adjust header and tail, before setting new size
230 (*it)->adjustHeader(-8);
231 (*it)->adjustTail(-1);
232
233 // Make payload one byte longer
234 (*it)->adjustPayload(1);
235
236 size = (*it)->getPayload();
237 data = (*it)->begin();
238
239 data[0] = (appIndex_ & 0xF) << 4; // Note: BIT3:0 = 0x0 (Version)
240 data[1] = (appIndex_ >> 4) & 0xFF;
241
242 data[2] = (segment >> 0) & 0xFF;
243 data[3] = (segment >> 8) & 0xFF;
244 data[4] = (segment >> 16) & 0xFF;
245
246 data[5] = tDest;
247 data[6] = 0; // TID Unused
248 data[7] = fUser;
249
250 data[size - 1] = lUser & 0x7F;
251
252 if (it == (frame->endBuffer() - 1)) data[size - 1] |= 0x80;
253
254 tFrame->appendBuffer(*it);
255 tranQueue_.push(tFrame);
256 segment++;
257 }
258 appIndex_++;
259 frame->clear(); // Empty old frame
260}
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Packetizer base controller.
Definition Controller.h:46
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::packetizer::ControllerV1 > ControllerV1Ptr
std::shared_ptr< rogue::protocols::packetizer::Transport > TransportPtr
Definition Transport.h:113
std::shared_ptr< rogue::protocols::packetizer::Application > ApplicationPtr