55import com .scalar .db .api .Put ;
66import com .scalar .db .api .Result ;
77import com .scalar .db .api .Scan ;
8+ import com .scalar .db .api .TransactionCrudOperable ;
89import com .scalar .db .exception .transaction .TransactionException ;
910import com .scalar .db .exception .transaction .UnknownTransactionStatusException ;
1011import com .scalar .kelpie .config .Config ;
1112import com .scalar .kelpie .exception .ProcessFatalException ;
1213import com .scalar .kelpie .modules .TimeBasedProcessor ;
14+ import java .util .ArrayList ;
15+ import java .util .HashSet ;
1316import java .util .List ;
17+ import java .util .Optional ;
18+ import java .util .Set ;
1419import java .util .concurrent .ThreadLocalRandom ;
1520import java .util .concurrent .atomic .AtomicBoolean ;
21+ import java .util .concurrent .atomic .AtomicInteger ;
1622import javax .json .Json ;
1723import kelpie .scalardb .Common ;
1824
@@ -21,6 +27,7 @@ public class SensorProcessor extends TimeBasedProcessor {
2127 private final int numDevices ;
2228 private final AtomicBoolean isVerification ;
2329 private final int startTimestamp ;
30+ private final AtomicInteger numAttempts = new AtomicInteger ();
2431
2532 public SensorProcessor (Config config ) {
2633 super (config );
@@ -67,13 +74,41 @@ public void close() {
6774
6875 private void updateRevision (DistributedTransaction transaction , int timestamp , int deviceId )
6976 throws TransactionException {
70-
7177 Scan scan = SensorCommon .prepareScan (timestamp );
72- List <Result > results = transaction .scan (scan );
7378
74- boolean hasDuplicatedRevision = SensorCommon .hasDuplicatedRevision (results );
79+ boolean hasDuplicatedRevision ;
80+ List <Result > results ;
81+ boolean scannerUsed = numAttempts .getAndIncrement () % 2 == 0 ;
82+ if (!scannerUsed ) {
83+ // Use scan()
84+ results = transaction .scan (scan );
85+ hasDuplicatedRevision = SensorCommon .hasDuplicatedRevision (results );
86+ } else {
87+ // Use getScanner()
88+ hasDuplicatedRevision = false ;
89+ results = new ArrayList <>();
90+ try (TransactionCrudOperable .Scanner scanner = transaction .getScanner (scan )) {
91+ Set <Integer > tempSet = new HashSet <>();
92+ while (true ) {
93+ Optional <Result > result = scanner .one ();
94+ if (!result .isPresent ()) {
95+ break ;
96+ }
97+
98+ int revision = SensorCommon .getRevisionFromResult (result .get ());
99+ if (!tempSet .add (revision )) {
100+ hasDuplicatedRevision = true ;
101+ break ;
102+ }
103+
104+ results .add (result .get ());
105+ }
106+ }
107+ }
108+
75109 if (hasDuplicatedRevision ) {
76- throw new ProcessFatalException ("A revision is duplicated at " + timestamp );
110+ throw new ProcessFatalException (
111+ "A revision is duplicated. timestamp: " + timestamp + "; scannerUsed: " + scannerUsed );
77112 }
78113
79114 int revision = SensorCommon .getMaxRevision (results ) + 1 ;
0 commit comments