Skip to content

Commit 7a0750b

Browse files
committed
Plumb allowUnconfirmed through SQL queries to the onWrite callback
It’s possible to mix writes that are confirmed with writes that are unconfirmed. We need to be able to distinguish which writes have what unconfirmed status in the `onWrite` call. The simplest way I could figure out how to do this was to plumb whether a certain query is confirmed or not all the way from where the queries are created down into the SqliteDatabase layer, which can call back into the ActorSqlite layer via the `onWrite` call. Notably, this allows us to mix APIs that do support unconfirmed writes (so far, just the async K/V API) with APIs that support confirmed writes only (the `storage.sql` and `storage.kv` APIs) since we default allowConfirmed to be false.
1 parent 6564053 commit 7a0750b

File tree

7 files changed

+60
-23
lines changed

7 files changed

+60
-23
lines changed

src/workerd/io/actor-sqlite.c++

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ void ActorSqlite::onCriticalError(
220220
}
221221
}
222222

223-
void ActorSqlite::onWrite() {
223+
void ActorSqlite::onWrite(bool allowUnconfirmed) {
224224
requireNotBroken();
225225
if (currentTxn.is<NoTxn>()) {
226226
auto txn = kj::heap<ImplicitTxn>(*this);
@@ -471,7 +471,7 @@ kj::OneOf<ActorCacheOps::GetResultList, kj::Promise<ActorCacheOps::GetResultList
471471

472472
kj::Maybe<kj::Promise<void>> ActorSqlite::put(Key key, Value value, WriteOptions options) {
473473
requireNotBroken();
474-
kv.put(key, value);
474+
kv.put(key, value, {.allowUnconfirmed = options.allowUnconfirmed});
475475
return kj::none;
476476
}
477477

@@ -491,7 +491,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::put(kj::Array<KeyValuePair> pairs, Wri
491491
{.regulator = SqliteDatabase::TRUSTED}, kj::str("SAVEPOINT _cf_put_multiple_savepoint"));
492492
for (const auto& pair: pairs) {
493493
try {
494-
kv.put(pair.key, pair.value);
494+
kv.put(pair.key, pair.value, {.allowUnconfirmed = options.allowUnconfirmed});
495495
} catch (kj::Exception& e) {
496496
// We need to rollback to the putMultiple SAVEPOINT. Do it, and then release the SAVEPOINT.
497497
db->run({.regulator = SqliteDatabase::TRUSTED},
@@ -507,7 +507,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::put(kj::Array<KeyValuePair> pairs, Wri
507507
}
508508
} else {
509509
for (auto& pair: pairs) {
510-
kv.put(pair.key, pair.value);
510+
kv.put(pair.key, pair.value, {.allowUnconfirmed = options.allowUnconfirmed});
511511
}
512512
}
513513
return kj::none;
@@ -516,15 +516,15 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::put(kj::Array<KeyValuePair> pairs, Wri
516516
kj::OneOf<bool, kj::Promise<bool>> ActorSqlite::delete_(Key key, WriteOptions options) {
517517
requireNotBroken();
518518

519-
return kv.delete_(key);
519+
return kv.delete_(key, {.allowUnconfirmed = options.allowUnconfirmed});
520520
}
521521

522522
kj::OneOf<uint, kj::Promise<uint>> ActorSqlite::delete_(kj::Array<Key> keys, WriteOptions options) {
523523
requireNotBroken();
524524

525525
uint count = 0;
526526
for (auto& key: keys) {
527-
count += kv.delete_(key);
527+
count += kv.delete_(key, {.allowUnconfirmed = options.allowUnconfirmed});
528528
}
529529
return count;
530530
}

src/workerd/io/actor-sqlite.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
227227
AlarmLaterErrorHandler alarmLaterErrorHandler;
228228
kj::TaskSet alarmLaterTasks;
229229

230-
void onWrite();
230+
void onWrite(bool allowUnconfirmed);
231231

232232
void onCriticalError(kj::StringPtr errorMessage, kj::Maybe<kj::Exception> maybeException);
233233

src/workerd/util/sqlite-kv.c++

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ void SqliteKv::cancelCurrentCursor() {
5353
}
5454
}
5555

56-
SqliteKv::Initialized& SqliteKv::ensureInitialized() {
56+
SqliteKv::Initialized& SqliteKv::ensureInitialized(bool allowUnconfirmed) {
5757
if (!tableCreated) {
58-
db.run(R"(
58+
db.run(SqliteDatabase::QueryOptions{.regulator = SqliteDatabase::TRUSTED,
59+
.allowUnconfirmed = allowUnconfirmed},
60+
R"(
5961
CREATE TABLE IF NOT EXISTS _cf_KV (
6062
key TEXT PRIMARY KEY,
6163
value BLOB
@@ -137,19 +139,29 @@ kj::Maybe<SqliteKv::ListCursor::KeyValuePair> SqliteKv::ListCursor::next() {
137139
}
138140

139141
void SqliteKv::put(KeyPtr key, ValuePtr value) {
140-
ensureInitialized().stmtPut.run(key, value);
142+
return put(key, value, {});
143+
}
144+
145+
void SqliteKv::put(KeyPtr key, ValuePtr value, WriteOptions options) {
146+
ensureInitialized(options.allowUnconfirmed)
147+
.stmtPut.run({.allowUnconfirmed = options.allowUnconfirmed}, key, value);
141148
}
142149

143150
bool SqliteKv::delete_(KeyPtr key) {
144-
auto query = ensureInitialized().stmtDelete.run(key);
151+
return delete_(key, {});
152+
}
153+
154+
bool SqliteKv::delete_(KeyPtr key, WriteOptions options) {
155+
auto query = ensureInitialized(options.allowUnconfirmed)
156+
.stmtDelete.run({.allowUnconfirmed = options.allowUnconfirmed}, key);
145157
return query.changeCount() > 0;
146158
}
147159

148160
uint SqliteKv::deleteAll() {
149161
// TODO(perf): Consider introducing a compatibility flag that causes deleteAll() to always return
150162
// 1. Apps almost certainly don't care about the return value but historically we returned the
151163
// count of keys deleted, so now we're stuck counting the table size for no good reason.
152-
uint count = tableCreated ? ensureInitialized().stmtCountKeys.run().getInt(0) : 0;
164+
uint count = tableCreated ? ensureInitialized(false).stmtCountKeys.run().getInt(0) : 0;
153165
db.reset();
154166
return count;
155167
}

src/workerd/util/sqlite-kv.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,17 @@ class SqliteKv: private SqliteDatabase::ResetListener {
5454
class ListCursor;
5555
kj::Own<ListCursor> list(KeyPtr begin, kj::Maybe<KeyPtr> end, kj::Maybe<uint> limit, Order order);
5656

57+
struct WriteOptions {
58+
bool allowUnconfirmed = false;
59+
};
60+
5761
// Store a value into the table.
5862
void put(KeyPtr key, ValuePtr value);
63+
void put(KeyPtr key, ValuePtr value, WriteOptions options);
5964

6065
// Delete the key and return whether it was matched.
6166
bool delete_(KeyPtr key);
67+
bool delete_(KeyPtr key, WriteOptions options);
6268

6369
uint deleteAll();
6470

@@ -148,7 +154,7 @@ class SqliteKv: private SqliteDatabase::ResetListener {
148154

149155
void cancelCurrentCursor();
150156

151-
Initialized& ensureInitialized();
157+
Initialized& ensureInitialized(bool allowUnconfirmed);
152158
// Make sure the KV table is created and prepared statements are ready. Not called until the
153159
// first write.
154160

src/workerd/util/sqlite-test.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ KJ_TEST("SQLite onWrite callback") {
449449
SqliteDatabase db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY);
450450

451451
bool sawWrite = false;
452-
db.onWrite([&]() { sawWrite = true; });
452+
db.onWrite([&](bool allowUnconfirmed) { sawWrite = true; });
453453

454454
setupSql(db);
455455
KJ_EXPECT(sawWrite);

src/workerd/util/sqlite.c++

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,9 +557,9 @@ bool SqliteDatabase::observedCriticalError() {
557557
return criticalErrorOccurred;
558558
}
559559

560-
void SqliteDatabase::notifyWrite() {
560+
void SqliteDatabase::notifyWrite(bool allowUnconfirmed) {
561561
KJ_IF_SOME(cb, onWriteCallback) {
562-
cb();
562+
cb(allowUnconfirmed);
563563
}
564564
}
565565

@@ -766,7 +766,7 @@ SqliteDatabase::StatementAndEffect SqliteDatabase::prepareSql(const Regulator& r
766766
KJ_DEFER(currentRegulator = regulator);
767767
currentParseContext = kj::none;
768768
KJ_DEFER(currentParseContext = parseContext);
769-
cb();
769+
cb(false); // prepareSql doesn't have access to allowUnconfirmed, use safe default
770770
}
771771
}
772772

@@ -1423,7 +1423,7 @@ void SqliteDatabase::Query::checkRequirements(size_t size) {
14231423

14241424
KJ_IF_SOME(cb, db.onWriteCallback) {
14251425
if (!sqlite3_stmt_readonly(statement)) {
1426-
cb();
1426+
cb(allowUnconfirmed);
14271427
}
14281428
}
14291429
}

src/workerd/util/sqlite.h

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class SqliteDatabase {
7676

7777
struct QueryOptions {
7878
const Regulator& regulator;
79+
bool allowUnconfirmed = false;
7980
};
8081

8182
struct IngestResult {
@@ -204,7 +205,7 @@ class SqliteDatabase {
204205
//
205206
// Note that the write callback is NOT called before (or at any point during) a reset(). Use the
206207
// `ResetListener` mechanism or `afterReset()` instead for that case.
207-
void onWrite(kj::Function<void()> callback) {
208+
void onWrite(kj::Function<void(bool allowUnconfirmed)> callback) {
208209
onWriteCallback = kj::mv(callback);
209210
}
210211

@@ -229,7 +230,7 @@ class SqliteDatabase {
229230
// implement explicit transactions. For synchronous transactions, the explicit transaction needs
230231
// to be nested inside the automatic transaction, so we need to force an auto-transaction to
231232
// start before the SAVEPOINT.
232-
void notifyWrite();
233+
void notifyWrite(bool allowUnconfirmed = false);
233234

234235
// Get the currently-executing SQL query for debug purposes. The query is normalized to hide
235236
// any literal values that might contain sensitive information. This is intended to be safe for
@@ -332,7 +333,7 @@ class SqliteDatabase {
332333
kj::Maybe<sqlite3_stmt&> currentStatement;
333334

334335
bool criticalErrorOccurred = false;
335-
kj::Maybe<kj::Function<void()>> onWriteCallback;
336+
kj::Maybe<kj::Function<void(bool allowUnconfirmed)>> onWriteCallback;
336337
kj::Maybe<kj::Function<void(kj::StringPtr errorMessage, kj::Maybe<kj::Exception> maybeException)>>
337338
onCriticalErrorCallback;
338339
kj::Maybe<kj::Function<void(SqliteDatabase&)>> afterResetCallback;
@@ -446,6 +447,13 @@ class SqliteDatabase::Statement final: private ResetListener {
446447
template <typename... Params>
447448
Query run(Params&&... bindings);
448449

450+
struct StatementOptions {
451+
bool allowUnconfirmed = false;
452+
};
453+
454+
template <typename... Params>
455+
Query run(StatementOptions options, Params&&... bindings);
456+
449457
private:
450458
const Regulator& regulator;
451459
kj::OneOf<kj::String, StatementAndEffect> stmt;
@@ -632,6 +640,9 @@ class SqliteDatabase::Query final: private ResetListener {
632640
uint64_t rowsRead = 0;
633641
uint64_t rowsWritten = 0;
634642

643+
// Whether this query allows unconfirmed writes.
644+
bool allowUnconfirmed = false;
645+
635646
friend class SqliteDatabase;
636647

637648
Query(SqliteDatabase& db,
@@ -647,7 +658,8 @@ class SqliteDatabase::Query final: private ResetListener {
647658
: ResetListener(db),
648659
regulator(options.regulator),
649660
maybeStatement(statement.prepareForExecution()),
650-
queryEvent(this->db.sqliteObserver) {
661+
queryEvent(this->db.sqliteObserver),
662+
allowUnconfirmed(options.allowUnconfirmed) {
651663
// If we throw from the constructor, the destructor won't run. Need to call destroy()
652664
// explicitly.
653665
KJ_ON_SCOPE_FAILURE(destroy());
@@ -659,7 +671,8 @@ class SqliteDatabase::Query final: private ResetListener {
659671
regulator(options.regulator),
660672
ownStatement(db.prepareSql(regulator, sqlCode, 0, MULTI)),
661673
maybeStatement(ownStatement),
662-
queryEvent(this->db.sqliteObserver) {
674+
queryEvent(this->db.sqliteObserver),
675+
allowUnconfirmed(options.allowUnconfirmed) {
663676
// If we throw from the constructor, the destructor won't run. Need to call destroy()
664677
// explicitly.
665678
KJ_ON_SCOPE_FAILURE(destroy());
@@ -966,6 +979,12 @@ SqliteDatabase::Query SqliteDatabase::Statement::run(Params&&... params) {
966979
return Query(db, QueryOptions{.regulator = regulator}, *this, kj::fwd<Params>(params)...);
967980
}
968981

982+
template <typename... Params>
983+
SqliteDatabase::Query SqliteDatabase::Statement::run(StatementOptions options, Params&&... params) {
984+
return Query(db, {.regulator = regulator, .allowUnconfirmed = options.allowUnconfirmed}, *this,
985+
kj::fwd<Params>(params)...);
986+
}
987+
969988
template <size_t size, typename... Params>
970989
SqliteDatabase::Query SqliteDatabase::run(const char (&sqlCode)[size], Params&&... params) {
971990
return Query(*this, QueryOptions{.regulator = TRUSTED}, sqlCode, kj::fwd<Params>(params)...);

0 commit comments

Comments
 (0)