@@ -56,27 +56,23 @@ namespace SCIRun::Modules::Basic
5656 using DataStream = std::queue<DataChunk>;
5757
5858
59- class StreamAppender
59+ class StreamAppenderImpl
6060 {
6161 public:
62- explicit StreamAppender (SimulationStreamingReaderBase* module ) : module_(module ) {}
62+ explicit StreamAppenderImpl (SimulationStreamingReaderBase* module ) : module_(module ) {}
6363
6464 DataStream& stream () { return stream_; }
6565
6666 void pushDataToStream ()
6767 {
68- logInfo (" __SR__ ........starting streaming reader" );
69-
7068 while (module_->hasData ())
7169 {
7270 auto value = module_->nextData ();
7371
74- logInfo (" __SR__ >>> pushing new data object: bundleOfSize {}" , value->size ());
7572 {
7673 Guard g (dataMutex_.get ());
7774 stream_.push (value);
7875 }
79- logInfo (" __SR__ : waiting for {} ms" , appendWaitTime_);
8076 std::this_thread::sleep_for (std::chrono::milliseconds (appendWaitTime_));
8177 }
8278 }
@@ -88,28 +84,21 @@ namespace SCIRun::Modules::Basic
8884
8985 void waitAndOutputEach ()
9086 {
91- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
9287 if (module_->hasData ())
9388 {
94- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
9589 // wait for result.
9690 while (stream ().empty ())
9791 {
98- logInfo (" __MAIN__ Waiting for data" );
9992 std::this_thread::sleep_for (std::chrono::milliseconds (1000 ));
10093 }
101- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
10294 // once data is available, output to ports
10395 auto data = stream ().front ();
10496 {
10597 Guard g (dataMutex_.get ());
10698 stream ().pop ();
10799 }
108100
109- logInfo (" __MAIN__ Received data: outputting bundle of size {}." , data->size ());
110101 module_->sendOutput (module_->OutputData , data);
111-
112- logInfo (" __MAIN__ Enqueue execute again" );
113102 module_->enqueueExecuteAgain (false );
114103 }
115104 else
@@ -131,7 +120,7 @@ namespace openPMDStub
131120 class IndexedIteration
132121 {
133122 public:
134- int iterationIndex;
123+ int iterationIndex{ 0 } ;
135124 IndexedIteration (int i) : iterationIndex(i) {}
136125 void seriesFlush () const {}
137126 void close () const {}
@@ -143,14 +132,14 @@ namespace openPMDStub
143132 };
144133
145134 using IndexedIterationContainer = std::vector<IndexedIteration>;
146- using IndexedIterationIterator = IndexedIterationContainer::iterator ;
135+ using IndexedIterationIterator = IndexedIterationContainer::const_iterator ;
147136
148137 class Series
149138 {
150139 public:
151140 Series () {}
152141 Series (const std::string&, Access) {}
153- IndexedIterationContainer readIterations () const { return dummySeriesData_; }
142+ const IndexedIterationContainer& readIterations () const { return dummySeriesData_; }
154143 private:
155144 IndexedIterationContainer dummySeriesData_ {1 ,2 ,3 ,4 ,5 ,6 };
156145 };
@@ -163,21 +152,19 @@ namespace SCIRun::Modules::Basic
163152
164153class SimulationStreamingReaderBaseImpl
165154{
166- // SimulationStreamingReaderBase* module_;
167155public:
168- // explicit SimulationStreamingReaderBaseImpl(SimulationStreamingReaderBase* module) : module_(module) {}
169-
170156 openPMDStub::Series series;
171- std::optional<openPMDStub::IndexedIterationIterator> iterationIterator;
157+ mutable openPMDStub::IndexedIterationIterator iterationIterator, iterationIteratorEnd;
158+ bool setup_{ false };
172159
173160 FieldHandle particleData (/* int buffer_size, float component_x[], float component_y[], float component_z[]*/ )
174161 {
175162 FieldInformation pcfi (" PointCloudMesh" ,0 ," int" );
176163 FieldHandle ofh = CreateField (pcfi);
177- /*
164+ # if openPMDIsAvailable
178165 VMesh* omesh = ofh->vmesh ();
179166 for (VMesh::Node::index_type p=0 ; p < buffer_size; p++) omesh->add_point (Point (component_z[p],component_y[p],component_x[p]));
180- */
167+ # endif
181168 return ofh;
182169 }
183170
@@ -187,14 +174,14 @@ class SimulationStreamingReaderBaseImpl
187174 std::vector<float > values (numvals, dataStub);
188175 MeshHandle mesh = CreateMesh (lfi,extent_sFD[0 ], extent_sFD[1 ], extent_sFD[2 ], Point (0.0 ,0.0 ,0.0 ), Point (extent_sFD[0 ],extent_sFD[1 ],extent_sFD[2 ]));
189176 FieldHandle ofh = CreateField (lfi,mesh);
190- /*
177+ # if openPMDIsAvailable
191178 for (int i=0 ; i<extent_sFD[0 ]; i++) for (int j=0 ; j<extent_sFD[1 ]; j++) for (int k=0 ; k<extent_sFD[2 ]; k++)
192179 {
193180 int flat_index = i*extent_sFD[1 ]*extent_sFD[2 ]+j*extent_sFD[2 ]+k;
194181 int c_m_index = k*extent_sFD[0 ]*extent_sFD[1 ]+j*extent_sFD[0 ]+i;
195182 values[c_m_index] = scalarFieldData_buffer.get ()[flat_index];
196183 }
197- */
184+ # endif
198185 VField* ofield = ofh->vfield ();
199186 ofield->set_values (values);
200187
@@ -241,7 +228,7 @@ class SimulationStreamingReaderBaseImpl
241228
242229 void setupStuff (const openPMDStub::IndexedIteration& iteration)
243230 {
244- std::cout << " \n From PIConGPUReader: Current iteration is: " << iteration.iterationIndex << std::endl;
231+ // std::cout << "\nFrom PIConGPUReader: Current iteration is: " << iteration.iterationIndex << std::endl;
245232
246233#if openPMDIsAvailable
247234 std::string particle_type = " e" ; // set particle related input variables
@@ -357,28 +344,6 @@ class SimulationStreamingReaderBaseImpl
357344 std::vector<long unsigned int > extent_vFD {2 ,3 ,4 };
358345 return vectorField (extent_vFD /* , vFD_component_x, vFD_component_y, vFD_component_z*/ );
359346 }
360-
361- #if 0
362- void executeImpl()
363- {
364- if (module_->needToExecute())
365- {
366-
367-
368- for (const auto& iteration : series.readIterations())
369- {
370- // sendOutput(module_->Particles, );
371- // sendOutput(module_->ScalarField, );
372- // sendOutput(module_->VectorField, );
373-
374- module_->sendOutput(module_->OutputData,
375- );
376-
377- iteration.close();
378- }
379- }
380- }
381- #endif
382347};
383348}
384349
@@ -400,60 +365,43 @@ void SimulationStreamingReaderBase::setStateDefaults()
400365
401366void SimulationStreamingReaderBase::execute ()
402367{
403- SCIRun::Core::Logging::GeneralLog::Instance ().setVerbose (true );
404-
405- logCritical (" SimulationStreamingReaderBase Execute called" );
406-
407- setupStream ();
408-
409- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
410368 if (needToExecute ())
411369 {
412- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
413- logInfo (" __MAIN__ Resetting impl/async thread" );
414- streamer_ = std::make_unique<StreamAppender>(this );
415- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
370+ setupStream ();
371+ streamer_ = std::make_unique<StreamAppenderImpl>(this );
416372 streamer_->beginPushDataAsync ();
417- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
418373 }
419374
420- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
421- if (!streamer_)
422- {
423- logCritical (" ????SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
424- }
425375 streamer_->waitAndOutputEach ();
426- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
427376}
428377
429378void SimulationStreamingReaderBase::setupStream ()
430379{
431- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
432- impl_->series = impl_->getSeries (" /home/kj/scratch/runs/SST/simOutput/openPMD/simData.sst" );
380+ if (!impl_->setup_ )
381+ {
382+ impl_->series = impl_->getSeries (" /home/kj/scratch/runs/SST/simOutput/openPMD/simData.sst" );
383+ impl_->iterationIterator = impl_->series .readIterations ().cbegin ();
384+ impl_->iterationIteratorEnd = impl_->series .readIterations ().cend ();
385+ impl_->setup_ = true ;
386+ }
433387}
434388
435389void SimulationStreamingReaderBase::shutdownStream ()
436390{
437- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
438391 impl_->series = {};
392+ impl_->iterationIterator = {};
393+ impl_->iterationIteratorEnd = {};
394+ impl_->setup_ = false ;
439395}
440396
441397bool SimulationStreamingReaderBase::hasData () const
442398{
443- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
444- if (!impl_->iterationIterator )
445- impl_->iterationIterator = impl_->series .readIterations ().begin ();
446- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
447- return *(impl_->iterationIterator ) != impl_->series .readIterations ().end ();
399+ return impl_->iterationIterator != impl_->iterationIteratorEnd ;
448400}
449401
450402BundleHandle SimulationStreamingReaderBase::nextData () const
451403{
452- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
453- ++(*(impl_->iterationIterator ));
454- const auto & ii = *(*(impl_->iterationIterator ));
455-
456- logCritical (" SimulationStreamingReaderBase {} {}" , __FUNCTION__, __LINE__);
404+ const auto & ii = *(impl_->iterationIterator ++);
457405
458406 return bundleOutputs ({" Particles" , " ScalarField" , " VectorField" },
459407 {impl_->makeParticleOutput (ii), impl_->makeScalarOutput (ii), impl_->makeVectorOutput (ii)});
0 commit comments