@@ -66,7 +66,7 @@ private void mutate(
6666 String namespaceName , String tableName , String partitionKey , List <Mutation > mutations )
6767 throws ExecutionException {
6868 Map <PartitionIdentifier , String > readVersionMap = new HashMap <>();
69- Map < String , ObjectStorageRecord > partition =
69+ ObjectStoragePartition partition =
7070 getPartition (namespaceName , tableName , partitionKey , readVersionMap );
7171 for (Mutation mutation : mutations ) {
7272 if (mutation instanceof Put ) {
@@ -79,78 +79,80 @@ private void mutate(
7979 applyPartitionWrite (namespaceName , tableName , partitionKey , partition , readVersionMap );
8080 }
8181
82- private void putInternal (Map <String , ObjectStorageRecord > partition , Put put )
83- throws ExecutionException {
82+ private void putInternal (ObjectStoragePartition partition , Put put ) throws ExecutionException {
8483 TableMetadata tableMetadata = metadataManager .getTableMetadata (put );
8584 ObjectStorageMutation mutation = new ObjectStorageMutation (put , tableMetadata );
8685 if (!put .getCondition ().isPresent ()) {
87- ObjectStorageRecord existingRecord = partition .get (mutation .getRecordId ());
88- if (existingRecord == null ) {
89- partition .put (mutation .getRecordId (), mutation .makeRecord ());
86+ if (!partition .getRecord (mutation .getRecordId ()).isPresent ()) {
87+ partition .putRecord (mutation .getRecordId (), mutation .makeRecord ());
9088 } else {
91- partition .put (mutation .getRecordId (), mutation .makeRecord (existingRecord ));
89+ partition .putRecord (
90+ mutation .getRecordId (),
91+ mutation .makeRecord (partition .getRecord (mutation .getRecordId ()).get ()));
9292 }
9393 } else if (put .getCondition ().get () instanceof PutIfNotExists ) {
94- if (partition .containsKey (mutation .getRecordId ())) {
94+ if (partition .getRecord (mutation .getRecordId ()). isPresent ( )) {
9595 throw new NoMutationException (
9696 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ));
9797 }
98- partition .put (mutation .getRecordId (), mutation .makeRecord ());
98+ partition .putRecord (mutation .getRecordId (), mutation .makeRecord ());
9999 } else if (put .getCondition ().get () instanceof PutIfExists ) {
100- ObjectStorageRecord existingRecord = partition .get (mutation .getRecordId ());
101- if (existingRecord == null ) {
100+ if (!partition .getRecord (mutation .getRecordId ()).isPresent ()) {
102101 throw new NoMutationException (
103102 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ));
104103 }
105- partition .put (mutation .getRecordId (), mutation .makeRecord (existingRecord ));
104+ partition .putRecord (
105+ mutation .getRecordId (),
106+ mutation .makeRecord (partition .getRecord (mutation .getRecordId ()).get ()));
106107 } else {
107108 assert put .getCondition ().get () instanceof PutIf ;
108- ObjectStorageRecord existingRecord = partition .get (mutation .getRecordId ());
109- if (existingRecord == null ) {
109+ if (!partition .getRecord (mutation .getRecordId ()).isPresent ()) {
110110 throw new NoMutationException (
111111 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ));
112112 }
113113 try {
114114 validateConditions (
115- partition .get (mutation .getRecordId ()),
115+ partition .getRecord (mutation .getRecordId ()). get ( ),
116116 put .getCondition ().get ().getExpressions (),
117117 metadataManager .getTableMetadata (mutation .getOperation ()));
118118 } catch (ExecutionException e ) {
119119 throw new NoMutationException (
120120 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (put ), e );
121121 }
122- partition .put (mutation .getRecordId (), mutation .makeRecord (existingRecord ));
122+ partition .putRecord (
123+ mutation .getRecordId (),
124+ mutation .makeRecord (partition .getRecord (mutation .getRecordId ()).get ()));
123125 }
124126 }
125127
126- private void deleteInternal (Map < String , ObjectStorageRecord > partition , Delete delete )
128+ private void deleteInternal (ObjectStoragePartition partition , Delete delete )
127129 throws ExecutionException {
128130 TableMetadata tableMetadata = metadataManager .getTableMetadata (delete );
129131 ObjectStorageMutation mutation = new ObjectStorageMutation (delete , tableMetadata );
130132 if (!delete .getCondition ().isPresent ()) {
131- partition .remove (mutation .getRecordId ());
133+ partition .removeRecord (mutation .getRecordId ());
132134 } else if (delete .getCondition ().get () instanceof DeleteIfExists ) {
133- if (!partition .containsKey (mutation .getRecordId ())) {
135+ if (!partition .getRecord (mutation .getRecordId ()). isPresent ( )) {
134136 throw new NoMutationException (
135137 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (delete ));
136138 }
137- partition .remove (mutation .getRecordId ());
139+ partition .removeRecord (mutation .getRecordId ());
138140 } else {
139141 assert delete .getCondition ().get () instanceof DeleteIf ;
140- if (!partition .containsKey (mutation .getRecordId ())) {
142+ if (!partition .getRecord (mutation .getRecordId ()). isPresent ( )) {
141143 throw new NoMutationException (
142144 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (delete ));
143145 }
144146 try {
145147 validateConditions (
146- partition .get (mutation .getRecordId ()),
148+ partition .getRecord (mutation .getRecordId ()). get ( ),
147149 delete .getCondition ().get ().getExpressions (),
148150 metadataManager .getTableMetadata (mutation .getOperation ()));
149151 } catch (ExecutionException e ) {
150152 throw new NoMutationException (
151153 CoreError .NO_MUTATION_APPLIED .buildMessage (), Collections .singletonList (delete ), e );
152154 }
153- partition .remove (mutation .getRecordId ());
155+ partition .removeRecord (mutation .getRecordId ());
154156 }
155157 }
156158
@@ -168,13 +170,11 @@ private void applyPartitionWrite(
168170 String namespaceName ,
169171 String tableName ,
170172 String partitionKey ,
171- Map < String , ObjectStorageRecord > partition ,
173+ ObjectStoragePartition partition ,
172174 Map <PartitionIdentifier , String > readVersionMap )
173175 throws ExecutionException {
174- if (readVersionMap .containsKey (
175- PartitionIdentifier .of (namespaceName , tableName , partitionKey ))) {
176- String readVersion =
177- readVersionMap .get (PartitionIdentifier .of (namespaceName , tableName , partitionKey ));
176+ if (readVersionMap .containsKey (partition .getPartitionIdentifier ())) {
177+ String readVersion = readVersionMap .get (partition .getPartitionIdentifier ());
178178 if (!partition .isEmpty ()) {
179179 updatePartition (namespaceName , tableName , partitionKey , partition , readVersion );
180180 } else {
@@ -197,7 +197,7 @@ private void applyPartitionWrite(
197197 * @return the partition
198198 * @throws ExecutionException if a failure occurs during the operation
199199 */
200- private Map < String , ObjectStorageRecord > getPartition (
200+ private ObjectStoragePartition getPartition (
201201 String namespaceName ,
202202 String tableName ,
203203 String partitionKey ,
@@ -207,13 +207,17 @@ private Map<String, ObjectStorageRecord> getPartition(
207207 try {
208208 Optional <ObjectStorageWrapperResponse > response = wrapper .get (objectKey );
209209 if (!response .isPresent ()) {
210- return new HashMap <>();
210+ return ObjectStoragePartition .newBuilder ()
211+ .namespaceName (namespaceName )
212+ .tableName (tableName )
213+ .partitionKey (partitionKey )
214+ .build ();
211215 }
212- readVersionMap . put (
213- PartitionIdentifier . of ( namespaceName , tableName , partitionKey ),
214- response .get ().getVersion () );
215- return Serializer . deserialize (
216- response . get (). getPayload (), new TypeReference < Map < String , ObjectStorageRecord >>() {}) ;
216+ ObjectStoragePartition partition =
217+ Serializer . deserialize (
218+ response .get ().getPayload (), new TypeReference < ObjectStoragePartition >() {} );
219+ readVersionMap . put ( partition . getPartitionIdentifier (), response . get (). getVersion ());
220+ return partition ;
217221 } catch (ObjectStorageWrapperException e ) {
218222 throw new ExecutionException (
219223 CoreError .OBJECT_STORAGE_ERROR_OCCURRED_IN_MUTATION .buildMessage (e .getMessage ()), e );
@@ -231,10 +235,7 @@ private Map<String, ObjectStorageRecord> getPartition(
231235 * @throws ExecutionException if a failure occurs during the operation
232236 */
233237 private void insertPartition (
234- String namespaceName ,
235- String tableName ,
236- String partitionKey ,
237- Map <String , ObjectStorageRecord > partition )
238+ String namespaceName , String tableName , String partitionKey , ObjectStoragePartition partition )
238239 throws ExecutionException {
239240 try {
240241 wrapper .insert (
@@ -264,7 +265,7 @@ private void updatePartition(
264265 String namespaceName ,
265266 String tableName ,
266267 String partitionKey ,
267- Map < String , ObjectStorageRecord > partition ,
268+ ObjectStoragePartition partition ,
268269 String readVersion )
269270 throws ExecutionException {
270271 try {
0 commit comments