41 #include <boost/python.hpp>
42namespace bp = boost::python;
52void ruf::StreamReader::setup_python() {
54 bp::class_<ruf::StreamReader, ruf::StreamReaderPtr, bp::bases<ris::Master>, boost::noncopyable>(
"StreamReader",
56 .def(
"open", &ruf::StreamReader::open)
57 .def(
"close", &ruf::StreamReader::close)
58 .def(
"isOpen", &ruf::StreamReader::isOpen)
59 .def(
"closeWait", &ruf::StreamReader::closeWait)
60 .def(
"isActive", &ruf::StreamReader::isActive);
65ruf::StreamReader::StreamReader() {
70ruf::StreamReader::~StreamReader() {
75void ruf::StreamReader::open(std::string file) {
77 std::unique_lock<std::mutex> lock(mtx_);
81 if (file.substr(file.find_last_of(
'.')) ==
".1") {
83 baseName_ = file.substr(0, file.find_last_of(
'.'));
89 if ((fd_ = ::open(file.c_str(), O_RDONLY)) < 0)
95 readThread_ =
new std::thread(&StreamReader::runThread,
this);
106 pthread_setname_np(readThread_->native_handle(),
"StreamReader");
111bool ruf::StreamReader::nextFile() {
112 std::unique_lock<std::mutex> lock(mtx_);
121 if (fdIdx_ == 0)
return (
false);
124 name = baseName_ +
"." + std::to_string(fdIdx_);
126 if ((fd_ = ::open(name.c_str(), O_RDONLY)) < 0)
return (
false);
131void ruf::StreamReader::close() {
133 std::unique_lock<std::mutex> lock(mtx_);
138bool ruf::StreamReader::isOpen() {
143void ruf::StreamReader::intClose() {
144 if (readThread_ != NULL) {
150 if (fd_ >= 0) ::close(fd_);
154void ruf::StreamReader::closeWait() {
156 std::unique_lock<std::mutex> lock(mtx_);
157 while (active_) cond_.wait_for(lock, std::chrono::microseconds(1000));
162bool ruf::StreamReader::isActive() {
168void ruf::StreamReader::runThread() {
178 ris::Frame::BufferIterator it;
186 while ((fd_ >= 0) && (read(fd_, &size, 4) == 4)) {
188 log.warning(
"Bad size read %" PRIu32, size);
194 if (read(fd_, &meta, 4) != 4) {
195 log.warning(
"Failed to read flags");
201 if (size <= 4)
continue;
205 flags = meta & 0xFFFF;
206 error = (meta >> 16) & 0xFF;
207 chan = (meta >> 24) & 0xFF;
210 frame = reqFrame(size,
true);
211 frame->setFlags(flags);
212 frame->setError(error);
213 frame->setChannel(chan);
214 it = frame->beginBuffer();
216 while ((err ==
false) && (size > 0)) {
220 if (bSize > (*it)->getSize()) bSize = (*it)->getSize();
222 if ((ret = read(fd_, (*it)->begin(), bSize)) != bSize) {
223 log.warning(
"Short read. Ret = %" PRId32
" Req = %" PRIu32
" after %" PRIu32
" bytes",
226 frame->getPayload());
229 frame->setError(0x1);
232 (*it)->setPayload(bSize);
239 }
while (threadEn_ && (err ==
false) && nextFile());
241 std::unique_lock<std::mutex> lock(mtx_);
242 if (fd_ >= 0) ::close(fd_);
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Structured Rogue logging helper.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::utilities::fileio::StreamReader > StreamReaderPtr