rogue
Loading...
Searching...
No Matches
StreamReader.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <fcntl.h>
22#include <inttypes.h>
23#include <stdint.h>
24#include <unistd.h>
25
26#include <memory>
27#include <string>
28#include <thread>
29
30#include "rogue/GeneralError.h"
31#include "rogue/GilRelease.h"
32#include "rogue/Logging.h"
36
39
40#ifndef NO_PYTHON
41 #include <boost/python.hpp>
42namespace bp = boost::python;
43#endif
44
46ruf::StreamReaderPtr ruf::StreamReader::create() {
47 ruf::StreamReaderPtr s = std::make_shared<ruf::StreamReader>();
48 return (s);
49}
50
52void ruf::StreamReader::setup_python() {
53#ifndef NO_PYTHON
54 bp::class_<ruf::StreamReader, ruf::StreamReaderPtr, bp::bases<ris::Master>, boost::noncopyable>("StreamReader",
55 bp::init<>())
56 .def("open", &ruf::StreamReader::open)
57 .def("close", &ruf::StreamReader::close)
58 .def("isOpen", &ruf::StreamReader::isOpen)
59 .def("closeWait", &ruf::StreamReader::closeWait)
60 .def("isActive", &ruf::StreamReader::isActive);
61#endif
62}
63
65ruf::StreamReader::StreamReader() {
66 baseName_ = "";
67 readThread_ = NULL;
68 active_ = false;
69}
70
72ruf::StreamReader::~StreamReader() {
73 close();
74}
75
77void ruf::StreamReader::open(std::string file) {
79 std::unique_lock<std::mutex> lock(mtx_);
80 intClose();
81
82 // Determine if we read a group of files
83 if (file.substr(file.find_last_of('.')) == ".1") {
84 fdIdx_ = 1;
85 baseName_ = file.substr(0, file.find_last_of('.'));
86 } else {
87 fdIdx_ = 0;
88 baseName_ = file;
89 }
90
91 if ((fd_ = ::open(file.c_str(), O_RDONLY)) < 0)
92 throw(rogue::GeneralError::create("StreamReader::open", "Failed to open data file: %s", file.c_str()));
93
94 active_ = true;
95 threadEn_ = true;
96 readThread_ = new std::thread(&StreamReader::runThread, this);
97
98 // Set a thread name
99#ifndef __MACH__
100 pthread_setname_np(readThread_->native_handle(), "StreamReader");
101#endif
102}
103
105bool ruf::StreamReader::nextFile() {
106 std::unique_lock<std::mutex> lock(mtx_);
107 std::string name;
108
109 if (fd_ >= 0) {
110 ::close(fd_);
111 fd_ = -1;
112 } else {
113 return (false);
114 }
115 if (fdIdx_ == 0) return (false);
116
117 fdIdx_++;
118 name = baseName_ + "." + std::to_string(fdIdx_);
119
120 if ((fd_ = ::open(name.c_str(), O_RDONLY)) < 0) return (false);
121 return (true);
122}
123
125void ruf::StreamReader::close() {
126 rogue::GilRelease noGil;
127 std::unique_lock<std::mutex> lock(mtx_);
128 intClose();
129}
130
132bool ruf::StreamReader::isOpen() {
133 return (fd_ >= 0);
134}
135
137void ruf::StreamReader::intClose() {
138 if (readThread_ != NULL) {
139 threadEn_ = false;
140 readThread_->join();
141 delete readThread_;
142 readThread_ = NULL;
143 }
144 if (fd_ >= 0) ::close(fd_);
145}
146
148void ruf::StreamReader::closeWait() {
149 rogue::GilRelease noGil;
150 std::unique_lock<std::mutex> lock(mtx_);
151 while (active_) cond_.wait_for(lock, std::chrono::microseconds(1000));
152 intClose();
153}
154
156bool ruf::StreamReader::isActive() {
157 rogue::GilRelease noGil;
158 return (active_);
159}
160
162void ruf::StreamReader::runThread() {
163 int32_t ret;
164 uint32_t size;
165 uint32_t meta;
166 uint16_t flags;
167 uint8_t error;
168 uint8_t chan;
169 uint32_t bSize;
170 bool err;
171 ris::FramePtr frame;
172 ris::Frame::BufferIterator it;
173 char* bData;
174 Logging log("streamReader");
175
176 ret = 0;
177 err = false;
178 do {
179 // Read size of each frame
180 while ((fd_ >= 0) && (read(fd_, &size, 4) == 4)) {
181 if (size == 0) {
182 log.warning("Bad size read %" PRIu32, size);
183 err = true;
184 break;
185 }
186
187 // Read flags
188 if (read(fd_, &meta, 4) != 4) {
189 log.warning("Failed to read flags");
190 err = true;
191 break;
192 }
193
194 // Skip next step if frame is empty
195 if (size <= 4) continue;
196 size -= 4;
197
198 // Extract meta data
199 flags = meta & 0xFFFF;
200 error = (meta >> 16) & 0xFF;
201 chan = (meta >> 24) & 0xFF;
202
203 // Request frame
204 frame = reqFrame(size, true);
205 frame->setFlags(flags);
206 frame->setError(error);
207 frame->setChannel(chan);
208 it = frame->beginBuffer();
209
210 while ((err == false) && (size > 0)) {
211 bSize = size;
212
213 // Adjust to buffer size, if necessary
214 if (bSize > (*it)->getSize()) bSize = (*it)->getSize();
215
216 if ((ret = read(fd_, (*it)->begin(), bSize)) != bSize) {
217 log.warning("Short read. Ret = %" PRId32 " Req = %" PRIu32 " after %" PRIu32 " bytes",
218 ret,
219 bSize,
220 frame->getPayload());
221 ::close(fd_);
222 fd_ = -1;
223 frame->setError(0x1);
224 err = true;
225 } else {
226 (*it)->setPayload(bSize);
227 ++it; // Next buffer
228 }
229 size -= bSize;
230 }
231 sendFrame(frame);
232 }
233 } while (threadEn_ && (err == false) && nextFile());
234
235 std::unique_lock<std::mutex> lock(mtx_);
236 if (fd_ >= 0) ::close(fd_);
237 fd_ = -1;
238 active_ = false;
239 cond_.notify_all();
240}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Structured Rogue logging helper.
Definition Logging.h:58
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::utilities::fileio::StreamReader > StreamReaderPtr