From a0c38fac586f03a7ecb89749aabdd1640453bf97 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Fri, 27 Jun 2025 10:09:10 +0900 Subject: [PATCH] Use include_metadata configuration in Consensus Commit --- scalardb/src/scalardb/db/cluster.clj | 5 ++++- scalardb/src/scalardb/db_extend.clj | 3 ++- scalardb/src/scalardb/transfer.clj | 10 +++------- scalardb/test/scalardb/transfer_2pc_test.clj | 19 ++----------------- scalardb/test/scalardb/transfer_test.clj | 19 ++----------------- 5 files changed, 13 insertions(+), 43 deletions(-) diff --git a/scalardb/src/scalardb/db/cluster.clj b/scalardb/src/scalardb/db/cluster.clj index 9691c07..e04edb5 100644 --- a/scalardb/src/scalardb/db/cluster.clj +++ b/scalardb/src/scalardb/db/cluster.clj @@ -39,7 +39,10 @@ "scalar.db.storage=jdbc" "scalar.db.contact_points=jdbc:postgresql://postgresql-scalardb-cluster.default.svc.cluster.local:5432/postgres" "scalar.db.username=postgres" - "scalar.db.password=postgres"]) + "scalar.db.password=postgres" + "" + ;; Set to true to include transaction metadata in the records + "scalar.db.consensus_commit.include_metadata.enabled=true"]) :imagePullSecrets [{:name "scalardb-ghcr-secret"}]}}) diff --git a/scalardb/src/scalardb/db_extend.clj b/scalardb/src/scalardb/db_extend.clj index 8648d85..5d4213a 100644 --- a/scalardb/src/scalardb/db_extend.clj +++ b/scalardb/src/scalardb/db_extend.clj @@ -29,7 +29,8 @@ (.setProperty "scalar.db.consensus_commit.coordinator.group_commit.slot_capacity" "4") (.setProperty "scalar.db.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis" "15000") (.setProperty "scalar.db.consensus_commit.coordinator.group_commit.delayed_slot_move_timeout_millis" "400") - (.setProperty "scalar.db.consensus_commit.coordinator.group_commit.metrics_monitor_log_enabled" "true"))) + (.setProperty "scalar.db.consensus_commit.coordinator.group_commit.metrics_monitor_log_enabled" "true") + (.setProperty "scalar.db.consensus_commit.include_metadata.enabled" "true"))) (defprotocol DbExtension (get-db-type [this]) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index 418c7f8..fb9b3a6 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -132,13 +132,11 @@ [test n] (try (let [tx (scalar/start-transaction test) - tx-results (map #(.get tx (prepare-get %)) (range n)) - ;; Need Storage API to read the transaction metadata - results (mapv #(.get @(:storage test) (prepare-get %)) (range n))] + results (map #(.get tx (prepare-get %)) (range n))] ;; Put the same balance to check conflicts with in-flight transactions (mapv #(->> (calc-new-balance %2 0) (prepare-put %1) (.put tx)) (range n) - tx-results) + results) (.commit tx) results) (catch Exception e @@ -148,11 +146,9 @@ (defn read-all-with-retry [test n] (scalar/check-transaction-connection! test) - (scalar/check-storage-connection! test) (scalar/with-retry (fn [test] - (scalar/prepare-transaction-service! test) - (scalar/prepare-storage-service! test)) + (scalar/prepare-transaction-service! test)) test (read-all test n))) diff --git a/scalardb/test/scalardb/transfer_2pc_test.clj b/scalardb/test/scalardb/transfer_2pc_test.clj index 6ad953b..f4a1a54 100644 --- a/scalardb/test/scalardb/transfer_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_2pc_test.clj @@ -8,7 +8,6 @@ [scalardb.transfer-2pc :as transfer-2pc] [spy.core :as spy]) (:import (com.scalar.db.api DistributedTransaction - DistributedStorage TwoPhaseCommitTransaction Get Put @@ -63,12 +62,6 @@ (^void put [_ ^Put p] (mock-put p)) (^void commit [_] (swap! commit-count inc)))) -(def mock-storage - (reify - DistributedStorage - (^Optional get [_ ^Get g] (mock-get g)) - (^void put [_ ^Put p] (mock-put p)))) - (def mock-transaction-throws-exception (reify DistributedTransaction @@ -206,16 +199,13 @@ (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] (with-redefs [scalar/check-transaction-connection! (spy/spy) - scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) - result (client/invoke! client {:db mock-db - :storage (ref mock-storage)} + result (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) - (is (spy/called-once? scalar/check-storage-connection!)) (is (= :ok (:type result))) (is (= [1000 100 10 1 0] (get-in result [:value :balance]))) (is (= [1000 100 10 1 0] (get-in result [:value :version]))))))) @@ -223,21 +213,16 @@ (deftest transfer-client-get-all-fail-test (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) - scalar/check-storage-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) - scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {:db mock-db - :storage (ref mock-storage)} + (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil)))) (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! - (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1))) - (is (spy/called-n-times? scalar/prepare-storage-service! (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1)))))) (deftest transfer-client-check-tx-test diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index ffff6fb..690e3bc 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -7,7 +7,6 @@ [scalardb.transfer :as transfer] [spy.core :as spy]) (:import (com.scalar.db.api DistributedTransaction - DistributedStorage Get Put Result) @@ -57,12 +56,6 @@ (^void put [_ ^Put p] (mock-put p)) (^void commit [_] (swap! commit-count inc)))) -(def mock-storage - (reify - DistributedStorage - (^Optional get [_ ^Get g] (mock-get g)) - (^void put [_ ^Put p] (mock-put p)))) - (def mock-transaction-throws-exception (reify DistributedTransaction @@ -178,16 +171,13 @@ (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] (with-redefs [scalar/check-transaction-connection! (spy/spy) - scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) - result (client/invoke! client {:db mock-db - :storage (ref mock-storage)} + result (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) - (is (spy/called-once? scalar/check-storage-connection!)) (is (= :ok (:type result))) (is (= [1000 100 10 1 0] (get-in result [:value :balance]))) (is (= [1000 100 10 1 0] (get-in result [:value :version]))))))) @@ -195,21 +185,16 @@ (deftest transfer-client-get-all-fail-test (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) - scalar/check-storage-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) - scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {:db mock-db - :storage (ref mock-storage)} + (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil)))) (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! - (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1))) - (is (spy/called-n-times? scalar/prepare-storage-service! (+ (quot scalar/RETRIES scalar/RETRIES_FOR_RECONNECTION) 1)))))) (deftest transfer-client-check-tx-test