62 appQueue_.setThold(2);
74 gettimeofday(&stTime_, NULL);
81 gettimeofday(&txTime_, NULL);
84 locMaxSegment_ = segSize;
92 curMaxSegment_ = segSize;
99 locConnId_ = 0x12345678;
102 convTime(tryPeriodD1_, locTryPeriod_);
103 convTime(tryPeriodD4_, locTryPeriod_ / 4);
104 convTime(retranToutD1_, curRetranTout_);
105 convTime(nullToutD3_, curNullTout_ / 3);
106 convTime(cumAckToutD1_, curCumAckTout_);
107 convTime(cumAckToutD2_, curCumAckTout_ / 2);
109 memset(&zeroTme_, 0,
sizeof(
struct timeval));
122rpr::Controller::~Controller() {
127void rpr::Controller::stopQueue() {
132void rpr::Controller::stop() {
133 if (thread_ != NULL) {
144void rpr::Controller::start() {
145 if (thread_ == NULL) {
148 thread_ =
new std::thread(&rpr::Controller::runThread,
this);
152 pthread_setname_np(thread_->native_handle(),
"RssiControler");
166 nSize = size + rpr::Header::HeaderSize;
167 if (nSize > curMaxSegment_ && curMaxSegment_ > 0) nSize = curMaxSegment_;
168 if (nSize > locMaxSegment_) nSize = locMaxSegment_;
171 frame = tran_->reqFrame(nSize,
false);
172 buffer = *(frame->beginBuffer());
175 if (buffer->getAvailable() < rpr::Header::HeaderSize)
177 "Buffer size %" PRId32
" is less than min header size %" PRIu32,
178 rpr::Header::HeaderSize,
179 buffer->getAvailable()));
182 buffer->adjustHeader(rpr::Header::HeaderSize);
185 frame = ris::Frame::create();
186 frame->appendBuffer(buffer);
194 std::map<uint8_t, rpr::HeaderPtr>::iterator it;
201 if (frame->getError() || frame->isEmpty() || !head->verify()) {
202 log_->warning(
"Dumping bad frame state=%" PRIu32
" server=%" PRIu32, state_, server_);
207 log_->debug(
"RX frame: state=%" PRIu32
" server=%" PRIu32
" size=%" PRIu32
" syn=%" PRIu32
" ack=%" PRIu32
208 " nul=%" PRIu32
", bst=%" PRIu32
", rst=%" PRIu32
", ack#=%" PRIu32
" seq=%" PRIu32
", nxt=%" PRIu32,
222 if (head->ack && (head->acknowledge != lastAckRx_)) {
223 std::unique_lock<std::mutex> lock(txMtx_);
226 txList_[++lastAckRx_].reset();
227 if (txListCount_ != 0) txListCount_--;
228 }
while (lastAckRx_ != head->acknowledge);
232 if (!remBusy_ && head->busy) remBusyCnt_++;
235 remBusy_ = head->busy;
239 if (state_ == StOpen || state_ == StWaitSyn) {
245 }
else if (head->syn) {
246 if (state_ == StOpen || state_ == StWaitSyn) {
247 lastSeqRx_ = head->sequence;
248 nextSeqRx_ = lastSeqRx_ + 1;
253 }
else if (state_ == StOpen && (head->nul || frame->getPayload() > rpr::Header::HeaderSize)) {
254 if (head->sequence == nextSeqRx_) {
258 lastSeqRx_ = nextSeqRx_;
259 nextSeqRx_ = nextSeqRx_ + 1;
260 appQueue_.push(head);
263 if (!oooQueue_.empty()) {
265 if ((it = oooQueue_.find(head->sequence)) != oooQueue_.end()) {
266 log_->warning(
"Removed duplicate frame. server=%" PRIu8
", head->sequence=%" PRIu32
267 ", next sequence=%" PRIu32,
278 while ((it = oooQueue_.find(nextSeqRx_)) != oooQueue_.end()) {
279 lastSeqRx_ = nextSeqRx_;
280 nextSeqRx_ = nextSeqRx_ + 1;
282 appQueue_.push(it->second);
283 log_->info(
"Using frame from ooo queue. server=%" PRIu8
", head->sequence=%" PRIu32,
285 (it->second)->sequence);
291 stCond_.notify_all();
294 }
else if ((it = oooQueue_.find(head->sequence)) != oooQueue_.end()) {
295 log_->warning(
"Dropped duplicate frame. server=%" PRIu8
", head->sequence=%" PRIu32
296 ", next sequence=%" PRIu32,
306 uint8_t x = nextSeqRx_;
307 uint8_t windowEnd = (nextSeqRx_ + curMaxBuffers_ + 1);
309 while (++x != windowEnd) {
310 if (head->sequence == x) {
311 oooQueue_.insert(std::make_pair(head->sequence, head));
312 log_->info(
"Adding frame to ooo queue. server=%" PRIu8
", head->sequence=%" PRIu32
313 ", nextSeqRx_=% " PRIu8
", windowsEnd=%" PRIu32,
322 if (x == windowEnd) {
323 log_->warning(
"Dropping out of window frame. server=%" PRIu8
", head->sequence=%" PRIu32
324 ", nextSeqRx_=%" PRIu8
", windowsEnd=%" PRIu32,
344 if ((head = appQueue_.pop()) == NULL)
return (frame);
345 stCond_.notify_all();
347 frame = head->getFrame();
350 ackSeqRx_ = head->sequence;
357 (*(frame->beginBuffer()))->adjustHeader(rpr::Header::HeaderSize);
367 struct timeval startTime;
369 gettimeofday(&startTime, NULL);
374 if (frame->isEmpty()) {
375 log_->warning(
"Dumping empty application frame");
379 if (frame->getError()) {
380 log_->warning(
"Dumping errored frame");
385 (*(frame->beginBuffer()))->adjustHeader(-rpr::Header::HeaderSize);
393 if (state_ != StOpen) {
398 while (txListCount_ >= curMaxBuffers_) {
400 if (timePassed(startTime, timeout_)) {
401 gettimeofday(&startTime, NULL);
402 log_->critical(
"Controller::applicationRx: Timeout waiting for outbound queue after %" PRIu32
".%" PRIu32
403 " seconds! May be caused by outbound backpressure.",
410 transportTx(head,
true,
false);
411 stCond_.notify_all();
415bool rpr::Controller::getOpen() {
416 return (state_ == StOpen);
420uint32_t rpr::Controller::getDownCount() {
425uint32_t rpr::Controller::getDropCount() {
430uint32_t rpr::Controller::getRetranCount() {
431 return (retranCount_);
435bool rpr::Controller::getLocBusy() {
436 bool queueBusy = appQueue_.busy();
437 if (!locBusy_ && queueBusy) locBusyCnt_++;
438 locBusy_ = queueBusy;
443uint32_t rpr::Controller::getLocBusyCnt() {
444 return (locBusyCnt_);
448bool rpr::Controller::getRemBusy() {
453uint32_t rpr::Controller::getRemBusyCnt() {
454 return (remBusyCnt_);
457void rpr::Controller::setLocTryPeriod(uint32_t val) {
460 "Invalid LocTryPeriod Value = %" PRIu32,
463 convTime(tryPeriodD1_, locTryPeriod_);
464 convTime(tryPeriodD4_, locTryPeriod_ / 4);
467uint32_t rpr::Controller::getLocTryPeriod() {
468 return locTryPeriod_;
471void rpr::Controller::setLocMaxBuffers(uint8_t val) {
474 "Invalid LocMaxBuffers Value = %" PRIu8,
477 locMaxBuffers_ = val;
480uint8_t rpr::Controller::getLocMaxBuffers() {
481 return locMaxBuffers_;
484void rpr::Controller::setLocMaxSegment(uint16_t val) {
487 "Invalid LocMaxSegment Value = %" PRIu16,
489 locMaxSegment_ = val;
492uint16_t rpr::Controller::getLocMaxSegment() {
493 return locMaxSegment_;
496void rpr::Controller::setLocCumAckTout(uint16_t val) {
499 "Invalid LocCumAckTout Value = %" PRIu16,
501 locCumAckTout_ = val;
504uint16_t rpr::Controller::getLocCumAckTout() {
505 return locCumAckTout_;
508void rpr::Controller::setLocRetranTout(uint16_t val) {
511 "Invalid LocRetranTout Value = %" PRIu16,
513 locRetranTout_ = val;
516uint16_t rpr::Controller::getLocRetranTout() {
517 return locRetranTout_;
520void rpr::Controller::setLocNullTout(uint16_t val) {
523 "Invalid LocNullTout Value = %" PRIu16,
528uint16_t rpr::Controller::getLocNullTout() {
532void rpr::Controller::setLocMaxRetran(uint8_t val) {
535 "Invalid LocMaxRetran Value = %" PRIu8,
540uint8_t rpr::Controller::getLocMaxRetran() {
541 return locMaxRetran_;
544void rpr::Controller::setLocMaxCumAck(uint8_t val) {
550uint8_t rpr::Controller::getLocMaxCumAck() {
551 return locMaxCumAck_;
554uint8_t rpr::Controller::curMaxBuffers() {
555 return curMaxBuffers_;
558uint16_t rpr::Controller::curMaxSegment() {
559 return curMaxSegment_;
562uint16_t rpr::Controller::curCumAckTout() {
563 return curCumAckTout_;
566uint16_t rpr::Controller::curRetranTout() {
567 return curRetranTout_;
570uint16_t rpr::Controller::curNullTout() {
574uint8_t rpr::Controller::curMaxRetran() {
575 return curMaxRetran_;
578uint8_t rpr::Controller::curMaxCumAck() {
579 return curMaxCumAck_;
582void rpr::Controller::resetCounters() {
591void rpr::Controller::transportTx(
rpr::HeaderPtr head,
bool seqUpdate,
bool txReset) {
592 std::unique_lock<std::mutex> lock(txMtx_);
594 head->sequence = locSequence_;
598 txList_[locSequence_] = head;
605 for (uint32_t x = 0; x < 256; x++) txList_[x].reset();
610 head->acknowledge = lastAckTx_;
613 head->acknowledge = ackSeqRx_;
614 lastAckTx_ = ackSeqRx_;
619 gettimeofday(&txTime_, NULL);
625 "TX frame: state=%" PRIu32
" server=%" PRIu8
" size=%" PRIu32
" syn=%" PRIu8
" ack=%" PRIu8
" nul=%" PRIu8
626 ", bsy=%" PRIu8
", rst=%" PRIu8
", ack#=%" PRIu8
", seq=%" PRIu8
", recount=%" PRIu32
", ptr=%" PRIu32,
629 head->getFrame()->getPayload(),
638 head->getFrame().get());
644 tran_->sendFrame(head->getFrame());
648int8_t rpr::Controller::retransmit(uint8_t
id) {
649 std::unique_lock<std::mutex> lock(txMtx_);
652 if (head == NULL)
return 0;
655 if (!timePassed(head->getTime(), retranToutD1_))
return 0;
658 if (head->count() >= curMaxRetran_)
return -1;
663 head->acknowledge = lastAckTx_;
666 head->acknowledge = ackSeqRx_;
667 lastAckTx_ = ackSeqRx_;
672 gettimeofday(&txTime_, NULL);
675 "Retran frame: state=%" PRIu8
" server=%" PRIu8
" size=%" PRIu32
" syn=%" PRIu8
" ack=%" PRIu8
676 " nul=%" PRIu8
", rst=%" PRIu8
", ack#=%" PRIu8
", seq=%" PRIu8
", recount=%" PRIu32
", ptr=%" PRIu8,
679 head->getFrame()->getPayload(),
687 head->getFrame().get());
695 tran_->sendFrame(head->getFrame());
700void rpr::Controller::convTime(
struct timeval& tme, uint32_t rssiTime) {
701 float units = std::pow(10, -TimeoutUnit);
702 float value = units *
static_cast<float>(rssiTime);
704 uint32_t usec =
static_cast<uint32_t
>(value / 1e-6);
706 div_t divResult = div(usec, 1000000);
707 tme.tv_sec = divResult.quot;
708 tme.tv_usec = divResult.rem;
712bool rpr::Controller::timePassed(
struct timeval& lastTime,
struct timeval& tme) {
713 struct timeval endTime;
714 struct timeval currTime;
716 gettimeofday(&currTime, NULL);
717 timeradd(&lastTime, &tme, &endTime);
718 return (timercmp(&currTime, &endTime, >=));
722void rpr::Controller::runThread() {
731 if (wait.tv_sec != 0 || wait.tv_usec != 0) {
733 std::unique_lock<std::mutex> lock(stMtx_);
736 stCond_.wait_for(lock, std::chrono::microseconds(wait.tv_usec) + std::chrono::seconds(wait.tv_sec));
742 wait = stateClosedWait();
746 wait = stateSendSynAck();
750 wait = stateSendSeqAck();
770struct timeval& rpr::Controller::stateClosedWait() {
774 if (!stQueue_.empty()) {
775 head = stQueue_.pop();
780 log_->warning(
"Closing link. Server=%" PRIu8, server_);
783 }
else if (head->syn && (head->ack || server_)) {
784 curMaxBuffers_ = head->maxOutstandingSegments;
785 curMaxSegment_ = head->maxSegmentSize;
786 curCumAckTout_ = head->cumulativeAckTimeout;
787 curRetranTout_ = head->retransmissionTimeout;
788 curNullTout_ = head->nullTimeout;
789 curMaxRetran_ = head->maxRetransmissions;
790 curMaxCumAck_ = head->maxCumulativeAck;
791 lastAckRx_ = head->acknowledge;
794 convTime(retranToutD1_, curRetranTout_);
795 convTime(cumAckToutD1_, curCumAckTout_);
796 convTime(cumAckToutD2_, curCumAckTout_ / 2);
797 convTime(nullToutD3_, curNullTout_ / 3);
800 state_ = StSendSynAck;
803 state_ = StSendSeqAck;
805 gettimeofday(&stTime_, NULL);
809 curMaxBuffers_ = locMaxBuffers_;
810 curMaxSegment_ = locMaxSegment_;
811 curCumAckTout_ = locCumAckTout_;
812 curRetranTout_ = locRetranTout_;
813 curNullTout_ = locNullTout_;
814 curMaxRetran_ = locMaxRetran_;
815 curMaxCumAck_ = locMaxCumAck_;
819 }
else if ((!server_) && timePassed(stTime_, tryPeriodD1_)) {
821 head = rpr::Header::create(tran_->reqFrame(rpr::Header::SynSize,
false));
825 head->version = Version;
827 head->maxOutstandingSegments = locMaxBuffers_;
828 head->maxSegmentSize = locMaxSegment_;
829 head->retransmissionTimeout = locRetranTout_;
830 head->cumulativeAckTimeout = locCumAckTout_;
831 head->nullTimeout = locNullTout_;
832 head->maxRetransmissions = locMaxRetran_;
833 head->maxCumulativeAck = locMaxCumAck_;
834 head->timeoutUnit = TimeoutUnit;
835 head->connectionId = locConnId_;
837 transportTx(head,
true,
false);
840 gettimeofday(&stTime_, NULL);
842 }
else if (server_) {
846 return (tryPeriodD4_);
850struct timeval& rpr::Controller::stateSendSynAck() {
854 rpr::HeaderPtr head = rpr::Header::create(tran_->reqFrame(rpr::Header::SynSize,
false));
859 head->version = Version;
861 head->maxOutstandingSegments = curMaxBuffers_;
862 head->maxSegmentSize = curMaxSegment_;
863 head->retransmissionTimeout = curRetranTout_;
864 head->cumulativeAckTimeout = curCumAckTout_;
865 head->nullTimeout = curNullTout_;
866 head->maxRetransmissions = curMaxRetran_;
867 head->maxCumulativeAck = curMaxCumAck_;
868 head->timeoutUnit = TimeoutUnit;
869 head->connectionId = locConnId_;
871 transportTx(head,
true,
true);
874 log_->warning(
"State is open. Server=%" PRIu8, server_);
876 return (cumAckToutD2_);
880struct timeval& rpr::Controller::stateSendSeqAck() {
884 rpr::HeaderPtr ack = rpr::Header::create(tran_->reqFrame(rpr::Header::HeaderSize,
false));
889 ackSeqRx_ = lastSeqRx_;
891 transportTx(ack,
false,
true);
895 log_->warning(
"State is open. Server=%" PRIu8, server_);
896 return (cumAckToutD2_);
900struct timeval& rpr::Controller::stateOpen() {
905 struct timeval locTime;
908 while (!stQueue_.empty()) {
909 head = stQueue_.pop();
912 if ((head->rst) || (head->syn && (!head->ack))) {
914 gettimeofday(&stTime_, NULL);
921 std::unique_lock<std::mutex> lock(txMtx_);
923 ackPend = ackSeqRx_ - lastAckTx_;
927 if (timePassed(locTime, nullToutD3_))
933 if ((doNull || ((!getLocBusy()) && ackPend >= curMaxCumAck_) ||
934 ((ackPend > 0 || getLocBusy()) && timePassed(locTime, cumAckToutD1_)))) {
935 head = rpr::Header::create(tran_->reqFrame(rpr::Header::HeaderSize,
false));
938 transportTx(head, doNull,
false);
943 while ((!remBusy_) && (idx != locSequence_)) {
944 if (retransmit(idx++) < 0) {
946 gettimeofday(&stTime_, NULL);
951 return (cumAckToutD2_);
955struct timeval& rpr::Controller::stateError() {
959 log_->warning(
"Entering reset state. Server=%" PRIu8, server_);
961 rst = rpr::Header::create(tran_->reqFrame(rpr::Header::HeaderSize,
false));
964 transportTx(rst,
true,
true);
967 log_->warning(
"Entering closed state. Server=%" PRIu8, server_);
975 gettimeofday(&stTime_, NULL);
976 return (tryPeriodD1_);
980void rpr::Controller::setTimeout(uint32_t timeout) {
981 div_t divResult = div(timeout, 1000000);
982 timeout_.tv_sec = divResult.quot;
983 timeout_.tv_usec = divResult.rem;
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
static const uint32_t Debug
Debug severity level constant.
static const uint32_t Warning
Warning severity level constant.
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
void defaultTimeout(struct timeval &tout)
Returns Rogue default timeout as a timeval.