Skip to content

Commit d08b853

Browse files
committed
quic: Move DatequeueFeeder to queue.*
1 parent 4eeab84 commit d08b853

File tree

5 files changed

+237
-229
lines changed

5 files changed

+237
-229
lines changed

lib/internal/quic/quic.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ class QuicStream {
840840
* Sets the outbound data source for the stream. This can only be called
841841
* once and must be called before any data will be sent. The body can be
842842
* an ArrayBuffer, a TypedArray or DataView, or a Blob, a ReadableStream.
843-
* If the stream is destroyed or already has an outbound data source,
843+
* If the stream is destroyed or already has an outbound data source,
844844
* an error will be thrown.
845845
* @param {ArrayBuffer|SharedArrayBuffer|ArrayBufferView|Blob|ReadableStream} outbound
846846
*/

src/dataqueue/queue.cc

Lines changed: 183 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222

2323
namespace node {
2424

25+
using v8::ArrayBuffer;
2526
using v8::ArrayBufferView;
2627
using v8::BackingStore;
2728
using v8::Local;
2829
using v8::Object;
30+
using v8::ObjectTemplate;
31+
using v8::Promise;
32+
using v8::Uint8Array;
33+
using v8::TypedArray;
2934
using v8::Value;
3035

3136
namespace {
@@ -1104,9 +1109,7 @@ class FdEntry final : public EntryImpl {
11041109
friend class ReaderImpl;
11051110
};
11061111

1107-
} // namespace
11081112
// ============================================================================
1109-
11101113
class FeederEntry final : public EntryImpl {
11111114
public:
11121115
FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) {}
@@ -1176,7 +1179,7 @@ class FeederEntry final : public EntryImpl {
11761179
FeederEntry* entry_;
11771180
};
11781181
};
1179-
1182+
} // namespace
11801183
// ============================================================================
11811184

11821185
std::shared_ptr<DataQueue> DataQueue::CreateIdempotent(
@@ -1265,4 +1268,181 @@ void DataQueue::RegisterExternalReferences(
12651268
// Nothing to do here currently.
12661269
}
12671270

1271+
1272+
DataQueueFeeder::DataQueueFeeder(Environment* env, Local<Object> object)
1273+
: AsyncWrap(env, object) {
1274+
MakeWeak();
1275+
}
1276+
1277+
1278+
void DataQueueFeeder::tryWakePulls() {
1279+
if (!readFinish_.IsEmpty()) {
1280+
v8::Local<v8::Promise::Resolver> resolver =
1281+
readFinish_.Get(env()->isolate());
1282+
// I do not think, that this can error...
1283+
[[maybe_unused]] v8::Maybe<bool> ignoredResult =
1284+
resolver->Resolve(env()->context(), v8::True(env()->isolate()));
1285+
readFinish_.Reset();
1286+
}
1287+
}
1288+
1289+
void DataQueueFeeder::DrainAndClose() {
1290+
if (done) return;
1291+
done = true;
1292+
// do not do this several time, and note,
1293+
// it may be called several times.
1294+
while (!pendingPulls_.empty()) {
1295+
auto& pending = pendingPulls_.front();
1296+
auto pop = OnScopeLeave([this] { pendingPulls_.pop_front(); });
1297+
pending.next(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {});
1298+
}
1299+
if (!readFinish_.IsEmpty()) {
1300+
Local<v8::Promise::Resolver> resolver = readFinish_.Get(env()->isolate());
1301+
[[maybe_unused]] v8::Maybe<bool> ignoredResult =
1302+
resolver->Resolve(env()->context(), v8::False(env()->isolate()));
1303+
readFinish_.Reset();
1304+
}
1305+
}
1306+
1307+
JS_METHOD_IMPL(DataQueueFeeder::New) {
1308+
DCHECK(args.IsConstructCall());
1309+
auto env = Environment::GetCurrent(args);
1310+
new DataQueueFeeder(env, args.This());
1311+
}
1312+
1313+
JS_METHOD_IMPL(DataQueueFeeder::Ready) {
1314+
Environment* env = Environment::GetCurrent(args);
1315+
DataQueueFeeder* feeder;
1316+
ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This());
1317+
if (feeder->pendingPulls_.size() > 0) {
1318+
feeder->readFinish_.Reset();
1319+
return;
1320+
} else {
1321+
Local<Promise::Resolver> readFinish =
1322+
Promise::Resolver::New(env->context()).ToLocalChecked();
1323+
feeder->readFinish_.Reset(env->isolate(), readFinish);
1324+
args.GetReturnValue().Set(readFinish->GetPromise());
1325+
return;
1326+
}
1327+
}
1328+
1329+
JS_METHOD_IMPL(DataQueueFeeder::Submit) {
1330+
Environment* env = Environment::GetCurrent(args);
1331+
DataQueueFeeder* feeder;
1332+
ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This());
1333+
1334+
bool done = false;
1335+
if (args[1]->IsBoolean() && args[1].As<v8::Boolean>()->Value()) {
1336+
done = true;
1337+
}
1338+
if (!args[0].IsEmpty() && !args[0]->IsUndefined() && !args[0]->IsNull()) {
1339+
CHECK_GT(feeder->pendingPulls_.size(), 0);
1340+
auto chunk = args[0];
1341+
1342+
if (chunk->IsArrayBuffer()) {
1343+
auto buffer = chunk.As<ArrayBuffer>();
1344+
chunk = Uint8Array::New(buffer, 0, buffer->ByteLength());
1345+
}
1346+
if (!chunk->IsTypedArray()) {
1347+
THROW_ERR_INVALID_ARG_TYPE(
1348+
env, "Invalid data must be Arraybuffer or TypedArray");
1349+
return;
1350+
}
1351+
Local<TypedArray> typedArray = chunk.As<TypedArray>();
1352+
// now we create a copy
1353+
// detaching, would not be a good idea for example, such
1354+
// a limitation is not given with W3C Webtransport
1355+
// if we do not do it here, a transform stream would
1356+
// be needed to do the copy in the Webtransport case.
1357+
// there may be also troubles, if multiple Uint8Array
1358+
// are derived in a parser from a single ArrayBuffer
1359+
size_t nread = typedArray->ByteLength();
1360+
JS_TRY_ALLOCATE_BACKING(env, backingUniq, nread);
1361+
std::shared_ptr<BackingStore> backing = std::move(backingUniq);
1362+
1363+
auto originalStore = typedArray->Buffer()->GetBackingStore();
1364+
const void* originalData =
1365+
static_cast<char*>(originalStore->Data()) + typedArray->ByteOffset();
1366+
memcpy(backing->Data(), originalData, nread);
1367+
auto& pending = feeder->pendingPulls_.front();
1368+
auto pop = OnScopeLeave([feeder] {
1369+
if (feeder->pendingPulls_.size() > 0) feeder->pendingPulls_.pop_front();
1370+
});
1371+
DataQueue::Vec vec;
1372+
vec.base = static_cast<uint8_t*>(backing->Data());
1373+
vec.len = static_cast<uint64_t>(nread);
1374+
pending.next(bob::STATUS_CONTINUE, &vec, 1, [backing](uint64_t) {});
1375+
}
1376+
if (done) {
1377+
feeder->DrainAndClose();
1378+
feeder->readFinish_.Reset();
1379+
args.GetReturnValue().Set(v8::False(env->isolate()));
1380+
return;
1381+
} else {
1382+
if (feeder->pendingPulls_.size() > 0) {
1383+
feeder->readFinish_.Reset();
1384+
args.GetReturnValue().Set(v8::True(env->isolate()));
1385+
return;
1386+
} else {
1387+
Local<Promise::Resolver> readFinish =
1388+
Promise::Resolver::New(env->context()).ToLocalChecked();
1389+
feeder->readFinish_.Reset(env->isolate(), readFinish);
1390+
args.GetReturnValue().Set(readFinish->GetPromise());
1391+
return;
1392+
}
1393+
}
1394+
}
1395+
1396+
JS_METHOD_IMPL(DataQueueFeeder::Error) {
1397+
DataQueueFeeder* feeder;
1398+
ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This());
1399+
// FIXME, how should I pass on the error
1400+
// ResetStream must be send also
1401+
feeder->DrainAndClose();
1402+
}
1403+
1404+
JS_METHOD_IMPL(DataQueueFeeder::AddFakePull) {
1405+
DataQueueFeeder* feeder;
1406+
ASSIGN_OR_RETURN_UNWRAP(&feeder, args.This());
1407+
// this adds a fake pull for testing code, not to be used anywhere else
1408+
Next dummyNext = [](int, const DataQueue::Vec*, size_t, bob::Done) {
1409+
// intentionally empty
1410+
};
1411+
feeder->addPendingPull(PendingPull(std::move(dummyNext)));
1412+
feeder->tryWakePulls();
1413+
}
1414+
1415+
using quic::BindingData;
1416+
JS_CONSTRUCTOR_IMPL(DataQueueFeeder, dataqueuefeeder_constructor_template, {
1417+
auto isolate = env->isolate();
1418+
JS_NEW_CONSTRUCTOR();
1419+
JS_INHERIT(AsyncWrap);
1420+
JS_CLASS(dataqueuefeeder);
1421+
SetProtoMethod(isolate, tmpl, "error", Error);
1422+
SetProtoMethod(isolate, tmpl, "submit", Submit);
1423+
SetProtoMethod(isolate, tmpl, "ready", Ready);
1424+
SetProtoMethod(isolate, tmpl, "addFakePull", AddFakePull);
1425+
})
1426+
1427+
void DataQueueFeeder::InitPerIsolate(IsolateData* data,
1428+
Local<ObjectTemplate> target) {
1429+
// TODO(@jasnell): Implement the per-isolate state
1430+
}
1431+
1432+
void DataQueueFeeder::InitPerContext(Realm* realm, Local<Object> target) {
1433+
SetConstructorFunction(realm->context(),
1434+
target,
1435+
"DataQueueFeeder",
1436+
GetConstructorTemplate(realm->env()));
1437+
}
1438+
1439+
void DataQueueFeeder::RegisterExternalReferences(
1440+
ExternalReferenceRegistry* registry) {
1441+
registry->Register(New);
1442+
registry->Register(Submit);
1443+
registry->Register(Error);
1444+
registry->Register(Ready);
1445+
registry->Register(AddFakePull);
1446+
}
1447+
12681448
} // namespace node

src/dataqueue/queue.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <optional>
1616
#include <vector>
1717

18+
#include "../quic/defs.h"
19+
1820
namespace node {
1921
using v8::Local;
2022
using v8::Value;
@@ -315,6 +317,55 @@ class DataQueue : public MemoryRetainer {
315317
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
316318
};
317319

320+
321+
class DataQueueFeeder final : public AsyncWrap {
322+
public:
323+
using Next = bob::Next<DataQueue::Vec>;
324+
325+
DataQueueFeeder(Environment* env, v8::Local<v8::Object> object);
326+
327+
JS_CONSTRUCTOR(DataQueueFeeder);
328+
JS_BINDING_INIT_BOILERPLATE();
329+
330+
static BaseObjectPtr<DataQueueFeeder> Create();
331+
332+
void setDataQueue(std::shared_ptr<DataQueue> queue) { dataQueue_ = queue; }
333+
334+
335+
void clearPendingNext() { pendingPulls_.clear(); }
336+
337+
struct PendingPull {
338+
Next next;
339+
explicit PendingPull(Next next) : next(std::move(next)) {}
340+
};
341+
342+
void addPendingPull(PendingPull toAdd) {
343+
pendingPulls_.emplace_back(std::move(toAdd));
344+
}
345+
346+
bool Done() { return done; }
347+
348+
void DrainAndClose();
349+
void tryWakePulls();
350+
351+
SET_NO_MEMORY_INFO()
352+
SET_MEMORY_INFO_NAME(DataQueueFeeder)
353+
SET_SELF_SIZE(DataQueueFeeder)
354+
355+
JS_METHOD(New);
356+
JS_METHOD(Submit);
357+
JS_METHOD(Error);
358+
JS_METHOD(Ready);
359+
JS_METHOD(AddFakePull);
360+
361+
private:
362+
std::shared_ptr<DataQueue> dataQueue_;
363+
v8::Global<v8::Promise::Resolver> readFinish_;
364+
365+
std::deque<PendingPull> pendingPulls_;
366+
bool done = false;
367+
};
368+
318369
} // namespace node
319370

320371
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

0 commit comments

Comments
 (0)