1919
2020package org .apache .sysds .runtime .instructions .ooc ;
2121
22+ import org .apache .sysds .runtime .controlprogram .caching .OOCEvictionManager ;
2223import org .apache .sysds .runtime .controlprogram .parfor .LocalTaskQueue ;
2324import org .apache .sysds .runtime .instructions .spark .data .IndexedMatrixValue ;
2425
26+ import java .io .IOException ;
2527import java .util .ArrayList ;
28+ import java .util .UUID ;
2629
2730/**
2831 * A wrapper around LocalTaskQueue to consume the source stream and reset to
2932 * consume again for other operators.
33+ * <p>
34+ * Uses OOCEvictionManager for out-of-core caching.
3035 *
3136 */
3237public class ResettableStream extends LocalTaskQueue <IndexedMatrixValue > {
3338
3439 // original live stream
3540 private final LocalTaskQueue <IndexedMatrixValue > _source ;
3641
37- // in-memory cache to store stream for re-play
38- private final ArrayList <IndexedMatrixValue > _cache ;
42+ // stream identifier
43+ private final String _streamId ;
44+
45+ // list of block keys (only the keys)
46+ private final ArrayList <String > _blockKeys ;
47+
3948
4049 // state flags
4150 private boolean _cacheInProgress = true ; // caching in progress, in the first pass.
4251 private int _replayPosition = 0 ; // slider position in the stream
4352
53+ private OOCEvictionManager _manager ;
54+
4455 public ResettableStream (LocalTaskQueue <IndexedMatrixValue > source ) {
56+ this (source , UUID .randomUUID ().toString ());
57+ }
58+ public ResettableStream (LocalTaskQueue <IndexedMatrixValue > source , String streamId ) {
4559 _source = source ;
46- _cache = new ArrayList <>();
60+ _streamId = streamId ;
61+ _blockKeys = new ArrayList <>();
62+ // _cache = new ArrayList<>();
63+ _manager = OOCEvictionManager .getInstance ();
4764 }
4865
4966 /**
@@ -60,18 +77,33 @@ public synchronized IndexedMatrixValue dequeueTask()
6077 // First pass: Read value from the source and cache it, and return.
6178 IndexedMatrixValue task = _source .dequeueTask ();
6279 if (task != NO_MORE_TASKS ) {
63- _cache .add (new IndexedMatrixValue (task ));
80+ String key = _streamId + "_" + _blockKeys .size ();
81+ // _cache.add(new IndexedMatrixValue(task));
82+ _blockKeys .add (key );
83+
84+ try {
85+ _manager .put (key , task ); // Serialize
86+ } catch (IOException e ) {
87+ throw new RuntimeException (e );
88+ }
89+ return task ;
6490 } else {
6591 _cacheInProgress = false ; // caching is complete
6692 _source .closeInput (); // close source stream
93+
94+ // Notify all the waiting consumers waiting for cache to fill with this stream
95+ notifyAll ();
96+ return (IndexedMatrixValue ) NO_MORE_TASKS ;
6797 }
68- notifyAll (); // Notify all the waiting consumers waiting for cache to fill with this stream
69- return task ;
7098 } else {
71- // Replay pass: read directly from in-memory cache
72- if (_replayPosition < _cache .size ()) {
73- // Return a copy to ensure consumer won't modify the cache
74- return new IndexedMatrixValue (_cache .get (_replayPosition ++));
99+ // // Replay pass: read from the buffer
100+ if (_replayPosition < _blockKeys .size ()) {
101+ String key = _blockKeys .get (_replayPosition ++);
102+ try {
103+ return _manager .get (key ); // Deserialize
104+ } catch (IOException ex ) {
105+ throw new RuntimeException (ex );
106+ }
75107 } else {
76108 return (IndexedMatrixValue ) NO_MORE_TASKS ;
77109 }
0 commit comments