forked from libapps/libapps-mirror
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnassh_buffer_scatgat.js
More file actions
160 lines (143 loc) · 4.44 KB
/
nassh_buffer_scatgat.js
File metadata and controls
160 lines (143 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2020 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
/**
* @fileoverview Scatter/gather buffer implementation.
*/
import {BufferInterface} from './nassh_buffer_interface.js';
/**
* A buffer using the scatter/gather pattern.
*
* Buffers written will be retained in their entirety, and we'll take care of
* walking them when reading as if it were one giant linear buffer. This way
* we avoid creating new temporary buffers on the fly, and unnecessary memcpys.
*/
export class ScatGatBuffer extends BufferInterface {
/**
* @param {boolean=} autoack
* @override
*/
constructor(autoack = false) {
super(autoack);
/**
* The list of queued buffers (unread/unacked).
*
* We use an object rather than an array as it simplifies implementation:
* we can freely delete elements when finished without having to reshift or
* reallocate the storage on the fly.
*
* @type {!Object<number, !Uint8Array>}
*/
this.queue_ = {};
/**
* The next free position for queueing a buffer written to us.
*
* @type {number}
*/
this.writePos_ = 0;
/**
* The current buffer we're reading.
*
* @type {number}
*/
this.readPos_ = 0;
/**
* Read offset into the current buffer.
*
* @type {number}
*/
this.readOffset_ = 0;
/**
* The buffer that has yet to be acked fully.
*
* @type {number}
*/
this.ackPos_ = 0;
/**
* Ack offset into the current buffer.
*
* @type {number}
*/
this.ackOffset_ = 0;
}
/**
* @param {!ArrayBuffer|!TypedArray} buffer
* @override
*/
write(buffer) {
const u8 = new Uint8Array(buffer);
// Since writePos_ is a number, this is limited to 2^53 which is ~9 peta
// buffers (not bytes). By the time this hits the limit, we'd have to
// transfer ~1EB in a single direction which is unrealistic atm. If we
// find a situation where this matters, we can switch writePos to a bigint.
this.queue_[this.writePos_++] = u8;
this.unreadCount_ += u8.length;
}
/**
* @param {number} length
* @return {!Uint8Array}
* @override
*/
read(length) {
let written = 0;
// We allocate the max requested size initially, but we'll shrink it down
// just before returning in case we weren't able to fill the request.
const ret = new Uint8Array(length);
// Walk each unread buffer and copy over data.
while (written < length && this.readPos_ in this.queue_) {
// Create a view into the return to make it easier to memcpy below.
const output = ret.subarray(written);
// Pull out the current unread buffer (and the offset into it).
const curr = this.queue_[this.readPos_];
const input = curr.subarray(
this.readOffset_, this.readOffset_ + output.length);
// Copy out this chunk of data.
output.set(input);
written += input.length;
// Figure out if we've fully consumed this buffer yet.
if (input.length + this.readOffset_ === curr.length) {
++this.readPos_;
this.readOffset_ = 0;
} else {
this.readOffset_ += input.length;
}
}
// If auto-acking, delete any fully read buffers now before syncing the
// ack state to the current read state.
if (this.autoack_) {
while (this.ackPos_ < this.readPos_) {
delete this.queue_[this.ackPos_++];
}
this.ackOffset_ = this.readOffset_;
}
this.unreadCount_ -= written;
// Shrink the returned view to match how much data actually exists.
return ret.subarray(0, written);
}
/**
* @param {number} length How many bytes to ack.
* @override
*/
ack(length) {
let acked = 0;
while (acked < length) {
// Look up the current unacked buffer (if any).
const curr = this.queue_[this.ackPos_];
if (!curr) {
break;
}
// Figure out if we can ack the entire buffer, or just part of it.
const togo = length - acked;
if (curr.length - this.ackOffset_ <= togo) {
// We're acking this entire buffer, so free it before moving on.
acked += curr.length - this.ackOffset_;
delete this.queue_[this.ackPos_++];
this.ackOffset_ = 0;
} else {
// We aren't acking the entire buffer, so update the offset into it.
acked += togo;
this.ackOffset_ += togo;
}
}
}
}