41 #include <boost/python.hpp>
42namespace bp = boost::python;
52void ruf::StreamReader::setup_python() {
54 bp::class_<ruf::StreamReader, ruf::StreamReaderPtr, bp::bases<ris::Master>, boost::noncopyable>(
"StreamReader",
56 .def(
"open", &ruf::StreamReader::open)
57 .def(
"close", &ruf::StreamReader::close)
58 .def(
"isOpen", &ruf::StreamReader::isOpen)
59 .def(
"closeWait", &ruf::StreamReader::closeWait)
60 .def(
"isActive", &ruf::StreamReader::isActive);
65ruf::StreamReader::StreamReader() {
72ruf::StreamReader::~StreamReader() {
77void ruf::StreamReader::open(std::string file) {
79 std::unique_lock<std::mutex> lock(mtx_);
83 if (file.substr(file.find_last_of(
'.')) ==
".1") {
85 baseName_ = file.substr(0, file.find_last_of(
'.'));
91 if ((fd_ = ::open(file.c_str(), O_RDONLY)) < 0)
96 readThread_ =
new std::thread(&StreamReader::runThread,
this);
100 pthread_setname_np(readThread_->native_handle(),
"StreamReader");
105bool ruf::StreamReader::nextFile() {
106 std::unique_lock<std::mutex> lock(mtx_);
115 if (fdIdx_ == 0)
return (
false);
118 name = baseName_ +
"." + std::to_string(fdIdx_);
120 if ((fd_ = ::open(name.c_str(), O_RDONLY)) < 0)
return (
false);
125void ruf::StreamReader::close() {
127 std::unique_lock<std::mutex> lock(mtx_);
132bool ruf::StreamReader::isOpen() {
137void ruf::StreamReader::intClose() {
138 if (readThread_ != NULL) {
144 if (fd_ >= 0) ::close(fd_);
148void ruf::StreamReader::closeWait() {
150 std::unique_lock<std::mutex> lock(mtx_);
151 while (active_) cond_.wait_for(lock, std::chrono::microseconds(1000));
156bool ruf::StreamReader::isActive() {
162void ruf::StreamReader::runThread() {
172 ris::Frame::BufferIterator it;
180 while ((fd_ >= 0) && (read(fd_, &size, 4) == 4)) {
182 log.warning(
"Bad size read %" PRIu32, size);
188 if (read(fd_, &meta, 4) != 4) {
189 log.warning(
"Failed to read flags");
195 if (size <= 4)
continue;
199 flags = meta & 0xFFFF;
200 error = (meta >> 16) & 0xFF;
201 chan = (meta >> 24) & 0xFF;
204 frame = reqFrame(size,
true);
205 frame->setFlags(flags);
206 frame->setError(error);
207 frame->setChannel(chan);
208 it = frame->beginBuffer();
210 while ((err ==
false) && (size > 0)) {
214 if (bSize > (*it)->getSize()) bSize = (*it)->getSize();
216 if ((ret = read(fd_, (*it)->begin(), bSize)) != bSize) {
217 log.warning(
"Short read. Ret = %" PRId32
" Req = %" PRIu32
" after %" PRIu32
" bytes",
220 frame->getPayload());
223 frame->setError(0x1);
226 (*it)->setPayload(bSize);
233 }
while (threadEn_ && (err ==
false) && nextFile());
235 std::unique_lock<std::mutex> lock(mtx_);
236 if (fd_ >= 0) ::close(fd_);
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Structured Rogue logging helper.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::utilities::fileio::StreamReader > StreamReaderPtr