65 if (frame->isEmpty()) log_->warning(
"Empty frame received on transport input");
69 std::lock_guard<std::mutex> lock(tranMtx_);
71 buff = *(frame->beginBuffer());
73 size = buff->getPayload();
76 if (frame->getError() ||
77 (frame->bufferCount() != 1) ||
79 ((data[0] & 0xF) != 0)) {
80 log_->warning(
"Dropping frame due to contents: error=0x%" PRIx8
", payload=%" PRIu32
", buffers=%" PRIu32
81 ", Version=0x%" PRIx8,
90 tmpIdx =
static_cast<uint32_t
>(data[0]) >> 4;
91 tmpIdx |=
static_cast<uint32_t
>(data[1]) << 4;
93 tmpCount =
static_cast<uint32_t
>(data[2]);
94 tmpCount |=
static_cast<uint32_t
>(data[3]) << 8;
95 tmpCount |=
static_cast<uint32_t
>(data[4]) << 16;
101 tmpLuser = data[size - 1] & 0x7F;
102 tmpEof = data[size - 1] & 0x80;
104 log_->debug(
"transportRx: Raw header: 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8
105 ", 0x%" PRIx8
", 0x%" PRIx8
", 0x%" PRIx8,
114 log_->debug(
"transportRx: Raw footer: 0x%" PRIx8, data[size - 1]);
115 log_->debug(
"transportRx: Got frame: Fuser=0x%" PRIx8
", Dest=0x%" PRIx8
", Id=0x%" PRIx32
", Count=%" PRIx32
116 ", Luser=0x%" PRIx8
", Eof=%" PRIu8
", size=%" PRIu32,
126 buff->adjustPayload(-1);
129 buff->adjustHeader(8);
133 if (tmpCount > 0 && (tmpIdx != tranIndex_ || tmpCount != tranCount_[0])) {
134 log_->warning(
"Dropping frame due to state mismatch: expIdx=%" PRIu32
", gotIdx=%" PRIu32
", expCount=%" PRIu32
135 ", gotCount=%" PRIu32,
142 tranFrame_[0].reset();
148 if (tranCount_[0] != 0)
149 log_->warning(
"Dropping frame due to new incoming frame: expIdx=%" PRIu32
", expCount=%" PRIu32,
153 tranFrame_[0] = ris::Frame::create();
158 tranFrame_[0]->setFirstUser(tmpFuser);
161 tranFrame_[0]->appendBuffer(buff);
166 tranFrame_[0]->setLastUser(tmpLuser);
170 if (enSsi_ & (tmpLuser & 0x1)) tranFrame_[0]->setError(0x80);
172 if (app_[tranDest_]) {
173 app_[tranDest_]->pushFrame(tranFrame_[0]);
175 tranFrame_[0].reset();
183 ris::Frame::BufferIterator it;
189 struct timeval startTime;
190 struct timeval currTime;
191 struct timeval endTime;
193 gettimeofday(&startTime, NULL);
194 timeradd(&startTime, &timeout_, &endTime);
196 if (frame->isEmpty()) log_->warning(
"Empty frame received on application input");
198 if (frame->getError())
return;
202 std::lock_guard<std::mutex> lock(appMtx_);
205 while (tranQueue_.busy()) {
207 gettimeofday(&currTime, NULL);
208 if (timercmp(&currTime, &endTime, >)) {
209 log_->critical(
"ControllerV1::applicationRx: Timeout waiting for outbound queue after %" PRIu32
".%" PRIu32
210 " seconds! May be caused by outbound backpressure.",
213 gettimeofday(&startTime, NULL);
214 timeradd(&startTime, &timeout_, &endTime);
219 fUser = frame->getFirstUser();
220 lUser = frame->getLastUser();
223 if (enSsi_) fUser |= 0x2;
226 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
230 (*it)->adjustHeader(-8);
231 (*it)->adjustTail(-1);
234 (*it)->adjustPayload(1);
236 size = (*it)->getPayload();
237 data = (*it)->begin();
239 data[0] = (appIndex_ & 0xF) << 4;
240 data[1] = (appIndex_ >> 4) & 0xFF;
242 data[2] = (segment >> 0) & 0xFF;
243 data[3] = (segment >> 8) & 0xFF;
244 data[4] = (segment >> 16) & 0xFF;
250 data[size - 1] = lUser & 0x7F;
252 if (it == (frame->endBuffer() - 1)) data[size - 1] |= 0x80;
254 tFrame->appendBuffer(*it);
255 tranQueue_.push(tFrame);