rogue
Loading...
Searching...
No Matches
StreamUnZip.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <bzlib.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <unistd.h>
25
26#include <memory>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
35
37namespace ru = rogue::utilities;
38
39#ifndef NO_PYTHON
40 #include <boost/python.hpp>
41namespace bp = boost::python;
42#endif
43
45ru::StreamUnZipPtr ru::StreamUnZip::create() {
46 ru::StreamUnZipPtr p = std::make_shared<ru::StreamUnZip>();
47 return (p);
48}
49
51ru::StreamUnZip::StreamUnZip() {}
52
54ru::StreamUnZip::~StreamUnZip() {}
55
57void ru::StreamUnZip::acceptFrame(ris::FramePtr frame) {
58 ris::Frame::BufferIterator rBuff;
59 ris::Frame::BufferIterator wBuff;
60 int32_t ret;
61
63 ris::FrameLockPtr lock = frame->lock();
64
65 // First request a new frame of the same size
66 ris::FramePtr newFrame = this->reqFrame(frame->getPayload(), true);
67
68 // Setup compression
69 bz_stream strm;
70 strm.bzalloc = NULL;
71 strm.bzfree = NULL;
72 strm.opaque = NULL;
73
74 if ((ret = BZ2_bzDecompressInit(&strm, 0, 0)) != BZ_OK)
75 throw(rogue::GeneralError::create("StreamUnZip::acceptFrame",
76 "Error initializing decompressor. ret=%" PRIi32,
77 ret));
78
79 // Setup decompression pointers
80 rBuff = frame->beginBuffer();
81 strm.next_in = reinterpret_cast<char*>((*rBuff)->begin());
82 strm.avail_in = (*rBuff)->getPayload();
83
84 wBuff = newFrame->beginBuffer();
85 strm.next_out = reinterpret_cast<char*>((*wBuff)->begin());
86 strm.avail_out = (*wBuff)->getAvailable();
87
88 // Use the iterators to move data
89 do {
90 ret = BZ2_bzDecompress(&strm);
91
92 if ((ret != BZ_STREAM_END) && (ret != BZ_OK))
93 throw(rogue::GeneralError::create("StreamUnZip::acceptFrame", "Decompression runtime error %" PRIi32, ret));
94
95 if (ret == BZ_STREAM_END) break;
96
97 // Update read buffer if necessary
98 if (strm.avail_in == 0) {
99 ++rBuff;
100 strm.next_in = reinterpret_cast<char*>((*rBuff)->begin());
101 strm.avail_in = (*rBuff)->getPayload();
102 }
103
104 // Update write buffer if necessary
105 if (strm.avail_out == 0) {
106 // We ran out of room, double the frame size
107 if ((wBuff + 1) == newFrame->endBuffer()) {
108 ris::FramePtr tmpFrame = this->reqFrame(frame->getPayload(), true);
109 wBuff = newFrame->appendFrame(tmpFrame);
110 } else {
111 ++wBuff;
112 }
113 strm.next_out = reinterpret_cast<char*>((*wBuff)->begin());
114 strm.avail_out = (*wBuff)->getAvailable();
115 }
116 } while (1);
117
118 newFrame->setPayload(strm.total_out_lo32);
119 newFrame->setError(frame->getError());
120 newFrame->setChannel(frame->getChannel());
121 newFrame->setFlags(frame->getFlags());
122
123 BZ2_bzDecompressEnd(&strm);
124
125 this->sendFrame(newFrame);
126}
127
129ris::FramePtr ru::StreamUnZip::acceptReq(uint32_t size, bool zeroCopyEn) {
130 return (this->reqFrame(size, zeroCopyEn));
131}
132
133void ru::StreamUnZip::setup_python() {
134#ifndef NO_PYTHON
135
136 bp::class_<ru::StreamUnZip, ru::StreamUnZipPtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>(
137 "StreamUnZip",
138 bp::init<>());
139
140 bp::implicitly_convertible<ru::StreamUnZipPtr, ris::SlavePtr>();
141 bp::implicitly_convertible<ru::StreamUnZipPtr, ris::MasterPtr>();
142#endif
143}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::utilities::StreamUnZip > StreamUnZipPtr
Definition StreamUnZip.h:96