@@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30
30
use crate :: difference:: Semigroup ;
31
31
use crate :: lattice:: Lattice ;
32
32
use crate :: trace:: { self , Trace , TraceReader , BatchReader , Batcher , Builder , Cursor } ;
33
- use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine } ;
33
+ use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine , VecChunker } ;
34
34
35
35
use trace:: wrappers:: enter:: { TraceEnter , BatchEnter , } ;
36
36
use trace:: wrappers:: enter_at:: TraceEnter as TraceEnterAt ;
76
76
use :: timely:: dataflow:: scopes:: Child ;
77
77
use :: timely:: progress:: timestamp:: Refines ;
78
78
use timely:: Container ;
79
- use timely:: container:: PushInto ;
79
+ use timely:: container:: { ContainerBuilder , PushInto } ;
80
80
81
81
impl < G , Tr > Arranged < G , Tr >
82
82
where
@@ -348,20 +348,22 @@ where
348
348
G : Scope < Timestamp : Lattice > ,
349
349
{
350
350
/// Arranges updates into a shared trace.
351
- fn arrange < Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
351
+ fn arrange < Chu , Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
352
352
where
353
- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
354
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
353
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
354
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
355
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
355
356
Tr : Trace < Time =G :: Timestamp > + ' static ,
356
357
{
357
- self . arrange_named :: < Ba , Bu , Tr > ( "Arrange" )
358
+ self . arrange_named :: < Chu , Ba , Bu , Tr > ( "Arrange" )
358
359
}
359
360
360
361
/// Arranges updates into a shared trace, with a supplied name.
361
- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
362
+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
362
363
where
363
- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
364
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
364
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
365
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
366
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
365
367
Tr : Trace < Time =G :: Timestamp > + ' static ,
366
368
;
367
369
}
@@ -373,14 +375,15 @@ where
373
375
V : ExchangeData ,
374
376
R : ExchangeData + Semigroup ,
375
377
{
376
- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
378
+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
377
379
where
378
- Ba : Batcher < Input =Vec < ( ( K , V ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
379
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
380
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , V ) , G :: Timestamp , R ) > > ,
381
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
382
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
380
383
Tr : Trace < Time =G :: Timestamp > + ' static ,
381
384
{
382
385
let exchange = Exchange :: new ( move |update : & ( ( K , V ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
383
- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . inner , exchange, name)
386
+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . inner , exchange, name)
384
387
}
385
388
}
386
389
@@ -389,12 +392,14 @@ where
389
392
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
390
393
/// It uses the supplied parallelization contract to distribute the data, which does not need to
391
394
/// be consistently by key (though this is the most common).
392
- pub fn arrange_core < G , P , Ba , Bu , Tr > ( stream : & StreamCore < G , Ba :: Input > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
395
+ pub fn arrange_core < G , P , C , Chu , Ba , Bu , Tr > ( stream : & StreamCore < G , C > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
393
396
where
394
397
G : Scope < Timestamp : Lattice > ,
395
- P : ParallelizationContract < G :: Timestamp , Ba :: Input > ,
396
- Ba : Batcher < Time =G :: Timestamp , Input : Container + Clone + ' static > + ' static ,
397
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
398
+ P : ParallelizationContract < G :: Timestamp , C > ,
399
+ C : Container + Clone + ' static ,
400
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
401
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
402
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
398
403
Tr : Trace < Time =G :: Timestamp > +' static ,
399
404
{
400
405
// The `Arrange` operator is tasked with reacting to an advancing input
@@ -443,6 +448,8 @@ where
443
448
// Initialize to the minimal input frontier.
444
449
let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
445
450
451
+ let mut chunker = Chu :: default ( ) ;
452
+
446
453
move |input, output| {
447
454
448
455
// As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
@@ -451,7 +458,11 @@ where
451
458
452
459
input. for_each ( |cap, data| {
453
460
capabilities. insert ( cap. retain ( ) ) ;
454
- batcher. push_container ( data) ;
461
+ chunker. push_into ( data) ;
462
+ while let Some ( chunk) = chunker. extract ( ) {
463
+ let chunk = std:: mem:: take ( chunk) ;
464
+ batcher. push_into ( chunk) ;
465
+ }
455
466
} ) ;
456
467
457
468
// The frontier may have advanced by multiple elements, which is an issue because
@@ -481,6 +492,11 @@ where
481
492
// If there is at least one capability not in advance of the input frontier ...
482
493
if capabilities. elements ( ) . iter ( ) . any ( |c| !input. frontier ( ) . less_equal ( c. time ( ) ) ) {
483
494
495
+ while let Some ( chunk) = chunker. finish ( ) {
496
+ let chunk = std:: mem:: take ( chunk) ;
497
+ batcher. push_into ( chunk) ;
498
+ }
499
+
484
500
let mut upper = Antichain :: new ( ) ; // re-used allocation for sealing batches.
485
501
486
502
// For each capability not in advance of the input frontier ...
@@ -547,14 +563,15 @@ impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K,
547
563
where
548
564
G : Scope < Timestamp : Lattice +Ord > ,
549
565
{
550
- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
566
+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
551
567
where
552
- Ba : Batcher < Input =Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
553
- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
568
+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > > ,
569
+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
570
+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
554
571
Tr : Trace < Time =G :: Timestamp > + ' static ,
555
572
{
556
573
let exchange = Exchange :: new ( move |update : & ( ( K , ( ) ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
557
- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
574
+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
558
575
}
559
576
}
560
577
@@ -587,7 +604,7 @@ where
587
604
}
588
605
589
606
fn arrange_by_key_named ( & self , name : & str ) -> Arranged < G , TraceAgent < ValSpine < K , V , G :: Timestamp , R > > > {
590
- self . arrange_named :: < ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
607
+ self . arrange_named :: < VecChunker < _ > , ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
591
608
}
592
609
}
593
610
@@ -622,6 +639,6 @@ where
622
639
623
640
fn arrange_by_self_named ( & self , name : & str ) -> Arranged < G , TraceAgent < KeySpine < K , G :: Timestamp , R > > > {
624
641
self . map ( |k| ( k, ( ) ) )
625
- . arrange_named :: < KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
642
+ . arrange_named :: < VecChunker < _ > , KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
626
643
}
627
644
}
0 commit comments