Skip to content

Commit c50c423

Browse files
committed
Implement MultiBuffer
1 parent 0ff7e54 commit c50c423

File tree

1 file changed

+318
-0
lines changed

1 file changed

+318
-0
lines changed
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
package com.maxmind.db;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.nio.channels.FileChannel;
6+
import java.nio.charset.CharacterCodingException;
7+
import java.nio.charset.CharsetDecoder;
8+
import java.nio.CharBuffer;
9+
import java.nio.charset.CoderResult;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
13+
/**
14+
* A {@link Buffer} implementation backed by multiple {@link ByteBuffer}s,
15+
* allowing support for capacities larger than {@link Integer#MAX_VALUE}.
16+
*
17+
* <p>This implementation virtually concatenates several
18+
* {@link ByteBuffer}s (each up to {@link Integer#MAX_VALUE}) and maintains
19+
* a single logical position and limit across them.
20+
*
21+
* <p>Use this when working with databases/files that may exceed 2GB.
22+
*/
23+
class MultiBuffer implements Buffer {
24+
25+
/** Maximum size per underlying chunk. */
26+
static final int CHUNK_SIZE = Integer.MAX_VALUE;
27+
28+
final List<ByteBuffer> buffers = new ArrayList<>();
29+
private final long capacity;
30+
31+
private long position = 0;
32+
private long limit;
33+
34+
/**
35+
* Creates a new {@code MultiBuffer} with the given capacity, backed by
36+
* heap-allocated {@link ByteBuffer}s.
37+
*
38+
* @param capacity the total capacity in bytes
39+
*/
40+
public MultiBuffer(long capacity) {
41+
if (capacity <= 0) {
42+
throw new IllegalArgumentException("Capacity must be positive");
43+
}
44+
this.capacity = capacity;
45+
this.limit = capacity;
46+
47+
int fullChunks = (int) (capacity / CHUNK_SIZE);
48+
int remainder = (int) (capacity % CHUNK_SIZE);
49+
for (int i = 0; i < fullChunks; i++) {
50+
buffers.add(ByteBuffer.allocateDirect(CHUNK_SIZE));
51+
}
52+
if (remainder > 0) {
53+
buffers.add(ByteBuffer.allocateDirect(remainder));
54+
}
55+
}
56+
57+
private MultiBuffer(List<ByteBuffer> buffers) {
58+
this.buffers.addAll(buffers);
59+
long capacity = buffers.stream().mapToLong(ByteBuffer::capacity).sum();
60+
this.capacity = capacity;
61+
this.limit = capacity;
62+
}
63+
64+
/** {@inheritDoc} */
65+
@Override
66+
public long capacity() {
67+
return capacity;
68+
}
69+
70+
/** {@inheritDoc} */
71+
@Override
72+
public long position() {
73+
return position;
74+
}
75+
76+
/** {@inheritDoc} */
77+
@Override
78+
public Buffer position(long newPosition) {
79+
if (newPosition < 0 || newPosition > limit) {
80+
throw new IllegalArgumentException("Invalid position: " + newPosition);
81+
}
82+
this.position = newPosition;
83+
return this;
84+
}
85+
86+
/** {@inheritDoc} */
87+
@Override
88+
public long limit() {
89+
return limit;
90+
}
91+
92+
/** {@inheritDoc} */
93+
@Override
94+
public Buffer limit(long newLimit) {
95+
if (newLimit < 0 || newLimit > capacity) {
96+
throw new IllegalArgumentException("Invalid limit: " + newLimit);
97+
}
98+
this.limit = newLimit;
99+
if (position > limit) {
100+
position = limit;
101+
}
102+
return this;
103+
}
104+
105+
/** {@inheritDoc} */
106+
@Override
107+
public byte get() {
108+
byte value = get(position);
109+
position++;
110+
return value;
111+
}
112+
113+
/** {@inheritDoc} */
114+
@Override
115+
public Buffer get(byte[] dst) {
116+
long pos = position;
117+
int offset = 0;
118+
int length = dst.length;
119+
while (length > 0) {
120+
int bufIndex = (int) (pos / CHUNK_SIZE);
121+
int bufOffset = (int) (pos % CHUNK_SIZE);
122+
ByteBuffer buf = buffers.get(bufIndex).duplicate();
123+
buf.position(bufOffset);
124+
int toRead = Math.min(buf.remaining(), length);
125+
buf.get(dst, offset, toRead);
126+
pos += toRead;
127+
offset += toRead;
128+
length -= toRead;
129+
}
130+
position = pos;
131+
return this;
132+
}
133+
134+
/** {@inheritDoc} */
135+
@Override
136+
public byte get(long index) {
137+
if (index < 0 || index >= limit) {
138+
throw new IndexOutOfBoundsException("Index: " + index);
139+
}
140+
int bufIndex = (int) (index / CHUNK_SIZE);
141+
int offset = (int) (index % CHUNK_SIZE);
142+
return buffers.get(bufIndex).get(offset);
143+
}
144+
145+
/** {@inheritDoc} */
146+
@Override
147+
public double getDouble() {
148+
int bufIndex = (int) (position / CHUNK_SIZE);
149+
int off = (int) (position % CHUNK_SIZE);
150+
ByteBuffer buf = buffers.get(bufIndex).duplicate();
151+
buf.position(off);
152+
double value;
153+
if (off + 8 <= buf.remaining()) {
154+
value = buf.getDouble();
155+
position += 8;
156+
return value;
157+
} else {
158+
byte[] eight = new byte[8];
159+
get(eight);
160+
return ByteBuffer.wrap(eight).getDouble();
161+
}
162+
}
163+
164+
/** {@inheritDoc} */
165+
@Override
166+
public float getFloat() {
167+
int bufIndex = (int) (position / CHUNK_SIZE);
168+
int off = (int) (position % CHUNK_SIZE);
169+
ByteBuffer buf = buffers.get(bufIndex).duplicate();
170+
buf.position(off);
171+
float value;
172+
if (off + 4 <= buf.remaining()) {
173+
value = buf.getFloat();
174+
position += 4;
175+
return value;
176+
} else {
177+
byte[] four = new byte[4];
178+
get(four);
179+
return ByteBuffer.wrap(four).getFloat();
180+
}
181+
}
182+
183+
/** {@inheritDoc} */
184+
@Override
185+
public Buffer duplicate() {
186+
MultiBuffer copy = new MultiBuffer(capacity);
187+
for (int i = 0; i < buffers.size(); i++) {
188+
copy.buffers.set(i, buffers.get(i).duplicate());
189+
}
190+
copy.position = this.position;
191+
copy.limit = this.limit;
192+
return copy;
193+
}
194+
195+
/** {@inheritDoc} */
196+
@Override
197+
public long readFrom(FileChannel channel) throws IOException {
198+
long totalRead = 0;
199+
long pos = position;
200+
for (int i = (int) (pos / CHUNK_SIZE); i < buffers.size(); i++) {
201+
ByteBuffer buf = buffers.get(i);
202+
buf.position((int) (pos % CHUNK_SIZE));
203+
int read = channel.read(buf);
204+
if (read == -1) break;
205+
totalRead += read;
206+
pos += read;
207+
if (pos >= limit) break;
208+
}
209+
position = pos;
210+
return totalRead;
211+
}
212+
213+
/** {@inheritDoc} */
214+
@Override
215+
public String decode(CharsetDecoder decoder)
216+
throws CharacterCodingException {
217+
long remainingBytes = limit - position;
218+
219+
// Cannot allocate more than Integer.MAX_VALUE for CharBuffer
220+
if (remainingBytes > Integer.MAX_VALUE) {
221+
throw new IllegalStateException(
222+
"Decoding region too large to fit in a CharBuffer: " + remainingBytes
223+
);
224+
}
225+
226+
CharBuffer out = CharBuffer.allocate((int) remainingBytes);
227+
long pos = position;
228+
229+
while (remainingBytes > 0) {
230+
// Locate which underlying buffer we are in
231+
int bufIndex = (int) (pos / CHUNK_SIZE);
232+
int bufOffset = (int) (pos % CHUNK_SIZE);
233+
234+
ByteBuffer srcView = buffers.get(bufIndex).duplicate();
235+
srcView.position(bufOffset);
236+
237+
int toRead = (int) Math.min(srcView.remaining(), remainingBytes);
238+
srcView.limit(bufOffset + toRead);
239+
240+
CoderResult result = decoder.decode(srcView, out, false);
241+
if (result.isError()) {
242+
result.throwException();
243+
}
244+
245+
pos += toRead;
246+
remainingBytes -= toRead;
247+
}
248+
249+
// Update this MultiBuffer’s logical position
250+
this.position = pos;
251+
252+
out.flip();
253+
return out.toString();
254+
}
255+
256+
/**
257+
* Wraps the given byte arrays in a new {@code MultiBuffer}.
258+
*
259+
* <p>If any array exceeds {@link #CHUNK_SIZE}, it will be split across multiple
260+
* underlying {@link ByteBuffer}s. The data is copied into new buffers so the
261+
* returned {@code MultiBuffer} is fully independent.
262+
*
263+
* @param chunks the byte arrays to wrap
264+
* @return a new {@code MultiBuffer} backed by the arrays
265+
*/
266+
public static MultiBuffer wrap(List<byte[]> chunks) {
267+
List<ByteBuffer> buffers = new ArrayList<>();
268+
269+
for (byte[] chunk : chunks) {
270+
int offset = 0;
271+
int remaining = chunk.length;
272+
273+
while (remaining > 0) {
274+
int toPut = remaining;
275+
ByteBuffer buf = ByteBuffer.allocate(toPut);
276+
buf.put(chunk, offset, toPut);
277+
buf.flip();
278+
buffers.add(buf);
279+
280+
offset += toPut;
281+
remaining -= toPut;
282+
}
283+
}
284+
285+
return new MultiBuffer(buffers);
286+
}
287+
288+
/**
289+
* Creates a read-only {@code MultiBuffer} by memory-mapping the given
290+
* {@link FileChannel}.
291+
*
292+
* @param channel the file channel to map
293+
* @return a new {@code MultiBuffer} backed by memory-mapped segments
294+
* @throws IOException if an I/O error occurs
295+
*/
296+
public static MultiBuffer mapFromChannel(FileChannel channel) throws IOException {
297+
long size = channel.size();
298+
if (size <= 0) {
299+
throw new IllegalArgumentException("File channel has no data");
300+
}
301+
302+
MultiBuffer buf = new MultiBuffer(size);
303+
long remaining = size;
304+
305+
for (int i = 0; i < buf.buffers.size(); i++) {
306+
long chunkPos = (long) i * CHUNK_SIZE;
307+
long chunkSize = Math.min(CHUNK_SIZE, remaining);
308+
ByteBuffer mapped = channel.map(
309+
FileChannel.MapMode.READ_ONLY,
310+
chunkPos,
311+
chunkSize
312+
);
313+
buf.buffers.set(i, mapped);
314+
remaining -= chunkSize;
315+
}
316+
return buf;
317+
}
318+
}

0 commit comments

Comments
 (0)