rogue
Loading...
Searching...
No Matches
CombinerV2.cpp
Go to the documentation of this file.
1
42
43#include <inttypes.h>
44#include <stdint.h>
45
46#include <vector>
47#include <string.h>
48
49#include <memory>
50
51#include "rogue/GilRelease.h"
57
60
61#ifndef NO_PYTHON
62 #include <boost/python.hpp>
63namespace bp = boost::python;
64#endif
65
67rpb::CombinerV2Ptr rpb::CombinerV2::create() {
68 rpb::CombinerV2Ptr p = std::make_shared<rpb::CombinerV2>();
69 return (p);
70}
71
73void rpb::CombinerV2::setup_python() {
74#ifndef NO_PYTHON
75 bp::class_<rpb::CombinerV2, rpb::CombinerV2Ptr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>(
76 "CombinerV2",
77 bp::init<>())
78 .def("sendBatch", &rpb::CombinerV2::sendBatch)
79 .def("getCount", &rpb::CombinerV2::getCount);
80#endif
81}
82
84rpb::CombinerV2::CombinerV2() : ris::Master(), ris::Slave() {
85 log_ = rogue::Logging::create("batcher.CombinerV2");
86 seq_ = 0;
87}
88
90rpb::CombinerV2::~CombinerV2() {}
91
93void rpb::CombinerV2::acceptFrame(ris::FramePtr frame) {
95 ris::FrameLockPtr lock = frame->lock();
96
97 std::lock_guard<std::mutex> guard(mtx_);
98 queue_.push_back(frame);
99}
100
102uint32_t rpb::CombinerV2::getCount() {
103 std::lock_guard<std::mutex> guard(mtx_);
104 return queue_.size();
105}
106
108void rpb::CombinerV2::sendBatch() {
109 const uint32_t headerSize = 2;
110 const uint32_t tailSize = 7;
111
112 std::vector<ris::FramePtr> frames;
113
114 {
115 std::lock_guard<std::mutex> guard(mtx_);
116 if (queue_.empty()) return;
117 frames.swap(queue_);
118 }
119
120 // Compute total super-frame size
121 uint32_t totalSize = headerSize;
122 for (auto& f : frames) {
123 totalSize += f->getPayload() + tailSize;
124 }
125
126 rogue::GilRelease noGil;
127
128 // Allocate the super-frame
129 ris::FramePtr sFrame = reqFrame(totalSize, true);
130 sFrame->setPayload(totalSize);
131
132 ris::FrameIterator it = sFrame->begin();
133
134 // Write super header (2 bytes)
135 uint8_t byte0 = 0x02; // version=2, reserved bits=0
136 ris::toFrame(it, 1, &byte0);
137 ris::toFrame(it, 1, &seq_);
138
139 // Write each record: data then tail
140 for (auto& f : frames) {
141 ris::FrameLockPtr fLock = f->lock();
142 uint32_t payloadSize = f->getPayload();
143
144 // Copy payload data
145 ris::FrameIterator srcIt = f->begin();
146 ris::copyFrame(srcIt, payloadSize, it);
147
148 // Write tail (7 bytes)
149 // Word 0: size (4 bytes)
150 ris::toFrame(it, 4, &payloadSize);
151
152 // Word 1: dest, fUser, lUser (3 bytes)
153 uint8_t dest = f->getChannel();
154 uint8_t fUser = f->getFirstUser();
155 uint8_t lUser = f->getLastUser();
156 ris::toFrame(it, 1, &dest);
157 ris::toFrame(it, 1, &fUser);
158 ris::toFrame(it, 1, &lUser);
159 }
160
161 seq_++;
162
163 sendFrame(sFrame);
164}
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
Random-access byte iterator across a Frame payload.
static void copyFrame(rogue::interfaces::stream::FrameIterator &srcIter, uint32_t size, rogue::interfaces::stream::FrameIterator &dstIter)
Copies bytes between frame iterators.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
static void toFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *src)
Copies bytes from a source pointer into a frame iterator.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::batcher::CombinerV2 > CombinerV2Ptr
Definition CombinerV2.h:103