65 if (frame->isEmpty()) log_->warning(
"Empty frame received At Transport");
69 std::lock_guard<std::mutex> lock(tranMtx_);
71 buff = *(frame->beginBuffer());
73 size = buff->getPayload();
76 if (frame->getError() ||
77 (frame->bufferCount() != 1) ||
79 ((data[0] & 0xF) != 0)) {
80 log_->warning(
"Dropping frame due to contents: error=0x%" PRIx8
", payload=%" PRIu32
", buffers=%" PRIu32
81 ", Version=0x%" PRIx8,
90 tmpIdx =
static_cast<uint32_t
>(data[0]) >> 4;
91 tmpIdx |=
static_cast<uint32_t
>(data[1]) << 4;
93 tmpCount =
static_cast<uint32_t
>(data[2]);
94 tmpCount |=
static_cast<uint32_t
>(data[3]) << 8;
95 tmpCount |=
static_cast<uint32_t
>(data[4]) << 16;
101 tmpLuser = data[size - 1] & 0x7F;
102 tmpEof = data[size - 1] & 0x80;
104 log_->debug(
"transportRx: Raw header: 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8
105 ", 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8,
114 log_->debug(
"transportRx: Raw footer: 0x%" PRIx8, data[size - 1]);
115 log_->debug(
"transportRx: Got frame: Fuser=0x%" PRIx8
", Dest=0x%" PRIx8
", Id=0x%" PRIx32
", Count=%" PRIx32
116 ", Luser=0x%" PRIx8
", Eof=%" PRIu8
", size=%" PRIu32,
126 buff->adjustPayload(-1);
129 buff->adjustHeader(8);
133 if (tmpCount > 0 && (tmpIdx != tranIndex_ || tmpCount != tranCount_[0])) {
134 log_->warning(
"Dropping frame due to state mismatch: expIdx=%" PRIu32
", gotIdx=%" PRIu32
", expCount=%" PRIu32
135 ", gotCount=%" PRIu32,
142 tranFrame_[0].reset();
148 if (tranCount_[0] != 0)
149 log_->warning(
"Dropping frame due to new incoming frame: expIdx=%" PRIu32
", expCount=%" PRIu32,
153 tranFrame_[0] = ris::Frame::create();
158 tranFrame_[0]->setFirstUser(tmpFuser);
161 tranFrame_[0]->appendBuffer(buff);
166 tranFrame_[0]->setLastUser(tmpLuser);
168 if (app_[tranDest_]) {
169 app_[tranDest_]->pushFrame(tranFrame_[0]);
171 tranFrame_[0].reset();
174 if (enSsi_ & (tmpLuser & 0x1)) tranFrame_[tmpDest]->setError(0x80);
182 ris::Frame::BufferIterator it;
188 struct timeval startTime;
189 struct timeval currTime;
190 struct timeval endTime;
192 gettimeofday(&startTime, NULL);
193 timeradd(&startTime, &timeout_, &endTime);
195 if (frame->isEmpty()) log_->warning(
"Empty frame received at application");
197 if (frame->getError())
return;
201 std::lock_guard<std::mutex> lock(appMtx_);
204 while (tranQueue_.busy()) {
206 gettimeofday(&currTime, NULL);
207 if (timercmp(&currTime, &endTime, >)) {
208 log_->critical(
"ControllerV1::applicationRx: Timeout waiting for outbound queue after %" PRIu32
".%" PRIu32
209 " seconds! May be caused by outbound backpressure.",
212 gettimeofday(&startTime, NULL);
213 timeradd(&startTime, &timeout_, &endTime);
218 fUser = frame->getFirstUser();
219 lUser = frame->getLastUser();
222 if (enSsi_) fUser |= 0x2;
225 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
229 (*it)->adjustHeader(-8);
230 (*it)->adjustTail(-1);
233 (*it)->adjustPayload(1);
235 size = (*it)->getPayload();
236 data = (*it)->begin();
238 data[0] = (appIndex_ & 0xF) << 4;
239 data[1] = (appIndex_ >> 4) & 0xFF;
241 data[2] = (segment >> 0) & 0xFF;
242 data[3] = (segment >> 8) & 0xFF;
243 data[4] = (segment >> 16) & 0xFF;
249 data[size - 1] = lUser & 0x7F;
251 if (it == (frame->endBuffer() - 1)) data[size - 1] |= 0x80;
253 tFrame->appendBuffer(*it);
254 tranQueue_.push(tFrame);