@@ -79,7 +79,11 @@ public Optional<Result> get(Get originalGet) throws CrudException {
7979 Get get = (Get ) prepareStorageSelection (originalGet );
8080 Snapshot .Key key = new Snapshot .Key (get );
8181 readUnread (key , get );
82- return createGetResult (key , get , originalProjections );
82+
83+ TableMetadata metadata = getTableMetadata (get );
84+ return snapshot
85+ .getResult (key , get )
86+ .map (r -> new FilteredResult (r , originalProjections , metadata , isIncludeMetadataEnabled ));
8387 }
8488
8589 @ VisibleForTesting
@@ -113,35 +117,22 @@ void read(Snapshot.Key key, Get get) throws CrudException {
113117 snapshot .getId ());
114118 }
115119
116- private Optional <Result > createGetResult (Snapshot .Key key , Get get , List <String > projections )
117- throws CrudException {
118- TableMetadata metadata = getTableMetadata (key .getNamespace (), key .getTable ());
119- return snapshot
120- .getResult (key , get )
121- .map (r -> new FilteredResult (r , projections , metadata , isIncludeMetadataEnabled ));
122- }
123-
124- public List <Result > scan (Scan scan ) throws CrudException {
125- List <Result > results = scanInternal (scan );
126-
127- // We verify if this scan does not overlap previous writes using the actual scan result. Because
128- // we support arbitrary conditions in the where clause of a scan (not only ScanAll, but also
129- // Scan and ScanWithIndex), we cannot determine whether the scan results will include a record
130- // whose key is the same as the key specified in the previous writes, without knowing the
131- // obtained keys in the actual scan. With this check, users can avoid seeing unexpected scan
132- // results that have not included previous writes yet.
133- snapshot .verify (scan );
134-
135- return results ;
136- }
137-
138- private List <Result > scanInternal (Scan originalScan ) throws CrudException {
120+ public List <Result > scan (Scan originalScan ) throws CrudException {
139121 List <String > originalProjections = new ArrayList <>(originalScan .getProjections ());
140122 Scan scan = (Scan ) prepareStorageSelection (originalScan );
123+ Map <Snapshot .Key , TransactionResult > results = scanInternal (scan );
124+ snapshot .verifyNoOverlap (scan , results );
141125
126+ TableMetadata metadata = getTableMetadata (scan );
127+ return results .values ().stream ()
128+ .map (r -> new FilteredResult (r , originalProjections , metadata , isIncludeMetadataEnabled ))
129+ .collect (Collectors .toList ());
130+ }
131+
132+ private Map <Snapshot .Key , TransactionResult > scanInternal (Scan scan ) throws CrudException {
142133 Optional <Map <Snapshot .Key , TransactionResult >> resultsInSnapshot = snapshot .getResults (scan );
143134 if (resultsInSnapshot .isPresent ()) {
144- return createScanResults ( scan , originalProjections , resultsInSnapshot .get () );
135+ return resultsInSnapshot .get ();
145136 }
146137
147138 Map <Snapshot .Key , TransactionResult > results = new LinkedHashMap <>();
@@ -151,23 +142,21 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
151142 scanner = scanFromStorage (scan );
152143 for (Result r : scanner ) {
153144 TransactionResult result = new TransactionResult (r );
154- if (!result .isCommitted ()) {
155- throw new UncommittedRecordException (
156- scan ,
157- result ,
158- CoreError .CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD .buildMessage (),
159- snapshot .getId ());
160- }
161-
162145 Snapshot .Key key = new Snapshot .Key (scan , r );
163-
164- // We always update the read set to create before image by using the latest record (result)
165- // because another conflicting transaction might have updated the record after this
166- // transaction read it first.
167- snapshot .putIntoReadSet (key , Optional .of (result ));
168-
169- snapshot .getResult (key ).ifPresent (value -> results .put (key , value ));
146+ processScanResult (key , scan , result );
147+ results .put (key , result );
148+ }
149+ } catch (RuntimeException e ) {
150+ Exception exception ;
151+ if (e .getCause () instanceof ExecutionException ) {
152+ exception = (ExecutionException ) e .getCause ();
153+ } else {
154+ exception = e ;
170155 }
156+ throw new CrudException (
157+ CoreError .CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED .buildMessage (),
158+ exception ,
159+ snapshot .getId ());
171160 } finally {
172161 if (scanner != null ) {
173162 try {
@@ -177,19 +166,26 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
177166 }
178167 }
179168 }
169+
180170 snapshot .putIntoScanSet (scan , results );
181171
182- return createScanResults ( scan , originalProjections , results ) ;
172+ return results ;
183173 }
184174
185- private List <Result > createScanResults (
186- Scan scan , List <String > projections , Map <Snapshot .Key , TransactionResult > results )
175+ private void processScanResult (Snapshot .Key key , Scan scan , TransactionResult result )
187176 throws CrudException {
188- assert scan .forNamespace ().isPresent () && scan .forTable ().isPresent ();
189- TableMetadata metadata = getTableMetadata (scan .forNamespace ().get (), scan .forTable ().get ());
190- return results .values ().stream ()
191- .map (r -> new FilteredResult (r , projections , metadata , isIncludeMetadataEnabled ))
192- .collect (Collectors .toList ());
177+ if (!result .isCommitted ()) {
178+ throw new UncommittedRecordException (
179+ scan ,
180+ result ,
181+ CoreError .CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD .buildMessage (),
182+ snapshot .getId ());
183+ }
184+
185+ // We always update the read set to create before image by using the latest record (result)
186+ // because another conflicting transaction might have updated the record after this
187+ // transaction read it first.
188+ snapshot .putIntoReadSet (key , Optional .of (result ));
193189 }
194190
195191 public void put (Put put ) throws CrudException {
@@ -322,14 +318,14 @@ private TransactionTableMetadata getTransactionTableMetadata(Operation operation
322318 }
323319 }
324320
325- private TableMetadata getTableMetadata (String namespace , String table ) throws CrudException {
321+ private TableMetadata getTableMetadata (Operation operation ) throws CrudException {
326322 try {
327323 TransactionTableMetadata metadata =
328- tableMetadataManager .getTransactionTableMetadata (namespace , table );
324+ tableMetadataManager .getTransactionTableMetadata (operation );
329325 if (metadata == null ) {
326+ assert operation .forFullTableName ().isPresent ();
330327 throw new IllegalArgumentException (
331- CoreError .TABLE_NOT_FOUND .buildMessage (
332- ScalarDbUtils .getFullTableName (namespace , table )));
328+ CoreError .TABLE_NOT_FOUND .buildMessage (operation .forFullTableName ().get ()));
333329 }
334330 return metadata .getTableMetadata ();
335331 } catch (ExecutionException e ) {
0 commit comments