@@ -49,40 +49,32 @@ using namespace SCIRun::Core::Geometry;
4949using namespace SCIRun ::Dataflow::Networks;
5050using namespace SCIRun ::Core::Thread;
5151
52- namespace Impl2Duplicated
52+ namespace SCIRun ::Modules::Basic
5353{
54- using DataChunk = DenseMatrixHandle ;
54+ using DataChunk = BundleHandle ;
5555 // TODO: need thread-safe container to share
5656 using DataStream = std::queue<DataChunk>;
5757
5858
5959 class StreamAppender
6060 {
6161 public:
62- StreamAppender (AsyncStreamingTest* module , DenseMatrixHandle input) : module_(module ), input_(input) {}
63-
64- bool hasData () const
65- {
66- return sliceIndex_ < input_->nrows ();
67- }
68-
69- int numDataAppended () const { return sliceIndex_; }
62+ explicit StreamAppender (SimulationStreamingReaderBase* module ) : module_(module ) {}
7063
7164 DataStream& stream () { return stream_; }
7265
7366 void pushDataToStream ()
7467 {
7568 logInfo (" __SR__ ........starting streaming reader" );
7669
77- while (hasData ())
70+ while (module_-> hasData ())
7871 {
79- auto value = makeShared<DenseMatrix>(input_-> row (sliceIndex_) );
72+ auto value = module_-> nextData ( );
8073
81- logInfo (" __SR__ >>> pushing new data object: [{}] " , sliceIndex_ );
74+ logInfo (" __SR__ >>> pushing new data object: bundleOfSize {} " , value-> size () );
8275 {
83- Guard g (dataMutex .get ());
76+ Guard g (dataMutex_ .get ());
8477 stream_.push (value);
85- sliceIndex_++;
8678 }
8779 logInfo (" __SR__ : waiting for {} ms" , appendWaitTime_);
8880 std::this_thread::sleep_for (std::chrono::milliseconds (appendWaitTime_));
@@ -96,7 +88,7 @@ namespace Impl2Duplicated
9688
9789 void waitAndOutputEach ()
9890 {
99- if (hasData ())
91+ if (module_-> hasData ())
10092 {
10193 // wait for result.
10294 while (stream ().empty ())
@@ -108,60 +100,70 @@ namespace Impl2Duplicated
108100 // once data is available, output to ports
109101 auto data = stream ().front ();
110102 {
111- Guard g (dataMutex .get ());
103+ Guard g (dataMutex_ .get ());
112104 stream ().pop ();
113105 }
114106
115- logInfo (" __MAIN__ Received data: [{}] outputting matrix ." , (* data)( 0 , 0 ));
116- module_->sendOutput (module_->OutputSlice , bundleOutputs ({ " Slice " }, { data }) );
107+ logInfo (" __MAIN__ Received data: outputting bundle of size {} ." , data-> size ( ));
108+ module_->sendOutput (module_->OutputData , data);
117109
118110 logInfo (" __MAIN__ Enqueue execute again" );
119111 module_->enqueueExecuteAgain (false );
120112 }
113+ else
114+ module_->shutdownStream ();
121115 }
122116
123117 private:
124- AsyncStreamingTest* module_;
125- DenseMatrixHandle input_;
118+ SimulationStreamingReaderBase* module_;
126119 DataStream stream_;
127120 const int appendWaitTime_ = 2000 ;
128- int sliceIndex_{ 0 };
129121
130122 std::future<void > f_;
131- Mutex dataMutex { " test " };
123+ Mutex dataMutex_ { " streamingData " };
132124 };
133125}
134126
135- namespace algo
127+ namespace openPMDStub
136128{
129+ class IndexedIteration
130+ {
131+ public:
132+ int iterationIndex;
133+ void seriesFlush () const {}
134+ void close () const {}
135+ };
137136
138- class IndexedIteration
139- {
140- public:
141- int iterationIndex;
142- void seriesFlush () const {}
143- void close () const {}
144- };
137+ enum class Access
138+ {
139+ READ_ONLY
140+ };
145141
146- enum class Access
147- {
148- READ_ONLY
149- };
142+ using IndexedIterationContainer = std::vector<IndexedIteration>;
143+ using IndexedIterationIterator = IndexedIterationContainer::iterator;
150144
151- class Series
152- {
153- public:
154- Series (const std::string&, Access) {}
155- std::vector<IndexedIteration> readIterations () const { return {}; }
156- };
145+ class Series
146+ {
147+ public:
148+ Series () {}
149+ Series (const std::string&, Access) {}
150+ IndexedIterationContainer readIterations () const { return {}; }
151+ };
152+ }
157153
158154#define openPMDIsAvailable 0
159155
160- class PIConGPUReader_Stub
156+ namespace SCIRun ::Modules::Basic
157+ {
158+
159+ class SimulationStreamingReaderBaseImpl
161160{
162- SimulationStreamingReaderBase* module_;
161+ // SimulationStreamingReaderBase* module_;
163162public:
164- explicit PIConGPUReader_Stub (SimulationStreamingReaderBase* module ) : module_(module ) {}
163+ // explicit SimulationStreamingReaderBaseImpl(SimulationStreamingReaderBase* module) : module_(module) {}
164+
165+ openPMDStub::Series series;
166+ std::optional<openPMDStub::IndexedIterationIterator> iterationIterator;
165167
166168 FieldHandle particleData (/* int buffer_size, float component_x[], float component_y[], float component_z[]*/ )
167169 {
@@ -222,15 +224,15 @@ class PIConGPUReader_Stub
222224 return ofh;
223225 }
224226
225- Series getSeries (const std::string& SST_dir)
227+ openPMDStub:: Series getSeries (const std::string& SST_dir)
226228 {
227229 // Wait for simulation output data to be generated and posted via SST
228230 while (!std::filesystem::exists (SST_dir))
229231 std::this_thread::sleep_for (std::chrono::seconds (1 ));
230- return Series (SST_dir, Access::READ_ONLY);
232+ return openPMDStub:: Series (SST_dir, openPMDStub:: Access::READ_ONLY);
231233 }
232234
233- void setupStuff (const IndexedIteration& iteration)
235+ void setupStuff (const openPMDStub:: IndexedIteration& iteration)
234236 {
235237 std::cout << " \n From PIConGPUReader: Current iteration is: " << iteration.iterationIndex << std::endl;
236238
@@ -283,7 +285,7 @@ class PIConGPUReader_Stub
283285#endif
284286 }
285287
286- void scalarFieldSetup1 (const IndexedIteration& /* iteration*/ )
288+ void scalarFieldSetup1 (const openPMDStub:: IndexedIteration& /* iteration*/ )
287289 {
288290#if openPMDIsAvailable
289291 std::string scalar_field_component = " e_all_chargeDensity" ;
@@ -320,15 +322,15 @@ class PIConGPUReader_Stub
320322#endif
321323 }
322324
323- FieldHandle makeParticleOutput (const IndexedIteration& iteration)
325+ FieldHandle makeParticleOutput (const openPMDStub:: IndexedIteration& iteration)
324326 {
325327 setupStuff (iteration);
326328 iteration.seriesFlush ();
327329 dataForParticleField ();
328330 return particleData (/* buffer_size, component_x, component_y, component_z*/ );
329331 }
330332
331- FieldHandle makeScalarOutput (const IndexedIteration& iteration)
333+ FieldHandle makeScalarOutput (const openPMDStub:: IndexedIteration& iteration)
332334 {
333335 scalarFieldSetup1 (iteration);
334336 iteration.seriesFlush ();
@@ -339,7 +341,7 @@ class PIConGPUReader_Stub
339341 return scalarField (buffer_size_sFD, extent_sFD/* , scalarFieldData_buffer*/ );
340342 }
341343
342- FieldHandle makeVectorOutput (const IndexedIteration& iteration)
344+ FieldHandle makeVectorOutput (const openPMDStub:: IndexedIteration& iteration)
343345 {
344346 vectorFieldSetup1 ();
345347 iteration.seriesFlush ();
@@ -349,11 +351,12 @@ class PIConGPUReader_Stub
349351 return vectorField (extent_vFD /* , vFD_component_x, vFD_component_y, vFD_component_z*/ );
350352 }
351353
354+ #if 0
352355 void executeImpl()
353356 {
354357 if (module_->needToExecute())
355358 {
356- auto series = getSeries ( " /home/kj/scratch/runs/SST/simOutput/openPMD/simData.sst " );
359+
357360
358361 for (const auto& iteration : series.readIterations())
359362 {
@@ -362,21 +365,21 @@ class PIConGPUReader_Stub
362365 // sendOutput(module_->VectorField, );
363366
364367 module_->sendOutput(module_->OutputData,
365- bundleOutputs ({" Particles" , " ScalarField" , " VectorField" },
366- {makeParticleOutput (iteration), makeScalarOutput (iteration), makeVectorOutput (iteration)}
367- ));
368+ );
368369
369370 iteration.close();
370371 }
371372 }
372373 }
374+ #endif
373375};
374376}
375377
376378MODULE_INFO_DEF (SimulationStreamingReaderBase, Basic, SCIRun)
377379
378380SimulationStreamingReaderBase::SimulationStreamingReaderBase()
379- : Module(staticInfo_, false )
381+ : Module(staticInfo_, false ),
382+ impl_(new SimulationStreamingReaderBaseImpl)
380383{
381384 INITIALIZE_PORT (OutputData);
382385}
@@ -390,20 +393,47 @@ void SimulationStreamingReaderBase::setStateDefaults()
390393
391394void SimulationStreamingReaderBase::execute ()
392395{
393- // SCIRun::Core::Logging::GeneralLog::Instance().setVerbose(true);
394- //
395- // logCritical("AsyncStreamingTest Execute called");
396- //
397- // auto input = getRequiredInput(InputMatrix);
398- //
399- // if (needToExecute())
400- // {
401- // logInfo("__MAIN__ Resetting impl/async thread");
402- // impl_ = std::make_unique<StreamAppender>(this, castMatrix::toDense(input));
403- // impl_->beginPushDataAsync();
404- // }
405- //
406- // impl_->waitAndOutputEach();
396+ SCIRun::Core::Logging::GeneralLog::Instance ().setVerbose (true );
397+
398+ logCritical (" SimulationStreamingReaderBase Execute called" );
399+
400+ setupStream ();
401+
402+ if (needToExecute ())
403+ {
404+ logInfo (" __MAIN__ Resetting impl/async thread" );
405+ streamer_ = std::make_unique<StreamAppender>(this );
406+ streamer_->beginPushDataAsync ();
407+ }
408+
409+ streamer_->waitAndOutputEach ();
410+ }
411+
412+ void SimulationStreamingReaderBase::setupStream ()
413+ {
414+ impl_->series = impl_->getSeries (" /home/kj/scratch/runs/SST/simOutput/openPMD/simData.sst" );
415+ }
416+
417+ void SimulationStreamingReaderBase::shutdownStream ()
418+ {
419+ impl_->series = {};
420+ }
421+
422+ bool SimulationStreamingReaderBase::hasData () const
423+ {
424+ if (!impl_->iterationIterator )
425+ impl_->iterationIterator = impl_->series .readIterations ().begin ();
426+
427+ return *(impl_->iterationIterator ) != impl_->series .readIterations ().end ();
428+ }
429+
430+ BundleHandle SimulationStreamingReaderBase::nextData () const
431+ {
432+ ++(*(impl_->iterationIterator ));
433+ const auto & ii = *(*(impl_->iterationIterator ));
434+
435+ return bundleOutputs ({" Particles" , " ScalarField" , " VectorField" },
436+ {impl_->makeParticleOutput (ii), impl_->makeScalarOutput (ii), impl_->makeVectorOutput (ii)});
407437}
408438
409439Core::Datatypes::BundleHandle SCIRun::Modules::Basic::bundleOutputs (std::initializer_list<std::string> names, std::initializer_list<DatatypeHandle> dataList)
0 commit comments