Skip to content

Commit c3a5c46

Browse files
committed
perf: add a NonLockingBufferInputStream for DirectLinkingResource
to improve performance of binary model loading, add a NonLockingBufferInputStream for DirectLinkingResource, based on BufferInputStream without supporting concurrency.
1 parent 263b9fe commit c3a5c46

File tree

2 files changed

+298
-4
lines changed

2 files changed

+298
-4
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Avaloq Group AG and others.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Eclipse Public License v1.0
5+
* which accompanies this distribution, and is available at
6+
* http://www.eclipse.org/legal/epl-v10.html
7+
*
8+
* Contributors:
9+
* Avaloq Group AG - initial API and implementation
10+
*******************************************************************************/
11+
package com.avaloq.tools.ddk.xtext.resource.persistence;
12+
13+
import java.io.FilterInputStream;
14+
import java.io.IOException;
15+
import java.io.InputStream;
16+
import java.io.OutputStream;
17+
import java.util.Arrays;
18+
import java.util.Objects;
19+
20+
21+
/**
22+
* Like {@code BufferedInputStream} without supporting concurrency.
23+
*/
24+
@SuppressWarnings("nls")
25+
public class NonLockingBufferInputStream extends FilterInputStream {
26+
27+
private static final int DEFAULT_BUFFER_SIZE = 8192;
28+
private static final byte[] EMPTY = new byte[0];
29+
private final int initialSize;
30+
31+
private byte[] buf;
32+
private int count;
33+
private int pos;
34+
private int markpos = -1;
35+
private int marklimit;
36+
37+
private InputStream getInIfOpen() throws IOException {
38+
InputStream input = in;
39+
if (input == null) {
40+
throw new IOException("Stream closed");
41+
}
42+
return input;
43+
}
44+
45+
private byte[] getBufIfOpen(final boolean allocateIfEmpty) throws IOException {
46+
if (allocateIfEmpty && buf == EMPTY) {
47+
buf = new byte[initialSize];
48+
}
49+
return buf;
50+
}
51+
52+
private byte[] getBufIfOpen() throws IOException {
53+
return getBufIfOpen(true);
54+
}
55+
56+
private void ensureOpen() throws IOException {
57+
if (buf == null) {
58+
throw new IOException("Stream closed");
59+
}
60+
}
61+
62+
/**
63+
* Creates a {@code NonLockingBufferInputStream}.
64+
*
65+
* @param in
66+
* the underlying input stream.
67+
*/
68+
public NonLockingBufferInputStream(final InputStream in) {
69+
this(in, DEFAULT_BUFFER_SIZE);
70+
}
71+
72+
/**
73+
* Creates a {@code NonLockingBufferInputStream}
74+
* with the specified buffer size.
75+
*
76+
* @param in
77+
* the underlying input stream.
78+
* @param size
79+
* the buffer size.
80+
* @throws IllegalArgumentException
81+
* if {@code size <= 0}.
82+
*/
83+
public NonLockingBufferInputStream(final InputStream in, final int size) {
84+
super(in);
85+
if (size <= 0) {
86+
throw new IllegalArgumentException("Buffer size <= 0");
87+
}
88+
initialSize = size;
89+
buf = new byte[size];
90+
}
91+
92+
private static final int SOFT_MAX_ARRAY_LENGTH = Integer.MAX_VALUE - 8;
93+
94+
private static int newLength(final int oldLength, final int minGrowth, final int prefGrowth) {
95+
int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
96+
if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
97+
return prefLength;
98+
} else {
99+
return hugeLength(oldLength, minGrowth);
100+
}
101+
}
102+
103+
private static int hugeLength(final int oldLength, final int minGrowth) {
104+
int minLength = oldLength + minGrowth;
105+
if (minLength < 0) { // overflow
106+
throw new OutOfMemoryError("Required array length " + oldLength + " + " + minGrowth + " is too large");
107+
} else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
108+
return SOFT_MAX_ARRAY_LENGTH;
109+
} else {
110+
return minLength;
111+
}
112+
}
113+
114+
private void fill() throws IOException {
115+
byte[] buffer = getBufIfOpen();
116+
if (markpos == -1) {
117+
pos = 0; /* no mark: throw away the buffer */
118+
} else if (pos >= buffer.length) { /* no room left in buffer */
119+
if (markpos > 0) { /* can throw away early part of the buffer */
120+
int sz = pos - markpos;
121+
System.arraycopy(buffer, markpos, buffer, 0, sz);
122+
pos = sz;
123+
markpos = 0;
124+
} else if (buffer.length >= marklimit) {
125+
markpos = -1; /* buffer got too big, invalidate mark */
126+
pos = 0; /* drop buffer contents */
127+
} else { /* grow buffer */
128+
int nsz = newLength(pos, 1, pos);
129+
if (nsz > marklimit) {
130+
nsz = marklimit;
131+
}
132+
byte[] nbuf = new byte[nsz];
133+
System.arraycopy(buffer, 0, nbuf, 0, pos);
134+
buffer = nbuf;
135+
}
136+
}
137+
count = pos;
138+
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
139+
if (n > 0) {
140+
count = n + pos;
141+
}
142+
}
143+
144+
@Override
145+
public int read() throws IOException {
146+
if (pos >= count) {
147+
fill();
148+
if (pos >= count) {
149+
return -1;
150+
}
151+
}
152+
return getBufIfOpen()[pos++] & 0xff;
153+
}
154+
155+
private int read1(final byte[] b, final int off, final int len) throws IOException {
156+
int avail = count - pos;
157+
if (avail <= 0) {
158+
/*
159+
* If the requested length is at least as large as the buffer, and
160+
* if there is no mark/reset activity, do not bother to copy the
161+
* bytes into the local buffer. In this way buffered streams will
162+
* cascade harmlessly.
163+
*/
164+
int size = Math.max(getBufIfOpen(false).length, initialSize);
165+
if (len >= size && markpos == -1) {
166+
return getInIfOpen().read(b, off, len);
167+
}
168+
fill();
169+
avail = count - pos;
170+
if (avail <= 0) {
171+
return -1;
172+
}
173+
}
174+
int cnt = (avail < len) ? avail : len;
175+
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
176+
pos += cnt;
177+
return cnt;
178+
}
179+
180+
@Override
181+
public int read(final byte[] b, final int off, final int len) throws IOException {
182+
ensureOpen();
183+
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
184+
throw new IndexOutOfBoundsException();
185+
} else if (len == 0) {
186+
return 0;
187+
}
188+
189+
int n = 0;
190+
for (;;) {
191+
int nread = read1(b, off + n, len - n);
192+
if (nread <= 0) {
193+
return (n == 0) ? nread : n;
194+
}
195+
n += nread;
196+
if (n >= len) {
197+
return n;
198+
}
199+
// if not closed but no bytes available, return
200+
InputStream input = in;
201+
if (input != null && input.available() <= 0) {
202+
return n;
203+
}
204+
}
205+
}
206+
207+
@Override
208+
public long skip(final long n) throws IOException {
209+
ensureOpen();
210+
if (n <= 0) {
211+
return 0;
212+
}
213+
long avail = count - pos;
214+
215+
if (avail <= 0) {
216+
// If no mark position set then don't keep in buffer
217+
if (markpos == -1) {
218+
return getInIfOpen().skip(n);
219+
}
220+
221+
// Fill in buffer to save bytes for reset
222+
fill();
223+
avail = count - pos;
224+
if (avail <= 0) {
225+
return 0;
226+
}
227+
}
228+
229+
long skipped = (avail < n) ? avail : n;
230+
pos += (int) skipped;
231+
return skipped;
232+
}
233+
234+
@Override
235+
public int available() throws IOException {
236+
int n = count - pos;
237+
int avail = getInIfOpen().available();
238+
return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail;
239+
}
240+
241+
@Override
242+
public void mark(final int readlimit) {
243+
marklimit = readlimit;
244+
markpos = pos;
245+
}
246+
247+
@Override
248+
public void reset() throws IOException {
249+
ensureOpen();
250+
if (markpos < 0) {
251+
throw new IOException("Resetting to invalid mark");
252+
}
253+
pos = markpos;
254+
}
255+
256+
@Override
257+
public boolean markSupported() {
258+
return true;
259+
}
260+
261+
@Override
262+
public void close() throws IOException {
263+
while (buf != null) {
264+
buf = null;
265+
InputStream input = in;
266+
in = null;
267+
if (input != null) {
268+
input.close();
269+
}
270+
return;
271+
}
272+
}
273+
274+
@Override
275+
public long transferTo(final OutputStream out) throws IOException {
276+
Objects.requireNonNull(out, "out");
277+
if (markpos == -1) {
278+
int avail = count - pos;
279+
if (avail > 0) {
280+
// Prevent poisoning and leaking of buf
281+
byte[] buffer = Arrays.copyOfRange(getBufIfOpen(), pos, count);
282+
out.write(buffer);
283+
pos = count;
284+
}
285+
try {
286+
return Math.addExact(avail, getInIfOpen().transferTo(out));
287+
} catch (ArithmeticException ignore) {
288+
return Long.MAX_VALUE;
289+
}
290+
} else {
291+
return super.transferTo(out);
292+
}
293+
}
294+
295+
}

com.avaloq.tools.ddk.xtext/src/com/avaloq/tools/ddk/xtext/resource/persistence/DirectLinkingResourceStorageLoadable.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
package com.avaloq.tools.ddk.xtext.resource.persistence;
1313

14-
import java.io.BufferedInputStream;
1514
import java.io.DataInputStream;
1615
import java.io.IOException;
1716
import java.io.InputStream;
@@ -150,7 +149,7 @@ protected void loadEntries(final StorageAwareResource resource, final ZipInputSt
150149
// fall through
151150
case LOAD:
152151
positioner.position(Constituent.CONTENT);
153-
readContents(resource, new BufferedInputStream(zipIn));
152+
readContents(resource, new NonLockingBufferInputStream(zipIn));
154153
break;
155154
}
156155

@@ -163,7 +162,7 @@ protected void loadEntries(final StorageAwareResource resource, final ZipInputSt
163162
break;
164163
default:
165164
positioner.position(Constituent.ASSOCIATIONS);
166-
readAssociationsAdapter(resource, new BufferedInputStream(zipIn));
165+
readAssociationsAdapter(resource, new NonLockingBufferInputStream(zipIn));
167166
break;
168167
}
169168

@@ -194,7 +193,7 @@ protected void loadEntries(final StorageAwareResource resource, final ZipInputSt
194193
break;
195194
case LOAD:
196195
positioner.position(Constituent.NODE_MODEL);
197-
readNodeModel(resource, new BufferedInputStream(zipIn), content);
196+
readNodeModel(resource, new NonLockingBufferInputStream(zipIn), content);
198197
break;
199198
}
200199
}

0 commit comments

Comments
 (0)