Skip to content

Commit 1f90579

Browse files
committed
Add readMessageAsync(…), tryReadMessageAsync(…), AsyncBufferedReader
1 parent 5102df7 commit 1f90579

File tree

5 files changed

+158
-2
lines changed

5 files changed

+158
-2
lines changed

capnproto/lib/capnproto.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export 'src/any_pointer.dart';
2+
export 'src/async_buffered_reader.dart';
23
export 'src/constants.dart';
34
export 'src/data_list.dart';
45
export 'src/error.dart';
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import 'dart:async';
2+
import 'dart:collection';
3+
import 'dart:math' as math;
4+
import 'dart:typed_data';
5+
6+
import 'package:meta/meta.dart';
7+
import 'package:oxidized/oxidized.dart';
8+
9+
import 'error.dart';
10+
11+
class AsyncBufferedReader {
12+
AsyncBufferedReader(Stream<List<int>> stream) {
13+
_subscription = stream.listen(
14+
(event) {
15+
_pendingData.add(event);
16+
_notifyNewEvent();
17+
},
18+
onDone: _notifyNewEvent,
19+
);
20+
}
21+
22+
late final StreamSubscription<List<int>> _subscription;
23+
final _pendingData = Queue<List<int>>();
24+
var _offsetInFirstBuffer = 0;
25+
26+
var _nextEvent = Completer<void>();
27+
void _notifyNewEvent() {
28+
final nextEvent = _nextEvent;
29+
_nextEvent = Completer();
30+
nextEvent.complete();
31+
}
32+
33+
@useResult
34+
Future<int> read(Uint8List buffer) async {
35+
if (buffer.isEmpty) return 0;
36+
37+
var didResumeSubscription = false;
38+
var offset = 0;
39+
while (offset < buffer.length) {
40+
if (_pendingData.isEmpty) {
41+
if (!didResumeSubscription) {
42+
didResumeSubscription = true;
43+
_subscription.resume();
44+
}
45+
46+
await _nextEvent.future;
47+
if (_pendingData.isEmpty) break;
48+
}
49+
50+
final newData = _pendingData.first;
51+
final length = math.min(
52+
buffer.length - offset,
53+
newData.length - _offsetInFirstBuffer,
54+
);
55+
buffer.setRange(
56+
offset,
57+
offset + length,
58+
newData.skip(_offsetInFirstBuffer),
59+
);
60+
offset += length;
61+
if (_offsetInFirstBuffer + length == newData.length) {
62+
_pendingData.removeFirst();
63+
_offsetInFirstBuffer = 0;
64+
}
65+
}
66+
_subscription.pause();
67+
return offset;
68+
}
69+
70+
@useResult
71+
Future<Result<void, PrematureEndOfInputCapnpError>> readExact(
72+
Uint8List buffer,
73+
) async {
74+
final length = await read(buffer);
75+
if (length < buffer.length) {
76+
return const Err(PrematureEndOfInputCapnpError());
77+
}
78+
return const Ok(null);
79+
}
80+
81+
Future<void> cancel() => _subscription.cancel();
82+
}

capnproto/lib/src/error.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ sealed class CapnpError {
2020
const CapnpError();
2121
}
2222

23-
class PrematureEndOfFileCapnpError extends CapnpError {
24-
const PrematureEndOfFileCapnpError();
23+
class PrematureEndOfInputCapnpError extends CapnpError {
24+
const PrematureEndOfInputCapnpError();
2525
}
2626

2727
class InvalidNumberOfSegmentsCapnpError extends CapnpError {

capnproto/lib/src/serialize.dart

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:typed_data';
33
import 'package:meta/meta.dart';
44
import 'package:oxidized/oxidized.dart';
55

6+
import 'async_buffered_reader.dart';
67
import 'constants.dart';
78
import 'error.dart';
89
import 'message.dart';
@@ -76,6 +77,74 @@ CapnpResult<MessageReader?> tryReadMessage(
7677
);
7778
}
7879

80+
/// Reads a serialized message (including a segment table) from [reader].
81+
///
82+
/// [reader] is allowed to extend beyond the end of the message.
83+
///
84+
/// The segment table format for streams is defined in the Cap'n Proto
85+
/// [encoding spec](https://capnproto.org/encoding.html).
86+
Future<CapnpResult<MessageReader>> readMessageAsync(
87+
AsyncBufferedReader reader, {
88+
ReaderOptions options = const ReaderOptions(),
89+
}) async {
90+
switch (await tryReadMessageAsync(reader, options: options)) {
91+
case Ok(value: null):
92+
return const Err(PrematureEndOfInputCapnpError());
93+
case Ok(:final value?):
94+
return Ok(value);
95+
case Err(:final error):
96+
return Err(error);
97+
}
98+
}
99+
100+
/// Like [readMessageAsync], but returns `Ok(null)` instead of an error if the
101+
/// [reader] is empty.
102+
Future<CapnpResult<MessageReader?>> tryReadMessageAsync(
103+
AsyncBufferedReader reader, {
104+
ReaderOptions options = const ReaderOptions(),
105+
}) async {
106+
final segmentCountBuffer = Uint8List(4);
107+
final readBytes = await reader.read(segmentCountBuffer);
108+
if (readBytes == 0) return const Ok(null);
109+
if (readBytes < 4) return const Err(PrematureEndOfInputCapnpError());
110+
111+
final int segmentCount;
112+
switch (segmentCountBuffer.asByteData.getSegmentCount()) {
113+
case Ok(:final value):
114+
segmentCount = value;
115+
case Err(:final error):
116+
return Err(error);
117+
}
118+
119+
final segmentLengthsBuffer =
120+
Uint8List(4 * (segmentCount.isEven ? segmentCount + 1 : segmentCount));
121+
if (await reader.readExact(segmentCountBuffer) case Err(:final error)) {
122+
return Err(error);
123+
}
124+
125+
final SegmentLengthsBuilder builder;
126+
switch (segmentLengthsBuffer.asByteData
127+
.getSegmentLengths(segmentCount, options)) {
128+
case Ok(:final value):
129+
builder = value.builder;
130+
case Err(:final error):
131+
return Err(error);
132+
}
133+
134+
final dataBuffer =
135+
Uint8List(builder.totalWords * CapnpConstants.bytesPerWord);
136+
if (await reader.readExact(dataBuffer) case Err(:final error)) {
137+
return Err(error);
138+
}
139+
140+
return Ok(
141+
MessageReader(
142+
builder.intoSegments(dataBuffer.asByteData),
143+
options: options,
144+
),
145+
);
146+
}
147+
79148
extension on ByteData {
80149
CapnpResult<int> getSegmentCount() {
81150
final segmentCount = getUint32(0, Endian.little) + 1;

capnproto/lib/src/utils.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,7 @@ extension ByteDataCapnp on ByteData {
7272
);
7373
}
7474
}
75+
76+
extension Uint8ListCapnp on Uint8List {
77+
ByteData get asByteData => buffer.asByteData(offsetInBytes, lengthInBytes);
78+
}

0 commit comments

Comments
 (0)