Skip to content

Commit 3860ae5

Browse files
committed
[LCHIB-674] joining two blocks of concurrency
Now that I'm looking at the resulting code, I don't see any reasons why. I double checked my early design notes and I concluded this must have been a result of evolution.
1 parent 26b2c28 commit 3860ae5

File tree

1 file changed

+10
-75
lines changed

1 file changed

+10
-75
lines changed

src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java

Lines changed: 10 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import com.google.common.collect.ImmutableList;
1010
import com.google.common.io.CharStreams;
11-
import com.google.common.util.concurrent.MoreExecutors;
1211
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
1312
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
1413
import org.apache.http.Header;
@@ -61,6 +60,7 @@
6160
import java.util.Set;
6261
import java.util.Vector;
6362
import java.util.concurrent.ExecutorService;
63+
import java.util.concurrent.Executors;
6464
import java.util.concurrent.Future;
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.atomic.AtomicInteger;
@@ -279,90 +279,25 @@ public void transfer(
279279
Collection<ObjectId> advertised, IOConsumer<ContentProducer> commitSender, TreeReceiver treeReceiver, IOConsumer<ContentProducer> fileSender, int chunkSize)
280280
throws IOException {
281281
ByRepository r = new ByRepository(root, rootName);
282-
/*
283-
Parallelization
284-
285-
TreeReceiver is blocking, so the big block in transfer should run in parallel
286-
- That gets CommitChunkStreamer and FileChunkStreamer invoked concurrently.
287-
- caller wants to wait for the response to FileChunkStreamer then collect commits,
288-
so this should be designed nicely
289-
- commit collection is very fast, so maybe we just need to do this after the whole join
290-
291-
- otherwise, we create a file queue and let threads wait for completion of their portions, which is complicated
292-
293-
we also need one CLI invocation to handle all repositories.
294-
295-
- header needs to be sent out per chunk. that suggests one stream per repo.
296-
297-
Maybe better to just focus on how to do decent progress reporting
298-
[task #1: 10/35] [task #2: 5/20] ...
299-
300-
Or maybe we can maintain the current "whoever came at the right time to report" scheme
301-
by maintaining the running sum + current workload. AtomicInteger can keep track.
302-
303-
Summary:
304-
fork/join, so that tasks can be forked
305-
the work is happening one thread, one repo. file transfer is buffered and flipped for counting
306-
file sender to be invoked concurrently
307-
two sets of AtomcInteger counters to maintain files to transfer + files transferred
308-
thread local chunk streamers
309-
310-
CommitChunkStreamer - we'll create one per thread.
311-
or maybe not. The point of this is to batch things up, so it benefits from seeing
312-
data from multiple threads. flushing is a problem, because it's synchronous. remove flushing.
313-
buffer together, then at a batch, fork
314-
-> that breaks the parent commit first guarantee from this perspective, best to keep one repository, one thread,
315-
just do those in parallel. This bites us back if we have tons of repo each small. To solve this,
316-
every thread maintains its own pool, flushing if it gets too big, but consolidate the remaining work to
317-
the global queue. But that seems too much.
318-
319-
-> let's just one have synchronized instance with one queue, where the act of writing
320-
blocks all produceers. simplest way to guarantee ordering. IOW no concurrent write, which
321-
is sufficient for now, since this is not the real bottleneck.
322-
FileChunkStreamer - one per thread, in order to deal with header. create header per chunk
323-
ProgressReportingConsumer - one per thread, but update common AtomicIntegers for counting,
324-
then steal worker to report.
325-
326-
submodule walk is fast enough, we can afford to do it twice
327-
328-
for each 'r' in parallel {
329-
collect files
330-
}
331-
for each 'r' in parallel {
332-
collect commits
333-
}
334-
335-
mixing serial and parallel complicates the close op, so let's just do it in parallel, and have
336-
CommitChunkStreamer serialize the whole thing back down to one.
337-
338-
----
339-
take 2
340-
341-
for each 'r' in parallel:
342-
recursively find submodules, use ExecutorService to submit tasks for each of them, closing repos at the end
343-
but this makes `close` complicated
344-
345-
*/
346282

347-
// ExecutorService es = Executors.newFixedThreadPool(4);
283+
ExecutorService es = Executors.newFixedThreadPool(4);
348284
// for debugging
349-
ExecutorService es = MoreExecutors.newDirectExecutorService();
285+
// ExecutorService es = MoreExecutors.newDirectExecutorService();
350286

351287
ProgressReporter<VirtualFile> pr = new ProgressReporter<>(VirtualFile::path, Duration.ofSeconds(3));
352288
try {
353-
if (collectFiles) {
354-
// record all the necessary BLOBs first, before attempting to record its commit.
355-
// this way, if the file collection fails, the server won't see this commit, so the future
356-
// "record commit" invocation will retry the file collection, thereby making the behavior idempotent.
357-
r.forEachSubModule(es, br -> {
289+
r.forEachSubModule(es, br -> {
290+
if (collectFiles) {
291+
// record all the necessary BLOBs first, before attempting to record its commit.
292+
// this way, if the file collection fails, the server won't see this commit, so the future
293+
// "record commit" invocation will retry the file collection, thereby making the behavior idempotent.
358294
// TODO: file transfer can be parallelized more aggressively, where we send chunks in parallel
359295
try (FileChunkStreamer fs = new FileChunkStreamer(fileSender, chunkSize);
360296
ProgressReporter<VirtualFile>.Consumer fsr = pr.newConsumer(fs)) {
361297
br.collectFiles(advertised, treeReceiver, fsr);
362298
}
363-
});
364-
}
365-
r.forEachSubModule(es, br -> {
299+
}
300+
366301
// we need to send commits in the topological order, so any parallelization within a repository
367302
// is probably not worth the effort.
368303
// TODO: If we process a repository and that doesn't create enough commits

0 commit comments

Comments
 (0)