Skip to content

Commit 91bd37c

Browse files
committed
[AIENG-289] collect directory index
1 parent 4f4f028 commit 91bd37c

File tree

6 files changed

+153
-38
lines changed

6 files changed

+153
-38
lines changed

src/main/java/com/launchableinc/ingest/commits/CommitGraphCollector.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.fasterxml.jackson.databind.node.ObjectNode;
99
import com.google.common.collect.ImmutableList;
1010
import com.google.common.io.CharStreams;
11-
import java.util.Collections;
1211
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
1312
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
1413
import org.apache.http.Header;
@@ -53,6 +52,7 @@
5352
import java.time.Duration;
5453
import java.util.ArrayList;
5554
import java.util.Collection;
55+
import java.util.Collections;
5656
import java.util.HashMap;
5757
import java.util.List;
5858
import java.util.Map;
@@ -63,8 +63,8 @@
6363
import java.util.function.Supplier;
6464
import java.util.zip.GZIPOutputStream;
6565

66-
import static com.google.common.collect.ImmutableList.toImmutableList;
67-
import static java.util.Arrays.stream;
66+
import static com.google.common.collect.ImmutableList.*;
67+
import static java.util.Arrays.*;
6868

6969
/**
7070
* Compares what commits the local repository and the remote repository have, then send delta over.
@@ -73,6 +73,10 @@ public class CommitGraphCollector {
7373
private static final Logger logger = LoggerFactory.getLogger(CommitGraphCollector.class);
7474
static final ObjectMapper objectMapper = new ObjectMapper();
7575
private static final int HTTP_TIMEOUT_MILLISECONDS = 15_000;
76+
/**
77+
* Repository header is sent using this reserved file name
78+
*/
79+
static final String HEADER_FILE = ".launchable";
7680

7781
private final String rootName;
7882

@@ -313,6 +317,7 @@ public void collectFiles(boolean collectFiles) {
313317

314318
/** Process commits per repository. */
315319
final class ByRepository implements AutoCloseable {
320+
/** Names that uniquely identifies this Git repository among other Git repositories collected for the workspace. */
316321
private final String name;
317322
private final Repository git;
318323

@@ -385,7 +390,7 @@ public void transfer(Collection<ObjectId> advertised, Consumer<JSCommit> commitR
385390
// record all the necessary BLOBs first, before attempting to record its commit.
386391
// this way, if the file collection fails, the server won't see this commit, so the future
387392
// "record commit" invocation will retry the file collection, thereby making the behavior idempotent.
388-
collectFiles(treeWalk, treeReceiver, fileReceiver);
393+
collectFiles(start, treeWalk, treeReceiver, fileReceiver);
389394
fileReceiver.flush();
390395

391396
// walk the commits, transform them, and send them to the commitReceiver
@@ -435,7 +440,7 @@ That is, find submodules that are available in the working tree (thus `!isBare()
435440
* Our goal here is to find all the files that the server hasn't seen yet. We'll send them to the tree receiver,
436441
* which further responds with the actual files we need to send to the server.
437442
*/
438-
private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer<VirtualFile> fileReceiver) throws IOException {
443+
private void collectFiles(RevCommit start, TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer<VirtualFile> fileReceiver) throws IOException {
439444
if (!collectFiles) {
440445
return;
441446
}
@@ -461,7 +466,7 @@ private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer
461466
if ((treeWalk.getFileMode(0).getBits() & FileMode.TYPE_MASK) == FileMode.TYPE_FILE) {
462467
GitFile f = new GitFile(name, treeWalk.getPathString(), head, objectReader);
463468
// to avoid excessive data transfer, skip files that are too big
464-
if (f.size() < 1024 * 1024 && f.isText()) {
469+
if (f.size() < 1024 * 1024 && f.isText() && !f.path.equals(HEADER_FILE)) {
465470
treeReceiver.accept(f);
466471
}
467472
}
@@ -471,11 +476,44 @@ private void collectFiles(TreeWalk treeWalk, TreeReceiver treeReceiver, Consumer
471476
// Note(Konboi): To balance the order, since words like "test" and "spec" tend to appear
472477
// toward the end in alphabetical sorting.
473478
List<VirtualFile> files = new ArrayList<>(treeReceiver.response());
474-
Collections.shuffle(files);
475-
for (VirtualFile f : files) {
476-
fileReceiver.accept(f);
479+
if (!files.isEmpty()) {
480+
fileReceiver.accept(buildHeader(start));
477481
filesSent++;
482+
483+
Collections.shuffle(files);
484+
for (VirtualFile f : files) {
485+
fileReceiver.accept(f);
486+
filesSent++;
487+
}
488+
}
489+
}
490+
491+
/**
492+
* Creates a per repository "header" file as a {@link VirtualFile}.
493+
* Currently, this is just the list of files in the repository.
494+
*/
495+
private VirtualFile buildHeader(RevCommit start) throws IOException {
496+
ByteArrayOutputStream os = new ByteArrayOutputStream();
497+
try (JsonGenerator w = new JsonFactory().createGenerator(os)) {
498+
w.setCodec(objectMapper);
499+
w.writeStartObject();
500+
w.writeArrayFieldStart("tree");
501+
502+
try (TreeWalk tw = new TreeWalk(git)) {
503+
tw.addTree(start.getTree());
504+
tw.setRecursive(true);
505+
506+
while (tw.next()) {
507+
w.writeStartObject();
508+
w.writeStringField("path", tw.getPathString());
509+
w.writeEndObject();
510+
}
511+
}
512+
513+
w.writeEndArray();
514+
w.writeEndObject();
478515
}
516+
return VirtualFile.from(name, HEADER_FILE, ObjectId.zeroId(), os.toByteArray());
479517
}
480518

481519

src/main/java/com/launchableinc/ingest/commits/FlushableConsumer.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,22 @@
77
* Consumers that spool items it accepts and process them in bulk.
88
*/
99
public interface FlushableConsumer<T> extends Consumer<T> {
10-
/**
11-
* Process all items that have been accepted so far.
12-
*/
13-
void flush() throws IOException;
10+
/**
11+
* Process all items that have been accepted so far.
12+
*/
13+
void flush() throws IOException;
14+
15+
static <T> FlushableConsumer<T> of(Consumer<T> c) {
16+
return new FlushableConsumer<T>() {
17+
@Override
18+
public void flush() throws IOException {
19+
// noop
20+
}
21+
22+
@Override
23+
public void accept(T t) {
24+
c.accept(t);
25+
}
26+
};
27+
}
1428
}

src/main/java/com/launchableinc/ingest/commits/VirtualFile.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,34 @@ public interface VirtualFile {
2323

2424
long size() throws IOException;
2525
void writeTo(OutputStream os) throws IOException;
26+
27+
static VirtualFile from(String repo, String path, ObjectId blob, byte[] payload) {
28+
return new VirtualFile() {
29+
30+
@Override
31+
public String repo() {
32+
return repo;
33+
}
34+
35+
@Override
36+
public String path() {
37+
return path;
38+
}
39+
40+
@Override
41+
public ObjectId blob() {
42+
return blob;
43+
}
44+
45+
@Override
46+
public long size() {
47+
return payload.length;
48+
}
49+
50+
@Override
51+
public void writeTo(OutputStream os) throws IOException {
52+
os.write(payload);
53+
}
54+
};
55+
}
2656
}

src/test/java/com/launchableinc/ingest/commits/CommitGraphCollectorTest.java

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package com.launchableinc.ingest.commits;
22

3+
import com.fasterxml.jackson.databind.JsonNode;
34
import com.fasterxml.jackson.databind.ObjectMapper;
45
import com.google.common.collect.ImmutableList;
56
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
67
import org.apache.commons.io.IOUtils;
78
import org.apache.commons.io.output.NullOutputStream;
89
import org.apache.http.entity.ContentProducer;
10+
import org.eclipse.jgit.api.CommitCommand;
911
import org.eclipse.jgit.api.Git;
1012
import org.eclipse.jgit.lib.ObjectId;
1113
import org.eclipse.jgit.lib.PersonIdent;
@@ -26,7 +28,8 @@
2628
import java.io.InputStream;
2729
import java.nio.charset.StandardCharsets;
2830
import java.nio.file.Files;
29-
import java.util.Collection;
31+
import java.util.ArrayList;
32+
import java.util.Collections;
3033
import java.util.List;
3134

3235
import static com.google.common.truth.Truth.*;
@@ -87,7 +90,7 @@ public void bareRepo() throws Exception {
8790
try (Repository r = Git.open(barerepoDir).getRepository()) {
8891
CommitGraphCollector cgc = collectCommit(r, ImmutableList.of());
8992
assertThat(cgc.getCommitsSent()).isEqualTo(1);
90-
assertThat(cgc.getFilesSent()).isEqualTo(1);
93+
assertThat(cgc.getFilesSent()).isEqualTo(2); // header + .gitmodules
9194
}
9295
}
9396

@@ -118,7 +121,7 @@ public void chunking() throws Exception {
118121
2);
119122
}
120123
assertThat(councCommitChunks[0]).isEqualTo(2);
121-
assertThat(countFilesChunks[0]).isEqualTo(1); // a and sub/x, 2 files, 1 chunk
124+
assertThat(countFilesChunks[0]).isEqualTo(3); // header, a, .gitmodules, and header, sub/x, 5 files, 3 chunks
122125
}
123126

124127
private void assertValidTar(ContentProducer content) throws IOException {
@@ -129,8 +132,8 @@ private void assertValidTar(ContentProducer content) throws IOException {
129132
}
130133
}
131134

132-
private void assertValidJson(ContentProducer content) throws IOException {
133-
new ObjectMapper().readTree(read(content));
135+
private JsonNode assertValidJson(ContentProducer content) throws IOException {
136+
return new ObjectMapper().readTree(read(content));
134137
}
135138

136139
private InputStream read(ContentProducer content) throws IOException {
@@ -163,6 +166,37 @@ private CommitGraphCollector collectCommit(Repository r, List<ObjectId> advertis
163166
return cgc;
164167
}
165168

169+
@Test
170+
public void header() throws Exception {
171+
setupRepos();
172+
try (Git mainrepo = Git.open(mainrepoDir)) {
173+
addCommitInSubRepo(mainrepo);
174+
175+
List<VirtualFile> files = new ArrayList<>();
176+
177+
CommitGraphCollector cgc = new CommitGraphCollector("test", mainrepo.getRepository());
178+
cgc.collectFiles(true);
179+
cgc.new ByRepository(mainrepo.getRepository(), "main")
180+
.transfer(Collections.emptyList(), c -> {},
181+
new PassThroughTreeReceiverImpl(),
182+
FlushableConsumer.of(files::add));
183+
184+
// header for the main repo, 'gitmodules', header for the sub repo, 'a', and 'x' in the sub repo
185+
assertThat(files).hasSize(5);
186+
VirtualFile header = files.get(2);
187+
assertThat(header.path()).isEqualTo(CommitGraphCollector.HEADER_FILE);
188+
JsonNode tree = assertValidJson(header::writeTo).get("tree");
189+
assertThat(tree.isArray()).isTrue();
190+
191+
List<String> paths = new ArrayList<>();
192+
for (JsonNode i : tree) {
193+
paths.add(i.get("path").asText());
194+
}
195+
196+
assertThat(paths).containsExactly("a", "x");
197+
}
198+
}
199+
166200
/**
167201
* Initialize a repository with a submodule.
168202
*
@@ -172,12 +206,13 @@ private PersonIdent setupRepos() throws Exception {
172206
PersonIdent ident;
173207
try (Git subrepo = Git.init().setDirectory(subrepoDir).call()) {
174208
Files.writeString(subrepoDir.toPath().resolve("a"), "");
175-
RevCommit c = subrepo.commit().setAll(true).setMessage("sub").call();
209+
subrepo.add().addFilepattern("a").call();
210+
RevCommit c = commit(subrepo).setMessage("sub").call();
176211
ident = c.getCommitterIdent();
177212
}
178213
try (Git mainrepo = Git.init().setDirectory(mainrepoDir).call()) {
179214
mainrepo.submoduleAdd().setPath("sub").setURI(subrepoDir.toURI().toString()).call();
180-
mainrepo.commit().setAll(true).setMessage("created a submodule").call();
215+
commit(mainrepo).setMessage("created a submodule").call();
181216
}
182217
return ident;
183218
}
@@ -186,7 +221,13 @@ private void addCommitInSubRepo(Git mainrepo) throws Exception {
186221
try (Git submodrepo =
187222
Git.wrap(SubmoduleWalk.getSubmoduleRepository(mainrepo.getRepository(), "sub"))) {
188223
Files.writeString(mainrepoDir.toPath().resolve("sub").resolve("x"), "");
189-
submodrepo.commit().setAll(true).setMessage("added x").call();
224+
submodrepo.add().addFilepattern("x").call();
225+
commit(submodrepo).setMessage("added x").call();
190226
}
191227
}
228+
229+
private CommitCommand commit(Git r) {
230+
return r.commit().setAll(true).setSign(false);
231+
}
232+
192233
}

src/test/java/com/launchableinc/ingest/commits/MainTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import java.net.InetSocketAddress;
88
import java.net.URL;
99
import java.nio.file.Files;
10+
11+
import org.eclipse.jgit.api.CommitCommand;
1012
import org.eclipse.jgit.api.Git;
1113
import org.eclipse.jgit.lib.ObjectId;
1214
import org.eclipse.jgit.revwalk.RevCommit;
@@ -32,12 +34,12 @@ public void specifySubmodule() throws Exception {
3234
RevCommit subCommit;
3335
try (Git subrepo = Git.init().setDirectory(subrepoDir).call()) {
3436
Files.writeString(subrepoDir.toPath().resolve("a"), "");
35-
subCommit = subrepo.commit().setAll(true).setMessage("sub").call();
37+
subCommit = commit(subrepo).setMessage("sub").call();
3638
}
3739
RevCommit mainCommit;
3840
try (Git mainrepo = Git.init().setDirectory(mainrepoDir).call()) {
3941
mainrepo.submoduleAdd().setPath("sub").setURI(subrepoDir.toURI().toString()).call();
40-
mainCommit = mainrepo.commit().setAll(true).setMessage("created a submodule").call();
42+
mainCommit = commit(mainrepo).setMessage("created a submodule").call();
4143
}
4244

4345
mockServerClient
@@ -73,4 +75,8 @@ public void specifySubmodule() throws Exception {
7375
main.launchableToken = "v1:testorg/testws:dummy-token";
7476
main.run();
7577
}
78+
79+
private CommitCommand commit(Git r) {
80+
return r.commit().setAll(true).setSign(false);
81+
}
7682
}

src/test/java/com/launchableinc/ingest/commits/ProgressReportingConsumerTest.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class ProgressReportingConsumerTest {
1313
@Test
1414
public void basic() throws IOException {
1515
List<String> done = new ArrayList<>();
16-
try (ProgressReportingConsumer<String> x = new ProgressReportingConsumer<>(flushableConsumer(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) {
16+
try (ProgressReportingConsumer<String> x = new ProgressReportingConsumer<>(FlushableConsumer.of(s -> {done.add(s);sleep();}), String::valueOf, Duration.ofMillis(100))) {
1717
for (int i = 0; i < 100; i++) {
1818
x.accept("item " + i);
1919
}
@@ -28,18 +28,4 @@ private static void sleep() {
2828
throw new UnsupportedOperationException();
2929
}
3030
}
31-
32-
private <T> FlushableConsumer<T> flushableConsumer(Consumer<T> c) {
33-
return new FlushableConsumer<T>() {
34-
@Override
35-
public void flush() throws IOException {
36-
// noop
37-
}
38-
39-
@Override
40-
public void accept(T t) {
41-
c.accept(t);
42-
}
43-
};
44-
}
4531
}

0 commit comments

Comments
 (0)