Skip to content

Commit 39ed2d6

Browse files
authored
Implement flows text operations (#77)
1 parent c7fb12d commit 39ed2d6

File tree

5 files changed

+573
-1
lines changed

5 files changed

+573
-1
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package com.softwaremill.jox.flows;
2+
3+
import java.nio.ByteBuffer;
4+
import java.nio.charset.StandardCharsets;
5+
import java.util.Arrays;
6+
import java.util.Map;
7+
import java.util.Optional;
8+
import java.util.concurrent.atomic.AtomicReference;
9+
10+
11+
/**
12+
* The general algorithm and some helper functions (with their comments) are copied from ox:
13+
* see <a href="https://github.com/softwaremill/ox/blob/master/core/src/main/scala/ox/flow/FlowTextOps.scala">ox.flow.FlowTextOps</a>
14+
* Which was copied from fs2: see fs2.text.decodeC <a href="https://github.com/typelevel/fs2/blob/9b1b27cf7a8d7027df852d890555b341da70ef9e/core/shared/src/main/scala/fs2/text.scala"">link</a>
15+
* <p>
16+
* Extracted to separate class for better readability
17+
*/
18+
class ChunksUtf8Decoder {
19+
private static final int BOM_SIZE = 3; // const for UTF-8
20+
private static final byte[] BOM_UTF8 = new byte[]{-17, -69, -65};
21+
22+
public static <T> Flow<String> decodeStringUtf8(FlowStage<T> flowStage) {
23+
return Flows.usingEmit(emit -> {
24+
final AtomicReference<State> state = new AtomicReference<>(State.ProcessBOM);
25+
final AtomicReference<byte[]> buffer = new AtomicReference<>(null);
26+
27+
flowStage.run(t -> {
28+
if (!(t instanceof byte[] bytes)) {
29+
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
30+
}
31+
byte[] newBuffer;
32+
State newState;
33+
if (state.get() == State.ProcessBOM) {
34+
Map.Entry<byte[], State> processResult = processByteOrderMark(bytes, buffer.get());
35+
newBuffer = processResult.getKey();
36+
newState = processResult.getValue();
37+
} else {
38+
newBuffer = doPull(bytes, buffer.get(), emit);
39+
newState = State.Pull;
40+
}
41+
buffer.set(newBuffer);
42+
state.set(newState);
43+
});
44+
// A common case, worth checking in advance
45+
46+
if (buffer.get() != null && buffer.get().length > 0) {
47+
emit.apply(new String(buffer.get(), StandardCharsets.UTF_8));
48+
}
49+
});
50+
}
51+
52+
private static Map.Entry<byte[], State> processByteOrderMark(byte[] bytes, byte[] buffer) {
53+
// A common case, worth checking in advance
54+
if (buffer == null && bytes.length >= BOM_SIZE && startsWith(bytes, BOM_UTF8)) {
55+
return Map.entry(bytes, State.Pull);
56+
} else {
57+
byte[] newBuffer0 = buffer == null ? new byte[0] : buffer;
58+
byte[] newBuffer = Arrays.copyOf(newBuffer0, newBuffer0.length + bytes.length);
59+
newBuffer = ByteBuffer.wrap(newBuffer).put(newBuffer0.length, bytes).array();
60+
if (newBuffer.length >= BOM_SIZE) {
61+
byte[] rem = startsWith(newBuffer, BOM_UTF8) ? drop(newBuffer, BOM_SIZE) : newBuffer;
62+
return Map.entry(rem, State.Pull);
63+
} else if (startsWith(newBuffer, take(BOM_UTF8, newBuffer.length))) {
64+
return Map.entry(newBuffer, State.ProcessBOM); // we've accumulated less than the full BOM, let's pull some more
65+
} else {
66+
return Map.entry(newBuffer, State.Pull); // We've accumulated less than BOM size but we already know that these bytes aren't BOM
67+
}
68+
}
69+
}
70+
71+
private static byte[] doPull(byte[] bytes, byte[] buffer, FlowEmit<String> output) throws Exception {
72+
var result = processSingleChunk(buffer, bytes);
73+
Optional<String> str = result.getKey();
74+
if (str.isPresent()) {
75+
output.apply(str.get());
76+
}
77+
return result.getValue();
78+
}
79+
80+
private static Map.Entry<Optional<String>, byte[]> processSingleChunk(byte[] buffer, byte[] nextBytes) {
81+
byte[] allBytes;
82+
if (buffer == null || buffer.length == 0) {
83+
allBytes = nextBytes;
84+
} else {
85+
allBytes = Arrays.copyOf(buffer, buffer.length + nextBytes.length);
86+
allBytes = ByteBuffer.wrap(allBytes).put(buffer.length, nextBytes).array();
87+
}
88+
89+
int splitAt = allBytes.length - lastIncompleteBytes(allBytes);
90+
if (splitAt == allBytes.length) {
91+
// in the common case of ASCII chars
92+
// we are in this branch so the next buffer will
93+
// be empty
94+
return Map.entry(Optional.of(new String(allBytes, StandardCharsets.UTF_8)), new byte[0]);
95+
} else if (splitAt == 0) {
96+
return Map.entry(Optional.empty(), allBytes);
97+
} else {
98+
return Map.entry(
99+
// character
100+
Optional.of(new String(Arrays.copyOfRange(allBytes, 0, splitAt), StandardCharsets.UTF_8)),
101+
// remaining bytes
102+
Arrays.copyOfRange(allBytes, splitAt, allBytes.length)
103+
);
104+
}
105+
}
106+
107+
/**
108+
* Takes n elements from the beginning of the array and returns copy of the result
109+
*/
110+
private static byte[] take(byte[] a, int n) {
111+
return Arrays.copyOfRange(a, 0, n);
112+
}
113+
114+
/**
115+
* Drops n elements from the beginning of the array and returns copy of the result
116+
*/
117+
private static byte[] drop(byte[] a, int n) {
118+
return Arrays.copyOfRange(a, n, a.length);
119+
}
120+
121+
/**
122+
* Checks if array a starts with array b
123+
*/
124+
private static boolean startsWith(byte[] a, byte[] b) {
125+
return ByteBuffer.wrap(a, 0, b.length).equals(ByteBuffer.wrap(b));
126+
}
127+
128+
/*
129+
* Copied from scala lib fs2 (fs2.text.decodeC.lastIncompleteBytes)
130+
* Returns the length of an incomplete multi-byte sequence at the end of
131+
* `bs`. If `bs` ends with an ASCII byte or a complete multi-byte sequence,
132+
* 0 is returned.
133+
*/
134+
private static int lastIncompleteBytes(byte[] bs) {
135+
int minIdx = Math.max(0, bs.length - 3);
136+
int idx = bs.length - 1;
137+
int counter = 0;
138+
int res = 0;
139+
while (minIdx <= idx) {
140+
int c = continuationBytes(bs[idx]);
141+
if (c >= 0) {
142+
if (c != counter) {
143+
res = counter + 1;
144+
}
145+
return res;
146+
}
147+
idx--;
148+
counter++;
149+
}
150+
return res;
151+
}
152+
153+
/*
154+
* Copied from scala lib fs2 (fs2.text.decodeC.continuationBytes)
155+
* Returns the number of continuation bytes if `b` is an ASCII byte or a
156+
* leading byte of a multi-byte sequence, and -1 otherwise.
157+
*/
158+
private static int continuationBytes(byte b) {
159+
if ((b & 0x80) == 0x00) return 0; // ASCII byte
160+
else if ((b & 0xe0) == 0xc0) return 1; // leading byte of a 2 byte seq
161+
else if ((b & 0xf0) == 0xe0) return 2; // leading byte of a 3 byte seq
162+
else if ((b & 0xf8) == 0xf0) return 3; // leading byte of a 4 byte seq
163+
else return -1; // continuation byte or garbage
164+
}
165+
166+
// we start in the ProcessBOM state, and then transit to the Pull state
167+
private enum State {
168+
ProcessBOM, Pull
169+
}
170+
}

flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import static com.softwaremill.jox.structured.Scopes.unsupervised;
99
import static java.lang.Thread.sleep;
1010

11+
import java.nio.ByteBuffer;
12+
import java.nio.charset.Charset;
13+
import java.nio.charset.StandardCharsets;
1114
import java.time.Duration;
1215
import java.util.ArrayList;
1316
import java.util.Arrays;
@@ -55,7 +58,7 @@
5558
* Running a flow is possible using one of the `run*` methods, such as {@link Flow#runToList}, {@link Flow#runToChannel} or {@link Flow#runFold}.
5659
*/
5760
public class Flow<T> {
58-
protected final FlowStage<T> last;
61+
final FlowStage<T> last;
5962

6063
public Flow(FlowStage<T> last) {
6164
this.last = last;
@@ -677,6 +680,109 @@ public Flow<Void> drain() {
677680
return Flows.usingEmit(_ -> last.run(_ -> {}));
678681
}
679682

683+
/** Decodes a stream of chunks of bytes into UTF-8 Strings. This function is able to handle UTF-8 characters encoded on multiple bytes
684+
* that are split across chunks.
685+
*
686+
* @return
687+
* a flow of Strings decoded from incoming bytes.
688+
*/
689+
public Flow<String> decodeStringUtf8() {
690+
return ChunksUtf8Decoder.decodeStringUtf8(last);
691+
}
692+
693+
/**
694+
* Encodes a flow of `String` into a flow of bytes using UTF-8.
695+
*/
696+
public Flow<byte[]> encodeUtf8() {
697+
return map(s -> {
698+
if (s instanceof String string) {
699+
return string.getBytes(StandardCharsets.UTF_8);
700+
}
701+
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing String");
702+
});
703+
}
704+
705+
/**
706+
* Transforms a flow of byte arrays such that each emitted `String` is a text line from the input decoded using UTF-8 charset.
707+
*
708+
* @return
709+
* a flow emitting lines read from the input byte chunks, assuming they represent text.
710+
*/
711+
public Flow<String> linesUtf8() {
712+
return lines(StandardCharsets.UTF_8);
713+
}
714+
715+
/**
716+
* Transforms a flow of byte arrays such that each emitted `String` is a text line from the input.
717+
*
718+
* @param charset the charset to use for decoding the bytes into text.
719+
* @return a flow emitting lines read from the input byte arrays, assuming they represent text.
720+
*/
721+
public Flow<String> lines(Charset charset) {
722+
// buffer == Optional.empty() is a special state for handling empty chunks in onComplete, in order to tell them apart from empty lines
723+
return mapStatefulConcat(Optional::<byte[]>empty,
724+
(buffer, nextChunk) -> {
725+
if (!byte[].class.isInstance(nextChunk)) {
726+
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
727+
}
728+
// get next incoming chunk
729+
byte[] chunk = (byte[]) nextChunk;
730+
if (chunk.length == 0) {
731+
return Map.entry(Optional.empty(), Collections.emptyList());
732+
}
733+
734+
// check if chunk contains newline character, if not proceed to the next chunk
735+
int newLineIndex = getNewLineIndex(chunk);
736+
if (newLineIndex == -1) {
737+
if (buffer.isEmpty()) {
738+
return Map.entry(Optional.of(chunk), Collections.emptyList());
739+
}
740+
var b = buffer.get();
741+
byte[] newBuffer = Arrays.copyOf(b, b.length + chunk.length);
742+
newBuffer = ByteBuffer.wrap(newBuffer).put(b.length, chunk).array();
743+
return Map.entry(Optional.of(newBuffer), Collections.emptyList());
744+
}
745+
746+
// buffer for lines, if chunk contains more than one newline character
747+
List<byte[]> lines = new ArrayList<>();
748+
749+
// variable used to clear buffer after using it
750+
byte[] bufferFromPreviousChunk = buffer.orElse(new byte[0]);
751+
while (chunk.length > 0 && newLineIndex != -1) {
752+
byte[] line = new byte[newLineIndex];
753+
byte[] newChunk = new byte[chunk.length - newLineIndex - 1];
754+
ByteBuffer.wrap(chunk)
755+
.get(line, 0, newLineIndex)
756+
.get(newLineIndex + 1, newChunk, 0, chunk.length - newLineIndex - 1);
757+
758+
if (bufferFromPreviousChunk.length > 0) {
759+
// concat accumulated buffer and line
760+
byte[] buf = Arrays.copyOf(bufferFromPreviousChunk, bufferFromPreviousChunk.length + line.length);
761+
lines.add(ByteBuffer.wrap(buf).put(bufferFromPreviousChunk.length, line).array());
762+
// cleanup buffer
763+
bufferFromPreviousChunk = new byte[0];
764+
} else {
765+
lines.add(line);
766+
}
767+
chunk = newChunk;
768+
newLineIndex = getNewLineIndex(chunk);
769+
}
770+
return Map.entry(Optional.of(chunk), lines);
771+
},
772+
buf -> buf
773+
)
774+
.map(chunk -> new String(chunk, charset));
775+
}
776+
777+
private int getNewLineIndex(byte[] chunk) {
778+
for (int i = 0; i < chunk.length; i++) {
779+
if (chunk[i] == '\n') {
780+
return i;
781+
}
782+
}
783+
return -1;
784+
}
785+
680786
/**
681787
* Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error.
682788
*/
@@ -929,6 +1035,28 @@ public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerCompl
9291035
return Flows.interleaveAll(Arrays.asList((Flow<U>) this, other), segmentSize, eagerComplete, bufferCapacity);
9301036
}
9311037

1038+
/**
1039+
* Emits a given number of elements (determined by `segmentSize`) from this flow to the returned flow, then emits the same number of
1040+
* elements from the `other` flow and repeats. The order of elements in both flows is preserved.
1041+
* <p>
1042+
* If one of the flows is done before the other, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow is
1043+
* completed immediately, otherwise the remaining elements from the other flow are emitted by the returned flow.
1044+
* <p>
1045+
* Both flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
1046+
*
1047+
* @param other
1048+
* The flow whose elements will be interleaved with the elements of this flow.
1049+
* @param segmentSize
1050+
* The number of elements sent from each flow before switching to the other one.
1051+
* @param eagerComplete
1052+
* If `true`, the returned flow is completed as soon as either of the flow completes. If `false`, the remaining elements of the
1053+
* non-completed flow are sent downstream.
1054+
*/
1055+
public <U> Flow<U> interleave(Flow<U> other, int segmentSize, boolean eagerComplete) {
1056+
//noinspection unchecked
1057+
return Flows.interleaveAll(Arrays.asList((Flow<U>) this, other), segmentSize, eagerComplete);
1058+
}
1059+
9321060
/**
9331061
* Applies the given mapping function `f`, to each element emitted by this source, transforming it into an `Iterable` of results,
9341062
* then the returned flow emits the results one by one. Can be used to unfold incoming sequences of elements into single elements.

flows/src/main/java/com/softwaremill/jox/flows/Flows.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,28 @@ public static <T> Flow<T> failed(Exception t) {
226226
});
227227
}
228228

229+
/**
230+
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
231+
* of elements in all flows is preserved.
232+
* <p>
233+
* If any of the flows is done before the others, the behavior depends on the `eagerComplete` flag. When set to `true`, the returned flow
234+
* is completed immediately, otherwise the interleaving continues with the remaining non-completed flows. Once all but one flows are
235+
* complete, the elements of the remaining non-complete flow are emitted by the returned flow.
236+
* <p>
237+
* The provided flows are run concurrently and asynchronously. The size of used buffer is determined by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
238+
*
239+
* @param flows
240+
* The flows whose elements will be interleaved.
241+
* @param segmentSize
242+
* The number of elements sent from each flow before switching to the next one.
243+
* @param eagerComplete
244+
* If `true`, the returned flow is completed as soon as any of the flows completes. If `false`, the interleaving continues with the
245+
* remaining non-completed flows.
246+
*/
247+
public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, boolean eagerComplete) {
248+
return interleaveAll(flows, segmentSize, eagerComplete, Channel.BUFFER_SIZE.orElse(Channel.DEFAULT_BUFFER_SIZE));
249+
}
250+
229251
/**
230252
* Sends a given number of elements (determined by `segmentSize`) from each flow in `flows` to the returned flow and repeats. The order
231253
* of elements in all flows is preserved.

0 commit comments

Comments
 (0)