rogue
Loading...
Searching...
No Matches
StreamWriter.cpp
Go to the documentation of this file.
1
31#include "rogue/Directives.h"
32
34
35#include <chrono>
36#include <fcntl.h>
37#include <inttypes.h>
38#include <stdint.h>
39#include <sys/time.h>
40#include <unistd.h>
41
42#include <cstring>
43#include <map>
44#include <memory>
45#include <string>
46#include <thread>
47
48#include "rogue/GeneralError.h"
49#include "rogue/GilRelease.h"
53
56
57#ifndef NO_PYTHON
58 #include <boost/python.hpp>
59namespace bp = boost::python;
60#endif
61
63ruf::StreamWriterPtr ruf::StreamWriter::create() {
64 ruf::StreamWriterPtr s = std::make_shared<ruf::StreamWriter>();
65 return (s);
66}
67
69void ruf::StreamWriter::setup_python() {
70#ifndef NO_PYTHON
71 bp::class_<ruf::StreamWriter, ruf::StreamWriterPtr, boost::noncopyable>("StreamWriter", bp::init<>())
72 .def("open", &ruf::StreamWriter::open)
73 .def("close", &ruf::StreamWriter::close)
74 .def("isOpen", &ruf::StreamWriter::isOpen)
75 .def("setRaw", &ruf::StreamWriter::setRaw)
76 .def("getRaw", &ruf::StreamWriter::getRaw)
77 .def("setBufferSize", &ruf::StreamWriter::setBufferSize)
78 .def("setMaxSize", &ruf::StreamWriter::setMaxSize)
79 .def("setDropErrors", &ruf::StreamWriter::setDropErrors)
80 .def("getChannel", &ruf::StreamWriter::getChannel)
81 .def("getTotalSize", &ruf::StreamWriter::getTotalSize)
82 .def("getCurrentSize", &ruf::StreamWriter::getCurrentSize)
83 .def("getBandwidth", &ruf::StreamWriter::getBandwidth)
84 .def("getFrameCount", &ruf::StreamWriter::getFrameCount)
85 .def("waitFrameCount", &ruf::StreamWriter::waitFrameCount);
86#endif
87}
88
90ruf::StreamWriter::StreamWriter() {
91 baseName_ = "";
92 fd_ = -1;
93 sizeLimit_ = 0;
94 buffSize_ = 0;
95 currSize_ = 0;
96 totSize_ = 0;
97 buffer_ = NULL;
98 frameCount_ = 0;
99 bandwidthBytes_ = 0;
100 currBuffer_ = 0;
101 dropErrors_ = false;
102 isOpen_ = false;
103 raw_ = false;
104
105 log_ = rogue::Logging::create("fileio.StreamWriter");
106}
107
109ruf::StreamWriter::~StreamWriter() {
110 this->close();
111}
112
114void ruf::StreamWriter::open(std::string file) {
115 std::string name;
116
117 rogue::GilRelease noGil;
118 std::lock_guard<std::mutex> lock(mtx_);
119 isOpen_ = false;
120 flush();
121
122 // Close if open
123 if (fd_ >= 0) ::close(fd_);
124 fd_ = -1;
125
126 baseName_ = file;
127 name = file;
128 fdIdx_ = 1;
129
130 if (sizeLimit_ > 0) name.append(".1");
131
132 if ((fd_ = ::open(name.c_str(),
133 O_RDWR | O_CREAT,
134 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) < 0)
135 throw(rogue::GeneralError::create("StreamWriter::open", "Failed to open data file: %s", name.c_str()));
136
137 totSize_ = 0;
138 currSize_ = 0;
139 frameCount_ = 0;
140 bandwidthBytes_ = 0;
141 bandwidthHistory_.clear();
142 currBuffer_ = 0;
143
144 // Iterate over all channels and reset their frame counts
145 for (std::map<uint32_t, ruf::StreamWriterChannelPtr>::iterator it = channelMap_.begin(); it != channelMap_.end();
146 ++it) {
147 it->second->setFrameCount(0);
148 }
149 isOpen_ = true;
150}
151
153void ruf::StreamWriter::close() {
154 rogue::GilRelease noGil;
155 std::lock_guard<std::mutex> lock(mtx_);
156 isOpen_ = false;
157 flush();
158 bandwidthBytes_ = 0;
159 bandwidthHistory_.clear();
160 if (fd_ >= 0) ::close(fd_);
161 fd_ = -1;
162}
163
165bool ruf::StreamWriter::isOpen() {
166 return (isOpen_);
167}
168
170void ruf::StreamWriter::setRaw(bool raw) {
171 raw_ = raw;
172}
173
175bool ruf::StreamWriter::getRaw() {
176 return raw_;
177}
178
180void ruf::StreamWriter::setBufferSize(uint32_t size) {
181 rogue::GilRelease noGil;
182 std::lock_guard<std::mutex> lock(mtx_);
183
184 // No change
185 if (size != buffSize_) {
186 // Flush data out of current buffer
187 flush();
188
189 // Free old buffer
190 if (buffer_ != NULL) free(buffer_);
191 buffSize_ = 0;
192
193 // Buffer is enabled
194 if (size != 0) {
195 // Create new buffer
196 if ((buffer_ = reinterpret_cast<uint8_t*>(malloc(size))) == NULL)
197 throw(rogue::GeneralError::create("StreamWriter::setBufferSize",
198 "Failed to allocate buffer with size = %" PRIu32,
199 size));
200 buffSize_ = size;
201 }
202 }
203}
204
206void ruf::StreamWriter::setMaxSize(uint64_t size) {
207 rogue::GilRelease noGil;
208 std::lock_guard<std::mutex> lock(mtx_);
209 sizeLimit_ = size;
210}
211
213void ruf::StreamWriter::setDropErrors(bool drop) {
214 dropErrors_ = drop;
215}
216
218ruf::StreamWriterChannelPtr ruf::StreamWriter::getChannel(uint8_t channel) {
219 rogue::GilRelease noGil;
220 std::lock_guard<std::mutex> lock(mtx_);
221 if (channelMap_.count(channel) == 0) {
222 channelMap_[channel] = ruf::StreamWriterChannel::create(shared_from_this(), channel);
223 }
224 return (channelMap_[channel]);
225}
226
228uint64_t ruf::StreamWriter::getTotalSize() {
229 rogue::GilRelease noGil;
230 std::lock_guard<std::mutex> lock(mtx_);
231 return (totSize_ + currBuffer_);
232}
233
235uint64_t ruf::StreamWriter::getCurrentSize() {
236 rogue::GilRelease noGil;
237 std::lock_guard<std::mutex> lock(mtx_);
238 return (currSize_ + currBuffer_);
239}
240
242double ruf::StreamWriter::getBandwidth() {
243 rogue::GilRelease noGil;
244 std::lock_guard<std::mutex> lock(mtx_);
245
246 auto now = std::chrono::steady_clock::now();
247 pruneBandwidth(now);
248
249 if (bandwidthHistory_.empty()) return (0.0);
250
251 auto windowStart = bandwidthHistory_.front().first;
252 double seconds = std::chrono::duration<double>(now - windowStart).count();
253 if (seconds <= 0.0) return (static_cast<double>(bandwidthBytes_));
254 return (static_cast<double>(bandwidthBytes_) / seconds);
255}
256
258uint32_t ruf::StreamWriter::getFrameCount() {
259 return (frameCount_);
260}
261
262bool ruf::StreamWriter::waitFrameCount(uint32_t count, uint64_t timeout) {
263 struct timeval endTime;
264 struct timeval sumTime;
265 struct timeval curTime;
266
267 rogue::GilRelease noGil;
268 std::unique_lock<std::mutex> lock(mtx_);
269
270 if (timeout != 0) {
271 gettimeofday(&curTime, NULL);
272
273 div_t divResult = div(timeout, 1000000);
274 sumTime.tv_sec = divResult.quot;
275 sumTime.tv_usec = divResult.rem;
276
277 timeradd(&curTime, &sumTime, &endTime);
278 }
279
280 while (frameCount_ < count) {
281 cond_.wait_for(lock, std::chrono::microseconds(1000));
282
283 if (timeout != 0) {
284 gettimeofday(&curTime, NULL);
285 if (timercmp(&curTime, &endTime, >)) break;
286 }
287 }
288
289 return (frameCount_ >= count);
290}
291
292void ruf::StreamWriter::pruneBandwidth(std::chrono::steady_clock::time_point now) {
293 auto cutoff = now - std::chrono::seconds(1);
294 while (!bandwidthHistory_.empty() && bandwidthHistory_.front().first < cutoff) {
295 bandwidthBytes_ -= bandwidthHistory_.front().second;
296 bandwidthHistory_.pop_front();
297 }
298}
299
300void ruf::StreamWriter::recordBandwidth(uint32_t size) {
301 if (size == 0) return;
302 auto now = std::chrono::steady_clock::now();
303 pruneBandwidth(now);
304 bandwidthHistory_.emplace_back(now, size);
305 bandwidthBytes_ += size;
306}
307
309void ruf::StreamWriter::writeFile(uint8_t channel, std::shared_ptr<rogue::interfaces::stream::Frame> frame) {
310 ris::Frame::BufferIterator it;
311 uint32_t value;
312 uint32_t size;
313
314 if ((frame->getPayload() == 0) || (dropErrors_ && (frame->getError() != 0))) return;
315
316 rogue::GilRelease noGil;
317 std::unique_lock<std::mutex> lock(mtx_);
318
319 if (fd_ >= 0) {
320 // Raw mode
321 if ( raw_ ) {
322 size = frame->getPayload();
323 checkSize(size);
324
325 // Written size has extra 4 bytes in non raw mode
326 // Check file size, including size header
327 } else {
328 size = frame->getPayload() + 4;
329 checkSize(size + 4);
330 }
331
332 if (!raw_) {
333 // First write size
334 intWrite(&size, 4);
335
336 // Create EVIO header
337 value = frame->getFlags();
338 value |= (frame->getError() << 16);
339 value |= (channel << 24);
340 intWrite(&value, 4);
341 }
342
343 // Write buffers
344 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) intWrite((*it)->begin(), (*it)->getPayload());
345
346 // Update counters
347 frameCount_++;
348 cond_.notify_all();
349 }
350}
351
353void ruf::StreamWriter::intWrite(void* data, uint32_t size) {
354 if (fd_ < 0) return;
355
356 // New size is larger than buffer size, flush
357 if ((size + currBuffer_) > buffSize_) flush();
358
359 // Attempted write is larger than buffer, raw write
360 // This is called if buffer is disabled
361 if (size > buffSize_) {
362 if (write(fd_, data, size) != static_cast<int32_t>(size)) {
363 ::close(fd_);
364 fd_ = -1;
365 log_->error("Write failed, closing file!");
366 return;
367 }
368 currSize_ += size;
369 totSize_ += size;
370 // Append to buffer if non zero
371 } else if (buffSize_ > 0 && size > 0) {
372 std::memcpy(buffer_ + currBuffer_, data, size);
373 currBuffer_ += size;
374 }
375
376 recordBandwidth(size);
377}
378
380void ruf::StreamWriter::checkSize(uint32_t size) {
381 std::string name;
382
383 // No Limit
384 if (sizeLimit_ == 0) return;
385
386 // Bad configuration
387 if (size > sizeLimit_)
388 throw(rogue::GeneralError("StreamWriter::checkSize", "Frame size is larger than file size limit"));
389
390 // File size (including buffer) is larger than max size
391 if ((size + currBuffer_ + currSize_) > sizeLimit_) {
392 flush();
393
394 // Close and update index
395 ::close(fd_);
396 fdIdx_++;
397
398 name = baseName_ + "." + std::to_string(fdIdx_);
399
400 // Open new file
401 if ((fd_ = ::open(name.c_str(),
402 O_RDWR | O_CREAT | O_APPEND,
403 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) < 0)
404 throw(rogue::GeneralError::create("StreamWriter::checkSize", "Failed to open file %s", name.c_str()));
405
406 currSize_ = 0;
407 }
408}
409
411void ruf::StreamWriter::flush() {
412 if (currBuffer_ > 0) {
413 if (write(fd_, buffer_, currBuffer_) != static_cast<int32_t>(currBuffer_)) {
414 ::close(fd_);
415 fd_ = -1;
416 log_->error("Write failed, closing file!");
417 currBuffer_ = 0;
418 return;
419 }
420 currSize_ += currBuffer_;
421 totSize_ += currBuffer_;
422 currBuffer_ = 0;
423 }
424}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
std::shared_ptr< rogue::utilities::fileio::StreamWriter > StreamWriterPtr
std::shared_ptr< rogue::utilities::fileio::StreamWriterChannel > StreamWriterChannelPtr