1919 */
2020package io .pixelsdb .pixels .daemon .retina ;
2121
22+ import com .google .common .base .Function ;
2223import com .google .protobuf .ByteString ;
2324import io .grpc .stub .StreamObserver ;
2425import io .pixelsdb .pixels .common .exception .IndexException ;
3536import org .apache .logging .log4j .LogManager ;
3637import org .apache .logging .log4j .Logger ;
3738
38- import java .util .ArrayList ;
39- import java .util .Arrays ;
40- import java .util .LinkedList ;
41- import java .util .List ;
39+ import java .util .*;
4240import java .util .stream .Collectors ;
4341import java .util .stream .IntStream ;
4442
@@ -201,6 +199,34 @@ public void onCompleted()
201199 };
202200 }
203201
202+ /**
203+ * Transpose the index keys from a row set to a column set.
204+ * @param dataList
205+ * @param indexExtractor
206+ * @return
207+ * @param <T>
208+ */
209+ private <T > List <List <IndexProto .IndexKey >> transposeIndexKeys (List <T > dataList ,
210+ Function <T , List <IndexProto .IndexKey >> indexExtractor )
211+ {
212+ if (dataList == null || dataList .isEmpty ())
213+ {
214+ return Collections .emptyList ();
215+ }
216+
217+ int indexNum = indexExtractor .apply (dataList .get (0 )).size ();
218+ if (indexNum == 0 )
219+ {
220+ return Collections .emptyList ();
221+ }
222+
223+ return IntStream .range (0 , indexNum )
224+ .mapToObj (i -> dataList .stream ()
225+ .map (data -> indexExtractor .apply (data ).get (i ))
226+ .collect (Collectors .toList ()))
227+ .collect (Collectors .toList ());
228+ }
229+
204230 /**
205231 * Common method to process updates for both normal and streaming rpc.
206232 *
@@ -217,120 +243,192 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw
217243 for (RetinaProto .TableUpdateData tableUpdateData : tableUpdateDataList )
218244 {
219245 String tableName = tableUpdateData .getTableName ();
246+ long primaryIndexId = tableUpdateData .getPrimaryIndexId ();
220247 long timestamp = tableUpdateData .getTimestamp ();
221- boolean init = true ;
248+
249+ // =================================================================
250+ // 1. Process Delete Data
251+ // =================================================================
222252 List <RetinaProto .DeleteData > deleteDataList = tableUpdateData .getDeleteDataList ();
223- long primaryIndexId = tableUpdateData .getPrimaryIndexId ();
224253 if (!deleteDataList .isEmpty ())
225254 {
255+ // 1a. Validate the delete data
226256 int indexNum = deleteDataList .get (0 ).getIndexKeysList ().size ();
227- List <List <IndexProto .IndexKey >> indexKeysList = new ArrayList <>(indexNum );
228- for (RetinaProto .DeleteData deleteData : deleteDataList )
257+ if (indexNum == 0 )
229258 {
230- List <IndexProto .IndexKey > deleteDataIndexKeysList = deleteData .getIndexKeysList ();
231- if (deleteDataIndexKeysList .isEmpty ())
232- {
233- throw new RetinaException ("Delete index key list is empty" );
234- }
235- extractIndexKeys (primaryIndexId , indexKeysList , deleteDataIndexKeysList , init );
259+ throw new RetinaException ("Delete index key list is empty" );
236260 }
237261
262+ boolean allRecordsValid = deleteDataList .stream ().allMatch (deleteData ->
263+ deleteData .getIndexKeysCount () == indexNum &&
264+ deleteData .getIndexKeys (0 ).getIndexId () == primaryIndexId );
265+ if (!allRecordsValid )
266+ {
267+ throw new RetinaException ("Primary index id mismatch or inconsistent index key list size" );
268+ }
269+
270+ // 1b. Transpose the index keys from row set to column set
271+ List <List <IndexProto .IndexKey >> indexKeysList = transposeIndexKeys (
272+ deleteDataList , RetinaProto .DeleteData ::getIndexKeysList );
273+
274+ // 1c. Delete the primary index entries and get the row locations
238275 List <IndexProto .IndexKey > primaryIndexKeys = indexKeysList .get (0 );
239276 long tableId = primaryIndexKeys .get (0 ).getTableId ();
240277 List <IndexProto .RowLocation > rowLocations = indexService .deletePrimaryIndexEntries
241278 (tableId , primaryIndexId , primaryIndexKeys );
279+
280+ // 1d. Delete the records
242281 for (IndexProto .RowLocation rowLocation : rowLocations )
243282 {
244283 this .retinaResourceManager .deleteRecord (rowLocation , timestamp );
245284 }
246285
247- for (int i = 1 ; i < indexNum ; i ++)
286+ // 1e. Delete the secondary index entries
287+ for (int i = 1 ; i < indexNum ; ++i )
248288 {
249289 List <IndexProto .IndexKey > indexKeys = indexKeysList .get (i );
250- indexService .deleteSecondaryIndexEntries
251- ( indexKeys . get ( 0 ). getTableId (), indexKeys .get (0 ).getIndexId (), indexKeys );
290+ indexService .deleteSecondaryIndexEntries ( indexKeys . get ( 0 ). getTableId (),
291+ indexKeys .get (0 ).getIndexId (), indexKeys );
252292 }
253293 }
254294
295+ // =================================================================
296+ // 2. Process Insert Data
297+ // =================================================================
255298 List <RetinaProto .InsertData > insertDataList = tableUpdateData .getInsertDataList ();
256299 if (!insertDataList .isEmpty ())
257300 {
301+ // 2a. Validate the insert data
258302 int indexNum = insertDataList .get (0 ).getIndexKeysList ().size ();
259- List <List <IndexProto .IndexKey >> indexKeysList = new ArrayList <>(indexNum );
303+ if (indexNum == 0 )
304+ {
305+ throw new RetinaException ("Insert index key list is empty" );
306+ }
307+
308+ boolean allRecordValid = insertDataList .stream ().allMatch (insertData ->
309+ insertData .getIndexKeysCount () == indexNum &&
310+ insertData .getIndexKeys (0 ).getIndexId () == primaryIndexId );
311+ if (!allRecordValid )
312+ {
313+ throw new RetinaException ("Primary index id mismatch or inconsistent index key list size" );
314+ }
315+
316+ // 2b. Transpose the index keys from row set to column set
317+ List <List <IndexProto .IndexKey >> indexKeysList = transposeIndexKeys (
318+ insertDataList , RetinaProto .InsertData ::getIndexKeysList );
319+
320+ // 2c. Insert the records and get the primary index entries
260321 List <IndexProto .PrimaryIndexEntry > primaryIndexEntries = new ArrayList <>(insertDataList .size ());
261322 List <Long > rowIdList = new ArrayList <>(insertDataList .size ());
323+
262324 for (RetinaProto .InsertData insertData : insertDataList )
263325 {
264- List <IndexProto .IndexKey > insertDataIndexKeysList = insertData .getIndexKeysList ();
265- if (insertDataIndexKeysList .isEmpty ())
266- {
267- throw new RetinaException ("Insert index key list is empty" );
268- }
269- extractIndexKeys (primaryIndexId , indexKeysList , insertDataIndexKeysList , init );
326+ byte [][] colValuesByteArray = insertData .getColValuesList ().stream ()
327+ .map (ByteString ::toByteArray )
328+ .toArray (byte [][]::new );
270329
271- List <ByteString > colValuesList = insertData .getColValuesList ();
272- byte [][] colValuesByteArray = new byte [colValuesList .size ()][];
273- for (int i = 0 ; i < colValuesList .size (); ++i )
274- {
275- colValuesByteArray [i ] = colValuesList .get (i ).toByteArray ();
276- }
277- IndexProto .PrimaryIndexEntry .Builder primaryIndexEntryBuilder =
330+ IndexProto .PrimaryIndexEntry .Builder builder =
278331 this .retinaResourceManager .insertRecord (schemaName , tableName ,
279332 colValuesByteArray , timestamp );
280- primaryIndexEntryBuilder .setIndexKey (insertData .getIndexKeys (0 ));
281- rowIdList .add (primaryIndexEntryBuilder .getRowId ());
282- primaryIndexEntries .add (primaryIndexEntryBuilder .build ());
333+ builder .setIndexKey (insertData .getIndexKeys (0 ));
334+ IndexProto .PrimaryIndexEntry entry = builder .build ();
335+ primaryIndexEntries .add (entry );
336+ rowIdList .add (entry .getRowId ());
283337 }
338+
339+ // 2d. Put the primary index entries
284340 long tableId = primaryIndexEntries .get (0 ).getIndexKey ().getTableId ();
285341 indexService .putPrimaryIndexEntries (tableId , primaryIndexId , primaryIndexEntries );
286- for (int i = 1 ; i < indexNum ; i ++)
342+
343+ // 2e. Put the secondary index entries
344+ for (int i = 1 ; i < indexNum ; ++i )
287345 {
288346 List <IndexProto .IndexKey > indexKeys = indexKeysList .get (i );
289- long indexId = indexKeys .get (0 ).getIndexId ();
290347 List <IndexProto .SecondaryIndexEntry > secondaryIndexEntries =
291348 IntStream .range (0 , indexKeys .size ())
292349 .mapToObj (j -> IndexProto .SecondaryIndexEntry .newBuilder ()
293350 .setRowId (rowIdList .get (j ))
294351 .setIndexKey (indexKeys .get (j ))
295352 .build ())
296353 .collect (Collectors .toList ());
297- indexService .putSecondaryIndexEntries
298- ( indexKeys .get (0 ).getTableId (), indexId , secondaryIndexEntries );
354+ indexService .putSecondaryIndexEntries ( indexKeys . get ( 0 ). getTableId (),
355+ indexKeys .get (0 ).getIndexId () , secondaryIndexEntries );
299356 }
300357 }
301- }
302- }
303- }
304-
305- private void extractIndexKeys (long primaryIndexId , List <List <IndexProto .IndexKey >> indexKeysList ,
306- List <IndexProto .IndexKey > allIndexKeysList , boolean init ) throws RetinaException
307- {
308- IndexProto .IndexKey primaryIndexKey = allIndexKeysList .get (0 );
309- if (primaryIndexKey .getIndexId () != primaryIndexId )
310- {
311- throw new RetinaException ("Primary index id mismatch" );
312- }
313358
314- for (int i = 0 ; i < allIndexKeysList .size (); ++i )
315- {
316- IndexProto .IndexKey currIndexKey = allIndexKeysList .get (i );
317- if (init )
318- {
319- indexKeysList .add (new ArrayList <>());
320- } else
321- {
322- // check if indexId or tableId is mismatch
323- IndexProto .IndexKey baseIndexKey = indexKeysList .get (i ).get (0 );
324- if (baseIndexKey .getIndexId () != currIndexKey .getIndexId ())
325- {
326- throw new RetinaException ("Index id mismatch" );
327- }
328- if (baseIndexKey .getTableId () != currIndexKey .getTableId ())
359+ // =================================================================
360+ // 3. Process Update Data
361+ // =================================================================
362+ List <RetinaProto .UpdateData > updateDataList = tableUpdateData .getUpdateDataList ();
363+ if (!updateDataList .isEmpty ())
329364 {
330- throw new RetinaException ("Table id mismatch" );
365+ // 3a. Validate the update data
366+ int indexNum = updateDataList .get (0 ).getIndexKeysList ().size ();
367+ if (indexNum == 0 )
368+ {
369+ throw new RetinaException ("Update index key list is empty" );
370+ }
371+
372+ boolean allRecordsValid = updateDataList .stream ().allMatch (updateData ->
373+ updateData .getIndexKeysCount () == indexNum &&
374+ updateData .getIndexKeys (0 ).getIndexId () == primaryIndexId );
375+ if (!allRecordsValid )
376+ {
377+ throw new RetinaException ("Primary index id mismatch or inconsistent index key list size" );
378+ }
379+
380+ // 3b. Transpose the index keys from row set to column set
381+ List <List <IndexProto .IndexKey >> indexKeysList = transposeIndexKeys (
382+ updateDataList , RetinaProto .UpdateData ::getIndexKeysList );
383+
384+ // 3c. Insert the records and get the primary index entries
385+ List <IndexProto .PrimaryIndexEntry > primaryIndexEntries = new ArrayList <>(updateDataList .size ());
386+ List <Long > rowIdList = new ArrayList <>(updateDataList .size ());
387+
388+ for (RetinaProto .UpdateData updateData : updateDataList )
389+ {
390+ byte [][] colValuesByteArray = updateData .getColValuesList ().stream ()
391+ .map (ByteString ::toByteArray )
392+ .toArray (byte [][]::new );
393+
394+ IndexProto .PrimaryIndexEntry .Builder builder =
395+ this .retinaResourceManager .insertRecord (schemaName , tableName ,
396+ colValuesByteArray , timestamp );
397+
398+ builder .setIndexKey (updateData .getIndexKeys (0 ));
399+ IndexProto .PrimaryIndexEntry entry = builder .build ();
400+ primaryIndexEntries .add (entry );
401+ rowIdList .add (entry .getRowId ());
402+ }
403+
404+ // 3d. Update the primary index entries and get the previous row locations
405+ long tableId = primaryIndexEntries .get (0 ).getIndexKey ().getTableId ();
406+ List <IndexProto .RowLocation > previousRowLocations = indexService .updatePrimaryIndexEntries
407+ (tableId , primaryIndexId , primaryIndexEntries );
408+
409+ // 3e. Delete the previous records
410+ for (IndexProto .RowLocation location : previousRowLocations )
411+ {
412+ this .retinaResourceManager .deleteRecord (location , timestamp );
413+ }
414+
415+ // 3f. Update the secondary index entries
416+ for (int i = 1 ; i < indexNum ; ++i )
417+ {
418+ List <IndexProto .IndexKey > indexKeys = indexKeysList .get (i );
419+ List <IndexProto .SecondaryIndexEntry > secondaryIndexEntries =
420+ IntStream .range (0 , indexKeys .size ())
421+ .mapToObj (j -> IndexProto .SecondaryIndexEntry .newBuilder ()
422+ .setRowId (rowIdList .get (j ))
423+ .setIndexKey (indexKeys .get (j ))
424+ .build ())
425+ .collect (Collectors .toList ());
426+
427+ indexService .updateSecondaryIndexEntries (indexKeys .get (0 ).getTableId (),
428+ indexKeys .get (0 ).getIndexId (), secondaryIndexEntries );
429+ }
331430 }
332431 }
333- indexKeysList .get (i ).add (currIndexKey );
334432 }
335433 }
336434
0 commit comments