Skip to content

Commit 2660838

Browse files
committed
progress smoothening + communicate indices instead of strings
- still no balancing, but smooth mode diminishes imbalance w.r.t. numbers of detections (but not w.r.t. durations) + iteration over indices - terrible progress estimations when smoothening is disabled
1 parent 8e3a7db commit 2660838

File tree

5 files changed

+88
-57
lines changed

5 files changed

+88
-57
lines changed

__dependency_graph.dot

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ digraph {
146146
DlProofEnumerator -> concurrent_unordered_set [color=blue]
147147
DlProofEnumerator -> concurrent_vector [color=blue]
148148
DlProofEnumerator -> parallel_for [color=blue]
149+
DlProofEnumerator -> parallel_sort [color=blue]
149150
DlProofEnumerator -> cstring [color=blue]
150151
DlProofEnumerator -> iostream [color=blue]
151152
DlProofEnumerator -> mpi [color=blue]

helper/FctHelper.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ enum mpi_tag : int {
101101
mpi_tag_unspecified = 0, // not used by any helper function
102102
mpi_tag_bool,
103103
mpi_tag_string,
104+
mpi_tag_uint64,
104105
mpi_tag_uint64_pair
105106
};
106107

@@ -152,6 +153,18 @@ bool FctHelper::mpi_tryRecvBool(int rank, int source, bool& result, bool debug)
152153
return false;
153154
}
154155

156+
void FctHelper::mpi_sendUint64(int rank, const uint64_t num, int dest, bool debug) {
157+
mpi_send(rank, 1, MPI_LONG_LONG_INT, &num, dest, mpi_tag_uint64, debug, [](const uint64_t* x) { return to_string(*x); });
158+
}
159+
160+
uint64_t FctHelper::mpi_recvUint64(int rank, int source, bool debug) {
161+
return mpi_recvValue<uint64_t>(rank, MPI_LONG_LONG_INT, source, mpi_tag_uint64, debug, [](const uint64_t x) { return to_string(x); });
162+
}
163+
164+
bool FctHelper::mpi_tryRecvUint64(int rank, int source, uint64_t& result, bool debug) {
165+
return mpi_tryRecvValue(rank, MPI_LONG_LONG_INT, source, mpi_tag_uint64, result, debug, [](const uint64_t x) { return to_string(x); });
166+
}
167+
155168
void FctHelper::mpi_sendUint64Pair(int rank, const array<uint64_t, 2>& arr, int dest, bool debug) {
156169
mpi_send(rank, 2, MPI_LONG_LONG_INT, arr.data(), dest, mpi_tag_uint64_pair, debug, [](const uint64_t* x) {
157170
return "(" + to_string(x[0]) + ", " + to_string(x[1]) + ")";

helper/FctHelper.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ struct FctHelper {
4848
static bool mpi_recvBool(int rank, int source, bool debug = false);
4949
static bool mpi_tryRecvBool(int rank, int source, bool& result, bool debug = false);
5050

51+
static void mpi_sendUint64(int rank, const std::uint64_t num, int dest, bool debug = false);
52+
static std::uint64_t mpi_recvUint64(int rank, int source, bool debug = false);
53+
static bool mpi_tryRecvUint64(int rank, int source, std::uint64_t& result, bool debug = false);
54+
5155
static void mpi_sendUint64Pair(int rank, const std::array<std::uint64_t, 2>& arr, int dest, bool debug = false);
5256
static std::array<std::uint64_t, 2> mpi_recvUint64Pair(int rank, int source, bool debug = false);
5357
static bool mpi_tryRecvUint64Pair(int rank, int source, std::array<std::uint64_t, 2>& result, bool debug = false);

nortmann/DlProofEnumerator.cpp

Lines changed: 69 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <tbb/concurrent_unordered_set.h>
1414
#include <tbb/concurrent_vector.h>
1515
#include <tbb/parallel_for.h>
16+
#include <tbb/parallel_sort.h>
1617

1718
#include <cstring>
1819
#include <iostream>
@@ -781,19 +782,18 @@ void DlProofEnumerator::mpi_filterDProofRepresentativeFile(uint32_t wordLengthLi
781782
MPI_Abort(MPI_COMM_WORLD, 1);
782783
return;
783784
}
784-
string myData = to_string(allRepresentativesCount);
785785
if (isMainProc) {
786786
for (int source = 1; source < mpi_size; source++) {
787-
string data = FctHelper::mpi_recvString(0, source);
788-
if (myData != data) {
789-
cerr << "Uniform loading failed: " + data + " representatives loaded on rank 0, but " + myData + " representatives loaded on rank " + to_string(source) + ". Aborting." << endl;
787+
uint64_t data = FctHelper::mpi_recvUint64(0, source);
788+
if (allRepresentativesCount != data) {
789+
cerr << "Uniform loading failed: " + to_string(data) + " representatives loaded on rank 0, but " + to_string(allRepresentativesCount) + " representatives loaded on rank " + to_string(source) + ". Aborting." << endl;
790790
MPI_Abort(MPI_COMM_WORLD, 1);
791791
return;
792792
}
793793
}
794794
cout << myTime() + ": Representative collections were initialized successfully on all ranks." << endl;
795795
} else
796-
FctHelper::mpi_sendString(mpi_rank, myData, 0);
796+
FctHelper::mpi_sendUint64(mpi_rank, allRepresentativesCount, 0);
797797

798798
// 2. Initialize and prepare progress data.
799799
bool showProgress = isMainProc && allRepresentatives.size() > 15;
@@ -818,7 +818,7 @@ void DlProofEnumerator::mpi_filterDProofRepresentativeFile(uint32_t wordLengthLi
818818
const vector<string>& recentConclusionSequence = allConclusions[wordLengthLimit];
819819
if (isMainProc)
820820
startTime = chrono::steady_clock::now();
821-
tbb::concurrent_unordered_set<string> redundant = _mpi_findRedundantConclusionsForProofsOfMaxLength(mpi_rank, mpi_size, wordLengthLimit, representativeProofs, recentConclusionSequence, isMainProc ? &removalProgress : nullptr, smoothProgress);
821+
tbb::concurrent_unordered_set<uint64_t> redundant = _mpi_findRedundantConclusionsForProofsOfMaxLength(mpi_rank, mpi_size, wordLengthLimit, representativeProofs, recentConclusionSequence, isMainProc ? &removalProgress : nullptr, smoothProgress);
822822
if (isMainProc)
823823
cout << FctHelper::durationStringMs(chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - startTime)) + " taken to detect " + to_string(redundant.size()) + " conclusions for which there are more general variants proven in lower or equal amounts of steps." << endl;
824824

@@ -835,9 +835,8 @@ void DlProofEnumerator::mpi_filterDProofRepresentativeFile(uint32_t wordLengthLi
835835
// 6.1 Order information.
836836
startTime = chrono::steady_clock::now();
837837
tbb::parallel_for(size_t(0), recentRepresentativeSequence.size(), [&recentRepresentativeSequence, &recentConclusionSequence, &redundant, &newContent](size_t i) { // NOTE: Counts from i = start := 0 until i < end := recentRepresentativeSequence.size().
838-
const string& conclusion = recentConclusionSequence[i];
839-
if (!redundant.count(conclusion))
840-
newContent.emplace(recentRepresentativeSequence[i], conclusion);
838+
if (!redundant.count(i))
839+
newContent.emplace(recentRepresentativeSequence[i], recentConclusionSequence[i]);
841840
});
842841
cout << FctHelper::durationStringMs(chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - startTime)) + " taken to filter and order new representative proofs." << endl;
843842

@@ -1253,17 +1252,32 @@ void DlProofEnumerator::_removeRedundantConclusionsForProofsOfMaxLength(const ui
12531252
//#cout << FctHelper::round(static_cast<long double>(chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - startTime).count()) / 1000.0, 2) << " ms taken for erasure of " << toErase.size() << " elements." << endl;
12541253
}
12551254

1256-
tbb::concurrent_unordered_set<string> DlProofEnumerator::_mpi_findRedundantConclusionsForProofsOfMaxLength(int mpi_rank, int mpi_size, const uint32_t maxLength, tbb::concurrent_unordered_map<string, string>& representativeProofs, const vector<string>& recentConclusionSequence, helper::ProgressData* const progressData, bool smoothProgress) {
1255+
tbb::concurrent_unordered_set<uint64_t> DlProofEnumerator::_mpi_findRedundantConclusionsForProofsOfMaxLength(int mpi_rank, int mpi_size, const uint32_t maxLength, tbb::concurrent_unordered_map<string, string>& representativeProofs, const vector<string>& recentConclusionSequence, helper::ProgressData* const progressData, bool smoothProgress) {
12571256
bool isMainProc = mpi_rank == 0;
12581257
size_t n = recentConclusionSequence.size();
12591258

1259+
// Reorders indices according to affine ciphered values (https://en.wikipedia.org/wiki/Affine_cipher),
1260+
// using a factor with good spectral results (https://en.wikipedia.org/wiki/Spectral_test).
1261+
auto distributeIndices = [](uint64_t size) -> vector<uint64_t> {
1262+
// NOTE: 0x9e3779b97f4a7c15 = 0b1001111000110111011110011011100101111111010010100111110000010101 = 11400714819323198485 = 5*139*199*82431689521877
1263+
// is coprime with 2^64 (as required for 64-bit affine cipher), and has well-distributed bits ; 2^64 / golden ratio = 1.14007148193231984859...
1264+
auto ac = [](uint64_t x) -> uint64_t { return 0x9e3779b97f4a7c15uLL * x + 1; };
1265+
auto cmp_ac = [&ac](uint64_t a, uint64_t b) -> bool { return ac(a) < ac(b); };
1266+
vector<uint64_t> v(size);
1267+
tbb::parallel_for(uint64_t(0), size, [&v](uint64_t i) { v[i] = i; });
1268+
tbb::parallel_sort(v.begin(), v.end(), cmp_ac);
1269+
return v;
1270+
};
1271+
//#chrono::time_point<chrono::steady_clock> startTime = chrono::steady_clock::now();
1272+
const vector<uint64_t> indexDistribution = smoothProgress ? distributeIndices(n) : vector<uint64_t> { };
1273+
//#if (smoothProgress) cout << "[Rank " + to_string(mpi_rank) + "] " << FctHelper::round(static_cast<long double>(chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - startTime).count()) / 1000.0, 2) << " ms taken to prepare index distribution of size " << n << "." << endl;
1274+
// e.g. (..., 27, 29, 31): (..., 18.83, 55.38, 166.21) ms taken to prepare index distribution of size (6649, 19416, 56321, 165223, 490604, 1459555, 4375266, 13194193).
1275+
12601276
size_t first = mpi_rank * n / mpi_size;
12611277
size_t end = mpi_rank + 1 == mpi_size ? n : (mpi_rank + 1) * n / mpi_size; // first index that is not contained
1262-
// NOTE: Using unordered structure to reduce fluctuations (since then the candidates are in a pseudo-random order).
1263-
tbb::concurrent_unordered_set<string> myRange = tbb::concurrent_unordered_set<string>(recentConclusionSequence.begin() + first, recentConclusionSequence.begin() + end); // empty in case first == end
12641278
//#if (isMainProc) this_thread::sleep_for(2s);
1265-
//#cout << "[Rank " + to_string(mpi_rank) + ", n = " + to_string(n) + "] Interval: [" + to_string(first) + ", " + (end ? to_string(end - 1) : "-1") + "] (size " + to_string(end - first) + "), |myRange| = " + to_string(myRange.size()) << endl;
1266-
//#chrono::time_point<chrono::steady_clock> startTime = chrono::steady_clock::now();
1279+
//#cout << "[Rank " + to_string(mpi_rank) + ", n = " + to_string(n) + "] Interval: [" + to_string(first) + ", " + (end ? to_string(end - 1) : "-1") + "] (size " + to_string(end - first) + ")" << endl;
1280+
//#startTime = chrono::steady_clock::now();
12671281
tbb::concurrent_map<size_t, tbb::concurrent_vector<const string*>> formulasByStandardLength;
12681282
tbb::parallel_for(representativeProofs.range(), [&formulasByStandardLength](tbb::concurrent_unordered_map<string, string>::range_type& range) {
12691283
for (tbb::concurrent_unordered_map<string, string>::const_iterator it = range.begin(); it != range.end(); ++it) {
@@ -1286,8 +1300,8 @@ tbb::concurrent_unordered_set<string> DlProofEnumerator::_mpi_findRedundantConcl
12861300
}
12871301
});
12881302
};
1289-
tbb::concurrent_queue<string> toErase;
1290-
tbb::concurrent_unordered_set<string> toErase_mainProc;
1303+
tbb::concurrent_queue<uint64_t> toErase;
1304+
tbb::concurrent_unordered_set<uint64_t> toErase_mainProc;
12911305
mutex mtx;
12921306
unique_lock<mutex> condLock(mtx);
12931307
condition_variable cond;
@@ -1296,40 +1310,39 @@ tbb::concurrent_unordered_set<string> DlProofEnumerator::_mpi_findRedundantConcl
12961310
atomic<uint64_t> localCounter = 0;
12971311
if (progressData)
12981312
progressData->setStartTime();
1299-
auto worker = [&iterateFormulasOfStandardLengthUpTo](int mpi_rank, size_t n, bool isMainProc, atomic<uint64_t>& localCounter, condition_variable& cond, atomic<bool>& communicate, atomic<bool>& workerDone, tbb::concurrent_unordered_set<string>& myRange, tbb::concurrent_queue<string>& toErase, tbb::concurrent_unordered_set<string>& toErase_mainProc, helper::ProgressData* const progressData) {
1300-
tbb::parallel_for(myRange.range(), [&isMainProc, &localCounter, &cond, &progressData, &iterateFormulasOfStandardLengthUpTo, &toErase, &toErase_mainProc](tbb::concurrent_unordered_set<string>::range_type& range) {
1301-
for (tbb::concurrent_unordered_set<string>::const_iterator it = range.begin(); it != range.end(); ++it) {
1302-
const string& formula = *it;
1303-
atomic<bool> redundant = false;
1304-
size_t formulaLen = DlCore::standardLen_polishNotation_noRename_numVars(formula);
1305-
iterateFormulasOfStandardLengthUpTo(formulaLen, redundant, [&formula, &redundant](const string& potentialSchema) {
1306-
if (formula != potentialSchema && DlCore::isSchemaOf_polishNotation_noRename_numVars(potentialSchema, formula)) // formula redundant
1307-
redundant = true;
1308-
});
1309-
if (redundant) {
1310-
localCounter++;
1311-
if (isMainProc)
1312-
toErase_mainProc.insert(*it);
1313-
else {
1314-
toErase.push(*it);
1315-
cond.notify_one();
1316-
}
1313+
auto worker = [&recentConclusionSequence, &indexDistribution, &iterateFormulasOfStandardLengthUpTo](int mpi_rank, size_t first, size_t end, size_t n, bool smoothProgress, bool isMainProc, atomic<uint64_t>& localCounter, condition_variable& cond, atomic<bool>& communicate, atomic<bool>& workerDone, tbb::concurrent_queue<uint64_t>& toErase, tbb::concurrent_unordered_set<uint64_t>& toErase_mainProc, helper::ProgressData* const progressData) {
1314+
tbb::parallel_for(first, end, [&recentConclusionSequence, &indexDistribution, &smoothProgress, &isMainProc, &localCounter, &cond, &progressData, &iterateFormulasOfStandardLengthUpTo, &toErase, &toErase_mainProc](size_t i) {
1315+
uint64_t index = smoothProgress ? indexDistribution[i] : i;
1316+
const string& formula = recentConclusionSequence[index];
1317+
atomic<bool> redundant = false;
1318+
size_t formulaLen = DlCore::standardLen_polishNotation_noRename_numVars(formula);
1319+
iterateFormulasOfStandardLengthUpTo(formulaLen, redundant, [&formula, &redundant](const string& potentialSchema) {
1320+
if (formula != potentialSchema && DlCore::isSchemaOf_polishNotation_noRename_numVars(potentialSchema, formula)) // formula redundant
1321+
redundant = true;
1322+
});
1323+
if (redundant) {
1324+
localCounter++;
1325+
if (isMainProc)
1326+
toErase_mainProc.insert(index);
1327+
else {
1328+
toErase.push(index);
1329+
cond.notify_one();
1330+
}
13171331

1318-
// Show progress if requested
1319-
if (progressData) {
1320-
if (progressData->nextStep()) {
1321-
uint64_t percentage;
1322-
string progress;
1323-
string etc;
1324-
if (progressData->nextState(percentage, progress, etc)) {
1325-
time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
1326-
cout << strtok(ctime(&time), "\n") << ": Removed " << (progressData->maximumEstimated ? "" : "") << (percentage < 10 ? " " : "") << percentage << "% of redundant conclusions. [" << progress << "] (" << etc << ")" << endl;
1327-
}
1332+
// Show progress if requested
1333+
if (progressData) {
1334+
if (progressData->nextStep()) {
1335+
uint64_t percentage;
1336+
string progress;
1337+
string etc;
1338+
if (progressData->nextState(percentage, progress, etc)) {
1339+
time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
1340+
cout << strtok(ctime(&time), "\n") << ": Removed " << (progressData->maximumEstimated ? "" : "") << (percentage < 10 ? " " : "") << percentage << "% of redundant conclusions. [" << progress << "] (" << etc << ")" << endl;
13281341
}
13291342
}
1330-
} else if (!toErase.empty())
1331-
cond.notify_one();
1332-
}
1343+
}
1344+
} else if (!toErase.empty())
1345+
cond.notify_one();
13331346
});
13341347
workerDone = true;
13351348
if (!isMainProc)
@@ -1339,17 +1352,17 @@ tbb::concurrent_unordered_set<string> DlProofEnumerator::_mpi_findRedundantConcl
13391352
}
13401353
cout << "[Rank " + to_string(mpi_rank) + ", n = " + to_string(n) + "] Worker complete." << endl;
13411354
};
1342-
thread workerThread(worker, mpi_rank, n, isMainProc, ref(localCounter), ref(cond), ref(communicate), ref(workerDone), ref(myRange), ref(toErase), ref(toErase_mainProc), progressData);
1355+
thread workerThread(worker, mpi_rank, first, end, n, smoothProgress, isMainProc, ref(localCounter), ref(cond), ref(communicate), ref(workerDone), ref(toErase), ref(toErase_mainProc), progressData);
13431356
if (isMainProc) {
13441357
int numComplete = 0;
1345-
string f;
1358+
uint64_t index;
13461359
while (communicate) {
13471360
for (int source = 1; source < mpi_size; source++)
1348-
while (FctHelper::mpi_tryRecvString(mpi_rank, source, f)) {
1349-
if (f == ";")
1361+
while (FctHelper::mpi_tryRecvUint64(mpi_rank, source, index)) {
1362+
if (index == UINT64_MAX) // notification that the process is done
13501363
numComplete++;
13511364
else {
1352-
toErase_mainProc.insert(f);
1365+
toErase_mainProc.insert(index);
13531366

13541367
// Show progress if requested
13551368
if (progressData) {
@@ -1375,11 +1388,11 @@ tbb::concurrent_unordered_set<string> DlProofEnumerator::_mpi_findRedundantConcl
13751388
} else
13761389
while (communicate) {
13771390
cond.wait(condLock);
1378-
string f;
1379-
while (toErase.try_pop(f)) // send and clear 'toErase'
1380-
FctHelper::mpi_sendString(mpi_rank, f, 0);
1391+
uint64_t index;
1392+
while (toErase.try_pop(index)) // send and clear 'toErase'
1393+
FctHelper::mpi_sendUint64(mpi_rank, index, 0);
13811394
if (workerDone && toErase.empty()) {
1382-
FctHelper::mpi_sendString(mpi_rank, ";", 0); // notify main process that this process is done
1395+
FctHelper::mpi_sendUint64(mpi_rank, UINT64_MAX, 0); // notify main process that this process is done
13831396
communicate = false;
13841397
}
13851398
}

nortmann/DlProofEnumerator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ struct DlProofEnumerator {
6767
private:
6868
static void _findProvenFormulas(tbb_concurrent_unordered_map<std::string, std::string>& representativeProofs, std::uint32_t wordLengthLimit, DlProofEnumeratorMode mode, helper::ProgressData* const progressData, std::uint64_t* optOut_counter, std::uint64_t* optOut_conclusionCounter, std::uint64_t* optOut_redundantCounter, std::uint64_t* optOut_invalidCounter, const std::vector<std::uint32_t>* genIn_stack = nullptr, const std::uint32_t* genIn_n = nullptr, const std::vector<std::vector<std::string>>* genIn_allRepresentativesLookup = nullptr);
6969
static void _removeRedundantConclusionsForProofsOfMaxLength(const std::uint32_t maxLength, tbb_concurrent_unordered_map<std::string, std::string>& representativeProofs, helper::ProgressData* const progressData, std::uint64_t& conclusionCounter, std::uint64_t& redundantCounter);
70-
static tbb_concurrent_unordered_set<std::string> _mpi_findRedundantConclusionsForProofsOfMaxLength(int mpi_rank, int mpi_size, const std::uint32_t maxLength, tbb_concurrent_unordered_map<std::string, std::string>& representativeProofs, const std::vector<std::string>& recentConclusionSequence, helper::ProgressData* const progressData, bool smoothProgress);
70+
static tbb_concurrent_unordered_set<std::uint64_t> _mpi_findRedundantConclusionsForProofsOfMaxLength(int mpi_rank, int mpi_size, const std::uint32_t maxLength, tbb_concurrent_unordered_map<std::string, std::string>& representativeProofs, const std::vector<std::string>& recentConclusionSequence, helper::ProgressData* const progressData, bool smoothProgress);
7171

7272
public:
7373
// Iterates condensed detachment strings for PL-proofs in D-notation, using knowledge of all representative proofs of length n or lower, which must be passed via 'allRepresentatives'.

0 commit comments

Comments
 (0)