forked from hpcc-systems/HPCC-Platform
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrtlcommon.hpp
More file actions
148 lines (123 loc) · 4.67 KB
/
rtlcommon.hpp
File metadata and controls
148 lines (123 loc) · 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#ifndef ECLCOMMON_HPP
#define ECLCOMMON_HPP
#include "jiface.hpp"
#include "jfile.hpp"
#include "jexcept.hpp"
#include "eclrtl.hpp"
#include "eclhelper.hpp"
//The CContiguousRowBuffer is a buffer used for reading ahead into a file, and ensuring there is a contiguous
//block of data available to the reader. Fixed size files could use this directly.
//NOTE: This class does not allocate a buffer - it uses the buffer in the input stream.
class ECLRTL_API CContiguousRowBuffer
{
public:
CContiguousRowBuffer() = default;
CContiguousRowBuffer(IBufferedSerialInputStream * _in);
void setStream(IBufferedSerialInputStream *_in);
const byte * peekBytes(size32_t maxSize);
const byte * peekFirstByte();
void skipBytes(size32_t size)
{
cur += size;
available -= size;
// call eos() to ensure stream->eos() is true if this class has got to the end of the stream
eos();
}
inline bool eos()
{
if (likely(available))
return false;
return checkInputEos();
}
inline offset_t tell() const { return in->tell() + (cur - buffer); }
inline const byte * queryRow() const { return cur; }
inline size_t maxAvailable() const { return available; }
inline void clearStream() { setStream(nullptr); }
inline void reset(offset_t offset, offset_t flen = (offset_t)-1)
{
in->reset(offset, flen);
clearBuffer();
}
protected:
void peekBytesDirect(size32_t size); // skip any consumed data and directly peek bytes from the input
private:
bool checkInputEos();
void clearBuffer()
{
buffer = nullptr;
cur = nullptr;
available = 0;
}
protected:
const byte * cur = nullptr;
private:
IBufferedSerialInputStream* in = nullptr;
const byte * buffer = nullptr;
size32_t available = 0;
};
//The CThorContiguousRowBuffer is the source for a readAhead call to ensure the entire row
//is in a contiguous block of memory. The read() and skip() functions must be implemented
class ECLRTL_API CThorContiguousRowBuffer : public CContiguousRowBuffer, implements IRowPrefetcherSource
{
public:
CThorContiguousRowBuffer() = default;
CThorContiguousRowBuffer(IBufferedSerialInputStream * _in);
inline void setStream(IBufferedSerialInputStream *_in)
{
CContiguousRowBuffer::setStream(_in);
readOffset = 0;
}
virtual const byte * peek(size32_t maxSize) override;
virtual offset_t beginNested() override;
virtual bool finishedNested(offset_t & len) override;
virtual size32_t read(size32_t len, void * ptr) override;
virtual size32_t readSize() override;
virtual size32_t readPackedInt(void * ptr) override;
virtual size32_t readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len) override;
virtual size32_t readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize) override;
virtual size32_t readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize) override;
//The following functions should only really be called when used by the readAhead() function
virtual void skip(size32_t size) override;
virtual void skipPackedInt() override;
virtual void skipUtf8(size32_t len) override;
virtual void skipVStr() override;
virtual void skipVUni() override;
virtual const byte * querySelf() override; // Dubious - used from ifblocks
virtual void noteStartChild() override;
virtual void noteFinishChild() override;
inline const byte * queryRow() const { return cur; }
inline size32_t queryRowSize() const { return readOffset; }
inline void finishedRow()
{
skipBytes(readOffset);
readOffset = 0;
}
inline void reset(offset_t offset, offset_t flen = (offset_t)-1)
{
CContiguousRowBuffer::reset(offset, flen);
readOffset = 0;
}
inline void read(byte & next) { doRead(1, &next); }
inline void read(bool & next) { doRead(1, &next); }
size32_t doRead(size32_t len, void * ptr);
protected:
size32_t sizePackedInt();
size32_t sizeUtf8(size32_t len);
size32_t sizeVStr();
size32_t sizeVUni();
void reportReadFail();
private:
inline void ensureAccessible(size32_t required)
{
if (required > maxAvailable())
{
peekBytesDirect(required);
if (unlikely(required > maxAvailable()))
throw makeStringExceptionV(0, "Required %u bytes, but only %u available (row offset %u file offset %llu)", required, (unsigned)maxAvailable(), readOffset, tell() + readOffset);
}
}
protected:
size32_t readOffset = 0; // Offset within the current row
UnsignedArray childStartOffsets;
};
#endif