Skip to content

Commit 13cda89

Browse files
committed
Rework the concurrency implementation. Submit a buffer for read/write as soon as it is empty/full instead of reading/writing all buffers at once
1 parent 0997f7b commit 13cda89

File tree

7 files changed

+523
-502
lines changed

7 files changed

+523
-502
lines changed

src/app/BlockCompressor.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -384,16 +384,18 @@ int BlockCompressor::compress(uint64& outputSize)
384384
blockSize = int(max(min((bl + 63) & ~63, int64(MAX_BLOCK_SIZE)), int64(MIN_BLOCK_SIZE)));
385385
}
386386

387-
Context taskCtx(_ctx);
388-
taskCtx.putLong("fileSize", files[i]._size);
389-
taskCtx.putString("inputName", iName);
390-
taskCtx.putString("outputName", oName);
391-
taskCtx.putInt("blockSize", blockSize);
392387
#ifdef CONCURRENCY_ENABLED
388+
ThreadPool pool(_jobs + 1); // +1 to avoid deadlock due to thread exhaustion
389+
Context taskCtx(_ctx, &pool);
393390
taskCtx.putInt("jobs", jobsPerTask[i]);
394391
#else
392+
Context taskCtx(_ctx);
395393
taskCtx.putInt("jobs", 1);
396394
#endif
395+
taskCtx.putLong("fileSize", files[i]._size);
396+
taskCtx.putString("inputName", iName);
397+
taskCtx.putString("outputName", oName);
398+
taskCtx.putInt("blockSize", blockSize);
397399
FileCompressTask<FileCompressResult>* task = new FileCompressTask<FileCompressResult>(taskCtx, _listeners);
398400
tasks.push_back(task);
399401
}

src/app/BlockDecompressor.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,17 @@ int BlockDecompressor::decompress(uint64& inputSize)
299299
oName = oName + ".bak";
300300
}
301301

302-
Context taskCtx(_ctx);
303-
taskCtx.putLong("fileSize", files[i]._size);
304-
taskCtx.putString("inputName", iName);
305-
taskCtx.putString("outputName", oName);
306302
#ifdef CONCURRENCY_ENABLED
303+
ThreadPool pool(_jobs + 1); // +1 to avoid deadlock due to thread exhaustion
304+
Context taskCtx(_ctx, &pool);
307305
taskCtx.putInt("jobs", jobsPerTask[i]);
308306
#else
307+
Context taskCtx(_ctx);
309308
taskCtx.putInt("jobs", 1);
310309
#endif
310+
taskCtx.putLong("fileSize", files[i]._size);
311+
taskCtx.putString("inputName", iName);
312+
taskCtx.putString("outputName", oName);
311313
FileDecompressTask<FileDecompressResult>* task = new FileDecompressTask<FileDecompressResult>(taskCtx, _listeners);
312314
tasks.push_back(task);
313315
}

src/app/Kanzi.cpp

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,32 +1068,31 @@ int main(int argc, const char* argv[])
10681068
}
10691069
#endif
10701070

1071-
Context args;
1071+
Context ctx;
10721072
Printer log(cout);
1073-
int status = processCommandLine(argc, argv, args, log);
1073+
int status = processCommandLine(argc, argv, ctx, log);
10741074

10751075
// Command line processing error ?
10761076
if (status != 0)
10771077
return status;
10781078

10791079
// Help mode only ?
1080-
if (args.has("mode") == false)
1080+
if (ctx.has("mode") == false)
10811081
return 0;
10821082

1083-
string mode = args.getString("mode");
1084-
int jobs = args.getInt("jobs", -1);
1083+
string mode = ctx.getString("mode");
1084+
int jobs = ctx.getInt("jobs", -1);
10851085

10861086
try {
10871087
#ifndef CONCURRENCY_ENABLED
10881088
if (jobs > 1) {
1089-
const int verbosity = args.getInt("verbosity");
1089+
const int verbosity = ctx.getInt("verbosity");
10901090
stringstream ss;
10911091
ss << "Warning: the number of jobs is limited to 1 in this version";
10921092
log.println(ss.str(), verbosity > 0);
10931093
}
10941094

10951095
jobs = 1;
1096-
Context ctx(args);
10971096
#else
10981097
if (jobs == 0) {
10991098
int cores = max(int(thread::hardware_concurrency()), 1); // User provided 0 => use all the cores
@@ -1104,21 +1103,14 @@ int main(int argc, const char* argv[])
11041103
jobs = min(cores, MAX_CONCURRENCY);
11051104
}
11061105
else if (jobs > MAX_CONCURRENCY) {
1107-
const int verbosity = args.getInt("verbosity");
1106+
const int verbosity = ctx.getInt("verbosity");
11081107
stringstream ss;
11091108
ss << "Warning: the number of jobs is too high, defaulting to " << MAX_CONCURRENCY;
11101109
log.println(ss.str(), verbosity > 0);
11111110
jobs = MAX_CONCURRENCY;
11121111
}
1113-
1114-
#if defined(WIN32) || defined(_WIN32) || defined(_WIN64)
1115-
// Windows already has a built-in threadpool. Using it is better for performance.
1116-
Context ctx(args);
1117-
#else
1118-
ThreadPool pool(jobs);
1119-
Context ctx(args, &pool);
1120-
#endif
11211112
#endif
1113+
11221114
ctx.putInt("jobs", jobs);
11231115

11241116
if (mode == "c") {

0 commit comments

Comments
 (0)