@@ -65,7 +65,7 @@ start_link(#{name := Name} = Config) ->
6565 gen_server :start_link ({local , Name }, ? MODULE , [Config ], []).
6666
6767-spec accept_mem_tables (atom () | pid (),
68- #{ra_uid () => [{ets :tid (), ra : range ()}]},
68+ #{ra_uid () => [{ets :tid (), ra_seq : state ()}]},
6969 string ()) -> ok .
7070accept_mem_tables (_SegmentWriter , Tables , undefined )
7171 when map_size (Tables ) == 0 ->
@@ -261,7 +261,7 @@ get_overview(#state{data_dir = Dir,
261261 #{data_dir => Dir ,
262262 segment_conf => Conf }.
263263
264- flush_mem_table_ranges ({ServerUId , TidRanges0 },
264+ flush_mem_table_ranges ({ServerUId , TidSeqs0 },
265265 # state {system = System } = State ) ->
266266 SmallestIdx = smallest_live_idx (ServerUId ),
267267 % % TidRanges arrive here sorted new -> old.
@@ -270,31 +270,32 @@ flush_mem_table_ranges({ServerUId, TidRanges0},
270270 % % list of tid ranges to flush to disk
271271 % % now TidRanges are sorted old -> new, i.e the correct order of
272272 % % processing
273- TidRanges = lists :foldl (
274- fun ({T , Range0 }, []) ->
275- case ra_range :truncate (SmallestIdx - 1 , Range0 ) of
276- undefined ->
277- [];
278- Range ->
279- [{T , Range }]
280- end ;
281- ({T , Range0 }, [{_T , {Start , _ }} | _ ] = Acc ) ->
282- Range1 = ra_range :truncate (SmallestIdx - 1 , Range0 ),
283- case ra_range :limit (Start , Range1 ) of
284- undefined ->
285- Acc ;
286- Range ->
287- [{T , Range } | Acc ]
288- end
289- end , [], TidRanges0 ),
273+ TidSeqs = lists :foldl (
274+ fun ({T , Seq0 }, []) ->
275+ case ra_seq :floor (SmallestIdx , Seq0 ) of
276+ undefined ->
277+ [];
278+ Seq ->
279+ [{T , Seq }]
280+ end ;
281+ ({T , Seq0 }, [{_T , PrevSeq } | _ ] = Acc ) ->
282+ Start = ra_seq :first (PrevSeq ),
283+ Seq1 = ra_seq :floor (SmallestIdx , Seq0 ),
284+ case ra_seq :limit (Start , Seq1 ) of
285+ undefined ->
286+ Acc ;
287+ Seq ->
288+ [{T , Seq } | Acc ]
289+ end
290+ end , [], TidSeqs0 ),
290291
291292 SegRefs0 = lists :append (
292293 lists :reverse (
293294 % % segrefs are returned in appended order so new -> old
294295 % % so we need to reverse them so that the final appended list
295296 % % of segrefs is in the old -> new order
296- [flush_mem_table_range (ServerUId , TidRange , State )
297- || TidRange <- TidRanges ])),
297+ [flush_mem_table_range (ServerUId , TidSeq , State )
298+ || TidSeq <- TidSeqs ])),
298299
299300 % % compact cases where a segment was appended in a subsequent call to
300301 % % flush_mem_table_range
@@ -308,14 +309,14 @@ flush_mem_table_ranges({ServerUId, TidRanges0},
308309 [Seg | Acc ]
309310 end , [], SegRefs0 )),
310311
311- ok = send_segments (System , ServerUId , TidRanges0 , SegRefs ),
312+ ok = send_segments (System , ServerUId , TidSeqs0 , SegRefs ),
312313 ok .
313314
314- flush_mem_table_range (ServerUId , {Tid , { StartIdx0 , EndIdx } },
315+ flush_mem_table_range (ServerUId , {Tid , Seq },
315316 # state {data_dir = DataDir ,
316317 segment_conf = SegConf } = State ) ->
317318 Dir = filename :join (DataDir , binary_to_list (ServerUId )),
318- StartIdx = start_index (ServerUId , StartIdx0 ),
319+ % StartIdx = start_index(ServerUId, StartIdx0),
319320 case open_file (Dir , SegConf ) of
320321 enoent ->
321322 ? DEBUG (" segment_writer: skipping segment as directory ~ts does "
@@ -324,8 +325,7 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}},
324325 % % clean up the tables for this process
325326 [];
326327 Segment0 ->
327- case append_to_segment (ServerUId , Tid , StartIdx , EndIdx ,
328- Segment0 , State ) of
328+ case append_to_segment (ServerUId , Tid , Seq , Segment0 , State ) of
329329 undefined ->
330330 ? WARN (" segment_writer: skipping segments for ~w as
331331 directory ~ts disappeared whilst writing" ,
@@ -366,31 +366,37 @@ send_segments(System, ServerUId, TidRanges, SegRefs) ->
366366 [ServerUId , " No Pid" ]),
367367 % % delete from the memtable on the non-running server's behalf
368368 [begin
369+ % % TODO: HACK: this is a hack to get a full range out of a
370+ % % sequent, ideally the mt should take the ra_seq and
371+ % % delete from that
372+ Range = {ra_seq :first (Seq ), ra_seq :last (Seq )},
369373 _ = catch ra_mt :delete ({range , Tid , Range })
370- end || {Tid , Range } <- TidRanges ],
374+ end || {Tid , Seq } <- TidRanges ],
371375 ok ;
372376 Pid ->
373377 Pid ! {ra_log_event , {segments , TidRanges , SegRefs }},
374378 ok
375379 end .
376380
377- append_to_segment (UId , Tid , StartIdx0 , EndIdx , Seg , State ) ->
378- StartIdx = start_index (UId , StartIdx0 ),
379- % EndIdx + 1 because FP
380- append_to_segment (UId , Tid , StartIdx , EndIdx + 1 , Seg , [], State ).
381+ append_to_segment (UId , Tid , Seq0 , Seg , State ) ->
382+ FirstIdx = ra_seq :first (Seq0 ),
383+ StartIdx = start_index (UId , FirstIdx ),
384+ % % TODO combine flor and iterator into one operation
385+ Seq = ra_seq :floor (StartIdx , Seq0 ),
386+ SeqIter = ra_seq :iterator (Seq ),
387+ append_to_segment (UId , Tid , ra_seq :next (SeqIter ), Seg , [], State ).
381388
382- append_to_segment (_ , _ , StartIdx , EndIdx , Seg , Closed , _State )
383- when StartIdx >= EndIdx ->
389+ append_to_segment (_ , _ , end_of_seq , Seg , Closed , _State ) ->
384390 {Seg , Closed };
385- append_to_segment (UId , Tid , Idx , EndIdx , Seg0 , Closed , State ) ->
391+ append_to_segment (UId , Tid , { Idx , SeqIter } = Cur , Seg0 , Closed , State ) ->
386392 try ets :lookup (Tid , Idx ) of
387393 [] ->
388394 StartIdx = start_index (UId , Idx ),
389395 case Idx < StartIdx of
390396 true ->
391397 % % a snapshot must have been completed after we last checked
392398 % % the start idx, continue flush from new start index.
393- append_to_segment (UId , Tid , StartIdx , EndIdx , Seg0 ,
399+ append_to_segment (UId , Tid , next_ra_seq ( StartIdx , SeqIter ) , Seg0 ,
394400 Closed , State );
395401 false ->
396402 % % oh dear, an expected index was not found in the mem table.
@@ -419,7 +425,7 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
419425 % % the segment index but is probably good enough to get comparative
420426 % % data rates for different Ra components
421427 ok = counters :add (State # state .counter , ? C_BYTES_WRITTEN , DataSize ),
422- append_to_segment (UId , Tid , Idx + 1 , EndIdx , Seg , Closed , State );
428+ append_to_segment (UId , Tid , ra_seq : next ( SeqIter ) , Seg , Closed , State );
423429 {error , full } ->
424430 % close and open a new segment
425431 case open_successor_segment (Seg0 , State # state .segment_conf ) of
@@ -432,8 +438,14 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
432438 % % re-evaluate snapshot state for the server in case
433439 % % a snapshot has completed during segment flush
434440 StartIdx = start_index (UId , Idx ),
435- append_to_segment (UId , Tid , StartIdx , EndIdx , Seg ,
436- [Seg0 | Closed ], State )
441+ Next = case StartIdx == Idx of
442+ true ->
443+ Cur ;
444+ false ->
445+ next_ra_seq (StartIdx , SeqIter )
446+ end ,
447+ append_to_segment (UId , Tid , Next ,
448+ Seg , [Seg0 | Closed ], State )
437449 end ;
438450 {error , Posix } ->
439451 FileName = ra_log_segment :filename (Seg0 ),
@@ -549,3 +561,13 @@ maybe_upgrade_segment_file_names(System, DataDir) ->
549561 ok
550562 end .
551563
564+ next_ra_seq (Idx , Iter0 ) ->
565+ case ra_seq :next (Iter0 ) of
566+ end_of_seq ->
567+ end_of_seq ;
568+ {I , _ } = Next
569+ when I >= Idx ->
570+ Next ;
571+ {_ , Iter } ->
572+ next_ra_seq (Idx , Iter )
573+ end .
0 commit comments