rogue
Loading...
Searching...
No Matches
Controller.cpp
Go to the documentation of this file.
1
18
19#include <inttypes.h>
20#include <sys/time.h>
21#include <unistd.h>
22
23#include <cmath>
24#include <cstdlib>
25#include <cstring>
26#include <map>
27#include <memory>
28#include <utility>
29
30#include "rogue/GeneralError.h"
31#include "rogue/GilRelease.h"
32#include "rogue/Helpers.h"
33#include "rogue/Logging.h"
40
41namespace rpr = rogue::protocols::rssi;
43
45rpr::ControllerPtr rpr::Controller::create(uint32_t segSize,
48 bool server) {
49 rpr::ControllerPtr r = std::make_shared<rpr::Controller>(segSize, tran, app, server);
50 return (r);
51}
52
54rpr::Controller::Controller(uint32_t segSize, rpr::TransportPtr tran, rpr::ApplicationPtr app, bool server) {
55 app_ = app;
56 tran_ = tran;
57 server_ = server;
58
59 locTryPeriod_ = 100;
60
61 // Busy after two entries
62 appQueue_.setThold(2);
63
64 dropCount_ = 0;
65 nextSeqRx_ = 0;
66 lastAckRx_ = 0;
67 locBusy_ = false;
68 remBusy_ = false;
69
70 lastSeqRx_ = 0;
71 ackSeqRx_ = 0;
72
73 state_ = StClosed;
74 gettimeofday(&stTime_, NULL);
75 downCount_ = 0;
76 retranCount_ = 0;
77
78 txListCount_ = 0;
79 lastAckTx_ = 0;
80 locSequence_ = 100;
81 gettimeofday(&txTime_, NULL);
82
83 locMaxBuffers_ = 32; // MAX_NUM_OUTS_SEG_G in FW
84 locMaxSegment_ = segSize;
85 locCumAckTout_ = 5; // ACK_TOUT_G in FW, 5mS
86 locRetranTout_ = 20; // RETRANS_TOUT_G in FW, 2hmS
87 locNullTout_ = 1000; // NULL_TOUT_G in FW, 1S
88 locMaxRetran_ = 15; // MAX_RETRANS_CNT_G in FW
89 locMaxCumAck_ = 2; // MAX_CUM_ACK_CNT_G in FW
90
91 curMaxBuffers_ = 32; // MAX_NUM_OUTS_SEG_G in FW
92 curMaxSegment_ = segSize;
93 curCumAckTout_ = 5; // ACK_TOUT_G in FW, 5mS
94 curRetranTout_ = 20; // RETRANS_TOUT_G in FW, 2hmS
95 curNullTout_ = 1000; // NULL_TOUT_G in FW, 1S
96 curMaxRetran_ = 15; // MAX_RETRANS_CNT_G in FW
97 curMaxCumAck_ = 2; // MAX_CUM_ACK_CNT_G in FW
98
99 locConnId_ = 0x12345678;
100 remConnId_ = 0;
101
102 convTime(tryPeriodD1_, locTryPeriod_);
103 convTime(tryPeriodD4_, locTryPeriod_ / 4);
104 convTime(retranToutD1_, curRetranTout_);
105 convTime(nullToutD3_, curNullTout_ / 3);
106 convTime(cumAckToutD1_, curCumAckTout_);
107 convTime(cumAckToutD2_, curCumAckTout_ / 2);
108
109 memset(&zeroTme_, 0, sizeof(struct timeval));
110
111 rogue::defaultTimeout(timeout_);
112
113 locBusyCnt_ = 0;
114 remBusyCnt_ = 0;
115
116 log_ = rogue::Logging::create("rssi.controller");
117
118 thread_ = NULL;
119}
120
122rpr::Controller::~Controller() {
123 stop();
124}
125
127void rpr::Controller::stopQueue() {
128 appQueue_.stop();
129}
130
132void rpr::Controller::stop() {
133 if (thread_ != NULL) {
134 rogue::GilRelease noGil;
135 threadEn_ = false;
136 thread_->join();
137 delete thread_;
138 thread_ = NULL;
139 state_ = StClosed;
140 }
141}
142
144void rpr::Controller::start() {
145 if (thread_ == NULL) {
146 state_ = StClosed;
147 threadEn_ = true;
148 thread_ = new std::thread(&rpr::Controller::runThread, this);
149
150 // Set a thread name
151#ifndef __MACH__
152 pthread_setname_np(thread_->native_handle(), "RssiControler");
153#endif
154 }
155}
156
158ris::FramePtr rpr::Controller::reqFrame(uint32_t size) {
159 ris::FramePtr frame;
160 ris::BufferPtr buffer;
161 uint32_t nSize;
162
163 // Request only single buffer frames.
164 // Frame size returned is never greater than remote max size
165 // or local segment size
166 nSize = size + rpr::Header::HeaderSize;
167 if (nSize > curMaxSegment_ && curMaxSegment_ > 0) nSize = curMaxSegment_;
168 if (nSize > locMaxSegment_) nSize = locMaxSegment_;
169
170 // Forward frame request to transport slave
171 frame = tran_->reqFrame(nSize, false);
172 buffer = *(frame->beginBuffer());
173
174 // Make sure there is enough room the buffer for our header
175 if (buffer->getAvailable() < rpr::Header::HeaderSize)
176 throw(rogue::GeneralError::create("rssi::Controller::reqFrame",
177 "Buffer size %" PRId32 " is less than min header size %" PRIu32,
178 rpr::Header::HeaderSize,
179 buffer->getAvailable()));
180
181 // Update buffer to include our header space.
182 buffer->adjustHeader(rpr::Header::HeaderSize);
183
184 // Recreate frame to ensure outbound only has a single buffer
185 frame = ris::Frame::create();
186 frame->appendBuffer(buffer);
187
188 // Return frame
189 return (frame);
190}
191
193void rpr::Controller::transportRx(ris::FramePtr frame) {
194 std::map<uint8_t, rpr::HeaderPtr>::iterator it;
195
196 rpr::HeaderPtr head = rpr::Header::create(frame);
197
198 rogue::GilRelease noGil;
199 ris::FrameLockPtr flock = frame->lock();
200
201 if (frame->getError() || frame->isEmpty() || !head->verify()) {
202 log_->warning("Dumping bad frame state=%" PRIu32 " server=%" PRIu32, state_, server_);
203 dropCount_++;
204 return;
205 }
206
207 log_->debug("RX frame: state=%" PRIu32 " server=%" PRIu32 " size=%" PRIu32 " syn=%" PRIu32 " ack=%" PRIu32
208 " nul=%" PRIu32 ", bst=%" PRIu32 ", rst=%" PRIu32 ", ack#=%" PRIu32 " seq=%" PRIu32 ", nxt=%" PRIu32,
209 state_,
210 server_,
211 frame->getPayload(),
212 head->syn,
213 head->ack,
214 head->nul,
215 head->busy,
216 head->rst,
217 head->acknowledge,
218 head->sequence,
219 nextSeqRx_);
220
221 // Ack set
222 if (head->ack && (head->acknowledge != lastAckRx_)) {
223 std::unique_lock<std::mutex> lock(txMtx_);
224
225 do {
226 txList_[++lastAckRx_].reset();
227 if (txListCount_ != 0) txListCount_--;
228 } while (lastAckRx_ != head->acknowledge);
229 }
230
231 // Check for busy state transition
232 if (!remBusy_ && head->busy) remBusyCnt_++;
233
234 // Update busy bit
235 remBusy_ = head->busy;
236
237 // Reset
238 if (head->rst) {
239 if (state_ == StOpen || state_ == StWaitSyn) {
240 stQueue_.push(head);
241 }
242
243 // Syn frame goes to state machine if state = open
244 // or we are waiting for ack replay
245 } else if (head->syn) {
246 if (state_ == StOpen || state_ == StWaitSyn) {
247 lastSeqRx_ = head->sequence;
248 nextSeqRx_ = lastSeqRx_ + 1;
249 stQueue_.push(head);
250 }
251
252 // Data or NULL in the correct sequence go to application
253 } else if (state_ == StOpen && (head->nul || frame->getPayload() > rpr::Header::HeaderSize)) {
254 if (head->sequence == nextSeqRx_) {
255 // log_->warning("Data or NULL in the correct sequence go to application: nextSeqRx_=0x%" PRIx8,
256 // nextSeqRx_);
257
258 lastSeqRx_ = nextSeqRx_;
259 nextSeqRx_ = nextSeqRx_ + 1;
260 appQueue_.push(head);
261
262 // There are elements in ooo (out-of-order) queue
263 if (!oooQueue_.empty()) {
264 // First remove received sequence number from queue to avoid duplicates
265 if ((it = oooQueue_.find(head->sequence)) != oooQueue_.end()) {
266 log_->warning("Removed duplicate frame. server=%" PRIu8 ", head->sequence=%" PRIu32
267 ", next sequence=%" PRIu32,
268 server_,
269 head->sequence,
270 nextSeqRx_);
271 dropCount_++;
272 oooQueue_.erase(it);
273 }
274
275 // Get next entries from ooo (out-of-order) queue if they exist
276 // This works because max outstanding will never be the full range of ids
277 // otherwise this could be stale data from previous ids
278 while ((it = oooQueue_.find(nextSeqRx_)) != oooQueue_.end()) {
279 lastSeqRx_ = nextSeqRx_;
280 nextSeqRx_ = nextSeqRx_ + 1;
281
282 appQueue_.push(it->second);
283 log_->info("Using frame from ooo queue. server=%" PRIu8 ", head->sequence=%" PRIu32,
284 server_,
285 (it->second)->sequence);
286 oooQueue_.erase(it);
287 }
288 }
289
290 // Notify after the last sequence update
291 stCond_.notify_all();
292
293 // Check if received frame is already in out of order queue
294 } else if ((it = oooQueue_.find(head->sequence)) != oooQueue_.end()) {
295 log_->warning("Dropped duplicate frame. server=%" PRIu8 ", head->sequence=%" PRIu32
296 ", next sequence=%" PRIu32,
297 server_,
298 head->sequence,
299 nextSeqRx_);
300 dropCount_++;
301
302 // Add to out of order queue in case things arrive out of order
303 // Make sure received sequence is in window. There may be a better way
304 // to do this while handling the 8 bit rollover
305 } else {
306 uint8_t x = nextSeqRx_;
307 uint8_t windowEnd = (nextSeqRx_ + curMaxBuffers_ + 1);
308
309 while (++x != windowEnd) {
310 if (head->sequence == x) {
311 oooQueue_.insert(std::make_pair(head->sequence, head));
312 log_->info("Adding frame to ooo queue. server=%" PRIu8 ", head->sequence=%" PRIu32
313 ", nextSeqRx_=% " PRIu8 ", windowsEnd=%" PRIu32,
314 server_,
315 head->sequence,
316 nextSeqRx_,
317 windowEnd);
318 break;
319 }
320 }
321
322 if (x == windowEnd) {
323 log_->warning("Dropping out of window frame. server=%" PRIu8 ", head->sequence=%" PRIu32
324 ", nextSeqRx_=%" PRIu8 ", windowsEnd=%" PRIu32,
325 server_,
326 head->sequence,
327 nextSeqRx_,
328 windowEnd);
329 dropCount_++;
330 }
331 }
332 }
333}
334
336// Called by application class thread
337ris::FramePtr rpr::Controller::applicationTx() {
338 ris::FramePtr frame;
339 rpr::HeaderPtr head;
340
341 rogue::GilRelease noGil;
342
343 do {
344 if ((head = appQueue_.pop()) == NULL) return (frame);
345 stCond_.notify_all();
346
347 frame = head->getFrame();
348 ris::FrameLockPtr flock = frame->lock();
349
350 ackSeqRx_ = head->sequence;
351
352 // Drop NULL frames
353 if (head->nul) {
354 head.reset();
355 frame.reset();
356 } else {
357 (*(frame->beginBuffer()))->adjustHeader(rpr::Header::HeaderSize);
358 }
359 } while (!frame);
360
361 return (frame);
362}
363
365void rpr::Controller::applicationRx(ris::FramePtr frame) {
366 ris::FramePtr tranFrame;
367 struct timeval startTime;
368
369 gettimeofday(&startTime, NULL);
370
371 rogue::GilRelease noGil;
372 ris::FrameLockPtr flock = frame->lock();
373
374 if (frame->isEmpty()) {
375 log_->warning("Dumping empty application frame");
376 return;
377 }
378
379 if (frame->getError()) {
380 log_->warning("Dumping errored frame");
381 return;
382 }
383
384 // Adjust header in first buffer
385 (*(frame->beginBuffer()))->adjustHeader(-rpr::Header::HeaderSize);
386
387 // Map to RSSI
388 rpr::HeaderPtr head = rpr::Header::create(frame);
389 head->ack = true;
390 flock->unlock();
391
392 // Connection is closed
393 if (state_ != StOpen) {
394 return;
395 }
396
397 // Wait while busy either by flow control or buffer starvation
398 while (txListCount_ >= curMaxBuffers_) {
399 usleep(10);
400 if (timePassed(startTime, timeout_)) {
401 gettimeofday(&startTime, NULL);
402 log_->critical("Controller::applicationRx: Timeout waiting for outbound queue after %" PRIu32 ".%" PRIu32
403 " seconds! May be caused by outbound backpressure.",
404 timeout_.tv_sec,
405 timeout_.tv_usec);
406 }
407 }
408
409 // Transmit
410 transportTx(head, true, false);
411 stCond_.notify_all();
412}
413
415bool rpr::Controller::getOpen() {
416 return (state_ == StOpen);
417}
418
420uint32_t rpr::Controller::getDownCount() {
421 return (downCount_);
422}
423
425uint32_t rpr::Controller::getDropCount() {
426 return (dropCount_);
427}
428
430uint32_t rpr::Controller::getRetranCount() {
431 return (retranCount_);
432}
433
435bool rpr::Controller::getLocBusy() {
436 bool queueBusy = appQueue_.busy();
437 if (!locBusy_ && queueBusy) locBusyCnt_++;
438 locBusy_ = queueBusy;
439 return (locBusy_);
440}
441
443uint32_t rpr::Controller::getLocBusyCnt() {
444 return (locBusyCnt_);
445}
446
448bool rpr::Controller::getRemBusy() {
449 return (remBusy_);
450}
451
453uint32_t rpr::Controller::getRemBusyCnt() {
454 return (remBusyCnt_);
455}
456
457void rpr::Controller::setLocTryPeriod(uint32_t val) {
458 if (val == 0)
459 throw rogue::GeneralError::create("Rssi::Controller::setLocTryPeriod",
460 "Invalid LocTryPeriod Value = %" PRIu32,
461 val);
462 locTryPeriod_ = val;
463 convTime(tryPeriodD1_, locTryPeriod_);
464 convTime(tryPeriodD4_, locTryPeriod_ / 4);
465}
466
467uint32_t rpr::Controller::getLocTryPeriod() {
468 return locTryPeriod_;
469}
470
471void rpr::Controller::setLocMaxBuffers(uint8_t val) {
472 if (val == 0)
473 throw rogue::GeneralError::create("Rssi::Controller::setLocMaxBuffers",
474 "Invalid LocMaxBuffers Value = %" PRIu8,
475 val);
476
477 locMaxBuffers_ = val;
478}
479
480uint8_t rpr::Controller::getLocMaxBuffers() {
481 return locMaxBuffers_;
482}
483
484void rpr::Controller::setLocMaxSegment(uint16_t val) {
485 if (val == 0)
486 throw rogue::GeneralError::create("Rssi::Controller::setLocMaxSegment",
487 "Invalid LocMaxSegment Value = %" PRIu16,
488 val);
489 locMaxSegment_ = val;
490}
491
492uint16_t rpr::Controller::getLocMaxSegment() {
493 return locMaxSegment_;
494}
495
496void rpr::Controller::setLocCumAckTout(uint16_t val) {
497 if (val == 0)
498 throw rogue::GeneralError::create("Rssi::Controller::setLocCumAckTout",
499 "Invalid LocCumAckTout Value = %" PRIu16,
500 val);
501 locCumAckTout_ = val;
502}
503
504uint16_t rpr::Controller::getLocCumAckTout() {
505 return locCumAckTout_;
506}
507
508void rpr::Controller::setLocRetranTout(uint16_t val) {
509 if (val == 0)
510 throw rogue::GeneralError::create("Rssi::Controller::setLocRetranTout",
511 "Invalid LocRetranTout Value = %" PRIu16,
512 val);
513 locRetranTout_ = val;
514}
515
516uint16_t rpr::Controller::getLocRetranTout() {
517 return locRetranTout_;
518}
519
520void rpr::Controller::setLocNullTout(uint16_t val) {
521 if (val == 0)
522 throw rogue::GeneralError::create("Rssi::Controller::setLocNullTout",
523 "Invalid LocNullTout Value = %" PRIu16,
524 val);
525 locNullTout_ = val;
526}
527
528uint16_t rpr::Controller::getLocNullTout() {
529 return locNullTout_;
530}
531
532void rpr::Controller::setLocMaxRetran(uint8_t val) {
533 if (val == 0)
534 throw rogue::GeneralError::create("Rssi::Controller::setLocMaxRetran",
535 "Invalid LocMaxRetran Value = %" PRIu8,
536 val);
537 locMaxRetran_ = val;
538}
539
540uint8_t rpr::Controller::getLocMaxRetran() {
541 return locMaxRetran_;
542}
543
544void rpr::Controller::setLocMaxCumAck(uint8_t val) {
545 if (val == 0)
546 throw rogue::GeneralError::create("Rssi::Controller::setLocMaxAck", "Invalid LocMaxAck Value = %" PRIu8, val);
547 locMaxCumAck_ = val;
548}
549
550uint8_t rpr::Controller::getLocMaxCumAck() {
551 return locMaxCumAck_;
552}
553
554uint8_t rpr::Controller::curMaxBuffers() {
555 return curMaxBuffers_;
556}
557
558uint16_t rpr::Controller::curMaxSegment() {
559 return curMaxSegment_;
560}
561
562uint16_t rpr::Controller::curCumAckTout() {
563 return curCumAckTout_;
564}
565
566uint16_t rpr::Controller::curRetranTout() {
567 return curRetranTout_;
568}
569
570uint16_t rpr::Controller::curNullTout() {
571 return curNullTout_;
572}
573
574uint8_t rpr::Controller::curMaxRetran() {
575 return curMaxRetran_;
576}
577
578uint8_t rpr::Controller::curMaxCumAck() {
579 return curMaxCumAck_;
580}
581
582void rpr::Controller::resetCounters() {
583 dropCount_ = 0;
584 downCount_ = 0;
585 retranCount_ = 0;
586 locBusyCnt_ = 0;
587 remBusyCnt_ = 0;
588}
589
590// Method to transit a frame with proper updates
591void rpr::Controller::transportTx(rpr::HeaderPtr head, bool seqUpdate, bool txReset) {
592 std::unique_lock<std::mutex> lock(txMtx_);
593
594 head->sequence = locSequence_;
595
596 // Update sequence numbers
597 if (seqUpdate) {
598 txList_[locSequence_] = head;
599 txListCount_++;
600 locSequence_++;
601 }
602
603 // Reset tx list
604 if (txReset) {
605 for (uint32_t x = 0; x < 256; x++) txList_[x].reset();
606 txListCount_ = 0;
607 }
608
609 if (getLocBusy()) {
610 head->acknowledge = lastAckTx_;
611 head->busy = true;
612 } else {
613 head->acknowledge = ackSeqRx_;
614 lastAckTx_ = ackSeqRx_;
615 head->busy = false;
616 }
617
618 // Track last tx time
619 gettimeofday(&txTime_, NULL);
620
621 ris::FrameLockPtr flock = head->getFrame()->lock();
622 head->update();
623
624 log_->log(rogue::Logging::Debug,
625 "TX frame: state=%" PRIu32 " server=%" PRIu8 " size=%" PRIu32 " syn=%" PRIu8 " ack=%" PRIu8 " nul=%" PRIu8
626 ", bsy=%" PRIu8 ", rst=%" PRIu8 ", ack#=%" PRIu8 ", seq=%" PRIu8 ", recount=%" PRIu32 ", ptr=%" PRIu32,
627 state_,
628 server_,
629 head->getFrame()->getPayload(),
630 head->syn,
631 head->ack,
632 head->nul,
633 head->busy,
634 head->rst,
635 head->acknowledge,
636 head->sequence,
637 retranCount_,
638 head->getFrame().get());
639
640 flock->unlock();
641 lock.unlock();
642
643 // Send frame
644 tran_->sendFrame(head->getFrame());
645}
646
647// Method to retransmit a frame
648int8_t rpr::Controller::retransmit(uint8_t id) {
649 std::unique_lock<std::mutex> lock(txMtx_);
650
651 rpr::HeaderPtr head = txList_[id];
652 if (head == NULL) return 0;
653
654 // retransmit timer has not expired
655 if (!timePassed(head->getTime(), retranToutD1_)) return 0;
656
657 // max retransmission count has been reached
658 if (head->count() >= curMaxRetran_) return -1;
659
660 retranCount_++;
661
662 if (getLocBusy()) {
663 head->acknowledge = lastAckTx_;
664 head->busy = true;
665 } else {
666 head->acknowledge = ackSeqRx_;
667 lastAckTx_ = ackSeqRx_;
668 head->busy = false;
669 }
670
671 // Track last tx time
672 gettimeofday(&txTime_, NULL);
673
674 log_->log(rogue::Logging::Warning,
675 "Retran frame: state=%" PRIu8 " server=%" PRIu8 " size=%" PRIu32 " syn=%" PRIu8 " ack=%" PRIu8
676 " nul=%" PRIu8 ", rst=%" PRIu8 ", ack#=%" PRIu8 ", seq=%" PRIu8 ", recount=%" PRIu32 ", ptr=%" PRIu8,
677 state_,
678 server_,
679 head->getFrame()->getPayload(),
680 head->syn,
681 head->ack,
682 head->nul,
683 head->rst,
684 head->acknowledge,
685 head->sequence,
686 retranCount_,
687 head->getFrame().get());
688
689 ris::FrameLockPtr flock = head->getFrame()->lock();
690 head->update();
691 flock->unlock();
692 lock.unlock();
693
694 // Send frame
695 tran_->sendFrame(head->getFrame());
696 return 1;
697}
698
700void rpr::Controller::convTime(struct timeval& tme, uint32_t rssiTime) {
701 float units = std::pow(10, -TimeoutUnit);
702 float value = units * static_cast<float>(rssiTime);
703
704 uint32_t usec = static_cast<uint32_t>(value / 1e-6);
705
706 div_t divResult = div(usec, 1000000);
707 tme.tv_sec = divResult.quot;
708 tme.tv_usec = divResult.rem;
709}
710
712bool rpr::Controller::timePassed(struct timeval& lastTime, struct timeval& tme) {
713 struct timeval endTime;
714 struct timeval currTime;
715
716 gettimeofday(&currTime, NULL);
717 timeradd(&lastTime, &tme, &endTime);
718 return (timercmp(&currTime, &endTime, >=));
719}
720
722void rpr::Controller::runThread() {
723 struct timeval wait;
724
725 log_->logThreadId();
726
727 wait = zeroTme_;
728
729 while (threadEn_) {
730 // Lock context
731 if (wait.tv_sec != 0 || wait.tv_usec != 0) {
732 // Wait on condition or timeout
733 std::unique_lock<std::mutex> lock(stMtx_);
734
735 // Adjustable wait
736 stCond_.wait_for(lock, std::chrono::microseconds(wait.tv_usec) + std::chrono::seconds(wait.tv_sec));
737 }
738
739 switch (state_) {
740 case StClosed:
741 case StWaitSyn:
742 wait = stateClosedWait();
743 break;
744
745 case StSendSynAck:
746 wait = stateSendSynAck();
747 break;
748
749 case StSendSeqAck:
750 wait = stateSendSeqAck();
751 break;
752
753 case StOpen:
754 wait = stateOpen();
755 break;
756
757 case StError:
758 wait = stateError();
759 break;
760 default:
761 break;
762 }
763 }
764
765 // Send reset on exit
766 stateError();
767}
768
770struct timeval& rpr::Controller::stateClosedWait() {
771 rpr::HeaderPtr head;
772
773 // got syn or reset
774 if (!stQueue_.empty()) {
775 head = stQueue_.pop();
776
777 // Reset
778 if (head->rst) {
779 state_ = StClosed;
780 log_->warning("Closing link. Server=%" PRIu8, server_);
781
782 // Syn ack
783 } else if (head->syn && (head->ack || server_)) {
784 curMaxBuffers_ = head->maxOutstandingSegments;
785 curMaxSegment_ = head->maxSegmentSize;
786 curCumAckTout_ = head->cumulativeAckTimeout;
787 curRetranTout_ = head->retransmissionTimeout;
788 curNullTout_ = head->nullTimeout;
789 curMaxRetran_ = head->maxRetransmissions;
790 curMaxCumAck_ = head->maxCumulativeAck;
791 lastAckRx_ = head->acknowledge;
792
793 // Convert times
794 convTime(retranToutD1_, curRetranTout_);
795 convTime(cumAckToutD1_, curCumAckTout_);
796 convTime(cumAckToutD2_, curCumAckTout_ / 2);
797 convTime(nullToutD3_, curNullTout_ / 3);
798
799 if (server_) {
800 state_ = StSendSynAck;
801 return (zeroTme_);
802 } else {
803 state_ = StSendSeqAck;
804 }
805 gettimeofday(&stTime_, NULL);
806
807 // reset counters
808 } else {
809 curMaxBuffers_ = locMaxBuffers_;
810 curMaxSegment_ = locMaxSegment_;
811 curCumAckTout_ = locCumAckTout_;
812 curRetranTout_ = locRetranTout_;
813 curNullTout_ = locNullTout_;
814 curMaxRetran_ = locMaxRetran_;
815 curMaxCumAck_ = locMaxCumAck_;
816 }
817
818 // Generate syn after try period passes
819 } else if ((!server_) && timePassed(stTime_, tryPeriodD1_)) {
820 // Allocate frame
821 head = rpr::Header::create(tran_->reqFrame(rpr::Header::SynSize, false));
822
823 // Set frame
824 head->syn = true;
825 head->version = Version;
826 head->chk = true;
827 head->maxOutstandingSegments = locMaxBuffers_;
828 head->maxSegmentSize = locMaxSegment_;
829 head->retransmissionTimeout = locRetranTout_;
830 head->cumulativeAckTimeout = locCumAckTout_;
831 head->nullTimeout = locNullTout_;
832 head->maxRetransmissions = locMaxRetran_;
833 head->maxCumulativeAck = locMaxCumAck_;
834 head->timeoutUnit = TimeoutUnit;
835 head->connectionId = locConnId_;
836
837 transportTx(head, true, false);
838
839 // Update state
840 gettimeofday(&stTime_, NULL);
841 state_ = StWaitSyn;
842 } else if (server_) {
843 state_ = StWaitSyn;
844 }
845
846 return (tryPeriodD4_);
847}
848
850struct timeval& rpr::Controller::stateSendSynAck() {
851 uint32_t x;
852
853 // Allocate frame
854 rpr::HeaderPtr head = rpr::Header::create(tran_->reqFrame(rpr::Header::SynSize, false));
855
856 // Set frame
857 head->syn = true;
858 head->ack = true;
859 head->version = Version;
860 head->chk = true;
861 head->maxOutstandingSegments = curMaxBuffers_;
862 head->maxSegmentSize = curMaxSegment_;
863 head->retransmissionTimeout = curRetranTout_;
864 head->cumulativeAckTimeout = curCumAckTout_;
865 head->nullTimeout = curNullTout_;
866 head->maxRetransmissions = curMaxRetran_;
867 head->maxCumulativeAck = curMaxCumAck_;
868 head->timeoutUnit = TimeoutUnit;
869 head->connectionId = locConnId_;
870
871 transportTx(head, true, true);
872
873 // Update state
874 log_->warning("State is open. Server=%" PRIu8, server_);
875 state_ = StOpen;
876 return (cumAckToutD2_);
877}
878
880struct timeval& rpr::Controller::stateSendSeqAck() {
881 uint32_t x;
882
883 // Allocate frame
884 rpr::HeaderPtr ack = rpr::Header::create(tran_->reqFrame(rpr::Header::HeaderSize, false));
885
886 // Setup frame
887 ack->ack = true;
888 ack->nul = false;
889 ackSeqRx_ = lastSeqRx_;
890
891 transportTx(ack, false, true);
892
893 // Update state
894 state_ = StOpen;
895 log_->warning("State is open. Server=%" PRIu8, server_);
896 return (cumAckToutD2_);
897}
898
900struct timeval& rpr::Controller::stateOpen() {
901 rpr::HeaderPtr head;
902 uint8_t idx;
903 bool doNull;
904 uint8_t ackPend;
905 struct timeval locTime;
906
907 // Pending frame may be reset
908 while (!stQueue_.empty()) {
909 head = stQueue_.pop();
910
911 // Reset or syn without ack is an error
912 if ((head->rst) || (head->syn && (!head->ack))) {
913 state_ = StError;
914 gettimeofday(&stTime_, NULL);
915 return (zeroTme_);
916 }
917 }
918
919 // Sample transmit time and compute pending ack count under lock
920 {
921 std::unique_lock<std::mutex> lock(txMtx_);
922 locTime = txTime_;
923 ackPend = ackSeqRx_ - lastAckTx_;
924 }
925
926 // NULL required
927 if (timePassed(locTime, nullToutD3_))
928 doNull = true;
929 else
930 doNull = false;
931
932 // Outbound frame required
933 if ((doNull || ((!getLocBusy()) && ackPend >= curMaxCumAck_) ||
934 ((ackPend > 0 || getLocBusy()) && timePassed(locTime, cumAckToutD1_)))) {
935 head = rpr::Header::create(tran_->reqFrame(rpr::Header::HeaderSize, false));
936 head->ack = true;
937 head->nul = doNull;
938 transportTx(head, doNull, false);
939 }
940
941 // Retransmission processing, don't process when busy
942 idx = lastAckRx_;
943 while ((!remBusy_) && (idx != locSequence_)) {
944 if (retransmit(idx++) < 0) {
945 state_ = StError;
946 gettimeofday(&stTime_, NULL);
947 return (zeroTme_);
948 }
949 }
950
951 return (cumAckToutD2_);
952}
953
955struct timeval& rpr::Controller::stateError() {
956 rpr::HeaderPtr rst;
957 uint32_t x;
958
959 log_->warning("Entering reset state. Server=%" PRIu8, server_);
960
961 rst = rpr::Header::create(tran_->reqFrame(rpr::Header::HeaderSize, false));
962 rst->rst = true;
963
964 transportTx(rst, true, true);
965
966 downCount_++;
967 log_->warning("Entering closed state. Server=%" PRIu8, server_);
968 state_ = StClosed;
969
970 // Reset queues
971 appQueue_.reset();
972 oooQueue_.clear();
973 stQueue_.reset();
974
975 gettimeofday(&stTime_, NULL);
976 return (tryPeriodD1_);
977}
978
980void rpr::Controller::setTimeout(uint32_t timeout) {
981 div_t divResult = div(timeout, 1000000);
982 timeout_.tv_sec = divResult.quot;
983 timeout_.tv_usec = divResult.rem;
984}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
static const uint32_t Debug
Debug severity level constant.
Definition Logging.h:88
static const uint32_t Warning
Warning severity level constant.
Definition Logging.h:84
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::rssi::Transport > TransportPtr
Definition Transport.h:91
std::shared_ptr< rogue::protocols::rssi::Application > ApplicationPtr
std::shared_ptr< rogue::protocols::rssi::Header > HeaderPtr
Definition Header.h:201
std::shared_ptr< rogue::protocols::rssi::Controller > ControllerPtr
Definition Controller.h:448
void defaultTimeout(struct timeval &tout)
Returns Rogue default timeout as a timeval.
Definition Helpers.h:49