rogue
Loading...
Searching...
No Matches
ControllerV1.cpp
Go to the documentation of this file.
1
18
19#include <inttypes.h>
20#include <sys/time.h>
21#include <unistd.h>
22
23#include <cmath>
24#include <cstdlib>
25#include <memory>
26
27#include "rogue/GeneralError.h"
28#include "rogue/GilRelease.h"
34
37
39rpp::ControllerV1Ptr rpp::ControllerV1::create(bool enSsi, rpp::TransportPtr tran, rpp::ApplicationPtr* app) {
40 rpp::ControllerV1Ptr r = std::make_shared<rpp::ControllerV1>(enSsi, tran, app);
41 return (r);
42}
43
45rpp::ControllerV1::ControllerV1(bool enSsi, rpp::TransportPtr tran, rpp::ApplicationPtr* app)
46 : rpp::Controller::Controller(tran, app, 8, 1, 8, enSsi) {}
47
49rpp::ControllerV1::~ControllerV1() {}
50
52void rpp::ControllerV1::transportRx(ris::FramePtr frame) {
53 ris::BufferPtr buff;
54 uint32_t size;
55 uint32_t tmpIdx;
56 uint32_t tmpCount;
57 uint8_t tmpFuser;
58 uint8_t tmpLuser;
59 uint8_t tmpDest;
60 uint8_t tmpId;
61 bool tmpEof;
62 uint32_t flags;
63 uint8_t* data;
64
65 if (frame->isEmpty()) log_->warning("Empty frame received At Transport");
66
68 ris::FrameLockPtr flock = frame->lock();
69 std::lock_guard<std::mutex> lock(tranMtx_);
70
71 buff = *(frame->beginBuffer());
72 data = buff->begin();
73 size = buff->getPayload();
74
75 // Drop invalid data
76 if (frame->getError() || // Check for frame ERROR
77 (frame->bufferCount() != 1) || // Incoming frame can only have one buffer
78 (size < 10) || // Check for min. size (64-bit header + 8-bit min. payload + 8-bit tail)
79 ((data[0] & 0xF) != 0)) { // Check for invalid version only
80 log_->warning("Dropping frame due to contents: error=0x%" PRIx8 ", payload=%" PRIu32 ", buffers=%" PRIu32
81 ", Version=0x%" PRIx8,
82 frame->getError(),
83 size,
84 frame->bufferCount(),
85 data[0] & 0xF);
86 dropCount_++;
87 return;
88 }
89
90 tmpIdx = static_cast<uint32_t>(data[0]) >> 4;
91 tmpIdx |= static_cast<uint32_t>(data[1]) << 4;
92
93 tmpCount = static_cast<uint32_t>(data[2]);
94 tmpCount |= static_cast<uint32_t>(data[3]) << 8;
95 tmpCount |= static_cast<uint32_t>(data[4]) << 16;
96
97 tmpDest = data[5];
98 tmpId = data[6];
99 tmpFuser = data[7];
100
101 tmpLuser = data[size - 1] & 0x7F;
102 tmpEof = data[size - 1] & 0x80;
103
104 log_->debug("transportRx: Raw header: 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8
105 ", 0x%" PRIx8 ", 0x%" PRIx8 ", 0x%" PRIx8,
106 data[0],
107 data[1],
108 data[2],
109 data[3],
110 data[4],
111 data[5],
112 data[6],
113 data[7]);
114 log_->debug("transportRx: Raw footer: 0x%" PRIx8, data[size - 1]);
115 log_->debug("transportRx: Got frame: Fuser=0x%" PRIx8 ", Dest=0x%" PRIx8 ", Id=0x%" PRIx32 ", Count=%" PRIx32
116 ", Luser=0x%" PRIx8 ", Eof=%" PRIu8 ", size=%" PRIu32,
117 tmpFuser,
118 tmpDest,
119 tmpIdx,
120 tmpCount,
121 tmpLuser,
122 tmpEof,
123 size);
124
125 // Shorten message by one byte (before adjusting tail)
126 buff->adjustPayload(-1);
127
128 // Adjust header and tail reservations
129 buff->adjustHeader(8);
130 buff->adjustTail(1);
131
132 // Drop frame and reset state if mismatch
133 if (tmpCount > 0 && (tmpIdx != tranIndex_ || tmpCount != tranCount_[0])) {
134 log_->warning("Dropping frame due to state mismatch: expIdx=%" PRIu32 ", gotIdx=%" PRIu32 ", expCount=%" PRIu32
135 ", gotCount=%" PRIu32,
136 tranIndex_,
137 tmpIdx,
138 tranCount_[0],
139 tmpCount);
140 dropCount_++;
141 tranCount_[0] = 0;
142 tranFrame_[0].reset();
143 return;
144 }
145
146 // First frame
147 if (tmpCount == 0) {
148 if (tranCount_[0] != 0)
149 log_->warning("Dropping frame due to new incoming frame: expIdx=%" PRIu32 ", expCount=%" PRIu32,
150 tranIndex_,
151 tranCount_[0]);
152
153 tranFrame_[0] = ris::Frame::create();
154 tranIndex_ = tmpIdx;
155 tranDest_ = tmpDest;
156 tranCount_[0] = 0;
157
158 tranFrame_[0]->setFirstUser(tmpFuser);
159 }
160
161 tranFrame_[0]->appendBuffer(buff);
162 frame->clear(); // Empty old frame
163
164 // Last of transfer
165 if (tmpEof) {
166 tranFrame_[0]->setLastUser(tmpLuser);
167 tranCount_[0] = 0;
168 if (app_[tranDest_]) {
169 app_[tranDest_]->pushFrame(tranFrame_[0]);
170 }
171 tranFrame_[0].reset();
172
173 // Detect SSI error
174 if (enSsi_ & (tmpLuser & 0x1)) tranFrame_[tmpDest]->setError(0x80);
175 } else {
176 tranCount_[0]++;
177 }
178}
179
181void rpp::ControllerV1::applicationRx(ris::FramePtr frame, uint8_t tDest) {
182 ris::Frame::BufferIterator it;
183 uint32_t segment;
184 uint8_t* data;
185 uint32_t size;
186 uint8_t fUser;
187 uint8_t lUser;
188 struct timeval startTime;
189 struct timeval currTime;
190 struct timeval endTime;
191
192 gettimeofday(&startTime, NULL);
193 timeradd(&startTime, &timeout_, &endTime);
194
195 if (frame->isEmpty()) log_->warning("Empty frame received at application");
196
197 if (frame->getError()) return;
198
199 rogue::GilRelease noGil;
200 ris::FrameLockPtr flock = frame->lock();
201 std::lock_guard<std::mutex> lock(appMtx_);
202
203 // Wait while queue is busy
204 while (tranQueue_.busy()) {
205 usleep(10);
206 gettimeofday(&currTime, NULL);
207 if (timercmp(&currTime, &endTime, >)) {
208 log_->critical("ControllerV1::applicationRx: Timeout waiting for outbound queue after %" PRIu32 ".%" PRIu32
209 " seconds! May be caused by outbound backpressure.",
210 timeout_.tv_sec,
211 timeout_.tv_usec);
212 gettimeofday(&startTime, NULL);
213 timeradd(&startTime, &timeout_, &endTime);
214 }
215 }
216
217 // User fields
218 fUser = frame->getFirstUser();
219 lUser = frame->getLastUser();
220
221 // Inject SOF
222 if (enSsi_) fUser |= 0x2;
223
224 segment = 0;
225 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
226 ris::FramePtr tFrame = ris::Frame::create();
227
228 // Adjust header and tail, before setting new size
229 (*it)->adjustHeader(-8);
230 (*it)->adjustTail(-1);
231
232 // Make payload one byte longer
233 (*it)->adjustPayload(1);
234
235 size = (*it)->getPayload();
236 data = (*it)->begin();
237
238 data[0] = (appIndex_ & 0xF) << 4; // Note: BIT3:0 = 0x0 (Version)
239 data[1] = (appIndex_ >> 4) & 0xFF;
240
241 data[2] = (segment >> 0) & 0xFF;
242 data[3] = (segment >> 8) & 0xFF;
243 data[4] = (segment >> 16) & 0xFF;
244
245 data[5] = tDest;
246 data[6] = 0; // TID Unused
247 data[7] = fUser;
248
249 data[size - 1] = lUser & 0x7F;
250
251 if (it == (frame->endBuffer() - 1)) data[size - 1] |= 0x80;
252
253 tFrame->appendBuffer(*it);
254 tranQueue_.push(tFrame);
255 segment++;
256 }
257 appIndex_++;
258 frame->clear(); // Empty old frame
259}
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Packetizer base controller.
Definition Controller.h:45
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::packetizer::ControllerV1 > ControllerV1Ptr
std::shared_ptr< rogue::protocols::packetizer::Transport > TransportPtr
Definition Transport.h:109
std::shared_ptr< rogue::protocols::packetizer::Application > ApplicationPtr