rogue
Loading...
Searching...
No Matches
CombinerV1.cpp
Go to the documentation of this file.
1
44
45#include <inttypes.h>
46#include <stdint.h>
47
48#include <vector>
49#include <string.h>
50
51#include <memory>
52
53#include "rogue/GilRelease.h"
54#include "rogue/GeneralError.h"
60
63
64#ifndef NO_PYTHON
65 #include <boost/python.hpp>
66namespace bp = boost::python;
67#endif
68
70rpb::CombinerV1Ptr rpb::CombinerV1::create(uint8_t width) {
71 rpb::CombinerV1Ptr p = std::make_shared<rpb::CombinerV1>(width);
72 return (p);
73}
74
76void rpb::CombinerV1::setup_python() {
77#ifndef NO_PYTHON
78 bp::class_<rpb::CombinerV1, rpb::CombinerV1Ptr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>(
79 "CombinerV1",
80 bp::init<uint8_t>())
81 .def("sendBatch", &rpb::CombinerV1::sendBatch)
82 .def("getCount", &rpb::CombinerV1::getCount);
83#endif
84}
85
87rpb::CombinerV1::CombinerV1(uint8_t width) : ris::Master(), ris::Slave() {
88 log_ = rogue::Logging::create("batcher.CombinerV1");
89
90 if (width > 5) throw rogue::GeneralError::create("batcher::CombinerV1::CombinerV1", "Invalid width %" PRIu8, width);
91
92 width_ = width;
93 seq_ = 0;
94
95 // headerSize = 2 * 2^width
96 headerSize_ = 2;
97 for (uint8_t i = 0; i < width_; i++) headerSize_ *= 2;
98
99 // tailSize = max(headerSize, 8)
100 tailSize_ = (headerSize_ < 8) ? 8 : headerSize_;
101}
102
104rpb::CombinerV1::~CombinerV1() {}
105
107void rpb::CombinerV1::acceptFrame(ris::FramePtr frame) {
108 rogue::GilRelease noGil;
109 ris::FrameLockPtr lock = frame->lock();
110
111 std::lock_guard<std::mutex> guard(mtx_);
112 queue_.push_back(frame);
113}
114
116uint32_t rpb::CombinerV1::getCount() {
117 std::lock_guard<std::mutex> guard(mtx_);
118 return queue_.size();
119}
120
122void rpb::CombinerV1::sendBatch() {
123 std::vector<ris::FramePtr> frames;
124
125 {
126 std::lock_guard<std::mutex> guard(mtx_);
127 if (queue_.empty()) return;
128 frames.swap(queue_);
129 }
130
131 // Compute total super-frame size
132 uint32_t totalSize = headerSize_;
133 for (auto& f : frames) {
134 uint32_t payloadSize = f->getPayload();
135
136 // Round payload up to width boundary
137 uint32_t padded = payloadSize;
138 if ((payloadSize % headerSize_) != 0) padded = ((payloadSize / headerSize_) + 1) * headerSize_;
139
140 totalSize += padded + tailSize_;
141 }
142
143 rogue::GilRelease noGil;
144
145 // Allocate the super-frame
146 ris::FramePtr sFrame = reqFrame(totalSize, true);
147 sFrame->setPayload(totalSize);
148
149 ris::FrameIterator it = sFrame->begin();
150
151 // Write super header
152 uint8_t byte0 = (width_ << 4) | 0x1; // version=1, width encoding
153 ris::toFrame(it, 1, &byte0);
154 ris::toFrame(it, 1, &seq_);
155
156 // Pad rest of header with zeros
157 if (headerSize_ > 2) {
158 uint32_t padLen = headerSize_ - 2;
159 uint8_t zero = 0;
160 for (uint32_t i = 0; i < padLen; i++) ris::toFrame(it, 1, &zero);
161 }
162
163 // Write each record: data then tail
164 for (auto& f : frames) {
165 ris::FrameLockPtr fLock = f->lock();
166 uint32_t payloadSize = f->getPayload();
167
168 // Copy payload data
169 ris::FrameIterator srcIt = f->begin();
170 ris::copyFrame(srcIt, payloadSize, it);
171
172 // Pad payload to width boundary
173 uint32_t padded = payloadSize;
174 if ((payloadSize % headerSize_) != 0) padded = ((payloadSize / headerSize_) + 1) * headerSize_;
175 uint32_t padLen = padded - payloadSize;
176 uint8_t zero = 0;
177 for (uint32_t i = 0; i < padLen; i++) ris::toFrame(it, 1, &zero);
178
179 // Write tail
180 // Word 0: size (4 bytes)
181 ris::toFrame(it, 4, &payloadSize);
182
183 // Word 1: dest, fUser, lUser, width
184 uint8_t dest = f->getChannel();
185 uint8_t fUser = f->getFirstUser();
186 uint8_t lUser = f->getLastUser();
187 uint8_t widthByte = width_; // FW writes WIDTH_C at tData(59 downto 56)
188 ris::toFrame(it, 1, &dest);
189 ris::toFrame(it, 1, &fUser);
190 ris::toFrame(it, 1, &lUser);
191 ris::toFrame(it, 1, &widthByte);
192
193 // Pad rest of tail with zeros (tailSize_ - 8 bytes already written)
194 if (tailSize_ > 8) {
195 uint32_t tailPad = tailSize_ - 8;
196 for (uint32_t i = 0; i < tailPad; i++) ris::toFrame(it, 1, &zero);
197 }
198 }
199
200 seq_++;
201
202 sendFrame(sFrame);
203}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
Random-access byte iterator across a Frame payload.
static void copyFrame(rogue::interfaces::stream::FrameIterator &srcIter, uint32_t size, rogue::interfaces::stream::FrameIterator &dstIter)
Copies bytes between frame iterators.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
static void toFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *src)
Copies bytes from a source pointer into a frame iterator.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::batcher::CombinerV1 > CombinerV1Ptr
Definition CombinerV1.h:122