Skip to content

Commit 2742a73

Browse files
authored
Use Scanner API in Sensor workload (#96)
1 parent 27a3aeb commit 2742a73

File tree

4 files changed

+53
-6
lines changed

4 files changed

+53
-6
lines changed

scalardb-test/schema/tx_sensor.cql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ DROP KEYSPACE IF EXISTS sensor;
22
CREATE KEYSPACE IF NOT EXISTS sensor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
33
DROP KEYSPACE IF EXISTS coordinator;
44
CREATE KEYSPACE IF NOT EXISTS coordinator WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
5+
CREATE KEYSPACE IF NOT EXISTS scalardb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
56

67
CREATE TABLE IF NOT EXISTS sensor.tx_sensor (
78
timestamp int,
@@ -27,3 +28,12 @@ CREATE TABLE IF NOT EXISTS coordinator.state (
2728
tx_created_at bigint,
2829
PRIMARY KEY (tx_id)
2930
);
31+
32+
CREATE TABLE IF NOT EXISTS scalardb.namespaces (
33+
name text,
34+
PRIMARY KEY (name)
35+
);
36+
37+
INSERT INTO scalardb.namespaces (name) VALUES ('sensor');
38+
INSERT INTO scalardb.namespaces (name) VALUES ('coordinator');
39+
INSERT INTO scalardb.namespaces (name) VALUES ('scalardb');

scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void execute() {
4040
}
4141

4242
if (isDuplicated) {
43-
logError("dupilication happened !");
43+
logError("Duplication happened !");
4444
throw new PostProcessException("Inconsistency happened!");
4545
}
4646
}

scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static int getMaxRevision(List<Result> results) {
5555
return maxRevision.orElse(0);
5656
}
5757

58-
private static int getRevisionFromResult(Result result) {
58+
public static int getRevisionFromResult(Result result) {
5959
return result.getInt(REVISION);
6060
}
6161
}

scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@
55
import com.scalar.db.api.Put;
66
import com.scalar.db.api.Result;
77
import com.scalar.db.api.Scan;
8+
import com.scalar.db.api.TransactionCrudOperable;
89
import com.scalar.db.exception.transaction.TransactionException;
910
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
1011
import com.scalar.kelpie.config.Config;
1112
import com.scalar.kelpie.exception.ProcessFatalException;
1213
import com.scalar.kelpie.modules.TimeBasedProcessor;
14+
import java.util.ArrayList;
15+
import java.util.HashSet;
1316
import java.util.List;
17+
import java.util.Optional;
18+
import java.util.Set;
1419
import java.util.concurrent.ThreadLocalRandom;
1520
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicInteger;
1622
import javax.json.Json;
1723
import kelpie.scalardb.Common;
1824

@@ -21,6 +27,7 @@ public class SensorProcessor extends TimeBasedProcessor {
2127
private final int numDevices;
2228
private final AtomicBoolean isVerification;
2329
private final int startTimestamp;
30+
private final AtomicInteger numAttempts = new AtomicInteger();
2431

2532
public SensorProcessor(Config config) {
2633
super(config);
@@ -67,13 +74,43 @@ public void close() {
6774

6875
private void updateRevision(DistributedTransaction transaction, int timestamp, int deviceId)
6976
throws TransactionException {
70-
7177
Scan scan = SensorCommon.prepareScan(timestamp);
72-
List<Result> results = transaction.scan(scan);
7378

74-
boolean hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results);
79+
boolean hasDuplicatedRevision;
80+
List<Result> results;
81+
82+
// Alternate between scan() and getScanner() based on the attempt count.
83+
boolean scannerUsed = numAttempts.getAndIncrement() % 2 == 0;
84+
if (!scannerUsed) {
85+
// Use scan()
86+
results = transaction.scan(scan);
87+
hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results);
88+
} else {
89+
// Use getScanner()
90+
hasDuplicatedRevision = false;
91+
results = new ArrayList<>();
92+
try (TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan)) {
93+
Set<Integer> tempSet = new HashSet<>();
94+
while (true) {
95+
Optional<Result> result = scanner.one();
96+
if (!result.isPresent()) {
97+
break;
98+
}
99+
100+
int revision = SensorCommon.getRevisionFromResult(result.get());
101+
if (!tempSet.add(revision)) {
102+
hasDuplicatedRevision = true;
103+
break;
104+
}
105+
106+
results.add(result.get());
107+
}
108+
}
109+
}
110+
75111
if (hasDuplicatedRevision) {
76-
throw new ProcessFatalException("A revision is duplicated at " + timestamp);
112+
throw new ProcessFatalException(
113+
"A revision is duplicated. timestamp: " + timestamp + "; scannerUsed: " + scannerUsed);
77114
}
78115

79116
int revision = SensorCommon.getMaxRevision(results) + 1;

0 commit comments

Comments
 (0)