Skip to content

Commit da1cb57

Browse files
authored
Add getScanner() method to transaction interfaces (#2698)
1 parent f713495 commit da1cb57

30 files changed

+502
-11
lines changed

core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.scalar.db.storage.jdbc.JdbcEnv;
77
import java.util.Properties;
88
import org.junit.jupiter.api.Disabled;
9+
import org.junit.jupiter.api.Test;
910

1011
public class JdbcTransactionIntegrationTest extends DistributedTransactionIntegrationTestBase {
1112

@@ -24,17 +25,31 @@ protected Properties getProperties(String testName) {
2425

2526
@Disabled("JDBC transactions don't support getState()")
2627
@Override
28+
@Test
2729
public void getState_forSuccessfulTransaction_ShouldReturnCommittedState() {}
2830

2931
@Disabled("JDBC transactions don't support getState()")
3032
@Override
33+
@Test
3134
public void getState_forFailedTransaction_ShouldReturnAbortedState() {}
3235

3336
@Disabled("JDBC transactions don't support abort()")
3437
@Override
38+
@Test
3539
public void abort_forOngoingTransaction_ShouldAbortCorrectly() {}
3640

3741
@Disabled("JDBC transactions don't support rollback()")
3842
@Override
43+
@Test
3944
public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {}
45+
46+
@Disabled("Implement later")
47+
@Override
48+
@Test
49+
public void getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {}
50+
51+
@Disabled("Implement later")
52+
@Override
53+
@Test
54+
public void manager_getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {}
4055
}

core/src/main/java/com/scalar/db/api/CrudOperable.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
* An interface for transactional CRUD operations. Note that the LINEARIZABLE consistency level is
1313
* always used in transactional CRUD operations, so {@link Consistency} specified for CRUD
1414
* operations is ignored.
15+
*
16+
* @param <E> the type of {@link TransactionException} that the implementation throws if the
17+
* operation fails
1518
*/
1619
public interface CrudOperable<E extends TransactionException> {
1720

@@ -26,16 +29,37 @@ public interface CrudOperable<E extends TransactionException> {
2629
Optional<Result> get(Get get) throws E;
2730

2831
/**
29-
* Retrieves results from the storage through a transaction with the specified {@link Scan}
30-
* command with a partition key and returns a list of {@link Result}. Results can be filtered by
31-
* specifying a range of clustering keys.
32+
* Retrieves results from the storage through a transaction with the specified {@link Scan} or
33+
* {@link ScanAll} or {@link ScanWithIndex} command with a partition key and returns a list of
34+
* {@link Result}. Results can be filtered by specifying a range of clustering keys.
35+
*
36+
* <ul>
37+
* <li>{@link Scan} : by specifying a partition key, it will return results within the
38+
* partition. Results can be filtered by specifying a range of clustering keys.
39+
* <li>{@link ScanAll} : for a given table, it will return all its records even if they span
40+
* several partitions.
41+
* <li>{@link ScanWithIndex} : by specifying an index key, it will return results within the
42+
* index.
43+
* </ul>
3244
*
3345
* @param scan a {@code Scan} command
3446
* @return a list of {@link Result}
3547
* @throws E if the transaction CRUD operation fails
3648
*/
3749
List<Result> scan(Scan scan) throws E;
3850

51+
/**
52+
* Retrieves results from the storage through a transaction with the specified {@link Scan} or
53+
* {@link ScanAll} or {@link ScanWithIndex} command with a partition key and returns a {@link
54+
* Scanner} to iterate over the results. Results can be filtered by specifying a range of
55+
* clustering keys.
56+
*
57+
* @param scan a {@code Scan} command
58+
* @return a {@code Scanner} to iterate over the results
59+
* @throws E if the transaction CRUD operation fails
60+
*/
61+
Scanner<E> getScanner(Scan scan) throws E;
62+
3963
/**
4064
* Inserts an entry into or updates an entry in the underlying storage through a transaction with
4165
* the specified {@link Put} command. If a condition is specified in the {@link Put} command, and
@@ -131,4 +155,32 @@ public interface CrudOperable<E extends TransactionException> {
131155
* @throws E if the transaction CRUD operation fails
132156
*/
133157
void mutate(List<? extends Mutation> mutations) throws E;
158+
159+
/** A scanner abstraction for iterating results. */
160+
interface Scanner<E extends TransactionException> extends AutoCloseable, Iterable<Result> {
161+
/**
162+
* Returns the next result.
163+
*
164+
* @return an {@code Optional} containing the next result if available, or empty if no more
165+
* results
166+
* @throws E if the operation fails
167+
*/
168+
Optional<Result> one() throws E;
169+
170+
/**
171+
* Returns all remaining results.
172+
*
173+
* @return a {@code List} containing all remaining results
174+
* @throws E if the operation fails
175+
*/
176+
List<Result> all() throws E;
177+
178+
/**
179+
* Closes the scanner.
180+
*
181+
* @throws E if closing the scanner fails
182+
*/
183+
@Override
184+
void close() throws E;
185+
}
134186
}

core/src/main/java/com/scalar/db/api/Scanner.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@
1313
public interface Scanner extends Closeable, Iterable<Result> {
1414

1515
/**
16-
* Returns the first result in the results.
16+
* Returns the next result.
1717
*
18-
* @return the first result in the results
18+
* @return an {@code Optional} containing the next result if available, or empty if no more
19+
* results
1920
* @throws ExecutionException if the operation fails
2021
*/
2122
Optional<Result> one() throws ExecutionException;
2223

2324
/**
24-
* Returns all the results.
25+
* Returns all remaining results.
2526
*
26-
* @return the list of {@code Result}s
27+
* @return a {@code List} containing all remaining results
2728
* @throws ExecutionException if the operation fails
2829
*/
2930
List<Result> all() throws ExecutionException;

core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ public interface TransactionCrudOperable extends CrudOperable<CrudException> {
3333
@Override
3434
List<Result> scan(Scan scan) throws CrudConflictException, CrudException;
3535

36+
/**
37+
* {@inheritDoc}
38+
*
39+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
40+
* (e.g., a conflict error). You can retry the transaction from the beginning
41+
* @throws CrudException if the transaction CRUD operation fails due to transient or nontransient
42+
* faults. You can try retrying the transaction from the beginning, but the transaction may
43+
* still fail if the cause is nontranient
44+
*/
45+
@Override
46+
Scanner getScanner(Scan scan) throws CrudConflictException, CrudException;
47+
3648
/**
3749
* {@inheritDoc}
3850
*
@@ -154,4 +166,38 @@ void delete(List<Delete> deletes)
154166
@Override
155167
void mutate(List<? extends Mutation> mutations)
156168
throws CrudConflictException, CrudException, UnsatisfiedConditionException;
169+
170+
interface Scanner extends CrudOperable.Scanner<CrudException> {
171+
/**
172+
* {@inheritDoc}
173+
*
174+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
175+
* (e.g., a conflict error). You can retry the transaction from the beginning
176+
* @throws CrudException if the transaction CRUD operation fails due to transient or
177+
* nontransient faults. You can try retrying the transaction from the beginning, but the
178+
* transaction may still fail if the cause is nontranient
179+
*/
180+
@Override
181+
Optional<Result> one() throws CrudConflictException, CrudException;
182+
183+
/**
184+
* {@inheritDoc}
185+
*
186+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
187+
* (e.g., a conflict error). You can retry the transaction from the beginning
188+
* @throws CrudException if the transaction CRUD operation fails due to transient or
189+
* nontransient faults. You can try retrying the transaction from the beginning, but the
190+
* transaction may still fail if the cause is nontranient
191+
*/
192+
@Override
193+
List<Result> all() throws CrudConflictException, CrudException;
194+
195+
/**
196+
* {@inheritDoc}
197+
*
198+
* @throws CrudException if closing the scanner fails
199+
*/
200+
@Override
201+
void close() throws CrudException;
202+
}
157203
}

core/src/main/java/com/scalar/db/api/TransactionManagerCrudOperable.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ Optional<Result> get(Get get)
3939
List<Result> scan(Scan scan)
4040
throws CrudConflictException, CrudException, UnknownTransactionStatusException;
4141

42+
/**
43+
* {@inheritDoc}
44+
*
45+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
46+
* (e.g., a conflict error). You can retry the transaction from the beginning
47+
* @throws CrudException if the transaction CRUD operation fails due to transient or nontransient
48+
* faults. You can try retrying the transaction from the beginning, but the transaction may
49+
* still fail if the cause is nontranient
50+
*/
51+
@Override
52+
Scanner getScanner(Scan scan) throws CrudConflictException, CrudException;
53+
4254
/**
4355
* {@inheritDoc}
4456
*
@@ -177,4 +189,39 @@ void delete(List<Delete> deletes)
177189
void mutate(List<? extends Mutation> mutations)
178190
throws CrudConflictException, CrudException, UnsatisfiedConditionException,
179191
UnknownTransactionStatusException;
192+
193+
interface Scanner extends CrudOperable.Scanner<TransactionException> {
194+
/**
195+
* {@inheritDoc}
196+
*
197+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
198+
* (e.g., a conflict error). You can retry the transaction from the beginning
199+
* @throws CrudException if the transaction CRUD operation fails due to transient or
200+
* nontransient faults. You can try retrying the transaction from the beginning, but the
201+
* transaction may still fail if the cause is nontranient
202+
*/
203+
@Override
204+
Optional<Result> one() throws CrudConflictException, CrudException;
205+
206+
/**
207+
* {@inheritDoc}
208+
*
209+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
210+
* (e.g., a conflict error). You can retry the transaction from the beginning
211+
* @throws CrudException if the transaction CRUD operation fails due to transient or
212+
* nontransient faults. You can try retrying the transaction from the beginning, but the
213+
* transaction may still fail if the cause is nontranient
214+
*/
215+
@Override
216+
List<Result> all() throws CrudConflictException, CrudException;
217+
218+
/**
219+
* {@inheritDoc}
220+
*
221+
* @throws CrudException if closing the scanner fails
222+
* @throws UnknownTransactionStatusException if the status of the commit is unknown
223+
*/
224+
@Override
225+
void close() throws CrudException, UnknownTransactionStatusException;
226+
}
180227
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.scalar.db.common;
2+
3+
import com.google.errorprone.annotations.concurrent.LazyInit;
4+
import com.scalar.db.api.CrudOperable;
5+
import com.scalar.db.api.Result;
6+
import com.scalar.db.exception.transaction.TransactionException;
7+
import java.util.Iterator;
8+
import java.util.NoSuchElementException;
9+
import java.util.Objects;
10+
import javax.annotation.Nonnull;
11+
import javax.annotation.concurrent.NotThreadSafe;
12+
13+
public abstract class AbstractCrudOperableScanner<E extends TransactionException>
14+
implements CrudOperable.Scanner<E> {
15+
16+
@LazyInit private ScannerIterator scannerIterator;
17+
18+
@Override
19+
@Nonnull
20+
public Iterator<Result> iterator() {
21+
if (scannerIterator == null) {
22+
scannerIterator = new ScannerIterator(this);
23+
}
24+
return scannerIterator;
25+
}
26+
27+
@NotThreadSafe
28+
public class ScannerIterator implements Iterator<Result> {
29+
30+
private final CrudOperable.Scanner<E> scanner;
31+
private Result next;
32+
33+
public ScannerIterator(CrudOperable.Scanner<E> scanner) {
34+
this.scanner = Objects.requireNonNull(scanner);
35+
}
36+
37+
@Override
38+
public boolean hasNext() {
39+
if (next != null) {
40+
return true;
41+
}
42+
43+
try {
44+
return (next = scanner.one().orElse(null)) != null;
45+
} catch (TransactionException e) {
46+
throw new RuntimeException(e.getMessage(), e);
47+
}
48+
}
49+
50+
@Override
51+
public Result next() {
52+
if (!hasNext()) {
53+
throw new NoSuchElementException();
54+
}
55+
56+
Result ret = next;
57+
next = null;
58+
return ret;
59+
}
60+
}
61+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.scalar.db.common;
2+
3+
import com.scalar.db.api.TransactionCrudOperable;
4+
import com.scalar.db.exception.transaction.CrudException;
5+
6+
public abstract class AbstractTransactionCrudOperableScanner
7+
extends AbstractCrudOperableScanner<CrudException> implements TransactionCrudOperable.Scanner {}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.scalar.db.common;
2+
3+
import com.scalar.db.api.TransactionManagerCrudOperable;
4+
import com.scalar.db.exception.transaction.TransactionException;
5+
6+
public abstract class AbstractTransactionManagerCrudOperableScanner
7+
extends AbstractCrudOperableScanner<TransactionException>
8+
implements TransactionManagerCrudOperable.Scanner {}

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
121121
return super.scan(scan);
122122
}
123123

124+
@Override
125+
public synchronized Scanner getScanner(Scan scan) throws CrudException {
126+
return super.getScanner(scan);
127+
}
128+
124129
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
125130
@Deprecated
126131
@Override

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
127127
return super.scan(scan);
128128
}
129129

130+
@Override
131+
public synchronized Scanner getScanner(Scan scan) throws CrudException {
132+
return super.getScanner(scan);
133+
}
134+
130135
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
131136
@Deprecated
132137
@Override

0 commit comments

Comments
 (0)