@@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned};
30
30
use crate :: difference:: Semigroup ;
31
31
use crate :: lattice:: Lattice ;
32
32
use crate :: trace:: { self , Trace , TraceReader , BatchReader , Batcher , Builder , Cursor } ;
33
- use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine } ;
33
+ use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine , VecChunker } ;
34
34
35
35
use trace:: wrappers:: enter:: { TraceEnter , BatchEnter , } ;
36
36
use trace:: wrappers:: enter_at:: TraceEnter as TraceEnterAt ;
76
76
use :: timely:: dataflow:: scopes:: Child ;
77
77
use :: timely:: progress:: timestamp:: Refines ;
78
78
use timely:: Container ;
79
- use timely:: container:: PushInto ;
79
+ use timely:: container:: { ContainerBuilder , PushInto } ;
80
80
81
81
impl < G , Tr > Arranged < G , Tr >
82
82
where
@@ -350,20 +350,22 @@ where
350
350
G : Scope < Timestamp : Lattice > ,
351
351
{
352
352
/// Arranges updates into a shared trace.
353
- fn arrange < Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
353
+ fn arrange < Chu , Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
354
354
where
355
- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
356
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
355
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
356
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
357
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
357
358
Tr : Trace < Time =G :: Timestamp > + ' static ,
358
359
{
359
- self . arrange_named :: < Ba , Bu , Tr > ( "Arrange" )
360
+ self . arrange_named :: < Chu , Ba , Bu , Tr > ( "Arrange" )
360
361
}
361
362
362
363
/// Arranges updates into a shared trace, with a supplied name.
363
- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
364
+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
364
365
where
365
- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
366
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
366
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
367
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
368
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
367
369
Tr : Trace < Time =G :: Timestamp > + ' static ,
368
370
;
369
371
}
@@ -375,14 +377,15 @@ where
375
377
V : ExchangeData ,
376
378
R : ExchangeData + Semigroup ,
377
379
{
378
- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
380
+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
379
381
where
380
- Ba : Batcher < Input =Vec < ( ( K , V ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
381
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
382
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , V ) , G :: Timestamp , R ) > > ,
383
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
384
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
382
385
Tr : Trace < Time =G :: Timestamp > + ' static ,
383
386
{
384
387
let exchange = Exchange :: new ( move |update : & ( ( K , V ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
385
- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . inner , exchange, name)
388
+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . inner , exchange, name)
386
389
}
387
390
}
388
391
@@ -391,12 +394,14 @@ where
391
394
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
392
395
/// It uses the supplied parallelization contract to distribute the data, which does not need to
393
396
/// be consistently by key (though this is the most common).
394
- pub fn arrange_core < G , P , Ba , Bu , Tr > ( stream : & StreamCore < G , Ba :: Input > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
397
+ pub fn arrange_core < G , P , C , Chu , Ba , Bu , Tr > ( stream : & StreamCore < G , C > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
395
398
where
396
399
G : Scope < Timestamp : Lattice > ,
397
- P : ParallelizationContract < G :: Timestamp , Ba :: Input > ,
398
- Ba : Batcher < Time =G :: Timestamp , Input : Container + Clone + ' static > + ' static ,
399
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
400
+ P : ParallelizationContract < G :: Timestamp , C > ,
401
+ C : Container + Clone + ' static ,
402
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
403
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
404
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
400
405
Tr : Trace < Time =G :: Timestamp > +' static ,
401
406
{
402
407
// The `Arrange` operator is tasked with reacting to an advancing input
@@ -445,6 +450,8 @@ where
445
450
// Initialize to the minimal input frontier.
446
451
let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
447
452
453
+ let mut chunker = Chu :: default ( ) ;
454
+
448
455
move |input, output| {
449
456
450
457
// As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
@@ -453,7 +460,11 @@ where
453
460
454
461
input. for_each ( |cap, data| {
455
462
capabilities. insert ( cap. retain ( ) ) ;
456
- batcher. push_container ( data) ;
463
+ chunker. push_into ( data) ;
464
+ while let Some ( chunk) = chunker. extract ( ) {
465
+ let chunk = std:: mem:: take ( chunk) ;
466
+ batcher. push_into ( chunk) ;
467
+ }
457
468
} ) ;
458
469
459
470
// The frontier may have advanced by multiple elements, which is an issue because
@@ -483,6 +494,11 @@ where
483
494
// If there is at least one capability not in advance of the input frontier ...
484
495
if capabilities. elements ( ) . iter ( ) . any ( |c| !input. frontier ( ) . less_equal ( c. time ( ) ) ) {
485
496
497
+ while let Some ( chunk) = chunker. finish ( ) {
498
+ let chunk = std:: mem:: take ( chunk) ;
499
+ batcher. push_into ( chunk) ;
500
+ }
501
+
486
502
let mut upper = Antichain :: new ( ) ; // re-used allocation for sealing batches.
487
503
488
504
// For each capability not in advance of the input frontier ...
@@ -549,14 +565,15 @@ impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K,
549
565
where
550
566
G : Scope < Timestamp : Lattice +Ord > ,
551
567
{
552
- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
568
+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
553
569
where
554
- Ba : Batcher < Input =Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
555
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
570
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > > ,
571
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
572
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
556
573
Tr : Trace < Time =G :: Timestamp > + ' static ,
557
574
{
558
575
let exchange = Exchange :: new ( move |update : & ( ( K , ( ) ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
559
- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
576
+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
560
577
}
561
578
}
562
579
@@ -589,7 +606,7 @@ where
589
606
}
590
607
591
608
fn arrange_by_key_named ( & self , name : & str ) -> Arranged < G , TraceAgent < ValSpine < K , V , G :: Timestamp , R > > > {
592
- self . arrange_named :: < ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
609
+ self . arrange_named :: < VecChunker < _ > , ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
593
610
}
594
611
}
595
612
@@ -624,6 +641,6 @@ where
624
641
625
642
fn arrange_by_self_named ( & self , name : & str ) -> Arranged < G , TraceAgent < KeySpine < K , G :: Timestamp , R > > > {
626
643
self . map ( |k| ( k, ( ) ) )
627
- . arrange_named :: < KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
644
+ . arrange_named :: < VecChunker < _ > , KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
628
645
}
629
646
}
0 commit comments