69void ruf::StreamWriter::setup_python() {
71 bp::class_<ruf::StreamWriter, ruf::StreamWriterPtr, boost::noncopyable>(
"StreamWriter", bp::init<>())
72 .def(
"open", &ruf::StreamWriter::open)
73 .def(
"close", &ruf::StreamWriter::close)
74 .def(
"isOpen", &ruf::StreamWriter::isOpen)
75 .def(
"setRaw", &ruf::StreamWriter::setRaw)
76 .def(
"getRaw", &ruf::StreamWriter::getRaw)
77 .def(
"setBufferSize", &ruf::StreamWriter::setBufferSize)
78 .def(
"setMaxSize", &ruf::StreamWriter::setMaxSize)
79 .def(
"setDropErrors", &ruf::StreamWriter::setDropErrors)
80 .def(
"getChannel", &ruf::StreamWriter::getChannel)
81 .def(
"getTotalSize", &ruf::StreamWriter::getTotalSize)
82 .def(
"getCurrentSize", &ruf::StreamWriter::getCurrentSize)
83 .def(
"getBandwidth", &ruf::StreamWriter::getBandwidth)
84 .def(
"getFrameCount", &ruf::StreamWriter::getFrameCount)
85 .def(
"waitFrameCount", &ruf::StreamWriter::waitFrameCount);
114void ruf::StreamWriter::open(std::string file) {
118 std::lock_guard<std::mutex> lock(mtx_);
123 if (fd_ >= 0) ::close(fd_);
130 if (sizeLimit_ > 0) name.append(
".1");
132 if ((fd_ = ::open(name.c_str(),
134 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) < 0)
141 bandwidthHistory_.clear();
145 for (std::map<uint32_t, ruf::StreamWriterChannelPtr>::iterator it = channelMap_.begin(); it != channelMap_.end();
147 it->second->setFrameCount(0);
242double ruf::StreamWriter::getBandwidth() {
244 std::lock_guard<std::mutex> lock(mtx_);
246 auto now = std::chrono::steady_clock::now();
249 if (bandwidthHistory_.empty())
return (0.0);
251 auto windowStart = bandwidthHistory_.front().first;
252 double seconds = std::chrono::duration<double>(now - windowStart).count();
253 if (seconds <= 0.0)
return (
static_cast<double>(bandwidthBytes_));
254 return (
static_cast<double>(bandwidthBytes_) / seconds);
262bool ruf::StreamWriter::waitFrameCount(uint32_t count, uint64_t timeout) {
263 struct timeval endTime;
264 struct timeval sumTime;
265 struct timeval curTime;
268 std::unique_lock<std::mutex> lock(mtx_);
271 gettimeofday(&curTime, NULL);
273 div_t divResult = div(timeout, 1000000);
274 sumTime.tv_sec = divResult.quot;
275 sumTime.tv_usec = divResult.rem;
277 timeradd(&curTime, &sumTime, &endTime);
280 while (frameCount_ < count) {
281 cond_.wait_for(lock, std::chrono::microseconds(1000));
284 gettimeofday(&curTime, NULL);
285 if (timercmp(&curTime, &endTime, >))
break;
289 return (frameCount_ >= count);
292void ruf::StreamWriter::pruneBandwidth(std::chrono::steady_clock::time_point now) {
293 auto cutoff = now - std::chrono::seconds(1);
294 while (!bandwidthHistory_.empty() && bandwidthHistory_.front().first < cutoff) {
295 bandwidthBytes_ -= bandwidthHistory_.front().second;
296 bandwidthHistory_.pop_front();
309void ruf::StreamWriter::writeFile(uint8_t channel, std::shared_ptr<rogue::interfaces::stream::Frame> frame) {
310 ris::Frame::BufferIterator it;
314 if ((frame->getPayload() == 0) || (dropErrors_ && (frame->getError() != 0)))
return;
317 std::unique_lock<std::mutex> lock(mtx_);
322 size = frame->getPayload();
328 size = frame->getPayload() + 4;
337 value = frame->getFlags();
338 value |= (frame->getError() << 16);
339 value |= (channel << 24);
344 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) intWrite((*it)->begin(), (*it)->getPayload());
353void ruf::StreamWriter::intWrite(
void* data, uint32_t size) {
357 if ((size + currBuffer_) > buffSize_) flush();
361 if (size > buffSize_) {
362 if (write(fd_, data, size) !=
static_cast<int32_t
>(size)) {
365 log_->error(
"Write failed, closing file!");
371 }
else if (buffSize_ > 0 && size > 0) {
372 std::memcpy(buffer_ + currBuffer_, data, size);
376 recordBandwidth(size);
380void ruf::StreamWriter::checkSize(uint32_t size) {
384 if (sizeLimit_ == 0)
return;
387 if (size > sizeLimit_)
388 throw(
rogue::GeneralError(
"StreamWriter::checkSize",
"Frame size is larger than file size limit"));
391 if ((size + currBuffer_ + currSize_) > sizeLimit_) {
398 name = baseName_ +
"." + std::to_string(fdIdx_);
401 if ((fd_ = ::open(name.c_str(),
402 O_RDWR | O_CREAT | O_APPEND,
403 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) < 0)