Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions scalardb-test/schema/tx_sensor.cql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ DROP KEYSPACE IF EXISTS sensor;
CREATE KEYSPACE IF NOT EXISTS sensor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
DROP KEYSPACE IF EXISTS coordinator;
CREATE KEYSPACE IF NOT EXISTS coordinator WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE KEYSPACE IF NOT EXISTS scalardb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };

CREATE TABLE IF NOT EXISTS sensor.tx_sensor (
timestamp int,
Expand All @@ -27,3 +28,10 @@ CREATE TABLE IF NOT EXISTS coordinator.state (
tx_created_at bigint,
PRIMARY KEY (tx_id)
);

CREATE TABLE IF NOT EXISTS scalardb.namespaces (
name text,
PRIMARY KEY (name)
);

INSERT INTO scalardb.namespaces (name) VALUES ('sensor');
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void execute() {
}

if (isDuplicated) {
logError("dupilication happened !");
logError("Duplication happened !");
throw new PostProcessException("Inconsistency happened!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static int getMaxRevision(List<Result> results) {
return maxRevision.orElse(0);
}

private static int getRevisionFromResult(Result result) {
public static int getRevisionFromResult(Result result) {
return result.getInt(REVISION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.TransactionCrudOperable;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.kelpie.config.Config;
import com.scalar.kelpie.exception.ProcessFatalException;
import com.scalar.kelpie.modules.TimeBasedProcessor;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.json.Json;
import kelpie.scalardb.Common;

Expand All @@ -21,6 +27,7 @@ public class SensorProcessor extends TimeBasedProcessor {
private final int numDevices;
private final AtomicBoolean isVerification;
private final int startTimestamp;
private final AtomicInteger numAttempts = new AtomicInteger();

public SensorProcessor(Config config) {
super(config);
Expand Down Expand Up @@ -67,13 +74,41 @@ public void close() {

private void updateRevision(DistributedTransaction transaction, int timestamp, int deviceId)
throws TransactionException {

Scan scan = SensorCommon.prepareScan(timestamp);
List<Result> results = transaction.scan(scan);

boolean hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results);
boolean hasDuplicatedRevision;
List<Result> results;
boolean scannerUsed = numAttempts.getAndIncrement() % 2 == 0;
if (!scannerUsed) {
// Use scan()
results = transaction.scan(scan);
hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results);
} else {
// Use getScanner()
hasDuplicatedRevision = false;
results = new ArrayList<>();
try (TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan)) {
Set<Integer> tempSet = new HashSet<>();
while (true) {
Optional<Result> result = scanner.one();
if (!result.isPresent()) {
break;
}

int revision = SensorCommon.getRevisionFromResult(result.get());
if (!tempSet.add(revision)) {
hasDuplicatedRevision = true;
break;
}

results.add(result.get());
}
}
}

if (hasDuplicatedRevision) {
throw new ProcessFatalException("A revision is duplicated at " + timestamp);
throw new ProcessFatalException(
"A revision is duplicated. timestamp: " + timestamp + "; scannerUsed: " + scannerUsed);
}

int revision = SensorCommon.getMaxRevision(results) + 1;
Expand Down