Skip to content

Commit 2073940

Browse files
committed
[CAS] Introduce changes in ObjectStore/ActionCache to better accomodate distributed caching
There are two main changes: * Introduce `Globally` parameter to `ActionCache::get/put`. This is a hint to the underlying implementation that the key association and the associated object is profitable to be globally distributed. * `ObjectStore::loadIfExists` can be called asynchronously and can return `std::nullopt`. With support from the underlying implementation, these changes allow downloading the outputs of the compilation concurrently and lazily (e.g. only downloading the 'main' module file output). With `std::nullopt` the implementation can indicate that one of the requested outputs was missing from the CAS. `CompileJobCacheResult::forEachLoadedOutput` is introduced to enable concurrent downloading of outputs and `ObjectStoreCachingOutputs` is enhanced to take advantage of it. `libCASPluginTest` is enhanced to simulate "uploading"/"downloading" objects for testing purposes. Make sure to handle multiple errors by joining them. (cherry picked from commit d4a0c07)
1 parent 84ec9db commit 2073940

22 files changed

+621
-124
lines changed

clang/include/clang/Frontend/CASDependencyCollector.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ class CASDependencyCollector : public DependencyFileGenerator {
3636
/// \param Opts Output options. Only options that affect the output format of
3737
/// a dependency file are signficant.
3838
/// \param CAS The CAS to read the result from.
39-
/// \param DepsRef The dependencies.
39+
/// \param Deps The dependencies.
4040
/// \param OS The output stream to write the dependency file to.
4141
static llvm::Error replay(const DependencyOutputOptions &Opts,
42-
cas::ObjectStore &CAS, cas::ObjectRef DepsRef,
42+
cas::ObjectStore &CAS, cas::ObjectProxy Deps,
4343
llvm::raw_ostream &OS);
4444

4545
private:

clang/include/clang/Frontend/CompileJobCacheResult.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,21 @@ class CompileJobCacheResult : public ObjectProxy {
4646
llvm::Error
4747
forEachOutput(llvm::function_ref<llvm::Error(Output)> Callback) const;
4848

49+
/// Loads all outputs concurrently and passes the resulting \c ObjectProxy
50+
/// objects to \p Callback. If there was an error during loading then the
51+
/// callback will not be invoked.
52+
llvm::Error forEachLoadedOutput(
53+
llvm::function_ref<llvm::Error(Output, std::optional<ObjectProxy>)>
54+
Callback);
55+
4956
size_t getNumOutputs() const;
5057

5158
/// Retrieves a specific output specified by \p Kind, if it exists.
5259
Optional<Output> getOutput(OutputKind Kind) const;
5360

61+
/// \returns a string for the given \p Kind.
62+
static StringRef getOutputKindName(OutputKind Kind);
63+
5464
/// Print this result to \p OS.
5565
llvm::Error print(llvm::raw_ostream &OS);
5666

clang/lib/CAS/CASOptions.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void CASOptions::initCache(DiagnosticsEngine &Diags) const {
8686
StringRef CASPath = Cache.Config.CASPath;
8787

8888
if (!PluginPath.empty()) {
89-
std::pair<std::unique_ptr<ObjectStore>, std::unique_ptr<ActionCache>> DBs;
89+
std::pair<std::shared_ptr<ObjectStore>, std::shared_ptr<ActionCache>> DBs;
9090
if (llvm::Error E =
9191
createPluginCASDatabases(PluginPath, CASPath, PluginOptions)
9292
.moveInto(DBs)) {

clang/lib/Frontend/CASDependencyCollector.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,8 @@ CASDependencyCollector::CASDependencyCollector(
3131
CAS(CAS), Callback(std::move(Callback)) {}
3232

3333
llvm::Error CASDependencyCollector::replay(const DependencyOutputOptions &Opts,
34-
ObjectStore &CAS, ObjectRef DepsRef,
34+
ObjectStore &CAS, ObjectProxy Refs,
3535
llvm::raw_ostream &OS) {
36-
auto Refs = CAS.getProxy(DepsRef);
37-
if (!Refs)
38-
return Refs.takeError();
39-
4036
CASDependencyCollector DC(Opts, CAS, nullptr);
4137

4238
// Add the filenames from DependencyOutputOptions::ExtraDeps. These are kept
@@ -47,7 +43,7 @@ llvm::Error CASDependencyCollector::replay(const DependencyOutputOptions &Opts,
4743
DC.addDependency(Dep.first);
4844
}
4945

50-
auto Err = Refs->forEachReference([&](ObjectRef Ref) -> llvm::Error {
46+
auto Err = Refs.forEachReference([&](ObjectRef Ref) -> llvm::Error {
5147
auto PathHandle = CAS.getProxy(Ref);
5248
if (!PathHandle)
5349
return PathHandle.takeError();

clang/lib/Frontend/CompileJobCacheResult.cpp

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,43 @@ Error CompileJobCacheResult::forEachOutput(
3434
return Error::success();
3535
}
3636

37+
Error CompileJobCacheResult::forEachLoadedOutput(
38+
llvm::function_ref<Error(Output, std::optional<ObjectProxy>)> Callback) {
39+
// Kick-off materialization for all outputs concurrently.
40+
SmallVector<std::future<Expected<std::optional<ObjectProxy>>>, 4>
41+
FutureOutputs;
42+
size_t Count = getNumOutputs();
43+
for (size_t I = 0; I < Count; ++I) {
44+
ObjectRef Ref = getOutputObject(I);
45+
FutureOutputs.push_back(getCAS().getProxyAsync(Ref));
46+
}
47+
48+
// Make sure all the outputs have materialized.
49+
std::optional<Error> OccurredError;
50+
SmallVector<std::optional<ObjectProxy>, 4> Outputs;
51+
for (auto &FutureOutput : FutureOutputs) {
52+
auto Obj = FutureOutput.get();
53+
if (!Obj) {
54+
if (!OccurredError)
55+
OccurredError = Obj.takeError();
56+
else
57+
OccurredError =
58+
llvm::joinErrors(std::move(*OccurredError), Obj.takeError());
59+
continue;
60+
}
61+
Outputs.push_back(*Obj);
62+
}
63+
if (OccurredError)
64+
return std::move(*OccurredError);
65+
66+
// Pass the loaded outputs.
67+
for (size_t I = 0; I < Count; ++I) {
68+
if (auto Err = Callback({getOutputObject(I), getOutputKind(I)}, Outputs[I]))
69+
return Err;
70+
}
71+
return Error::success();
72+
}
73+
3774
Optional<CompileJobCacheResult::Output>
3875
CompileJobCacheResult::getOutput(OutputKind Kind) const {
3976
size_t Count = getNumOutputs();
@@ -45,25 +82,21 @@ CompileJobCacheResult::getOutput(OutputKind Kind) const {
4582
return None;
4683
}
4784

48-
static void printOutputKind(llvm::raw_ostream &OS,
49-
CompileJobCacheResult::OutputKind Kind) {
85+
StringRef CompileJobCacheResult::getOutputKindName(OutputKind Kind) {
5086
switch (Kind) {
51-
case CompileJobCacheResult::OutputKind::MainOutput:
52-
OS << "main ";
53-
break;
54-
case CompileJobCacheResult::OutputKind::Dependencies:
55-
OS << "deps ";
56-
break;
57-
case CompileJobCacheResult::OutputKind::SerializedDiagnostics:
58-
OS << "diags ";
59-
break;
87+
case OutputKind::MainOutput:
88+
return "main";
89+
case OutputKind::SerializedDiagnostics:
90+
return "deps";
91+
case OutputKind::Dependencies:
92+
return "diags";
6093
}
6194
}
6295

6396
Error CompileJobCacheResult::print(llvm::raw_ostream &OS) {
6497
return forEachOutput([&](Output O) -> Error {
65-
printOutputKind(OS, O.Kind);
66-
OS << ' ' << getCAS().getID(O.Object) << '\n';
98+
OS << getOutputKindName(O.Kind) << " " << getCAS().getID(O.Object)
99+
<< '\n';
67100
return Error::success();
68101
});
69102
}

clang/lib/Frontend/CompilerInstance.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2401,6 +2401,10 @@ static bool addCachedModuleFileToInMemoryCache(
24012401
Result->getOutput(cas::CompileJobCacheResult::OutputKind::MainOutput);
24022402
if (!Output)
24032403
llvm::report_fatal_error("missing main output");
2404+
// FIXME: We wait to materialize each module file before proceeding, which
2405+
// introduces latency for a network CAS. Instead we should collect all the
2406+
// module keys and materialize them concurrently using \c getProxyAsync, for
2407+
// better network utilization.
24042408
auto OutputProxy = CAS.getProxy(Output->Object);
24052409
if (!OutputProxy) {
24062410
Diags.Report(diag::err_cas_cannot_get_module_cache_key)

clang/test/CAS/plugin-cas.c

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,47 @@
22

33
// RUN: rm -rf %t && mkdir -p %t
44

5-
// RUN: %clang -cc1depscan -o %t/t.rsp -fdepscan=inline -cc1-args \
5+
// RUN: %clang -cc1depscan -o %t/t1.rsp -fdepscan=inline -cc1-args \
66
// RUN: -cc1 %s -fcas-path %t/cas \
77
// RUN: -fcas-plugin-path %llvmshlibdir/libCASPluginTest%pluginext \
8-
// RUN: -fcas-plugin-option first-prefix=myfirst- -fcas-plugin-option second-prefix=mysecond-
9-
// RUN: %clang @%t/t.rsp -emit-obj -o %t/t1.o -Rcompile-job-cache 2>&1 | FileCheck %s --check-prefix=CACHE-MISS
10-
// RUN: %clang @%t/t.rsp -emit-obj -o %t/t2.o -Rcompile-job-cache 2>&1 | FileCheck %s --check-prefix=CACHE-HIT
8+
// RUN: -fcas-plugin-option first-prefix=myfirst- -fcas-plugin-option second-prefix=mysecond- \
9+
// RUN: -fcas-plugin-option upstream-path=%t/cas-upstream
10+
// RUN: %clang @%t/t1.rsp -emit-obj -o %t/t1.o -Rcompile-job-cache 2>&1 | FileCheck %s --check-prefix=CACHE-MISS
11+
12+
// Clear the CAS and check the outputs can still be "downloaded" from upstream.
13+
// RUN: rm -rf %t/cas
14+
// RUN: %clang -cc1depscan -o %t/t2.rsp -fdepscan=inline -cc1-args \
15+
// RUN: -cc1 %s -fcas-path %t/cas \
16+
// RUN: -fcas-plugin-path %llvmshlibdir/libCASPluginTest%pluginext \
17+
// RUN: -fcas-plugin-option first-prefix=myfirst- -fcas-plugin-option second-prefix=mysecond- \
18+
// RUN: -fcas-plugin-option upstream-path=%t/cas-upstream
19+
// RUN: %clang @%t/t2.rsp -emit-obj -o %t/t2.o -Rcompile-job-cache 2>&1 | FileCheck %s --check-prefix=CACHE-HIT
1120
// RUN: diff %t/t1.o %t/t2.o
1221

22+
// Check that it's a cache miss if outputs are not found in the upstream CAS.
23+
// RUN: rm -rf %t/cas
24+
// RUN: %clang -cc1depscan -o %t/t3.rsp -fdepscan=inline -cc1-args \
25+
// RUN: -cc1 %s -fcas-path %t/cas \
26+
// RUN: -fcas-plugin-path %llvmshlibdir/libCASPluginTest%pluginext \
27+
// RUN: -fcas-plugin-option first-prefix=myfirst- -fcas-plugin-option second-prefix=mysecond- \
28+
// RUN: -fcas-plugin-option upstream-path=%t/cas-upstream \
29+
// RUN: -fcas-plugin-option simulate-missing-objects
30+
// RUN: %clang @%t/t3.rsp -emit-obj -o %t/t.o -Rcompile-job-cache 2>&1 | FileCheck %s --check-prefix=CACHE-NOTFOUND
31+
1332
// CACHE-MISS: remark: compile job cache miss for 'myfirst-mysecond-
1433
// CACHE-MISS: warning: some warning
1534

16-
// CACHE-HIT: remark: compile job cache hit for 'myfirst-mysecond-
17-
// CACHE-HIT: warning: some warning
35+
// Check that outputs are downloaded concurrently.
36+
// CACHE-HIT: load_object_async downstream begin:
37+
// CACHE-HIT-NEXT: load_object_async downstream begin:
38+
// CACHE-HIT-NEXT: load_object_async downstream end:
39+
// CACHE-HIT-NEXT: load_object_async downstream end:
40+
// CACHE-HIT-NEXT: remark: compile job cache hit for 'myfirst-mysecond-
41+
// CACHE-HIT-NEXT: warning: some warning
42+
43+
// CACHE-NOTFOUND: remark: compile job cache backend did not find output 'main' for key
44+
// CACHE-NOTFOUND: remark: compile job cache miss
45+
// CACHE-NOTFOUND: warning: some warning
1846

1947
// RUN: not %clang -cc1depscan -o %t/t.rsp -fdepscan=inline -cc1-args \
2048
// RUN: -cc1 %s -fcas-path %t/cas \

clang/tools/driver/cc1_main.cpp

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,9 @@ class ObjectStoreCachingOutputs : public CachingOutputs {
273273
/// Replay a cache hit.
274274
///
275275
/// Return status if should exit immediately, otherwise None.
276-
Optional<int> replayCachedResult(llvm::cas::ObjectRef ResultID,
277-
bool JustComputedResult);
276+
std::optional<int> replayCachedResult(const llvm::cas::CASID &ResultCacheKey,
277+
llvm::cas::ObjectRef ResultID,
278+
bool JustComputedResult);
278279

279280
std::shared_ptr<llvm::cas::ObjectStore> CAS;
280281
std::shared_ptr<llvm::cas::ActionCache> Cache;
@@ -561,7 +562,8 @@ Expected<bool> ObjectStoreCachingOutputs::tryReplayCachedResult(
561562
Diags.Report(diag::remark_compile_job_cache_timing_backend_key_query)
562563
<< llvm::format("%.6fs", Seconds);
563564
});
564-
if (Error E = Cache->get(ResultCacheKey).moveInto(Result))
565+
if (Error E =
566+
Cache->get(ResultCacheKey, /*Globally=*/true).moveInto(Result))
565567
return std::move(E);
566568
}
567569

@@ -583,11 +585,11 @@ Expected<bool> ObjectStoreCachingOutputs::tryReplayCachedResult(
583585
return false;
584586
}
585587

586-
Diags.Report(diag::remark_compile_job_cache_hit)
587-
<< ResultCacheKey.toString() << CAS->getID(*ResultRef).toString();
588-
Optional<int> Status =
589-
replayCachedResult(*ResultRef, /*JustComputedResult=*/false);
590-
assert(Status && "Expected a status for a cache hit");
588+
// \c replayCachedResult emits remarks for a cache hit or miss.
589+
std::optional<int> Status = replayCachedResult(ResultCacheKey, *ResultRef,
590+
/*JustComputedResult=*/false);
591+
if (!Status)
592+
return false; // cache miss.
591593
assert(*Status == 0 && "Expected success status for a cache hit");
592594
return true;
593595
}
@@ -770,22 +772,23 @@ Error ObjectStoreCachingOutputs::finishComputedResult(
770772
Diags.Report(diag::remark_compile_job_cache_timing_backend_key_update)
771773
<< llvm::format("%.6fs", Seconds);
772774
});
773-
if (llvm::Error E = Cache->put(ResultCacheKey, CAS->getID(*Result)))
775+
if (llvm::Error E =
776+
Cache->put(ResultCacheKey, CAS->getID(*Result), /*Globally=*/true))
774777
return E;
775778
}
776779

777780
// Replay / decanonicalize as necessary.
778-
Optional<int> Status = replayCachedResult(*Result,
779-
/*JustComputedResult=*/true);
781+
std::optional<int> Status = replayCachedResult(ResultCacheKey, *Result,
782+
/*JustComputedResult=*/true);
780783
(void)Status;
781784
assert(Status == None);
782785
return Error::success();
783786
}
784787

785788
/// Replay a result after a cache hit.
786-
Optional<int>
787-
ObjectStoreCachingOutputs::replayCachedResult(llvm::cas::ObjectRef ResultID,
788-
bool JustComputedResult) {
789+
std::optional<int> ObjectStoreCachingOutputs::replayCachedResult(
790+
const llvm::cas::CASID &ResultCacheKey, llvm::cas::ObjectRef ResultID,
791+
bool JustComputedResult) {
789792
if (JustComputedResult)
790793
return None;
791794

@@ -795,19 +798,30 @@ ObjectStoreCachingOutputs::replayCachedResult(llvm::cas::ObjectRef ResultID,
795798
if (Error E = Schema.load(ResultID).moveInto(Result))
796799
llvm::report_fatal_error(std::move(E));
797800

798-
auto Err = Result->forEachOutput([&](clang::cas::CompileJobCacheResult::Output
799-
O) -> Error {
801+
DiagnosticsEngine &Diags = Clang.getDiagnostics();
802+
bool HasMissingOutput = false;
803+
std::optional<llvm::cas::ObjectProxy> SerialDiags;
804+
805+
auto processOutput = [&](clang::cas::CompileJobCacheResult::Output O,
806+
std::optional<llvm::cas::ObjectProxy> Obj) -> Error {
807+
if (!Obj.has_value()) {
808+
Diags.Report(diag::remark_compile_job_cache_backend_output_not_found)
809+
<< clang::cas::CompileJobCacheResult::getOutputKindName(O.Kind)
810+
<< ResultCacheKey.toString() << CAS->getID(O.Object).toString();
811+
HasMissingOutput = true;
812+
return Error::success();
813+
}
814+
if (HasMissingOutput)
815+
return Error::success();
816+
800817
if (O.Kind == OutputKind::SerializedDiagnostics) {
801-
Optional<llvm::cas::ObjectProxy> DiagsObj;
802-
if (Error E = CAS->getProxy(O.Object).moveInto(DiagsObj))
803-
return E;
804-
return replayCachedDiagnostics(DiagsObj->getData());
818+
SerialDiags = Obj;
819+
return Error::success();
805820
}
806821

807822
std::string Path = std::string(getPathForOutputKind(O.Kind));
808823
if (Path.empty())
809-
// The output may be always generated but not needed with this invocation,
810-
// like the serialized diagnostics file.
824+
// The output may be always generated but not needed with this invocation.
811825
return Error::success(); // continue
812826

813827
// Always create parent directory of outputs, since it is hard to precisely
@@ -823,14 +837,11 @@ ObjectStoreCachingOutputs::replayCachedResult(llvm::cas::ObjectRef ResultID,
823837
if (O.Kind == OutputKind::Dependencies) {
824838
llvm::raw_svector_ostream OS(ContentsStorage);
825839
if (auto E = CASDependencyCollector::replay(
826-
Clang.getDependencyOutputOpts(), *CAS, O.Object, OS))
840+
Clang.getDependencyOutputOpts(), *CAS, *Obj, OS))
827841
return E;
828842
Contents = ContentsStorage;
829843
} else {
830-
Optional<llvm::cas::ObjectProxy> Bytes;
831-
if (Error E = CAS->getProxy(O.Object).moveInto(Bytes))
832-
return E;
833-
Contents = Bytes->getData();
844+
Contents = Obj->getData();
834845
}
835846

836847
std::unique_ptr<llvm::FileOutputBuffer> Output;
@@ -839,12 +850,29 @@ ObjectStoreCachingOutputs::replayCachedResult(llvm::cas::ObjectRef ResultID,
839850
return E;
840851
llvm::copy(*Contents, Output->getBufferStart());
841852
return Output->commit();
842-
});
853+
};
843854

844855
// FIXME: Stop calling report_fatal_error().
845-
if (Err)
856+
if (auto Err = Result->forEachLoadedOutput(processOutput))
846857
llvm::report_fatal_error(std::move(Err));
847858

859+
if (HasMissingOutput) {
860+
Diags.Report(diag::remark_compile_job_cache_miss)
861+
<< ResultCacheKey.toString();
862+
return std::nullopt;
863+
}
864+
865+
if (!JustComputedResult) {
866+
Diags.Report(diag::remark_compile_job_cache_hit)
867+
<< ResultCacheKey.toString() << CAS->getID(ResultID).toString();
868+
869+
if (SerialDiags) {
870+
// FIXME: Stop calling report_fatal_error().
871+
if (Error E = replayCachedDiagnostics(SerialDiags->getData()))
872+
llvm::report_fatal_error(std::move(E));
873+
}
874+
}
875+
848876
if (JustComputedResult)
849877
return None;
850878
return 0;

0 commit comments

Comments
 (0)