23
23
import java .util .ArrayList ;
24
24
import java .util .Arrays ;
25
25
import java .util .Collections ;
26
+ import java .util .Iterator ;
26
27
import java .util .List ;
27
28
import java .util .NavigableSet ;
28
29
import java .util .function .Function ;
32
33
import org .apache .cassandra .db .BufferClusteringBound ;
33
34
import org .apache .cassandra .db .Clustering ;
34
35
import org .apache .cassandra .db .ClusteringBound ;
36
+ import org .apache .cassandra .db .ClusteringPrefix ;
35
37
import org .apache .cassandra .db .DataRange ;
36
38
import org .apache .cassandra .db .DecoratedKey ;
39
+ import org .apache .cassandra .db .DeletionInfo ;
37
40
import org .apache .cassandra .db .PartitionPosition ;
38
41
import org .apache .cassandra .db .PartitionRangeReadCommand ;
42
+ import org .apache .cassandra .db .RangeTombstone ;
39
43
import org .apache .cassandra .db .ReadCommand ;
40
44
import org .apache .cassandra .db .ReadResponse ;
41
45
import org .apache .cassandra .db .SinglePartitionReadCommand ;
42
46
import org .apache .cassandra .db .Slice ;
43
47
import org .apache .cassandra .db .Slices ;
48
+ import org .apache .cassandra .db .TruncateRequest ;
44
49
import org .apache .cassandra .db .filter .ClusteringIndexFilter ;
45
50
import org .apache .cassandra .db .filter .ClusteringIndexSliceFilter ;
46
51
import org .apache .cassandra .db .filter .ColumnFilter ;
50
55
import org .apache .cassandra .db .marshal .ByteBufferAccessor ;
51
56
import org .apache .cassandra .db .marshal .CompositeType ;
52
57
import org .apache .cassandra .db .marshal .Int32Type ;
58
+ import org .apache .cassandra .db .partitions .PartitionUpdate ;
53
59
import org .apache .cassandra .db .partitions .UnfilteredPartitionIterator ;
60
+ import org .apache .cassandra .db .rows .BTreeRow ;
54
61
import org .apache .cassandra .db .rows .Cell ;
62
+ import org .apache .cassandra .db .rows .ColumnData ;
55
63
import org .apache .cassandra .db .rows .Row ;
56
64
import org .apache .cassandra .db .rows .Unfiltered ;
57
65
import org .apache .cassandra .db .rows .UnfilteredRowIterator ;
68
76
import org .apache .cassandra .schema .TableMetadata ;
69
77
import org .apache .cassandra .tcm .ClusterMetadata ;
70
78
import org .apache .cassandra .tcm .membership .NodeId ;
79
+ import org .apache .cassandra .utils .btree .BTree ;
80
+ import org .apache .cassandra .utils .concurrent .AsyncPromise ;
81
+ import org .apache .cassandra .utils .concurrent .Promise ;
71
82
import org .apache .cassandra .utils .concurrent .SyncPromise ;
72
83
73
84
import static org .apache .cassandra .db .ClusteringBound .BOTTOM ;
@@ -228,16 +239,32 @@ private Request(DataRange dataRange, RowFilter rowFilter, ColumnFilter columnFil
228
239
229
240
private void send (RequestAndResponse rr , InetAddressAndPort endpoint )
230
241
{
231
- MessagingService .instance ().sendWithCallback (Message .out (Verb .READ_REQ , rr .readCommand ), endpoint , new RequestCallback <ReadResponse >()
242
+ send (Verb .READ_REQ , rr .readCommand , rr , endpoint );
243
+ }
244
+
245
+ private <Reply > Promise <Reply > send (Verb verb , Object payload , InetAddressAndPort endpoint )
246
+ {
247
+ Promise <Reply > promise = new AsyncPromise <>();
248
+ send (verb , payload , promise , endpoint );
249
+ return promise ;
250
+ }
251
+
252
+ private <Reply > void send (Verb verb , Object payload , Promise <Reply > promise , InetAddressAndPort endpoint )
253
+ {
254
+ // we have to send inline some of the MessagingService logic to circumvent the requirement to use AbstractWriteResponseHandler
255
+ Message <?> message = Message .out (verb , payload );
256
+ RequestCallback <?> callback = new RequestCallback <Reply >()
232
257
{
233
- @ Override public void onResponse (Message <ReadResponse > msg ) { rr .trySuccess (msg .payload ); }
258
+ @ Override public void onResponse (Message <Reply > msg ) { promise .trySuccess (msg .payload ); }
234
259
@ Override public boolean invokeOnFailure () { return true ; }
235
260
@ Override public void onFailure (InetAddressAndPort from , RequestFailure failure )
236
261
{
237
- if (failure .failure == null ) rr .tryFailure (new RuntimeException (failure .reason .toString ()));
238
- else rr .tryFailure (failure .failure );
262
+ if (failure .failure == null ) promise .tryFailure (new RuntimeException (failure .reason .toString ()));
263
+ else promise .tryFailure (failure .failure );
239
264
}
240
- });
265
+ };
266
+
267
+ MessagingService .instance ().sendWithCallback (message , endpoint , callback );
241
268
}
242
269
243
270
private void collect (PartitionsCollector collector , RequestAndResponse rr , Function <DecoratedKey , ByteBuffer []> pksToCks )
@@ -280,7 +307,6 @@ private void collect(PartitionsCollector collector, RequestAndResponse rr, Funct
280
307
}
281
308
}
282
309
}
283
-
284
310
}
285
311
286
312
private static boolean selectsOneRow (TableMetadata metadata , DataRange dataRange , DecoratedKey key )
@@ -299,9 +325,9 @@ private static boolean selectsOneRow(TableMetadata metadata, DataRange dataRange
299
325
return slice .start ().equals (slice .end ());
300
326
}
301
327
302
- private static Function <DecoratedKey , ByteBuffer []> partitionKeyToClusterings (TableMetadata metadata , TableMetadata local )
328
+ private static Function <DecoratedKey , ByteBuffer []> partitionKeyToClusterings (TableMetadata distributed , TableMetadata local )
303
329
{
304
- ByteBuffer [] cks = new ByteBuffer [metadata .clusteringColumns ().size ()];
330
+ ByteBuffer [] cks = new ByteBuffer [distributed .clusteringColumns ().size ()];
305
331
if (local .partitionKeyColumns ().size () == 1 )
306
332
{
307
333
return pk -> {
@@ -448,4 +474,136 @@ private static ClusteringIndexSliceFilter filter(TableMetadata metadata, Cluster
448
474
{
449
475
return new ClusteringIndexSliceFilter (Slices .with (metadata .comparator , Slice .make (start , end )), reversed );
450
476
}
477
+
478
+ @ Override
479
+ public void apply (PartitionUpdate update )
480
+ {
481
+ int nodeId = Int32Type .instance .compose (update .partitionKey ().getKey ());
482
+ InetAddressAndPort endpoint = ClusterMetadata .current ().directory .endpoint (new NodeId (nodeId ));
483
+ if (endpoint == null )
484
+ throw new InvalidRequestException ("Unknown node " + nodeId );
485
+
486
+ DeletionInfo deletionInfo = update .deletionInfo ();
487
+ if (!deletionInfo .getPartitionDeletion ().isLive ())
488
+ {
489
+ truncate (endpoint ).syncThrowUncheckedOnInterrupt ();
490
+ return ;
491
+ }
492
+
493
+ int pkCount = local .partitionKeyColumns ().size ();
494
+ ByteBuffer [] pkBuffer , ckBuffer ;
495
+ {
496
+ int ckCount = local .clusteringColumns ().size ();
497
+ pkBuffer = pkCount == 1 ? null : new ByteBuffer [pkCount ];
498
+ ckBuffer = new ByteBuffer [ckCount ];
499
+ }
500
+
501
+ PartitionUpdate .Builder builder = null ;
502
+ ArrayDeque <Promise <Void >> results = new ArrayDeque <>();
503
+
504
+ if (deletionInfo .hasRanges ())
505
+ {
506
+ Iterator <RangeTombstone > iterator = deletionInfo .rangeIterator (false );
507
+ while (iterator .hasNext ())
508
+ {
509
+ RangeTombstone rt = iterator .next ();
510
+ ClusteringBound start = rt .deletedSlice ().start ();
511
+ ClusteringBound end = rt .deletedSlice ().end ();
512
+ if (start .size () < pkCount || end .size () < pkCount )
513
+ throw new InvalidRequestException ("Range deletions must specify a complete partition key in the underlying table " + metadata );
514
+
515
+ for (int i = 0 ; i < pkCount ; ++i )
516
+ {
517
+ if (0 != start .accessor ().compare (start .get (i ), end .get (i ), end .accessor ()))
518
+ throw new InvalidRequestException ("Range deletions must specify a single partition key in the underlying table " + metadata );
519
+ }
520
+
521
+ DecoratedKey key = remoteClusteringToLocalPartitionKey (local , start , pkCount , pkBuffer );
522
+ builder = maybeRolloverAndWait (key , builder , results , endpoint );
523
+ if (start .size () == pkCount && end .size () == pkCount )
524
+ {
525
+ builder .addPartitionDeletion (rt .deletionTime ());
526
+ }
527
+ else
528
+ {
529
+ start = ClusteringBound .create (start .kind (), Clustering .make (remoteClusteringToLocalClustering (start .clustering (), pkCount , ckBuffer )));
530
+ end = ClusteringBound .create (end .kind (), Clustering .make (remoteClusteringToLocalClustering (end .clustering (), pkCount , ckBuffer )));
531
+ builder .add (new RangeTombstone (Slice .make (start , end ), rt .deletionTime ()));
532
+ }
533
+ }
534
+ }
535
+
536
+ if (!update .staticRow ().isEmpty ())
537
+ throw new InvalidRequestException ("Static rows are not supported for remote table " + metadata );
538
+
539
+ try (BTree .FastBuilder <ColumnData > columns = BTree .fastBuilder ())
540
+ {
541
+ for (Row row : update )
542
+ {
543
+ Clustering <?> clustering = row .clustering ();
544
+ DecoratedKey key = remoteClusteringToLocalPartitionKey (local , clustering , pkCount , pkBuffer );
545
+ builder = maybeRolloverAndWait (key , builder , results , endpoint );
546
+ Clustering newClustering = Clustering .make (remoteClusteringToLocalClustering (clustering , pkCount , ckBuffer ));
547
+ columns .reset ();
548
+ for (ColumnData cd : row )
549
+ columns .add (rebind (local , cd ));
550
+ builder .add (BTreeRow .create (newClustering , row .primaryKeyLivenessInfo (), row .deletion (), columns .build ()));
551
+ }
552
+ }
553
+
554
+ if (builder != null )
555
+ results .add (send (Verb .VIRTUAL_MUTATION_REQ , new VirtualMutation (builder .build ()), endpoint ));
556
+
557
+ while (!results .isEmpty ())
558
+ results .pollFirst ().syncThrowUncheckedOnInterrupt ();
559
+ }
560
+
561
+ private PartitionUpdate .Builder maybeRolloverAndWait (DecoratedKey key , PartitionUpdate .Builder builder , ArrayDeque <Promise <Void >> waiting , InetAddressAndPort endpoint )
562
+ {
563
+ if (builder == null || !builder .partitionKey ().equals (key ))
564
+ {
565
+ if (builder != null )
566
+ waiting .add (send (Verb .VIRTUAL_MUTATION_REQ , new VirtualMutation (builder .build ()), endpoint ));
567
+ builder = new PartitionUpdate .Builder (local , key , local .regularAndStaticColumns (), 8 );
568
+ while (waiting .size () >= MAX_CONCURRENCY )
569
+ waiting .pollFirst ().syncThrowUncheckedOnInterrupt ();
570
+ }
571
+ return builder ;
572
+ }
573
+
574
+ private Promise <Void > truncate (InetAddressAndPort endpoint )
575
+ {
576
+ return send (Verb .TRUNCATE_REQ , new TruncateRequest (local .keyspace , local .name ), endpoint );
577
+ }
578
+
579
+ private static ColumnData rebind (TableMetadata local , ColumnData cd )
580
+ {
581
+ ColumnMetadata column = local .getColumn (cd .column ().name );
582
+
583
+ Invariants .require (column != null , cd .column () + " not found in " + local );
584
+ Invariants .require (!column .isComplex (), "Complex column " + column + " not supported; should have been removed from metadata" );
585
+
586
+ return ((Cell <?>) cd ).withUpdatedColumn (column );
587
+ }
588
+
589
+ private static DecoratedKey remoteClusteringToLocalPartitionKey (TableMetadata local , ClusteringPrefix clustering , int pkCount , ByteBuffer [] pkBuffer )
590
+ {
591
+ ByteBuffer bytes ;
592
+ if (pkCount == 1 ) bytes = clustering .bufferAt (0 );
593
+ else
594
+ {
595
+ for (int i = 0 ; i < pkBuffer .length ; ++i )
596
+ pkBuffer [i ] = clustering .bufferAt (i );
597
+ bytes = CompositeType .build (ByteBufferAccessor .instance , pkBuffer );
598
+ }
599
+ return local .partitioner .decorateKey (bytes );
600
+ }
601
+
602
+ private static ByteBuffer [] remoteClusteringToLocalClustering (ClusteringPrefix clustering , int pkCount , ByteBuffer [] ckBuffer )
603
+ {
604
+ for (int i = pkCount ; i < clustering .size (); ++i )
605
+ ckBuffer [i - pkCount ] = clustering .bufferAt (i );
606
+
607
+ return Arrays .copyOf (ckBuffer , clustering .size () - pkCount );
608
+ }
451
609
}
0 commit comments