Skip to content

Commit 935aa80

Browse files
committed
Fix to make ThreadStatistics a unique pointer in vector
1 parent c96fb9e commit 935aa80

File tree

4 files changed

+42
-36
lines changed

4 files changed

+42
-36
lines changed

include/particlezoo/parallel/HistoryBalancedParallelReader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ namespace ParticleZoo {
228228
};
229229

230230
std::vector<std::shared_ptr<PhaseSpaceFileReader>> readers_;
231-
std::vector<ThreadStatistics> threadStats_;
231+
std::vector<std::unique_ptr<ThreadStatistics>> threadStats_;
232232
std::vector<std::uint64_t> startingHistorys_;
233233

234234
bool hasNativeRepresentedHistoryCount_;

include/particlezoo/parallel/ParticleBalancedParallelReader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ namespace ParticleZoo {
231231

232232
std::vector<std::shared_ptr<PhaseSpaceFileReader>> readers_;
233233
std::vector<std::uint64_t> startingParticleIndex_;
234-
std::vector<ThreadStatistics> threadStats_;
234+
std::vector<std::unique_ptr<ThreadStatistics>> threadStats_;
235235

236236
bool hasNativeRepresentedHistoryCount_;
237237
bool hasNativeIncrementalHistoryCounters_;

src/parallel/HistoryBalancedParallelReader.cc

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,21 @@ namespace ParticleZoo {
6666

6767
// Calculate starting history for each thread
6868
startingHistorys_.reserve(numThreads);
69-
threadStats_.resize(numThreads);
69+
threadStats_.reserve(numThreads);
70+
for (size_t i = 0; i < numThreads; ++i) {
71+
threadStats_.emplace_back(std::make_unique<ThreadStatistics>());
72+
}
7073
std::uint64_t historiesPerThread = numberOfRepresentedHistories_ / numThreads;
7174
std::uint64_t remainderHistories = numberOfRepresentedHistories_ % numThreads;
7275
std::uint64_t currentStartingHistory = 0;
7376
for (size_t i = 0; i < numThreads; ++i) {
7477
startingHistorys_.push_back(currentStartingHistory);
75-
threadStats_[i].historiesRead = currentStartingHistory;
78+
threadStats_[i]->historiesRead = currentStartingHistory;
7679
// Calculate the correct initial error for this thread based on how many
7780
// represented histories have been "processed" before this thread's starting point
7881
if (hasGapsBetweenHistories_) {
7982
std::uint64_t initialError = numberOfRepresentedHistories_ / 2;
80-
threadStats_[i].emptyHistoryError = (initialError + currentStartingHistory * perHistoryErrorContribution_) % numberOfRepresentedHistories_;
83+
threadStats_[i]->emptyHistoryError = (initialError + currentStartingHistory * perHistoryErrorContribution_) % numberOfRepresentedHistories_;
8184
}
8285
currentStartingHistory += historiesPerThread;
8386
if (i < remainderHistories) {
@@ -122,8 +125,8 @@ namespace ParticleZoo {
122125
}
123126

124127
// check the cache first
125-
if (threadStats_[threadIndex].hasMoreParticlesCache != NEEDS_CHECKING) {
126-
return threadStats_[threadIndex].hasMoreParticlesCache == HAS_MORE_PARTICLES;
128+
if (threadStats_[threadIndex]->hasMoreParticlesCache != NEEDS_CHECKING) {
129+
return threadStats_[threadIndex]->hasMoreParticlesCache == HAS_MORE_PARTICLES;
127130
}
128131

129132
// Check if the reader has more particles
@@ -137,7 +140,7 @@ namespace ParticleZoo {
137140
: numberOfRepresentedHistories_;
138141

139142
// Check if we have completed all histories for this thread
140-
if (threadStats_[threadIndex].historiesRead >= targetHistories) {
143+
if (threadStats_[threadIndex]->historiesRead >= targetHistories) {
141144
// Check if the next particle would start a new history
142145
const Particle nextParticle = readers_[threadIndex]->peekNextParticle();
143146
// If the next particle starts a new history, we have no more particles for this thread
@@ -146,7 +149,7 @@ namespace ParticleZoo {
146149
}
147150

148151
// Update the cache
149-
threadStats_[threadIndex].hasMoreParticlesCache = hasMoreParticles ? HAS_MORE_PARTICLES : NO_MORE_PARTICLES;
152+
threadStats_[threadIndex]->hasMoreParticlesCache = hasMoreParticles ? HAS_MORE_PARTICLES : NO_MORE_PARTICLES;
150153

151154
return hasMoreParticles;
152155
}
@@ -161,33 +164,33 @@ namespace ParticleZoo {
161164
}
162165

163166
// reset the cache since we are consuming a particle
164-
threadStats_[threadIndex].hasMoreParticlesCache = NEEDS_CHECKING;
167+
threadStats_[threadIndex]->hasMoreParticlesCache = NEEDS_CHECKING;
165168

166169
// Get the next particle from the appropriate reader
167170
Particle particle = readers_[threadIndex]->getNextParticle();
168171

169172
// Update history count if this particle starts a new history
170173
int32_t incrementalHistoryNumber = 0;
171174
{
172-
std::lock_guard<std::mutex> lock(threadStats_[threadIndex].mutex);
173-
threadStats_[threadIndex].particlesRead++;
175+
std::lock_guard<std::mutex> lock(threadStats_[threadIndex]->mutex);
176+
threadStats_[threadIndex]->particlesRead++;
174177

175178
if (particle.isNewHistory()) {
176179
if (hasGapsBetweenHistories_) {
177-
threadStats_[threadIndex].emptyHistoryError += perHistoryErrorContribution_;
178-
if (threadStats_[threadIndex].emptyHistoryError >= numberOfRepresentedHistories_) {
180+
threadStats_[threadIndex]->emptyHistoryError += perHistoryErrorContribution_;
181+
if (threadStats_[threadIndex]->emptyHistoryError >= numberOfRepresentedHistories_) {
179182
incrementalHistoryNumber = 2 + emptyHistoriesBetweenEachHistory_;
180-
threadStats_[threadIndex].emptyHistoryError -= numberOfRepresentedHistories_;
183+
threadStats_[threadIndex]->emptyHistoryError -= numberOfRepresentedHistories_;
181184
} else {
182185
incrementalHistoryNumber = 1 + emptyHistoriesBetweenEachHistory_;
183186
}
184187
} else {
185188
incrementalHistoryNumber = 1;
186189
}
187-
threadStats_[threadIndex].historiesRead++;
190+
threadStats_[threadIndex]->historiesRead++;
188191
}
189192

190-
threadStats_[threadIndex].totalHistoriesRead += incrementalHistoryNumber;
193+
threadStats_[threadIndex]->totalHistoriesRead += incrementalHistoryNumber;
191194
}
192195

193196
particle.setIntProperty(IntPropertyType::INCREMENTAL_HISTORY_NUMBER, incrementalHistoryNumber);
@@ -213,27 +216,27 @@ namespace ParticleZoo {
213216
if (threadIndex >= readers_.size()) {
214217
throw std::out_of_range("Thread index out of range in getHistoriesRead()");
215218
}
216-
return threadStats_[threadIndex].totalHistoriesRead;
219+
return threadStats_[threadIndex]->totalHistoriesRead;
217220
}
218221

219222
std::uint64_t HistoryBalancedParallelReader::getParticlesRead(size_t threadIndex) const {
220223
if (threadIndex >= readers_.size()) {
221224
throw std::out_of_range("Thread index out of range in getParticlesRead()");
222225
}
223-
return threadStats_[threadIndex].particlesRead;
226+
return threadStats_[threadIndex]->particlesRead;
224227
}
225228

226229
std::uint64_t HistoryBalancedParallelReader::getTotalParticlesRead() const {
227230
std::uint64_t total = 0;
228231
for (size_t t = 0; t < readers_.size(); t++)
229-
total += threadStats_[t].particlesRead;
232+
total += threadStats_[t]->particlesRead;
230233
return total;
231234
}
232235

233236
std::uint64_t HistoryBalancedParallelReader::getTotalHistoriesRead() const {
234237
std::uint64_t total = 0;
235238
for (size_t t = 0; t < readers_.size(); t++)
236-
total += threadStats_[t].totalHistoriesRead;
239+
total += threadStats_[t]->totalHistoriesRead;
237240
return total;
238241
}
239242

src/parallel/ParticleBalancedParallelReader.cc

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ namespace ParticleZoo {
5555

5656
// Position each reader at its starting particle
5757
startingParticleIndex_.reserve(numThreads);
58-
threadStats_.resize(numThreads);
58+
threadStats_.reserve(numThreads);
59+
for (size_t i = 0; i < numThreads; ++i) {
60+
threadStats_.emplace_back(std::make_unique<ThreadStatistics>());
61+
}
5962
std::uint64_t currentParticleIndex = 0;
6063
for (size_t i = 0; i < numThreads; ++i) {
6164
std::uint64_t particlesToRead = particlesPerThread + (i < remainderParticles ? 1 : 0);
@@ -88,7 +91,7 @@ namespace ParticleZoo {
8891
? startingParticleIndex_[threadIndex + 1]
8992
: numberOfParticlesInPhsp_;
9093
// Check if we have completed all particles for this thread
91-
if (threadStats_[threadIndex].particlesRead >= (targetParticleIndex - startingParticleIndex_[threadIndex])) {
94+
if (threadStats_[threadIndex]->particlesRead >= (targetParticleIndex - startingParticleIndex_[threadIndex])) {
9295
hasMore = false;
9396
}
9497
}
@@ -109,12 +112,12 @@ namespace ParticleZoo {
109112
Particle particle = readers_[threadIndex]->getNextParticle();
110113

111114
{
112-
std::lock_guard<std::mutex> lock(threadStats_[threadIndex].mutex);
113-
threadStats_[threadIndex].particlesRead++;
115+
std::lock_guard<std::mutex> lock(threadStats_[threadIndex]->mutex);
116+
threadStats_[threadIndex]->particlesRead++;
114117

115118
if (particle.isNewHistory()) {
116-
threadStats_[threadIndex].representedHistoriesRead++;
117-
threadStats_[threadIndex].incrementalHistorySum += particle.getIncrementalHistories();
119+
threadStats_[threadIndex]->representedHistoriesRead++;
120+
threadStats_[threadIndex]->incrementalHistorySum += particle.getIncrementalHistories();
118121
}
119122
}
120123

@@ -161,7 +164,7 @@ namespace ParticleZoo {
161164
if (threadIndex >= readers_.size()) {
162165
throw std::out_of_range("Thread index out of range in getHistoriesRead()");
163166
}
164-
return threadStats_[threadIndex].particlesRead;
167+
return threadStats_[threadIndex]->particlesRead;
165168
}
166169

167170
std::uint64_t ParticleBalancedParallelReader::getHistoriesRead(size_t threadIndex) const {
@@ -172,21 +175,21 @@ namespace ParticleZoo {
172175
switch (historyCountMode_) {
173176
case HistoryCountMode::RATIO:
174177
{
175-
const std::uint64_t representedHistories = threadStats_[threadIndex].representedHistoriesRead;
178+
const std::uint64_t representedHistories = threadStats_[threadIndex]->representedHistoriesRead;
176179
return (representedHistories * numberOfOriginalHistories_)
177180
/ numberOfRepresentedHistories_;
178181
}
179182
case HistoryCountMode::INCREMENTAL:
180-
return threadStats_[threadIndex].incrementalHistorySum;
183+
return threadStats_[threadIndex]->incrementalHistorySum;
181184
}
182185

183-
return threadStats_[threadIndex].representedHistoriesRead; // unreachable fallback
186+
return threadStats_[threadIndex]->representedHistoriesRead; // unreachable fallback
184187
}
185188

186189
std::uint64_t ParticleBalancedParallelReader::getTotalParticlesRead() const {
187190
std::uint64_t total = 0;
188191
for (size_t t = 0; t < numThreads_; t++) {
189-
total += threadStats_[t].particlesRead;
192+
total += threadStats_[t]->particlesRead;
190193
}
191194
return total;
192195
}
@@ -196,17 +199,17 @@ namespace ParticleZoo {
196199
case HistoryCountMode::RATIO: {
197200
std::uint64_t totalRep = 0;
198201
for (size_t t = 0; t < numThreads_; t++) {
199-
totalRep += threadStats_[t].representedHistoriesRead;
202+
totalRep += threadStats_[t]->representedHistoriesRead;
200203
}
201204
return (totalRep * numberOfOriginalHistories_) / numberOfRepresentedHistories_;
202205
}
203206
case HistoryCountMode::INCREMENTAL: {
204207
std::uint64_t total = 0;
205208
std::uint64_t totalParticles = 0;
206209
for (size_t t = 0; t < numThreads_; t++) {
207-
std::lock_guard<std::mutex> lock(threadStats_[t].mutex);
208-
total += threadStats_[t].incrementalHistorySum;
209-
totalParticles += threadStats_[t].particlesRead;
210+
std::lock_guard<std::mutex> lock(threadStats_[t]->mutex);
211+
total += threadStats_[t]->incrementalHistorySum;
212+
totalParticles += threadStats_[t]->particlesRead;
210213
}
211214
if (totalParticles >= numberOfParticlesInPhsp_)
212215
return numberOfOriginalHistories_;

0 commit comments

Comments
 (0)