11package com .scalar .db .storage .objectstorage ;
22
3- import com .fasterxml .jackson .core .type .TypeReference ;
43import com .scalar .db .api .Delete ;
54import com .scalar .db .api .Mutation ;
65import com .scalar .db .api .Put ;
1413import java .util .HashMap ;
1514import java .util .List ;
1615import java .util .Map ;
17- import java .util .Optional ;
1816
1917public class MutateStatementHandler extends StatementHandler {
2018 public MutateStatementHandler (
@@ -26,138 +24,67 @@ public void handle(Mutation mutation) throws ExecutionException {
2624 TableMetadata tableMetadata = metadataManager .getTableMetadata (mutation );
2725 ObjectStorageMutation objectStorageMutation =
2826 new ObjectStorageMutation (mutation , tableMetadata );
29- mutate (
30- getNamespace (mutation ),
31- getTable (mutation ),
32- objectStorageMutation .getConcatenatedPartitionKey (),
33- Collections .singletonList (mutation ));
27+ String objectKey =
28+ ObjectStoragePartition .getObjectKey (
29+ getNamespace (mutation ),
30+ getTable (mutation ),
31+ objectStorageMutation .getConcatenatedPartitionKey ());
32+ mutate (objectKey , Collections .singletonList (mutation ));
3433 }
3534
3635 public void handle (List <? extends Mutation > mutations ) throws ExecutionException {
37- Map <PartitionIdentifier , List <Mutation >> mutationPerPartition = new HashMap <>();
36+ Map <String , List <Mutation >> mutationPerPartition = new HashMap <>();
3837 for (Mutation mutation : mutations ) {
3938 TableMetadata tableMetadata = metadataManager .getTableMetadata (mutation );
4039 ObjectStorageMutation objectStorageMutation =
4140 new ObjectStorageMutation (mutation , tableMetadata );
4241 String partitionKey = objectStorageMutation .getConcatenatedPartitionKey ();
43- PartitionIdentifier partitionIdentifier =
44- PartitionIdentifier .of (getNamespace (mutation ), getTable (mutation ), partitionKey );
45- mutationPerPartition
46- .computeIfAbsent (partitionIdentifier , k -> new ArrayList <>())
47- .add (mutation );
42+ String objectKey =
43+ ObjectStoragePartition .getObjectKey (
44+ getNamespace (mutation ), getTable (mutation ), partitionKey );
45+ mutationPerPartition .computeIfAbsent (objectKey , k -> new ArrayList <>()).add (mutation );
4846 }
49- for (Map .Entry <PartitionIdentifier , List <Mutation >> entry : mutationPerPartition .entrySet ()) {
50- PartitionIdentifier partitionIdentifier = entry .getKey ();
51- mutate (
52- partitionIdentifier .getNamespaceName (),
53- partitionIdentifier .getTableName (),
54- partitionIdentifier .getPartitionName (),
55- entry .getValue ());
47+ for (Map .Entry <String , List <Mutation >> entry : mutationPerPartition .entrySet ()) {
48+ mutate (entry .getKey (), entry .getValue ());
5649 }
5750 }
5851
59- private void mutate (
60- String namespaceName , String tableName , String partitionKey , List <Mutation > mutations )
61- throws ExecutionException {
62- Map <PartitionIdentifier , String > readVersionMap = new HashMap <>();
63- ObjectStoragePartition partition =
64- getPartition (namespaceName , tableName , partitionKey , readVersionMap );
52+ private void mutate (String objectKey , List <Mutation > mutations ) throws ExecutionException {
53+ ObjectStoragePartitionSnapshot snapshot = getPartition (objectKey );
6554 for (Mutation mutation : mutations ) {
55+ TableMetadata tableMetadata = metadataManager .getTableMetadata (mutation );
6656 if (mutation instanceof Put ) {
67- partition .applyPut ((Put ) mutation , metadataManager . getTableMetadata ( mutation ) );
57+ snapshot .applyPut ((Put ) mutation , tableMetadata );
6858 } else {
6959 assert mutation instanceof Delete ;
70- partition .applyDelete ((Delete ) mutation , metadataManager . getTableMetadata ( mutation ) );
60+ snapshot .applyDelete ((Delete ) mutation , tableMetadata );
7161 }
7262 }
73- applyPartitionWrite ( namespaceName , tableName , partitionKey , partition , readVersionMap );
63+ writePartition ( snapshot );
7464 }
7565
7666 /**
77- * Applies the partition write .
67+ * Writes a partition to the object storage .
7868 *
79- * @param namespaceName the namespace name
80- * @param tableName the table name
81- * @param partitionKey the partition key
82- * @param partition the partition to be written
83- * @param readVersionMap the map of read versions
69+ * @param snapshot the partition snapshot
8470 * @throws ExecutionException if a failure occurs during the operation
8571 */
86- private void applyPartitionWrite (
87- String namespaceName ,
88- String tableName ,
89- String partitionKey ,
90- ObjectStoragePartition partition ,
91- Map <PartitionIdentifier , String > readVersionMap )
92- throws ExecutionException {
93- if (readVersionMap .containsKey (partition .getPartitionIdentifier ())) {
94- String readVersion = readVersionMap .get (partition .getPartitionIdentifier ());
95- if (!partition .isEmpty ()) {
96- updatePartition (namespaceName , tableName , partitionKey , partition , readVersion );
97- } else {
98- deletePartition (namespaceName , tableName , partitionKey , readVersion );
99- }
100- } else {
101- if (!partition .isEmpty ()) {
102- insertPartition (namespaceName , tableName , partitionKey , partition );
103- }
104- }
105- }
106-
107- /**
108- * Gets a partition from the object storage.
109- *
110- * @param namespaceName the namespace name
111- * @param tableName the table name
112- * @param partitionKey the partition key
113- * @param readVersionMap the map to store the read version
114- * @return the partition
115- * @throws ExecutionException if a failure occurs during the operation
116- */
117- private ObjectStoragePartition getPartition (
118- String namespaceName ,
119- String tableName ,
120- String partitionKey ,
121- Map <PartitionIdentifier , String > readVersionMap )
122- throws ExecutionException {
123- String objectKey = ObjectStorageUtils .getObjectKey (namespaceName , tableName , partitionKey );
72+ private void writePartition (ObjectStoragePartitionSnapshot snapshot ) throws ExecutionException {
12473 try {
125- Optional <ObjectStorageWrapperResponse > response = wrapper .get (objectKey );
126- if (!response .isPresent ()) {
127- return ObjectStoragePartition .newBuilder ()
128- .namespaceName (namespaceName )
129- .tableName (tableName )
130- .partitionKey (partitionKey )
131- .build ();
74+ if (snapshot .getReadVersion ().isPresent ()) {
75+ if (!snapshot .getPartition ().isEmpty ()) {
76+ wrapper .update (
77+ snapshot .getObjectKey (),
78+ snapshot .getPartition ().serialize (),
79+ snapshot .getReadVersion ().get ());
80+ } else {
81+ wrapper .delete (snapshot .getObjectKey (), snapshot .getReadVersion ().get ());
82+ }
83+ } else {
84+ if (!snapshot .getPartition ().isEmpty ()) {
85+ wrapper .insert (snapshot .getObjectKey (), snapshot .getPartition ().serialize ());
86+ }
13287 }
133- ObjectStoragePartition partition =
134- Serializer .deserialize (
135- response .get ().getPayload (), new TypeReference <ObjectStoragePartition >() {});
136- readVersionMap .put (partition .getPartitionIdentifier (), response .get ().getVersion ());
137- return partition ;
138- } catch (ObjectStorageWrapperException e ) {
139- throw new ExecutionException (
140- CoreError .OBJECT_STORAGE_ERROR_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
141- }
142- }
143-
144- /**
145- * Inserts a partition into the object storage. This method is called after confirming that the
146- * partition does not exist.
147- *
148- * @param namespaceName the namespace name
149- * @param tableName the table name
150- * @param partitionKey the partition key
151- * @param partition the partition to be inserted
152- * @throws ExecutionException if a failure occurs during the operation
153- */
154- private void insertPartition (
155- String namespaceName , String tableName , String partitionKey , ObjectStoragePartition partition )
156- throws ExecutionException {
157- try {
158- wrapper .insert (
159- ObjectStorageUtils .getObjectKey (namespaceName , tableName , partitionKey ),
160- Serializer .serialize (partition ));
16188 } catch (PreconditionFailedException e ) {
16289 throw new RetriableExecutionException (
16390 CoreError .OBJECT_STORAGE_CONFLICT_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
@@ -168,56 +95,24 @@ private void insertPartition(
16895 }
16996
17097 /**
171- * Updates a partition in the object storage. This method is called after confirming that the
172- * partition exists.
98+ * Gets a partition and its version as a snapshot from the object storage.
17399 *
174- * @param namespaceName the namespace name
175- * @param tableName the table name
176- * @param partitionKey the partition key
177- * @param partition the partition to be updated
178- * @param readVersion the read version
179- * @throws ExecutionException if a failure occurs during the operation
180- */
181- private void updatePartition (
182- String namespaceName ,
183- String tableName ,
184- String partitionKey ,
185- ObjectStoragePartition partition ,
186- String readVersion )
187- throws ExecutionException {
188- try {
189- wrapper .update (
190- ObjectStorageUtils .getObjectKey (namespaceName , tableName , partitionKey ),
191- Serializer .serialize (partition ),
192- readVersion );
193- } catch (PreconditionFailedException e ) {
194- throw new RetriableExecutionException (
195- CoreError .OBJECT_STORAGE_CONFLICT_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
196- } catch (ObjectStorageWrapperException e ) {
197- throw new ExecutionException (
198- CoreError .OBJECT_STORAGE_ERROR_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
199- }
200- }
201-
202- /**
203- * Deletes a partition from the object storage. This method is called after confirming that the
204- * partition exists.
205- *
206- * @param namespaceName the namespace name
207- * @param tableName the table name
208- * @param partitionKey the partition key
209- * @param readVersion the read version
100+ * @param objectKey the object key
101+ * @return the partition
210102 * @throws ExecutionException if a failure occurs during the operation
211103 */
212- private void deletePartition (
213- String namespaceName , String tableName , String partitionKey , String readVersion )
214- throws ExecutionException {
104+ private ObjectStoragePartitionSnapshot getPartition (String objectKey ) throws ExecutionException {
215105 try {
216- wrapper .delete (
217- ObjectStorageUtils .getObjectKey (namespaceName , tableName , partitionKey ), readVersion );
218- } catch (PreconditionFailedException e ) {
219- throw new RetriableExecutionException (
220- CoreError .OBJECT_STORAGE_CONFLICT_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
106+ return wrapper
107+ .get (objectKey )
108+ .map (
109+ response ->
110+ new ObjectStoragePartitionSnapshot (
111+ objectKey , response .getPayload (), response .getVersion ()))
112+ .orElseGet (
113+ () ->
114+ new ObjectStoragePartitionSnapshot (
115+ objectKey , new ObjectStoragePartition (null ), null ));
221116 } catch (ObjectStorageWrapperException e ) {
222117 throw new ExecutionException (
223118 CoreError .OBJECT_STORAGE_ERROR_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
0 commit comments