rogue
Loading...
Searching...
No Matches
StreamZip.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <bzlib.h>
22#include <inttypes.h>
23#include <stdarg.h>
24#include <unistd.h>
25
26#include <memory>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
35
37namespace ru = rogue::utilities;
38
39#ifndef NO_PYTHON
40 #include <boost/python.hpp>
41namespace bp = boost::python;
42#endif
43
45ru::StreamZipPtr ru::StreamZip::create() {
46 ru::StreamZipPtr p = std::make_shared<ru::StreamZip>();
47 return (p);
48}
49
51ru::StreamZip::StreamZip() {}
52
54ru::StreamZip::~StreamZip() {}
55
57void ru::StreamZip::acceptFrame(ris::FramePtr frame) {
58 ris::Frame::BufferIterator rBuff;
59 ris::Frame::BufferIterator wBuff;
60 bool done;
61 int32_t ret;
62
64 ris::FrameLockPtr lock = frame->lock();
65
66 // First request a new frame of the same size
67 ris::FramePtr newFrame = this->reqFrame(frame->getPayload(), true);
68
69 // Setup compression
70 bz_stream strm;
71 strm.bzalloc = NULL;
72 strm.bzfree = NULL;
73 strm.opaque = NULL;
74
75 if ((ret = BZ2_bzCompressInit(&strm, 1, 0, 30)) != BZ_OK)
76 throw(
77 rogue::GeneralError::create("StreamZip::acceptFrame", "Error initializing compressor. ret=%" PRIi32, ret));
78
79 // Setup decompression pointers
80 rBuff = frame->beginBuffer();
81 strm.next_in = reinterpret_cast<char*>((*rBuff)->begin());
82 strm.avail_in = (*rBuff)->getPayload();
83
84 wBuff = newFrame->beginBuffer();
85 strm.next_out = reinterpret_cast<char*>((*wBuff)->begin());
86 strm.avail_out = (*wBuff)->getAvailable();
87
88 // Use the iterators to move data
89 done = false;
90 do {
91 if ((ret = BZ2_bzCompress(&strm, (done) ? BZ_FINISH : BZ_RUN)) == BZ_SEQUENCE_ERROR)
92 throw(rogue::GeneralError::create("StreamZip::acceptFrame", "Compression runtime error %" PRIi32, ret));
93
94 // Update read buffer if necessary
95 if (strm.avail_in == 0 && (!done)) {
96 if (++rBuff != frame->endBuffer()) {
97 strm.next_in = reinterpret_cast<char*>((*rBuff)->begin());
98 strm.avail_in = (*rBuff)->getPayload();
99 } else {
100 done = true;
101 }
102 }
103
104 // Update write buffer if necessary
105 if (strm.avail_out == 0) {
106 // We ran out of room, double the frame size, should not happen
107 if ((wBuff + 1) == newFrame->endBuffer()) {
108 ris::FramePtr tmpFrame = this->reqFrame(frame->getPayload(), true);
109 wBuff = newFrame->appendFrame(tmpFrame);
110 } else {
111 ++wBuff;
112 }
113 strm.next_out = reinterpret_cast<char*>((*wBuff)->begin());
114 strm.avail_out = (*wBuff)->getAvailable();
115 }
116 } while (ret != BZ_STREAM_END);
117
118 // Update output frame
119 newFrame->setPayload(strm.total_out_lo32);
120 newFrame->setError(frame->getError());
121 newFrame->setChannel(frame->getChannel());
122 newFrame->setFlags(frame->getFlags());
123 BZ2_bzCompressEnd(&strm);
124
125 this->sendFrame(newFrame);
126}
127
129ris::FramePtr ru::StreamZip::acceptReq(uint32_t size, bool zeroCopyEn) {
130 return (this->reqFrame(size, zeroCopyEn));
131}
132
133void ru::StreamZip::setup_python() {
134#ifndef NO_PYTHON
135
136 bp::class_<ru::StreamZip, ru::StreamZipPtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>("StreamZip",
137 bp::init<>());
138
139 bp::implicitly_convertible<ru::StreamZipPtr, ris::SlavePtr>();
140 bp::implicitly_convertible<ru::StreamZipPtr, ris::MasterPtr>();
141#endif
142}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::utilities::StreamZip > StreamZipPtr
Definition StreamZip.h:94