rogue
Loading...
Searching...
No Matches
StreamReader.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <fcntl.h>
22#include <inttypes.h>
23#include <stdint.h>
24#include <unistd.h>
25
26#include <memory>
27#include <string>
28#include <thread>
29
30#include "rogue/GeneralError.h"
31#include "rogue/GilRelease.h"
32#include "rogue/Logging.h"
36
39
40#ifndef NO_PYTHON
41 #include <boost/python.hpp>
42namespace bp = boost::python;
43#endif
44
46ruf::StreamReaderPtr ruf::StreamReader::create() {
47 ruf::StreamReaderPtr s = std::make_shared<ruf::StreamReader>();
48 return (s);
49}
50
52void ruf::StreamReader::setup_python() {
53#ifndef NO_PYTHON
54 bp::class_<ruf::StreamReader, ruf::StreamReaderPtr, bp::bases<ris::Master>, boost::noncopyable>("StreamReader",
55 bp::init<>())
56 .def("open", &ruf::StreamReader::open)
57 .def("close", &ruf::StreamReader::close)
58 .def("isOpen", &ruf::StreamReader::isOpen)
59 .def("closeWait", &ruf::StreamReader::closeWait)
60 .def("isActive", &ruf::StreamReader::isActive);
61#endif
62}
63
65ruf::StreamReader::StreamReader() {
66 // Members default-init in header; dtor is safe before open().
67}
68
70ruf::StreamReader::~StreamReader() {
71 close();
72}
73
75void ruf::StreamReader::open(std::string file) {
77 std::unique_lock<std::mutex> lock(mtx_);
78 intClose();
79
80 // Determine if we read a group of files
81 if (file.substr(file.find_last_of('.')) == ".1") {
82 fdIdx_ = 1;
83 baseName_ = file.substr(0, file.find_last_of('.'));
84 } else {
85 fdIdx_ = 0;
86 baseName_ = file;
87 }
88
89 if ((fd_ = ::open(file.c_str(), O_RDONLY)) < 0)
90 throw(rogue::GeneralError::create("StreamReader::open", "Failed to open data file: %s", file.c_str()));
91
92 active_ = true;
93 threadEn_ = true;
94 try {
95 readThread_ = new std::thread(&StreamReader::runThread, this);
96 } catch (...) {
97 active_ = false;
98 threadEn_ = false;
99 ::close(fd_);
100 fd_ = -1;
101 throw;
102 }
103
104 // Set a thread name
105#ifndef __MACH__
106 pthread_setname_np(readThread_->native_handle(), "StreamReader");
107#endif
108}
109
111bool ruf::StreamReader::nextFile() {
112 std::unique_lock<std::mutex> lock(mtx_);
113 std::string name;
114
115 if (fd_ >= 0) {
116 ::close(fd_);
117 fd_ = -1;
118 } else {
119 return (false);
120 }
121 if (fdIdx_ == 0) return (false);
122
123 fdIdx_++;
124 name = baseName_ + "." + std::to_string(fdIdx_);
125
126 if ((fd_ = ::open(name.c_str(), O_RDONLY)) < 0) return (false);
127 return (true);
128}
129
131void ruf::StreamReader::close() {
132 rogue::GilRelease noGil;
133 std::unique_lock<std::mutex> lock(mtx_);
134 intClose();
135}
136
138bool ruf::StreamReader::isOpen() {
139 return (fd_ >= 0);
140}
141
143void ruf::StreamReader::intClose() {
144 if (readThread_ != NULL) {
145 threadEn_ = false;
146 readThread_->join();
147 delete readThread_;
148 readThread_ = NULL;
149 }
150 if (fd_ >= 0) ::close(fd_);
151}
152
154void ruf::StreamReader::closeWait() {
155 rogue::GilRelease noGil;
156 std::unique_lock<std::mutex> lock(mtx_);
157 while (active_) cond_.wait_for(lock, std::chrono::microseconds(1000));
158 intClose();
159}
160
162bool ruf::StreamReader::isActive() {
163 rogue::GilRelease noGil;
164 return (active_);
165}
166
168void ruf::StreamReader::runThread() {
169 int32_t ret;
170 uint32_t size;
171 uint32_t meta;
172 uint16_t flags;
173 uint8_t error;
174 uint8_t chan;
175 uint32_t bSize;
176 bool err;
177 ris::FramePtr frame;
178 ris::Frame::BufferIterator it;
179 char* bData;
180 Logging log("streamReader");
181
182 ret = 0;
183 err = false;
184 do {
185 // Read size of each frame
186 while ((fd_ >= 0) && (read(fd_, &size, 4) == 4)) {
187 if (size == 0) {
188 log.warning("Bad size read %" PRIu32, size);
189 err = true;
190 break;
191 }
192
193 // Read flags
194 if (read(fd_, &meta, 4) != 4) {
195 log.warning("Failed to read flags");
196 err = true;
197 break;
198 }
199
200 // Skip next step if frame is empty
201 if (size <= 4) continue;
202 size -= 4;
203
204 // Extract meta data
205 flags = meta & 0xFFFF;
206 error = (meta >> 16) & 0xFF;
207 chan = (meta >> 24) & 0xFF;
208
209 // Request frame
210 frame = reqFrame(size, true);
211 frame->setFlags(flags);
212 frame->setError(error);
213 frame->setChannel(chan);
214 it = frame->beginBuffer();
215
216 while ((err == false) && (size > 0)) {
217 bSize = size;
218
219 // Adjust to buffer size, if necessary
220 if (bSize > (*it)->getSize()) bSize = (*it)->getSize();
221
222 if ((ret = read(fd_, (*it)->begin(), bSize)) != bSize) {
223 log.warning("Short read. Ret = %" PRId32 " Req = %" PRIu32 " after %" PRIu32 " bytes",
224 ret,
225 bSize,
226 frame->getPayload());
227 ::close(fd_);
228 fd_ = -1;
229 frame->setError(0x1);
230 err = true;
231 } else {
232 (*it)->setPayload(bSize);
233 ++it; // Next buffer
234 }
235 size -= bSize;
236 }
237 sendFrame(frame);
238 }
239 } while (threadEn_ && (err == false) && nextFile());
240
241 std::unique_lock<std::mutex> lock(mtx_);
242 if (fd_ >= 0) ::close(fd_);
243 fd_ = -1;
244 active_ = false;
245 cond_.notify_all();
246}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Structured Rogue logging helper.
Definition Logging.h:59
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::utilities::fileio::StreamReader > StreamReaderPtr