17
17
//! must accumulate to the same value as would an un-compacted trace.
18
18
19
19
use std:: collections:: { BTreeMap , HashMap } ;
20
+ use std:: convert:: TryInto ;
20
21
use std:: iter;
21
22
use std:: os:: unix:: ffi:: OsStringExt ;
22
23
use std:: path:: Path ;
23
24
use std:: thread;
24
- use std:: time:: Duration ;
25
+ use std:: time:: { Duration , Instant } ;
25
26
26
27
use failure:: { bail, ResultExt } ;
27
28
use futures:: executor:: block_on;
@@ -124,10 +125,14 @@ where
124
125
logical_compaction_window_ms : Option < Timestamp > ,
125
126
/// Instance count: number of times sources have been instantiated in views. This is used
126
127
/// to associate each new instance of a source with a unique instance id (iid)
127
- local_input_time : Timestamp ,
128
128
log : bool ,
129
129
executor : tokio:: runtime:: Handle ,
130
130
feedback_rx : Option < comm:: mpsc:: Receiver < WorkerFeedbackWithMeta > > ,
131
+ /// Remember the last assigned timestamp so that we can ensure monotonicity.
132
+ last_assigned : Timestamp ,
133
+ /// The startup time of the coordinator, from which local input timstamps are generated
134
+ /// relative to.
135
+ start_time : Instant ,
131
136
}
132
137
133
138
impl < C > Coordinator < C >
@@ -182,12 +187,13 @@ where
182
187
indexes : ArrangementFrontiers :: default ( ) ,
183
188
since_updates : Vec :: new ( ) ,
184
189
active_tails : HashMap :: new ( ) ,
185
- local_input_time : 1 ,
186
190
log : config. logging . is_some ( ) ,
187
191
executor : config. executor . clone ( ) ,
188
192
timestamp_config : config. timestamp ,
189
193
logical_compaction_window_ms,
190
194
feedback_rx : Some ( rx) ,
195
+ last_assigned : 1 ,
196
+ start_time : Instant :: now ( ) ,
191
197
} ;
192
198
193
199
let catalog_entries: Vec < _ > = coord
@@ -283,6 +289,23 @@ where
283
289
Ok ( coord)
284
290
}
285
291
292
+ pub fn now ( & mut self ) -> Timestamp {
293
+ let mut t = self
294
+ . start_time
295
+ . elapsed ( )
296
+ . as_millis ( )
297
+ . try_into ( )
298
+ . expect ( "system time did not fit in u64" ) ;
299
+ // This is a hack. In a perfect world we would represent time as having a "real" dimension
300
+ // and a "coordinator" dimension so that clients always observed linearizability from
301
+ // things the coordinator did without being related to the real dimension.
302
+ if t <= self . last_assigned {
303
+ t = self . last_assigned + 1
304
+ }
305
+ self . last_assigned = t;
306
+ t
307
+ }
308
+
286
309
pub fn serve ( & mut self , cmd_rx : futures:: channel:: mpsc:: UnboundedReceiver < Command > ) {
287
310
self . executor . clone ( ) . enter ( || self . serve_core ( cmd_rx) )
288
311
}
@@ -814,6 +837,7 @@ where
814
837
] ) {
815
838
Ok ( _) => {
816
839
self . views . insert ( source_id, ViewState :: new ( false , vec ! [ ] ) ) ;
840
+ let advance_to = self . now ( ) ;
817
841
broadcast (
818
842
& mut self . broadcast_tx ,
819
843
SequencedCommand :: CreateLocalInput {
@@ -824,7 +848,7 @@ where
824
848
keys : index. keys . clone ( ) ,
825
849
} ,
826
850
on_type : source. desc . typ ( ) . clone ( ) ,
827
- advance_to : self . local_input_time ,
851
+ advance_to,
828
852
} ,
829
853
) ;
830
854
self . insert_index ( index_id, & index, self . logical_compaction_window_ms ) ;
@@ -1403,23 +1427,24 @@ where
1403
1427
affected_rows : usize ,
1404
1428
kind : MutationKind ,
1405
1429
) -> Result < ExecuteResponse , failure:: Error > {
1430
+ let timestamp = self . last_assigned ;
1406
1431
let updates = updates
1407
1432
. into_iter ( )
1408
1433
. map ( |( row, diff) | Update {
1409
1434
row,
1410
1435
diff,
1411
- timestamp : self . local_input_time ,
1436
+ timestamp,
1412
1437
} )
1413
1438
. collect ( ) ;
1414
1439
1415
- self . local_input_time += 1 ;
1440
+ let advance_to = self . now ( ) ;
1416
1441
1417
1442
broadcast (
1418
1443
& mut self . broadcast_tx ,
1419
1444
SequencedCommand :: Insert {
1420
1445
id,
1421
1446
updates,
1422
- advance_to : self . local_input_time ,
1447
+ advance_to,
1423
1448
} ,
1424
1449
) ;
1425
1450
@@ -2032,9 +2057,9 @@ where
2032
2057
// In symbiosis mode, we enforce serializability by forcing all
2033
2058
// PEEKs to peek at the latest input time.
2034
2059
// TODO(benesch): should this be saturating subtraction, and what should happen
2035
- // when `self.local_input_time ` is zero?
2036
- assert ! ( self . local_input_time > 0 ) ;
2037
- return Ok ( self . local_input_time - 1 ) ;
2060
+ // when `self.last_assigned ` is zero?
2061
+ assert ! ( self . last_assigned > 0 ) ;
2062
+ return Ok ( self . last_assigned - 1 ) ;
2038
2063
}
2039
2064
2040
2065
// Each involved trace has a validity interval `[since, upper)`.
0 commit comments