diff --git a/.github/scripts/generate-snapshot.sh b/.github/scripts/generate-snapshot.sh new file mode 100755 index 000000000..cf2e37262 --- /dev/null +++ b/.github/scripts/generate-snapshot.sh @@ -0,0 +1,101 @@ +#!/bin/sh + +ROW_COUNT=1000 +WAL_AUTOCHECKPOINT=0 +MAIN_ONLY=0 +usage() { + cat <] [--row-count ] + +Options: + --main-only Generate a main database only snapshot (no WAL) + --wal-limit Maximum amount of WAL pages to keep (default: 0 - unbounded) + --row-count Number of rows to generate (default: 1000) + -h, --help Show this help message + +USAGE + exit 0 +} + +# Parse arguments. +while [ $# -gt 0 ]; do + case "$1" in + --main-only) + MAIN_ONLY=1 + shift 1 + ;; + --wal-limit) + if [ -z "$2" ]; then + echo "Error: --wal-limit requires a value" >&2 + usage + fi + WAL_AUTOCHECKPOINT="$2" + shift 2 + ;; + --row-count) + if [ -z "$2" ]; then + echo "Error: --row-count requires a value" >&2 + usage + fi + ROW_COUNT="$2" + shift 2 + ;; + -h|--help) + usage + ;; + *) + if [ -n "$OUTPUT_DIR" ]; then + echo "Error: unknown option '$1'" >&2 + usage + fi + OUTPUT_DIR="$1" + shift 1 + ;; + esac +done + +if [ -z "$OUTPUT_DIR" ]; then + echo "Error: Missing output directory" >&2 + usage +fi + +# Cleanup. +trap 'rm -f temp temp-wal temp-shm' EXIT + +SQLITE3_CHECKPOINT_ON_CLOSE="" +if [ "$MAIN_ONLY" -eq 0 ]; then + SQLITE3_CHECKPOINT_ON_CLOSE=".dbconfig no_ckpt_on_close on" +fi + +# First generate a sqlite3 database. +echo -n "Generating database with $ROW_COUNT rows..." +cat < /dev/null 2>&1 +PRAGMA journal_mode=WAL; +PRAGMA wal_autocheckpoint=$WAL_AUTOCHECKPOINT; +$SQLITE3_CHECKPOINT_ON_CLOSE + +CREATE TABLE test(id INTEGER PRIMARY KEY, value TEXT NOT NULL); +WITH RECURSIVE sequence AS ( + SELECT 1 AS id + + UNION ALL + + SELECT id + 1 + FROM sequence + WHERE id < $ROW_COUNT +) +INSERT OR REPLACE INTO test +SELECT id, hex(randomblob(16)) +FROM sequence; +EOF +echo "Done" + +# Now generate a dqlite snapshot from the sqlite3 database. +echo -n "Generating snapshot in '$OUTPUT_DIR'..." +cat < /dev/null 2>&1 +.snapshot +.add-server "1" +ATTACH DATABASE "temp" AS test; +.finish "$OUTPUT_DIR" +EOF +echo "Done" diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index d41546f0e..b85d5f92c 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -27,7 +27,7 @@ jobs: sudo apt update sudo apt install -y libsqlite3-dev liblz4-dev libuv1-dev xfslibs-dev \ linux-libc-dev btrfs-progs xfsprogs zfsutils-linux \ - lcov + lcov sqlite3 - name: Build dqlite env: @@ -38,6 +38,33 @@ jobs: --enable-build-raft make -j$(nproc) check-norun + - name: Generate test snapshots + if: ${{ matrix.os == 'ubuntu-24.04' }} + run: | + # Get dqlite-utils tool by downloading prebuilt binary. Eventually we will + # replace this with a proper package installation or with a snap. + curl -H "Authorization: token ${{ secrets.TEMP_DQLITE_UTILS_TOKEN }}" \ + -H "Accept: application/octet-stream" \ + -L https://api.github.com/repos/canonical/dqlite-utils/releases/assets/343675902 \ + -o release.tar.xz + sudo tar -xf release.tar.xz -C /usr/local/bin dqlite-utils + rm release.tar.xz + (mkdir test-snapshots && + cd test-snapshots && \ + # Simple cases + ../.github/scripts/generate-snapshot.sh --main-only --row-count 1000 small-no-wal && \ + ../.github/scripts/generate-snapshot.sh --main-only --row-count 100000 medium-no-wal && \ + ../.github/scripts/generate-snapshot.sh --main-only --row-count 5000000 large-no-wal && \ + # With WAL + ../.github/scripts/generate-snapshot.sh --wal-limit 1000 --row-count 1000 small-with-wal && \ + ../.github/scripts/generate-snapshot.sh --wal-limit 1000 --row-count 100000 medium-with-wal && \ + ../.github/scripts/generate-snapshot.sh --wal-limit 1000 --row-count 5000000 large-with-wal && \ + # With huge WAL + ../.github/scripts/generate-snapshot.sh --row-count 1000 small-huge-wal && \ + ../.github/scripts/generate-snapshot.sh --row-count 100000 medium-huge-wal && \ + ../.github/scripts/generate-snapshot.sh --row-count 5000000 large-huge-wal \ + ) + - name: Test env: CC: ${{ matrix.compiler }} @@ -46,7 +73,10 @@ jobs: run: | ./test/raft/lib/fs.sh setup export $(./test/raft/lib/fs.sh detect) - sudo UV_THREADPOOL_SIZE=$(($(nproc) * 2)) make check || (cat test-suite.log && false) + sudo \ + DQLITE_TEST_SNAPSHOT_DIRS=$(shopt -s nullglob; echo test-snapshots/*/ | tr ' ' :) \ + UV_THREADPOOL_SIZE=$(($(nproc) * 2)) \ + make check || (cat test-suite.log && false) ./test/raft/lib/fs.sh teardown - name: Coverage diff --git a/src/fsm.c b/src/fsm.c index c3a23837e..bb0186870 100644 --- a/src/fsm.c +++ b/src/fsm.c @@ -1,4 +1,5 @@ #include +#include #include #include "command.h" @@ -12,7 +13,6 @@ #include "tracing.h" #include "vfs.h" - struct fsmDatabaseSnapshot { sqlite3 *conn; struct raft_buffer header; @@ -25,8 +25,7 @@ struct fsmSnapshot { size_t database_count; }; -struct fsm -{ +struct fsm { struct logger *logger; struct registry *registry; struct fsmSnapshot snapshot; @@ -74,9 +73,9 @@ static int apply_frames(struct fsm *f, const struct command_frames *c) } struct vfsTransaction transaction = { - .n_pages = c->frames.n_pages, + .n_pages = c->frames.n_pages, .page_numbers = c->frames.page_numbers, - .pages = c->frames.pages, + .pages = c->frames.pages, }; rv = VfsApply(conn, &transaction); if (rv != 0) { @@ -202,21 +201,81 @@ static int decodeDatabase(const struct registry *r, const size_t page_size = r->config->vfs.page_size; dqlite_assert((header.main_size % page_size) == 0); - dqlite_assert(header.wal_size == 0); - - const size_t page_count = (size_t)header.main_size / page_size; - void **pages = raft_malloc(sizeof(void *) * page_count); + size_t main_page_count = (size_t)header.main_size / page_size; + void **pages = raft_malloc(sizeof(void *) * main_page_count); if (pages == NULL) { return RAFT_NOMEM; } - for (size_t i = 0; i < page_count; i++) { + for (size_t i = 0; i < main_page_count; i++) { pages[i] = (void *)(cursor->p + i * page_size); } cursor->p += header.main_size; - *snapshot = (struct vfsSnapshot) { - .page_count = page_count, + const size_t wal_header_size = 32; + if (header.wal_size > wal_header_size) { + tracef("pre 1.17 snapshot loading"); + const size_t wal_frame_header_size = 24; + const size_t wal_frame_size = page_size + wal_frame_header_size; + + dqlite_assert(header.wal_size > wal_header_size); + dqlite_assert(((header.wal_size - (size_t)wal_header_size) % + wal_frame_size) == 0); + + const unsigned n_frames = + (unsigned)((header.wal_size - (size_t)wal_header_size) / + wal_frame_size); + const uint8_t *wal = (uint8_t*)cursor->p; + // In dqlite, WAL is always well-formed (id doesn't contain + // partial transactions). As such, the last frame is always a + // commit frame and must contain the size of the database after + // the transaction. + const uint8_t *last_frame = + wal + wal_header_size + + n_frames * wal_frame_size - wal_frame_size; + const size_t wal_page_count = ByteGetBe32(last_frame + 4); + dqlite_assert(wal_page_count != 0); + + if (wal_page_count > main_page_count) { + void *wal_pages = raft_realloc( + pages, sizeof(void *) * wal_page_count); + if (wal_pages == NULL) { + raft_free(pages); + return RAFT_NOMEM; + } + pages = wal_pages; + } + + /* Read pages in the WAL order */ + for (const uint8_t *frame = wal + wal_header_size; + frame < wal + header.wal_size; frame += wal_frame_size) { + uint32_t page_number = + ByteGetBe32(frame); + if (page_number > wal_page_count) { + continue; + } + dqlite_assert(page_number > 0); + pages[page_number - 1] = + (void *)(frame + wal_frame_header_size); + } + + /* Verify that if the WAL resized the database, then no page is + * missing. */ + for (size_t page_number = main_page_count; + page_number <= wal_page_count; page_number++) { + if (pages[page_number - 1] == NULL) { + tracef("missing page %" PRIu64 " in wal", + (uint64_t)(page_number)); + raft_free(pages); + return RAFT_INVALID; + } + } + main_page_count = wal_page_count; + } + cursor->p += header.wal_size; + + *snapshot = (struct vfsSnapshot){ + .page_count = main_page_count, .page_size = page_size, .pages = pages, }; @@ -225,7 +284,8 @@ static int decodeDatabase(const struct registry *r, return RAFT_OK; } -static int integrityCheckCb(void *pArg, int n, char **values, char **names) { +static int integrityCheckCb(void *pArg, int n, char **values, char **names) +{ bool *check_passed = pArg; PRE(check_passed != NULL); @@ -260,9 +320,11 @@ static int restoreDatabase(struct registry *r, if (rv == SQLITE_OK) { bool check_passed = true; char *errmsg; - rv = sqlite3_exec(conn, "PRAGMA quick_check", integrityCheckCb, &check_passed, &errmsg); + rv = sqlite3_exec(conn, "PRAGMA quick_check", integrityCheckCb, + &check_passed, &errmsg); if (rv != SQLITE_OK) { - tracef("PRAGMA quick_check failed: %s (%d)", errmsg, rv); + tracef("PRAGMA quick_check failed: %s (%d)", errmsg, + rv); } else if (!check_passed) { rv = SQLITE_CORRUPT; } @@ -311,7 +373,8 @@ static int snapshotDatabase(struct db *db, struct fsmDatabaseSnapshot *snapshot) const struct snapshotDatabase header = { .filename = db->filename, - .main_size = snapshot->content.page_count * snapshot->content.page_size, + .main_size = + snapshot->content.page_count * snapshot->content.page_size, .wal_size = 0, }; @@ -325,7 +388,7 @@ static int snapshotDatabase(struct db *db, struct fsmDatabaseSnapshot *snapshot) char *cursor = header_buffer; snapshotDatabase__encode(&header, &cursor); - snapshot->header = (struct raft_buffer) { + snapshot->header = (struct raft_buffer){ .base = header_buffer, .len = header_size, }; @@ -380,7 +443,8 @@ static int fsm__snapshot(struct raft_fsm *fsm, i++; } - struct raft_buffer *buffers = raft_malloc(buffer_count * sizeof(struct raft_buffer)); + struct raft_buffer *buffers = + raft_malloc(buffer_count * sizeof(struct raft_buffer)); if (buffers == NULL) { rv = RAFT_NOMEM; goto err; @@ -397,9 +461,8 @@ static int fsm__snapshot(struct raft_fsm *fsm, buff_i++; dqlite_assert((buff_i + databases[i].content.page_count) <= - buffer_count); - for (unsigned j = 0; j < databases[i].content.page_count; - j++) { + buffer_count); + for (unsigned j = 0; j < databases[i].content.page_count; j++) { buffers[buff_i] = (struct raft_buffer){ .base = databases[i].content.pages[j], .len = databases[i].content.page_size, @@ -462,7 +525,7 @@ static int fsm__restore(struct raft_fsm *fsm, struct raft_buffer *buf) { tracef("fsm restore"); struct fsm *f = fsm->data; - struct cursor cursor = {buf->base, buf->len}; + struct cursor cursor = { buf->base, buf->len }; struct snapshotHeader header; unsigned i; int rv; @@ -508,7 +571,7 @@ int fsm__init(struct raft_fsm *fsm, if (f == NULL) { return DQLITE_NOMEM; } - *f = (struct fsm) { + *f = (struct fsm){ .logger = &config->logger, .registry = registry, }; diff --git a/test/integration/test_node.c b/test/integration/test_node.c index 99b2e3c35..44b944c2a 100644 --- a/test/integration/test_node.c +++ b/test/integration/test_node.c @@ -33,40 +33,36 @@ struct fixture dqlite_node *node; /* Node instance. */ }; -static void *setUp(const MunitParameter params[], void *user_data) +static void *setUp(const MunitParameter params[], void *user_data, const char *dir, const char *address) { + if (dir == NULL) { + return NULL; + } + struct fixture *f = munit_malloc(sizeof *f); int rv; test_heap_setup(params, user_data); test_sqlite_setup(params); - f->dir = test_dir_setup(); + f->dir = (char *)dir; rv = dqlite_node_create(1, "1", f->dir, &f->node); munit_assert_int(rv, ==, 0); - rv = dqlite_node_set_bind_address(f->node, "@123"); + rv = dqlite_node_set_bind_address(f->node, address); munit_assert_int(rv, ==, 0); return f; } -static void *setUpInet(const MunitParameter params[], void *user_data) +static void *setUpLocal(const MunitParameter params[], void *user_data) { - struct fixture *f = munit_malloc(sizeof *f); - int rv; - test_heap_setup(params, user_data); - test_sqlite_setup(params); - - f->dir = test_dir_setup(); - - rv = dqlite_node_create(1, "1", f->dir, &f->node); - munit_assert_int(rv, ==, 0); - - rv = dqlite_node_set_bind_address(f->node, "127.0.0.1:9001"); - munit_assert_int(rv, ==, 0); + return setUp(params, user_data, test_dir_setup(), "@123"); +} - return f; +static void *setUpInet(const MunitParameter params[], void *user_data) +{ + return setUp(params, user_data, test_dir_setup(), "127.0.0.1:9001"); } /* Tests if node starts/stops successfully and also performs some memory cleanup @@ -81,7 +77,7 @@ static void startStopNode(struct fixture *f) static void *setUpForRecovery(const MunitParameter params[], void *user_data) { int rv; - struct fixture *f = setUp(params, user_data); + struct fixture *f = setUpLocal(params, user_data); startStopNode(f); dqlite_node_destroy(f->node); @@ -94,18 +90,29 @@ static void *setUpForRecovery(const MunitParameter params[], void *user_data) return f; } -static void tearDown(void *data) +static void tearDownKeepDir(void *data) { struct fixture *f = data; + if (f == NULL) { + return; + } dqlite_node_destroy(f->node); - test_dir_tear_down(f->dir); test_sqlite_tear_down(); test_heap_tear_down(data); free(f); } +static void tearDown(void *data) +{ + struct fixture *f = data; + char *dir = f->dir; + tearDownKeepDir(data); + test_dir_tear_down(dir); +} + + SUITE(node); /****************************************************************************** @@ -113,8 +120,7 @@ SUITE(node); * dqlite_node_start * ******************************************************************************/ - -TEST(node, start, setUp, tearDown, 0, node_params) +TEST(node, start, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -188,7 +194,6 @@ static int openDb(struct client_proto *c, struct dqlite_node *n, const char *db) return RAFT_OK; } - TEST(node, stopInflightReads, setUpInet, tearDown, 0, node_params) { struct fixture *f = data; @@ -235,6 +240,85 @@ TEST(node, stopInflightReads, setUpInet, tearDown, 0, node_params) return MUNIT_OK; } +#define SNAPSHOT_DIR_PARAM "snapshot_dir" + +static char *snapshot_compression[] = {"1", NULL}; +static char* snapshot_dirs[SUITE__CAP] = {}; +static int snapshot_dirs_n = 0; +static MunitParameterEnum test_snapshot_params[SUITE__CAP] = { + {SNAPSHOT_DIR_PARAM, snapshot_dirs}, + {SNAPSHOT_COMPRESSION_PARAM, snapshot_compression}, + {NULL, NULL}, +}; + +static char *snapshot_dir_env = NULL; +__attribute__((constructor)) static void snapshot_dirs_env_init(void) +{ + const char *env = getenv("DQLITE_TEST_SNAPSHOT_DIRS"); + if (env == NULL) { + return; + } + + char *dirs = snapshot_dir_env = strdup(env); + char *saveptr = NULL; + char *dir = strtok_r(dirs, ":", &saveptr); + while (dir != NULL && snapshot_dirs_n < SUITE__CAP - 1) { + snapshot_dirs[snapshot_dirs_n++] = dir; + dir = strtok_r(NULL, ":", &saveptr); + } +} + +// Make the sanitizer happy. +__attribute__((destructor)) static void snapshot_dirs_env_uninit(void) +{ + free(snapshot_dir_env); + snapshot_dir_env = NULL; +} + +static void *setUpExistingSnapshot(const MunitParameter params[], void *user_data) +{ + return setUp(params, user_data, munit_parameters_get(params, SNAPSHOT_DIR_PARAM), "127.0.0.1:9001"); +} + +TEST(node, existing_snapshot, setUpExistingSnapshot, tearDownKeepDir, 0, test_snapshot_params) +{ + struct fixture *f = data; + int rv; + + if (f == NULL) { + return MUNIT_SKIP; + } + + rv = dqlite_node_start(f->node); + munit_assert_int(rv, ==, 0); + + /* Open a client and do a query on them */ + struct client_proto client; + struct rows rows; + + rv = openDb(&client, f->node, "test"); + munit_assert_int(rv, ==, RAFT_OK); + rv = clientSendQuerySQL(&client, "SELECT COUNT(*) FROM test", NULL, 0, + NULL); + munit_assert_int(rv, ==, RAFT_OK); + bool done; + rv = clientRecvRows(&client, &rows, &done, NULL); + munit_assert_int(rv, ==, RAFT_OK); + munit_assert_true(done); + munit_assert_int(rows.column_count, ==, 1); + munit_assert_string_equal(rows.column_names[0], "COUNT(*)"); + munit_assert_not_null(rows.next); + munit_assert_int(rows.next->values[0].type, ==, SQLITE_INTEGER); + munit_assert_int(rows.next->values[0].integer, >, 0); + clientCloseRows(&rows); + clientClose(&client); + + rv = dqlite_node_stop(f->node); + munit_assert_int(rv, ==, 0); + + return MUNIT_OK; +} + static void notify_transaction(sqlite3_context *context, int argc, sqlite3_value **argv); static int register_notify(sqlite3 *connection, char **pzErrMsg, struct sqlite3_api_routines *pThunk) { @@ -329,7 +413,7 @@ TEST(node, stopInflightWrites, setUpInflight, tearDownInflight, 0, node_params) } -TEST(node, snapshotParams, setUp, tearDown, 0, node_params) +TEST(node, snapshotParams, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -341,7 +425,7 @@ TEST(node, snapshotParams, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, snapshotParamsRunning, setUp, tearDown, 0, node_params) +TEST(node, snapshotParamsRunning, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -358,7 +442,7 @@ TEST(node, snapshotParamsRunning, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, snapshotParamsTrailingTooSmall, setUp, tearDown, 0, node_params) +TEST(node, snapshotParamsTrailingTooSmall, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -372,7 +456,7 @@ TEST(node, snapshotParamsTrailingTooSmall, setUp, tearDown, 0, node_params) TEST(node, snapshotParamsThresholdLargerThanTrailing, - setUp, + setUpLocal, tearDown, 0, node_params) @@ -387,7 +471,7 @@ TEST(node, return MUNIT_OK; } -TEST(node, networkLatency, setUp, tearDown, 0, node_params) +TEST(node, networkLatency, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -399,7 +483,7 @@ TEST(node, networkLatency, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, networkLatencyRunning, setUp, tearDown, 0, node_params) +TEST(node, networkLatencyRunning, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -416,7 +500,7 @@ TEST(node, networkLatencyRunning, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, networkLatencyTooLarge, setUp, tearDown, 0, node_params) +TEST(node, networkLatencyTooLarge, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -428,7 +512,7 @@ TEST(node, networkLatencyTooLarge, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, networkLatencyMs, setUp, tearDown, 0, node_params) +TEST(node, networkLatencyMs, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -442,7 +526,7 @@ TEST(node, networkLatencyMs, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, networkLatencyMsRunning, setUp, tearDown, 0, node_params) +TEST(node, networkLatencyMsRunning, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -459,7 +543,7 @@ TEST(node, networkLatencyMsRunning, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, networkLatencyMsTooSmall, setUp, tearDown, 0, node_params) +TEST(node, networkLatencyMsTooSmall, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -471,7 +555,7 @@ TEST(node, networkLatencyMsTooSmall, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, networkLatencyMsTooLarge, setUp, tearDown, 0, node_params) +TEST(node, networkLatencyMsTooLarge, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv; @@ -483,7 +567,7 @@ TEST(node, networkLatencyMsTooLarge, setUp, tearDown, 0, node_params) return MUNIT_OK; } -TEST(node, blockSize, setUp, tearDown, 0, NULL) +TEST(node, blockSize, setUpLocal, tearDown, 0, NULL) { struct fixture *f = data; int rv; @@ -503,7 +587,7 @@ TEST(node, blockSize, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(node, blockSizeRunning, setUp, tearDown, 0, NULL) +TEST(node, blockSizeRunning, setUpLocal, tearDown, 0, NULL) { struct fixture *f = data; int rv; @@ -522,7 +606,7 @@ TEST(node, blockSizeRunning, setUp, tearDown, 0, NULL) /* Our file locking prevents starting a second dqlite instance that * uses the same directory as a running instance. */ -TEST(node, locked, setUp, tearDown, 0, NULL) +TEST(node, locked, setUpLocal, tearDown, 0, NULL) { struct fixture *f = data; int rv; @@ -714,7 +798,7 @@ TEST(node, errMsgNodeNull, NULL, NULL, 0, NULL) return MUNIT_OK; } -TEST(node, errMsg, setUp, tearDown, 0, node_params) +TEST(node, errMsg, setUpLocal, tearDown, 0, node_params) { struct fixture *f = data; int rv;