diff --git a/scalardb-test/multi-storage-ycsb.properties b/scalardb-test/multi-storage-ycsb.properties new file mode 100644 index 0000000..01f8202 --- /dev/null +++ b/scalardb-test/multi-storage-ycsb.properties @@ -0,0 +1,17 @@ +scalar.db.storage=multi-storage +scalar.db.multi_storage.storages=primary,secondary + +scalar.db.multi_storage.storages.primary.storage=jdbc +scalar.db.multi_storage.storages.primary.contact_points=jdbc:mysql://localhost:3306/ +scalar.db.multi_storage.storages.primary.username=root +scalar.db.multi_storage.storages.primary.password=example +scalar.db.multi_storage.storages.primary.transaction_manager=jdbc + +scalar.db.multi_storage.storages.secondary.storage=jdbc +scalar.db.multi_storage.storages.secondary.contact_points=jdbc:mysql://localhost:13306/ +scalar.db.multi_storage.storages.secondary.username=root +scalar.db.multi_storage.storages.secondary.password=example +scalar.db.multi_storage.storages.secondary.transaction_manager=jdbc + +scalar.db.multi_storage.namespace_mapping=ycsb_primary:primary,ycsb_secondary:secondary,coordinator:primary +scalar.db.multi_storage.default_storage=primary diff --git a/scalardb-test/multi-storage-ycsb.toml b/scalardb-test/multi-storage-ycsb.toml new file mode 100644 index 0000000..12a54e3 --- /dev/null +++ b/scalardb-test/multi-storage-ycsb.toml @@ -0,0 +1,28 @@ +[modules] + [modules.preprocessor] + name = "kelpie.scalardb.ycsb.MultiStorageLoader" + path = "build/libs/scalardb-test-all.jar" + [modules.processor] + name = "kelpie.scalardb.ycsb.MultiStorageWorkloadFr" + path = "build/libs/scalardb-test-all.jar" + [modules.postprocessor] + name = "kelpie.scalardb.ycsb.YcsbReporter" + path = "build/libs/scalardb-test-all.jar" + +[common] + concurrency = 4 + run_for_sec = 5 + ramp_for_sec = 5 + +[stats] + realtime_report_enabled = true + +[test_config] + ops_per_tx = 1 + record_count = 10000 + hotspot_record_count = 100 + dispatch_rate = 25 + population_concurrency = 4 + +[storage_config] + config_file = "multi-storage-ycsb.properties" diff --git a/scalardb-test/schema/multi-storage-ycsb.json b/scalardb-test/schema/multi-storage-ycsb.json new file mode 100644 index 0000000..5c4c615 --- /dev/null +++ b/scalardb-test/schema/multi-storage-ycsb.json @@ -0,0 +1,22 @@ +{ + "ycsb_primary.usertable": { + "transaction": true, + "partition-key": [ + "ycsb_key" + ], + "columns": { + "ycsb_key": "INT", + "payload": "TEXT" + } + }, + "ycsb_secondary.usertable": { + "transaction": true, + "partition-key": [ + "ycsb_key" + ], + "columns": { + "ycsb_key": "INT", + "payload": "TEXT" + } + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/Common.java b/scalardb-test/src/main/java/kelpie/scalardb/Common.java index 2222cef..e7b83c8 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/Common.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/Common.java @@ -35,11 +35,16 @@ public static DistributedStorage getStorage(Config config) { return factory.getStorage(); } - public static DistributedTransactionManager getTransactionManager( - Config config, String keyspace, String table) { + public static DistributedTransactionManager getTransactionManager(Config config) { DatabaseConfig dbConfig = getDatabaseConfig(config); TransactionFactory factory = new TransactionFactory(dbConfig); DistributedTransactionManager manager = factory.getTransactionManager(); + return manager; + } + + public static DistributedTransactionManager getTransactionManager( + Config config, String keyspace, String table) { + DistributedTransactionManager manager = getTransactionManager(config); manager.with(keyspace, table); return manager; } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageLoader.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageLoader.java new file mode 100644 index 0000000..0aff57c --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageLoader.java @@ -0,0 +1,139 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; +import static kelpie.scalardb.ycsb.YcsbCommon.preparePut; +import static kelpie.scalardb.ycsb.YcsbCommon.randomFastChars; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.Get; +import com.scalar.db.api.Put; +import com.scalar.db.exception.transaction.AbortException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.PreProcessor; +import io.github.resilience4j.retry.Retry; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; +import kelpie.scalardb.Common; + +public class MultiStorageLoader extends PreProcessor { + private static final long DEFAULT_POPULATION_CONCURRENCY = 10L; + private static final long DEFAULT_BATCH_SIZE = 1; + private static final String POPULATION_CONCURRENCY = "population_concurrency"; + private static final String POPULATION_READ_BEFORE_WRITE = "population_read_before_write"; + private static final String BATCH_SIZE = "batch_size"; + private static final int MAX_RETRIES = 10; + private static final int WAIT_DURATION_MILLIS = 1000; + private final DistributedTransactionManager manager; + private final int concurrency; + private final int recordCount; + private final char[] payload; + private final int batchSize; + private final boolean readBeforeWrite; + + public MultiStorageLoader(Config config) { + super(config); + manager = Common.getTransactionManager(config); + concurrency = + (int) + config.getUserLong(CONFIG_NAME, POPULATION_CONCURRENCY, DEFAULT_POPULATION_CONCURRENCY); + batchSize = (int) config.getUserLong(CONFIG_NAME, BATCH_SIZE, DEFAULT_BATCH_SIZE); + recordCount = getRecordCount(config); + payload = new char[getPayloadSize(config)]; + readBeforeWrite = config.getUserBoolean(CONFIG_NAME, POPULATION_READ_BEFORE_WRITE, false); + } + + @Override + public void execute() { + ExecutorService es = Executors.newCachedThreadPool(); + List> futures = new ArrayList<>(); + IntStream.range(0, concurrency) + .forEach( + i -> { + CompletableFuture future = + CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es); + futures.add(future); + }); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + logInfo("all records have been inserted"); + } + + @Override + public void close() throws Exception { + manager.close(); + } + + private class PopulationRunner { + private final int id; + + public PopulationRunner(int threadId) { + this.id = threadId; + } + + public void run() { + int numPerThread = (recordCount + concurrency - 1) / concurrency; + int start = numPerThread * id; + int end = Math.min(numPerThread * (id + 1), recordCount); + IntStream.range(0, (numPerThread + batchSize - 1) / batchSize) + .forEach( + i -> { + int startId = start + batchSize * i; + int endId = Math.min(start + batchSize * (i + 1), end); + populateWithTx(startId, endId); + }); + } + + private void populateWithTx(int startId, int endId) { + Runnable populate = + () -> { + DistributedTransaction transaction = null; + try { + transaction = manager.start(); + for (int i = startId; i < endId; ++i) { + if (readBeforeWrite) { + Get get = prepareGet(i); + transaction.get(get); + } + randomFastChars(ThreadLocalRandom.current(), payload); + Put primaryPut = preparePut(NAMESPACE_PRIMARY, i, new String(payload)); + transaction.put(primaryPut); + Put secondaryPut = preparePut(NAMESPACE_SECONDARY, i, new String(payload)); + transaction.put(secondaryPut); + } + transaction.commit(); + } catch (Exception e) { + if (transaction != null) { + try { + transaction.abort(); + } catch (AbortException ex) { + logWarn("abort failed.", ex); + } + } + logWarn("population failed.", e); + throw new RuntimeException("population failed.", e); + } + }; + + Retry retry = + Common.getRetryWithFixedWaitDuration("populate", MAX_RETRIES, WAIT_DURATION_MILLIS); + Runnable decorated = Retry.decorateRunnable(retry, populate); + try { + decorated.run(); + } catch (Exception e) { + logError("population failed repeatedly!"); + throw e; + } + } + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCF.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCF.java new file mode 100644 index 0000000..7d2b0d5 --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCF.java @@ -0,0 +1,95 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; +import static kelpie.scalardb.ycsb.YcsbCommon.preparePut; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload CF: + * Read operations for primary and the same number of read-modify-write operations for secondary. +*/ +public class MultiStorageWorkloadCF extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; // 1 read for primary and 1 RMW for secondary + private final DistributedTransactionManager manager; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadCF(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + } + + @Override + public void executeEach() throws TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + YcsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + transaction.put(preparePut(NAMESPACE_SECONDARY, userId, payloads.get(i))); + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCe.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCe.java new file mode 100644 index 0000000..a528400 --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCe.java @@ -0,0 +1,85 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload Fe: + * Same number of read operation for both primary and secondary database. +*/ +public class MultiStorageWorkloadCe extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; // 1 read operation per database + private final DistributedTransactionManager manager; + private final int recordCount; + private final int opsPerTx; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadCe(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + } + + @Override + public void executeEach() throws TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCh.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCh.java new file mode 100644 index 0000000..074dab8 --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCh.java @@ -0,0 +1,89 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.getHotspotRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload Ch: + * Same number of read operations for both primary and secondary database. + * For primary, there is a hotspot; only limited rage of records are accessed. +*/ +public class MultiStorageWorkloadCh extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; // 1 read operation per database + private final DistributedTransactionManager manager; + private final int recordCount; + private final int hotspotRecordCount; + private final int opsPerTx; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadCh(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.hotspotRecordCount = getHotspotRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + } + + @Override + public void executeEach() throws TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(hotspotRecordCount)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCr.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCr.java new file mode 100644 index 0000000..97e314a --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadCr.java @@ -0,0 +1,85 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.getDispatchRate; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload Cr: + * Read operation for either database probablistically based on the specified rate. +*/ +public class MultiStorageWorkloadCr extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DistributedTransactionManager manager; + private final int recordCount; + private final int opsPerTx; + private final int dispatchRate; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadCr(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.dispatchRate = getDispatchRate(config); + } + + @Override + public void executeEach() throws TransactionException { + List userIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + int rate = ThreadLocalRandom.current().nextInt(100) + 1; + if (rate > dispatchRate) { + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + } else { + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + } + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFe.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFe.java new file mode 100644 index 0000000..b66e580 --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFe.java @@ -0,0 +1,99 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; +import static kelpie.scalardb.ycsb.YcsbCommon.preparePut; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload Fe: + * Same number of read-modify-write operation for both primary and secondary database. +*/ +public class MultiStorageWorkloadFe extends TimeBasedProcessor { + // one read-modify-write operation (one read and one write for the same record is regarded as one + // operation) + private static final long DEFAULT_OPS_PER_TX = 1; + private final DistributedTransactionManager manager; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadFe(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + } + + @Override + public void executeEach() throws TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + YcsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + // use same payload for secondary + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + transaction.put(preparePut(NAMESPACE_PRIMARY, userId, payloads.get(i))); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + transaction.put(preparePut(NAMESPACE_SECONDARY, userId, payloads.get(i))); + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFh.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFh.java new file mode 100644 index 0000000..ada9753 --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFh.java @@ -0,0 +1,103 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.getHotspotRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; +import static kelpie.scalardb.ycsb.YcsbCommon.preparePut; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload Fe: + * Same number of read-modify-write operation for both primary and secondary database. + * For primary, there is a hotspot; only limited rage of records are accessed. +*/ +public class MultiStorageWorkloadFh extends TimeBasedProcessor { + // one read-modify-write operation (one read and one write for the same record is regarded as one + // operation) + private static final long DEFAULT_OPS_PER_TX = 1; + private final DistributedTransactionManager manager; + private final int recordCount; + private final int hotspotRecordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadFh(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.hotspotRecordCount = getHotspotRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + } + + @Override + public void executeEach() throws TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(hotspotRecordCount)); + + YcsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + // use same payload for secondary + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + transaction.put(preparePut(NAMESPACE_PRIMARY, userId, payloads.get(i))); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + transaction.put(preparePut(NAMESPACE_SECONDARY, userId, payloads.get(i))); + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFr.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFr.java new file mode 100644 index 0000000..b829ae3 --- /dev/null +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/MultiStorageWorkloadFr.java @@ -0,0 +1,98 @@ +package kelpie.scalardb.ycsb; + +import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY; +import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY; +import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount; +import static kelpie.scalardb.ycsb.YcsbCommon.getDispatchRate; +import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet; +import static kelpie.scalardb.ycsb.YcsbCommon.preparePut; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.scalardb.Common; + +/** + * Multi-storage workload Fr: + * Read-modify-write operation for either database probablistically based on the specified rate. +*/ +public class MultiStorageWorkloadFr extends TimeBasedProcessor { + // one read-modify-write operation (one read and one write for the same record is regarded as one + // operation) + private static final long DEFAULT_OPS_PER_TX = 1; + private final DistributedTransactionManager manager; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + private final int dispatchRate; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public MultiStorageWorkloadFr(Config config) { + super(config); + this.manager = Common.getTransactionManager(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + this.dispatchRate = getDispatchRate(config); + } + + @Override + public void executeEach() throws TransactionException { + List userIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + YcsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + + while (true) { + DistributedTransaction transaction = manager.start(); + try { + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + int rate = ThreadLocalRandom.current().nextInt(100) + 1; + if (rate > dispatchRate) { + transaction.get(prepareGet(NAMESPACE_PRIMARY, userId)); + transaction.put(preparePut(NAMESPACE_PRIMARY, userId, payloads.get(i))); + } else { + transaction.get(prepareGet(NAMESPACE_SECONDARY, userId)); + transaction.put(preparePut(NAMESPACE_SECONDARY, userId, payloads.get(i))); + } + } + transaction.commit(); + break; + } catch (CrudConflictException | CommitConflictException e) { + transaction.abort(); + transactionRetryCount.increment(); + } catch (Exception e) { + transaction.abort(); + throw e; + } + } + } + + @Override + public void close() throws Exception { + manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/YcsbCommon.java b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/YcsbCommon.java index 9819b0d..3326245 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/ycsb/YcsbCommon.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/ycsb/YcsbCommon.java @@ -9,13 +9,19 @@ public class YcsbCommon { static final long DEFAULT_RECORD_COUNT = 1000; + static final long DEFAULT_HOTSPOT_RECORD_COUNT = 100; static final long DEFAULT_PAYLOAD_SIZE = 1000; + static final long DEFAULT_DISPATCH_RATE = 50; // rate for sending operations to secondary static final String NAMESPACE = "ycsb"; + static final String NAMESPACE_PRIMARY = "ycsb_primary"; // for multi-storage mode + static final String NAMESPACE_SECONDARY = "ycsb_secondary"; // for multi-storage mode static final String TABLE = "usertable"; static final String YCSB_KEY = "ycsb_key"; static final String PAYLOAD = "payload"; static final String CONFIG_NAME = "test_config"; static final String RECORD_COUNT = "record_count"; + static final String HOTSPOT_RECORD_COUNT = "hotspot_record_count"; + static final String DISPATCH_RATE = "dispatch_rate"; static final String PAYLOAD_SIZE = "payload_size"; static final String OPS_PER_TX = "ops_per_tx"; private static final int CHAR_START = 32; // [space] @@ -49,6 +55,14 @@ public static Get prepareGet(int key) { .forTable(TABLE); } + public static Get prepareGet(String namespace, int key) { + Key partitionKey = new Key(YCSB_KEY, key); + return new Get(partitionKey) + .withConsistency(Consistency.LINEARIZABLE) + .forNamespace(namespace) + .forTable(TABLE); + } + public static Put preparePut(int key, String payload) { Key partitionKey = new Key(YCSB_KEY, key); return new Put(partitionKey) @@ -58,6 +72,15 @@ public static Put preparePut(int key, String payload) { .forTable(TABLE); } + public static Put preparePut(String namespace, int key, String payload) { + Key partitionKey = new Key(YCSB_KEY, key); + return new Put(partitionKey) + .withConsistency(Consistency.LINEARIZABLE) + .withValue(PAYLOAD, payload) + .forNamespace(namespace) + .forTable(TABLE); + } + public static int getRecordCount(Config config) { return (int) config.getUserLong(CONFIG_NAME, RECORD_COUNT, DEFAULT_RECORD_COUNT); } @@ -66,6 +89,16 @@ public static int getPayloadSize(Config config) { return (int) config.getUserLong(CONFIG_NAME, PAYLOAD_SIZE, DEFAULT_PAYLOAD_SIZE); } + public static int getHotspotRecordCount(Config config) { + return (int) config.getUserLong( + CONFIG_NAME, HOTSPOT_RECORD_COUNT, DEFAULT_HOTSPOT_RECORD_COUNT); + } + + public static int getDispatchRate(Config config) { + return (int) config.getUserLong( + CONFIG_NAME, DISPATCH_RATE, DEFAULT_DISPATCH_RATE); + } + // This method is taken from benchbase. // https://github.com/cmu-db/benchbase/blob/bbe8c1db84ec81c6cdec6fbeca27b24b1b4e6612/src/main/java/com/oltpbenchmark/util/TextGenerator.java#L80 public static char[] randomFastChars(Random rng, char[] chars) { diff --git a/seata-test/build.gradle b/seata-test/build.gradle new file mode 100644 index 0000000..2f14c48 --- /dev/null +++ b/seata-test/build.gradle @@ -0,0 +1,23 @@ +plugins { + id 'java-library-distribution' + id "com.github.johnrengelman.shadow" version "5.2.0" +} + +repositories { + mavenCentral() +} + +dependencies { + implementation group: 'com.alibaba', name: 'druid', version: "1.1.12" + implementation group: 'com.scalar-labs', name: 'kelpie', version: '1.2.1' + implementation group: 'io.github.resilience4j', name: 'resilience4j-retry', version: '1.3.1' + implementation group: 'io.seata', name: 'seata-all', version: "1.4.2" + implementation group: 'mysql', name: 'mysql-connector-java', version: "5.1.44" +} + +shadowJar { + mergeServiceFiles() +} + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 diff --git a/seata-test/file.conf b/seata-test/file.conf new file mode 100644 index 0000000..4d99936 --- /dev/null +++ b/seata-test/file.conf @@ -0,0 +1,10 @@ +service { + #vgroup->rgroup + vgroupMapping.tx_group = "default" + #only support single node + default.grouplist = "127.0.0.1:8091" + #degrade current not support + enableDegrade = false + #disable + disable = false +} diff --git a/seata-test/gradle/wrapper/gradle-wrapper.jar b/seata-test/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..e708b1c Binary files /dev/null and b/seata-test/gradle/wrapper/gradle-wrapper.jar differ diff --git a/seata-test/gradle/wrapper/gradle-wrapper.properties b/seata-test/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..be52383 --- /dev/null +++ b/seata-test/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/seata-test/gradlew b/seata-test/gradlew new file mode 100755 index 0000000..4f906e0 --- /dev/null +++ b/seata-test/gradlew @@ -0,0 +1,185 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=`expr $i + 1` + done + case $i in + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +exec "$JAVACMD" "$@" diff --git a/seata-test/gradlew.bat b/seata-test/gradlew.bat new file mode 100644 index 0000000..ac1b06f --- /dev/null +++ b/seata-test/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/seata-test/multi-storage-ycsb.toml b/seata-test/multi-storage-ycsb.toml new file mode 100644 index 0000000..1eb97c9 --- /dev/null +++ b/seata-test/multi-storage-ycsb.toml @@ -0,0 +1,38 @@ +[modules] + [modules.preprocessor] + name = "kelpie.seata.mycsb.Loader" + path = "build/libs/seata-test-all.jar" + [modules.processor] + name = "kelpie.seata.mycsb.WorkloadFe" + path = "build/libs/seata-test-all.jar" + [modules.postprocessor] + name = "kelpie.seata.mycsb.MycsbReporter" + path = "build/libs/seata-test-all.jar" + +[common] + concurrency = 4 + run_for_sec = 5 + ramp_for_sec = 5 + +[stats] + realtime_report_enabled = true + +[test_config] + record_count = 10000 + population_concurrency = 4 + +[primary_db_config] + url = "jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC" + username = "root" + password = "root" + driver = "com.mysql.jdbc.Driver" + min_idle = 200 + max_active = 500 + +[secondary_db_config] + url = "jdbc:mysql://localhost:13306/seata?useSSL=false&serverTimezone=UTC" + username = "root" + password = "root" + driver = "com.mysql.jdbc.Driver" + min_idle = 200 + max_active = 500 diff --git a/seata-test/registry.conf b/seata-test/registry.conf new file mode 100644 index 0000000..1680744 --- /dev/null +++ b/seata-test/registry.conf @@ -0,0 +1,82 @@ +registry { + # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa + type = "file" + + nacos { + serverAddr = "localhost" + namespace = "" + cluster = "default" + } + eureka { + serviceUrl = "http://localhost:8761/eureka" + application = "default" + weight = "1" + } + redis { + serverAddr = "localhost:6379" + db = "0" + password = "" + cluster = "default" + timeout = "0" + } + zk { + cluster = "default" + serverAddr = "127.0.0.1:2181" + session.timeout = 6000 + connect.timeout = 2000 + username = "" + password = "" + } + consul { + cluster = "default" + serverAddr = "127.0.0.1:8500" + } + etcd3 { + cluster = "default" + serverAddr = "http://localhost:2379" + } + sofa { + serverAddr = "127.0.0.1:9603" + application = "default" + region = "DEFAULT_ZONE" + datacenter = "DefaultDataCenter" + cluster = "default" + group = "SEATA_GROUP" + addressWaitTime = "3000" + } + file { + name = "file.conf" + } +} + +config { + # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig + type = "file" + + nacos { + serverAddr = "localhost" + namespace = "" + group = "SEATA_GROUP" + } + consul { + serverAddr = "127.0.0.1:8500" + } + apollo { + app.id = "seata-server" + apollo.meta = "http://192.168.1.204:8801" + namespace = "application" + } + zk { + serverAddr = "127.0.0.1:2181" + session.timeout = 6000 + connect.timeout = 2000 + username = "" + password = "" + } + etcd3 { + serverAddr = "http://localhost:2379" + } + file { + name = "file:///path/to/file.conf" + } +} diff --git a/seata-test/src/main/java/kelpie/seata/Common.java b/seata-test/src/main/java/kelpie/seata/Common.java new file mode 100644 index 0000000..1fe3077 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/Common.java @@ -0,0 +1,37 @@ +package kelpie.seata; + +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import java.time.Duration; + +public class Common { + public static final String APPLICATION_ID = "mycsb"; + public static final String TRANSACTION_SERVICE_GROUP = "tx_group"; + private static final int WAIT_MILLS = 1000; + private static final long SLEEP_BASE_MILLIS = 100L; + private static final int MAX_RETRIES = 10; + + public static Retry getRetryWithFixedWaitDuration(String name) { + return getRetryWithFixedWaitDuration(name, MAX_RETRIES, WAIT_MILLS); + } + + public static Retry getRetryWithFixedWaitDuration(String name, int maxRetries, int waitMillis) { + RetryConfig retryConfig = + RetryConfig.custom() + .maxAttempts(maxRetries) + .waitDuration(Duration.ofMillis(waitMillis)) + .build(); + + return Retry.of(name, retryConfig); + } + + public static Retry getRetryWithExponentialBackoff(String name) { + IntervalFunction intervalFunc = IntervalFunction.ofExponentialBackoff(SLEEP_BASE_MILLIS, 2.0); + + RetryConfig retryConfig = + RetryConfig.custom().maxAttempts(MAX_RETRIES).intervalFunction(intervalFunc).build(); + + return Retry.of(name, retryConfig); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/DataSourceManager.java b/seata-test/src/main/java/kelpie/seata/DataSourceManager.java new file mode 100644 index 0000000..2487b36 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/DataSourceManager.java @@ -0,0 +1,46 @@ +package kelpie.seata; + +import com.alibaba.druid.pool.DruidDataSource; +import java.sql.Connection; +import java.sql.SQLException; +import javax.sql.DataSource; +import com.scalar.kelpie.config.Config; +import io.seata.rm.datasource.xa.DataSourceProxyXA; + +public class DataSourceManager { + private static final String DEFAULT_STORAGE_CONFIG_NAME = "db_config"; + private final Config config; + private final DataSource dataSource; + + public DataSourceManager(Config config) { + this.config = config; + this.dataSource = getDataSourceXA(DEFAULT_STORAGE_CONFIG_NAME); + } + + public DataSourceManager(Config config, String table) { + this.config = config; + this.dataSource = getDataSourceXA(table); + } + + public Connection getConnectionXA() throws SQLException { + return dataSource.getConnection(); + } + + private DataSource getDataSourceXA(String table) { + String url = config.getUserString(table, "url", "jdbc:mysql://localhost/seata"); + String username = config.getUserString(table, "username", "root"); + String password = config.getUserString(table, "password", "example"); + String driver = config.getUserString(table, "driver", "com.mysql.jdbc.Driver"); + int minIdle = (int)config.getUserLong(table, "min_idle", (long)0); + int maxActive = (int)config.getUserLong(table, "max_active", (long)8); + + DataSource dataSource = new DruidDataSource(); + ((DruidDataSource)dataSource).setUrl(url); + ((DruidDataSource)dataSource).setUsername(username); + ((DruidDataSource)dataSource).setPassword(password); + ((DruidDataSource)dataSource).setDriverClassName(driver); + ((DruidDataSource)dataSource).setMinIdle(minIdle); + ((DruidDataSource)dataSource).setMaxActive(maxActive); + return new DataSourceProxyXA(dataSource); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/Loader.java b/seata-test/src/main/java/kelpie/seata/mycsb/Loader.java new file mode 100644 index 0000000..2f18e62 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/Loader.java @@ -0,0 +1,198 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.TABLE; +import static kelpie.seata.mycsb.MycsbCommon.YCSB_KEY; +import static kelpie.seata.mycsb.MycsbCommon.PAYLOAD; +import static kelpie.seata.mycsb.MycsbCommon.getPayloadSize; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.randomFastChars; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.PreProcessor; +import io.github.resilience4j.retry.Retry; +import io.seata.rm.RMClient; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class Loader extends PreProcessor { + private static final long DEFAULT_POPULATION_CONCURRENCY = 10L; + private static final long DEFAULT_BATCH_SIZE = 1; + private static final long DEFAULT_INSERT_BATCH_SIZE = 100; + private static final String POPULATION_CONCURRENCY = "population_concurrency"; + private static final String BATCH_SIZE = "batch_size"; + private static final String INSERT_BATCH_SIZE = "population_insert_batch_size"; + private static final int MAX_RETRIES = 10; + private static final int WAIT_DURATION_MILLIS = 1000; + private static final String INSERT_SQL = "insert into " + TABLE + "(" + YCSB_KEY + ", " + PAYLOAD +") values (?, ?)"; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int concurrency; + private final int recordCount; + private final int payloadSize; + private final char[] payload; + private final int batchSize; + private final int insertBatchSize; + + public Loader(Config config) throws SQLException { + super(config); + concurrency = + (int) + config.getUserLong(CONFIG_NAME, POPULATION_CONCURRENCY, DEFAULT_POPULATION_CONCURRENCY); + batchSize = (int) config.getUserLong(CONFIG_NAME, BATCH_SIZE, DEFAULT_BATCH_SIZE); + insertBatchSize = (int) config.getUserLong(CONFIG_NAME, INSERT_BATCH_SIZE, DEFAULT_INSERT_BATCH_SIZE); + recordCount = getRecordCount(config); + payloadSize = getPayloadSize(config); + payload = new char[payloadSize]; + + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + createTable(primary, payloadSize); + createTable(secondary, payloadSize); + } + + @Override + public void execute() { + ExecutorService es = Executors.newCachedThreadPool(); + List> futures = new ArrayList<>(); + IntStream.range(0, concurrency) + .forEach( + i -> { + CompletableFuture future = + CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es); + futures.add(future); + }); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + logInfo("all records have been inserted"); + } + + @Override + public void close() throws Exception { + } + + private class PopulationRunner { + private final int id; + + public PopulationRunner(int threadId) { + this.id = threadId; + } + + public void run() { + int numPerThread = (recordCount + concurrency - 1) / concurrency; + int start = numPerThread * id; + int end = Math.min(numPerThread * (id + 1), recordCount); + IntStream.range(0, (numPerThread + batchSize - 1) / batchSize) + .forEach( + i -> { + int startId = start + batchSize * i; + int endId = Math.min(start + batchSize * (i + 1), end); + populateWithTx(startId, endId); + }); + } + + private void populateWithTx(int startId, int endId) { + Runnable populate = + () -> { + Connection primaryConnection = null; + Connection secondaryConnection = null; + PreparedStatement primaryStatement = null; + PreparedStatement secondaryStatement = null; + try { + primaryConnection = primary.getConnectionXA(); + primaryStatement = primaryConnection.prepareStatement(INSERT_SQL); + secondaryConnection = secondary.getConnectionXA(); + secondaryStatement = secondaryConnection.prepareStatement(INSERT_SQL); + for (int i = startId; i < endId; ++i) { + randomFastChars(ThreadLocalRandom.current(), payload); + prepareInsert(primaryStatement, i, new String(payload)); + prepareInsert(secondaryStatement, i, new String(payload)); + if (i % insertBatchSize == 0 || i == endId - 1) { + primaryStatement.executeBatch(); + secondaryStatement.executeBatch(); + } + } + } catch (SQLException e) { + logWarn("population failed.", e); + throw new RuntimeException("population failed.", e); + } finally { + try { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + if (primaryStatement != null) { + primaryStatement.close(); + } + if (secondaryStatement != null) { + secondaryStatement.close(); + } + } catch (SQLException e) { + logWarn("population failed.", e); + throw new RuntimeException("population failed.", e); + } + } + }; + + Retry retry = + Common.getRetryWithFixedWaitDuration("populate", MAX_RETRIES, WAIT_DURATION_MILLIS); + Runnable decorated = Retry.decorateRunnable(retry, populate); + try { + decorated.run(); + } catch (Exception e) { + logError("population failed repeatedly!"); + throw e; + } + } + + private void prepareInsert(PreparedStatement statement, int id, String payload) throws SQLException { + try { + statement.setInt(1, id); + statement.setString(2, payload); + statement.addBatch(); + } catch (SQLException e) { + statement.close(); + throw e; + } + } + } + + private void createTable(DataSourceManager ds, int payloadSize) throws SQLException { + String dropSQL = "drop table if exists " + TABLE; + String createSQL = "create table " + TABLE + + " (ycsb_key int not null, payload varchar(" + payloadSize + ")," + "primary key (ycsb_key))"; + Connection connection = null; + Statement statement = null; + try { + connection = ds.getConnectionXA(); + statement = connection.createStatement(); + statement.executeUpdate(dropSQL); + statement.executeUpdate(createSQL); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/MycsbCommon.java b/seata-test/src/main/java/kelpie/seata/mycsb/MycsbCommon.java new file mode 100644 index 0000000..6310774 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/MycsbCommon.java @@ -0,0 +1,129 @@ +package kelpie.seata.mycsb; + +import com.scalar.kelpie.config.Config; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Random; + +public class MycsbCommon { + static final long DEFAULT_RECORD_COUNT = 1000; + static final long DEFAULT_HOTSPOT_RECORD_COUNT = 100; + static final long DEFAULT_PAYLOAD_SIZE = 1000; + static final long DEFAULT_DISPATCH_RATE = 50; // rate for sending operations to secondary + static final String NAMESPACE = "ycsb"; + static final String TABLE = "usertable"; + static final String YCSB_KEY = "ycsb_key"; + static final String PAYLOAD = "payload"; + static final String CONFIG_NAME = "test_config"; + static final String PRIMARY_DB_CONFIG_NAME = "primary_db_config"; + static final String SECONDARY_DB_CONFIG_NAME = "secondary_db_config"; + static final String RECORD_COUNT = "record_count"; + static final String HOTSPOT_RECORD_COUNT = "hotspot_record_count"; + static final String DISPATCH_RATE = "dispatch_rate"; + static final String PAYLOAD_SIZE = "payload_size"; + static final String OPS_PER_TX = "ops_per_tx"; + private static final int CHAR_START = 32; // [space] + private static final int CHAR_STOP = 126; // [~] + private static final char[] CHAR_SYMBOLS = new char[1 + CHAR_STOP - CHAR_START]; + + static { + for (int i = 0; i < CHAR_SYMBOLS.length; i++) { + CHAR_SYMBOLS[i] = (char) (CHAR_START + i); + } + } + + private static final int[] FAST_MASKS = { + 554189328, // 10000 + 277094664, // 01000 + 138547332, // 00100 + 69273666, // 00010 + 34636833, // 00001 + 346368330, // 01010 + 727373493, // 10101 + 588826161, // 10001 + 935194491, // 11011 + 658099827, // 10011 + }; + + public static int getRecordCount(Config config) { + return (int) config.getUserLong(CONFIG_NAME, RECORD_COUNT, DEFAULT_RECORD_COUNT); + } + + public static int getPayloadSize(Config config) { + return (int) config.getUserLong(CONFIG_NAME, PAYLOAD_SIZE, DEFAULT_PAYLOAD_SIZE); + } + + public static int getHotspotRecordCount(Config config) { + return (int) config.getUserLong( + CONFIG_NAME, HOTSPOT_RECORD_COUNT, DEFAULT_HOTSPOT_RECORD_COUNT); + } + + public static int getDispatchRate(Config config) { + return (int) config.getUserLong( + CONFIG_NAME, DISPATCH_RATE, DEFAULT_DISPATCH_RATE); + } + + // This method is taken from benchbase. + // https://github.com/cmu-db/benchbase/blob/bbe8c1db84ec81c6cdec6fbeca27b24b1b4e6612/src/main/java/com/oltpbenchmark/util/TextGenerator.java#L80 + public static char[] randomFastChars(Random rng, char[] chars) { + // Ok so now the goal of this is to reduce the number of times that we have to + // invoke a random number. We'll do this by grabbing a single random int + // and then taking different bitmasks + + int num_rounds = chars.length / FAST_MASKS.length; + int i = 0; + for (int ctr = 0; ctr < num_rounds; ctr++) { + int rand = rng.nextInt(10000); // CHAR_SYMBOLS.length); + for (int mask : FAST_MASKS) { + chars[i++] = CHAR_SYMBOLS[(rand | mask) % CHAR_SYMBOLS.length]; + } + } + // Use the old way for the remaining characters + // I am doing this because I am too lazy to think of something more clever + for (; i < chars.length; i++) { + chars[i] = CHAR_SYMBOLS[rng.nextInt(CHAR_SYMBOLS.length)]; + } + return (chars); + } + + public static String read(Connection connection, int userId) throws SQLException { + PreparedStatement statement = null; + String result = null; + String sql = "select * from " + TABLE + " where " + YCSB_KEY + " = ?"; + try { + statement = connection.prepareStatement(sql); + statement.setInt(1, userId); + ResultSet resultSet = statement.executeQuery(); + resultSet.next(); + result = resultSet.getString(PAYLOAD); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + } + return result; + } + + public static String write(Connection connection, int userId, String payload) throws SQLException { + PreparedStatement statement = null; + String result = null; + String sql = "update " + TABLE + " set " + PAYLOAD + " = ? where " + YCSB_KEY + " = ?"; + try { + statement = connection.prepareStatement(sql); + statement.setString(1, payload); + statement.setInt(2, userId); + statement.executeUpdate(); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + } + return result; + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/MycsbReporter.java b/seata-test/src/main/java/kelpie/seata/mycsb/MycsbReporter.java new file mode 100644 index 0000000..aaac12b --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/MycsbReporter.java @@ -0,0 +1,54 @@ +package kelpie.seata.mycsb; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.PostProcessor; +import com.scalar.kelpie.stats.Stats; + +public class MycsbReporter extends PostProcessor { + + public MycsbReporter(Config config) { + super(config); + } + + @Override + public void execute() { + Stats stats = getStats(); + if (stats == null) { + return; + } + logInfo( + "==== Statistics Summary ====\n" + + "Throughput: " + + stats.getThroughput(config.getRunForSec()) + + " ops\n" + + "Succeeded operations: " + + stats.getSuccessCount() + + "\n" + + "Failed operations: " + + stats.getFailureCount() + + "\n" + + "Mean latency: " + + stats.getMeanLatency() + + " ms\n" + + "SD of latency: " + + stats.getStandardDeviation() + + " ms\n" + + "Max latency: " + + stats.getMaxLatency() + + " ms\n" + + "Latency at 50 percentile: " + + stats.getLatencyAtPercentile(50.0) + + " ms\n" + + "Latency at 90 percentile: " + + stats.getLatencyAtPercentile(90.0) + + " ms\n" + + "Latency at 99 percentile: " + + stats.getLatencyAtPercentile(99.0) + + " ms\n" + + "Transaction retry count: " + + getPreviousState().getString("transaction-retry-count")); + } + + @Override + public void close() {} +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCF.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCF.java new file mode 100644 index 0000000..d9af62d --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCF.java @@ -0,0 +1,112 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getPayloadSize; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; +import static kelpie.seata.mycsb.MycsbCommon.write; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadCF extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadCF(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + MycsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + read(primaryConnection, userId); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + read(secondaryConnection, userId); + write(secondaryConnection, userId, payloads.get(i)); + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCe.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCe.java new file mode 100644 index 0000000..b3e6643 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCe.java @@ -0,0 +1,98 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadCe extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int opsPerTx; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadCe(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List userIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + read(primaryConnection, userId); + } + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + read(secondaryConnection, userId); + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCh.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCh.java new file mode 100644 index 0000000..c44e36d --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCh.java @@ -0,0 +1,105 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.getHotspotRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadCh extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int hotspotRecordCount; + private final int opsPerTx; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadCh(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.hotspotRecordCount = getHotspotRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(hotspotRecordCount)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + read(primaryConnection, userId); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + read(secondaryConnection, userId); + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCr.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCr.java new file mode 100644 index 0000000..0fd36af --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadCr.java @@ -0,0 +1,102 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getDispatchRate; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadCr extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int opsPerTx; + private final int dispatchRate; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadCr(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.dispatchRate = getDispatchRate(config); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List userIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + int rate = ThreadLocalRandom.current().nextInt(100) + 1; + if (rate > dispatchRate) { + read(primaryConnection, userId); + } else { + read(secondaryConnection, userId); + } + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFe.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFe.java new file mode 100644 index 0000000..2090220 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFe.java @@ -0,0 +1,109 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getPayloadSize; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; +import static kelpie.seata.mycsb.MycsbCommon.write; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadFe extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadFe(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List userIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + MycsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + read(primaryConnection, userId); + write(primaryConnection, userId, payloads.get(i)); + } + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + read(secondaryConnection, userId); + write(secondaryConnection, userId, payloads.get(i)); + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFh.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFh.java new file mode 100644 index 0000000..8d21243 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFh.java @@ -0,0 +1,117 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getPayloadSize; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.getHotspotRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; +import static kelpie.seata.mycsb.MycsbCommon.write; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadFh extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int hotspotRecordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadFh(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.hotspotRecordCount = getHotspotRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List primaryIds = new ArrayList<>(opsPerTx); + List secondaryIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + primaryIds.add(ThreadLocalRandom.current().nextInt(hotspotRecordCount)); + + MycsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + for (int i = 0; i < opsPerTx; ++i) { + secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + // use same payload for secondary + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < primaryIds.size(); i++) { + int userId = primaryIds.get(i); + read(primaryConnection, userId); + write(primaryConnection, userId, payloads.get(i)); + } + for (int i = 0; i < secondaryIds.size(); i++) { + int userId = secondaryIds.get(i); + read(secondaryConnection, userId); + write(secondaryConnection, userId, payloads.get(i)); + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFr.java b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFr.java new file mode 100644 index 0000000..dbfcd43 --- /dev/null +++ b/seata-test/src/main/java/kelpie/seata/mycsb/WorkloadFr.java @@ -0,0 +1,113 @@ +package kelpie.seata.mycsb; + +import static kelpie.seata.mycsb.MycsbCommon.CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.PRIMARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.SECONDARY_DB_CONFIG_NAME; +import static kelpie.seata.mycsb.MycsbCommon.OPS_PER_TX; +import static kelpie.seata.mycsb.MycsbCommon.getDispatchRate; +import static kelpie.seata.mycsb.MycsbCommon.getPayloadSize; +import static kelpie.seata.mycsb.MycsbCommon.getRecordCount; +import static kelpie.seata.mycsb.MycsbCommon.read; +import static kelpie.seata.mycsb.MycsbCommon.write; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import io.seata.core.exception.TransactionException; +import io.seata.rm.RMClient; +import io.seata.tm.TMClient; +import io.seata.tm.api.GlobalTransaction; +import io.seata.tm.api.GlobalTransactionContext; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.seata.Common; +import kelpie.seata.DataSourceManager; + +public class WorkloadFr extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager primary; + private final DataSourceManager secondary; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + private final int dispatchRate; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadFr(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + this.dispatchRate = getDispatchRate(config); + + TMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + RMClient.init(Common.APPLICATION_ID, Common.TRANSACTION_SERVICE_GROUP); + primary = new DataSourceManager(config, PRIMARY_DB_CONFIG_NAME); + secondary = new DataSourceManager(config, SECONDARY_DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException, TransactionException { + List userIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + MycsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + + Connection primaryConnection = null; + Connection secondaryConnection = null; + while (true) { + GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); + primaryConnection = primary.getConnectionXA(); + secondaryConnection = secondary.getConnectionXA(); + try { + tx.begin(); + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + int rate = ThreadLocalRandom.current().nextInt(100) + 1; + if (rate > dispatchRate) { + read(primaryConnection, userId); + write(primaryConnection, userId, payloads.get(i)); + } else { + read(secondaryConnection, userId); + write(secondaryConnection, userId, payloads.get(i)); + } + } + tx.commit(); + break; + } catch (SQLException | TransactionException e) { + e.printStackTrace(); + tx.rollback(); + transactionRetryCount.increment(); + } catch (Exception e) { + tx.rollback(); + throw e; + } finally { + if (primaryConnection != null) { + primaryConnection.close(); + } + if (secondaryConnection != null) { + secondaryConnection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +}