Skip to content

Commit dbdbb76

Browse files
committed
[OnDiskGraphDB] Add capability for "chaining" OnDiskGraphDB instances
This is useful for one instance ('primary') to be able to fault-in nodes from another instance ('upstream').
1 parent 871a80f commit dbdbb76

File tree

3 files changed

+456
-32
lines changed

3 files changed

+456
-32
lines changed

llvm/include/llvm/CAS/OnDiskGraphDB.h

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ class InternalRefArrayRef {
173173
struct OnDiskContent;
174174

175175
/// Reference to a node. The node's data may not be stored in the database.
176+
/// An \p ObjectID instance can only be used with the \p OnDiskGraphDB instance
177+
/// it came from. \p ObjectIDs from different \p OnDiskGraphDB instances are not
178+
/// comparable.
176179
class ObjectID {
177180
public:
178181
uint64_t getOpaqueData() const { return Opaque; }
@@ -273,10 +276,12 @@ class OnDiskGraphDB {
273276
/// Get an existing reference to the object \p Digest.
274277
///
275278
/// Returns \p nullopt if the object is not stored in this CAS.
276-
std::optional<ObjectID> getExistingReference(ArrayRef<uint8_t> Digest) const;
279+
std::optional<ObjectID> getExistingReference(ArrayRef<uint8_t> Digest);
277280

278281
/// \returns true if the object associated with \p Ref is stored in the CAS.
279-
bool containsObject(ObjectID Ref) const;
282+
bool containsObject(ObjectID Ref) const {
283+
return containsObject(Ref, /*CheckUpstream=*/true);
284+
}
280285

281286
/// \returns the data part of the provided object handle.
282287
ArrayRef<char> getObjectData(ObjectHandle Node) const;
@@ -288,8 +293,31 @@ class OnDiskGraphDB {
288293

289294
void print(raw_ostream &OS) const;
290295

296+
/// How to fault-in nodes if an upstream database is used.
297+
enum class FaultInPolicy {
298+
/// Copy only the requested node.
299+
SingleNode,
300+
/// Copy the the entire graph of a node.
301+
FullTree,
302+
};
303+
304+
/// Open the on-disk store from a directory.
305+
///
306+
/// \param Path directory for the on-disk store. The directory will be created
307+
/// if it doesn't exist.
308+
/// \param HashName Identifier name for the hashing algorithm that is going to
309+
/// be used.
310+
/// \param HashByteSize Size for the object digest hash bytes.
311+
/// \param UpstreamDB Optional on-disk store to be used for faulting-in nodes
312+
/// if they don't exist in the primary store. The upstream store is only used
313+
/// for reading nodes, new nodes are only written to the primary store.
314+
/// \param Policy If \p UpstreamDB is provided, controls how nodes are copied
315+
/// to primary store. This is recorded at creation time and subsequent opens
316+
/// need to pass the same policy otherwise the \p open will fail.
291317
static Expected<std::unique_ptr<OnDiskGraphDB>>
292-
open(StringRef Path, StringRef HashName, unsigned HashByteSize);
318+
open(StringRef Path, StringRef HashName, unsigned HashByteSize,
319+
std::unique_ptr<OnDiskGraphDB> UpstreamDB = nullptr,
320+
FaultInPolicy Policy = FaultInPolicy::FullTree);
293321

294322
~OnDiskGraphDB();
295323

@@ -298,6 +326,14 @@ class OnDiskGraphDB {
298326
class TempFile;
299327
class MappedTempFile;
300328

329+
bool containsObject(ObjectID Ref, bool CheckUpstream) const;
330+
331+
/// When \p load is called for a node that doesn't exist, this function tries
332+
/// to load it from the upstream store and copy it to the primary one.
333+
Expected<std::optional<ObjectHandle>> faultInFromUpstream(ObjectID PrimaryID);
334+
Error importFullTree(ObjectID PrimaryID, ObjectHandle UpstreamNode);
335+
Error importSingleNode(ObjectID PrimaryID, ObjectHandle UpstreamNode);
336+
301337
IndexProxy indexHash(ArrayRef<uint8_t> Hash);
302338

303339
Error createStandaloneLeaf(IndexProxy &I, ArrayRef<char> Data);
@@ -306,13 +342,15 @@ class OnDiskGraphDB {
306342

307343
OnDiskContent getContentFromHandle(ObjectHandle H) const;
308344

309-
InternalRef getInternalRef(ObjectID Ref) const {
345+
static InternalRef getInternalRef(ObjectID Ref) {
310346
return InternalRef::getFromRawData(Ref.getOpaqueData());
311347
}
312-
ObjectID getExternalReference(InternalRef Ref) const {
348+
static ObjectID getExternalReference(InternalRef Ref) {
313349
return ObjectID::fromOpaqueData(Ref.getRawData());
314350
}
315351

352+
static ObjectID getExternalReference(const IndexProxy &I);
353+
316354
void getStandalonePath(StringRef FileSuffix, const IndexProxy &I,
317355
SmallVectorImpl<char> &Path) const;
318356

@@ -321,15 +359,17 @@ class OnDiskGraphDB {
321359

322360
IndexProxy getIndexProxyFromRef(InternalRef Ref) const;
323361

324-
InternalRef makeInternalRef(FileOffset IndexOffset) const;
362+
static InternalRef makeInternalRef(FileOffset IndexOffset);
325363

326364
IndexProxy
327365
getIndexProxyFromPointer(OnDiskHashMappedTrie::const_pointer P) const;
328366

329367
InternalRefArrayRef getInternalRefs(ObjectHandle Node) const;
330368

331369
OnDiskGraphDB(StringRef RootPath, OnDiskHashMappedTrie Index,
332-
OnDiskDataAllocator DataPool);
370+
OnDiskDataAllocator DataPool,
371+
std::unique_ptr<OnDiskGraphDB> UpstreamDB,
372+
FaultInPolicy Policy);
333373

334374
/// Mapping from hash to object reference.
335375
///
@@ -344,6 +384,10 @@ class OnDiskGraphDB {
344384
void *StandaloneData; // a StandaloneDataMap.
345385

346386
std::string RootPath;
387+
388+
/// Optional on-disk store to be used for faulting-in nodes.
389+
std::unique_ptr<OnDiskGraphDB> UpstreamDB;
390+
FaultInPolicy FIPolicy;
347391
};
348392

349393
} // namespace llvm::cas::ondisk

llvm/lib/CAS/OnDiskGraphDB.cpp

Lines changed: 164 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -937,18 +937,35 @@ OnDiskGraphDB::IndexProxy OnDiskGraphDB::getIndexProxyFromPointer(
937937

938938
ObjectID OnDiskGraphDB::getReference(ArrayRef<uint8_t> Hash) {
939939
IndexProxy I = indexHash(Hash);
940+
return getExternalReference(I);
941+
}
942+
943+
ObjectID OnDiskGraphDB::getExternalReference(const IndexProxy &I) {
940944
return getExternalReference(makeInternalRef(I.Offset));
941945
}
942946

943947
std::optional<ObjectID>
944-
OnDiskGraphDB::getExistingReference(ArrayRef<uint8_t> Digest) const {
948+
OnDiskGraphDB::getExistingReference(ArrayRef<uint8_t> Digest) {
949+
auto tryUpstream =
950+
[&](std::optional<IndexProxy> I) -> std::optional<ObjectID> {
951+
if (!UpstreamDB)
952+
return std::nullopt;
953+
std::optional<ObjectID> UpstreamID =
954+
UpstreamDB->getExistingReference(Digest);
955+
if (!UpstreamID)
956+
return std::nullopt;
957+
if (!I)
958+
I.emplace(indexHash(Digest));
959+
return getExternalReference(*I);
960+
};
961+
945962
OnDiskHashMappedTrie::const_pointer P = Index.find(Digest);
946963
if (!P)
947-
return std::nullopt;
964+
return tryUpstream(std::nullopt);
948965
IndexProxy I = getIndexProxyFromPointer(P);
949966
TrieRecord::Data Obj = I.Ref.load();
950967
if (Obj.SK == TrieRecord::StorageKind::Unknown)
951-
return std::nullopt;
968+
return tryUpstream(I);
952969
return getExternalReference(makeInternalRef(I.Offset));
953970
}
954971

@@ -991,8 +1008,11 @@ OnDiskGraphDB::load(ObjectID ExternalRef) {
9911008
IndexProxy I = getIndexProxyFromRef(Ref);
9921009
TrieRecord::Data Object = I.Ref.load();
9931010

994-
if (Object.SK == TrieRecord::StorageKind::Unknown)
995-
return std::nullopt;
1011+
if (Object.SK == TrieRecord::StorageKind::Unknown) {
1012+
if (!UpstreamDB)
1013+
return std::nullopt;
1014+
return faultInFromUpstream(ExternalRef);
1015+
}
9961016

9971017
auto toObjectHandle = [](InternalHandle H) -> ObjectHandle {
9981018
return ObjectHandle::fromOpaqueData(H.getRawData());
@@ -1035,14 +1055,21 @@ OnDiskGraphDB::load(ObjectID ExternalRef) {
10351055
->insert(I.Hash, Object.SK, std::move(*OwnedBuffer))));
10361056
}
10371057

1038-
bool OnDiskGraphDB::containsObject(ObjectID ExternalRef) const {
1058+
bool OnDiskGraphDB::containsObject(ObjectID ExternalRef,
1059+
bool CheckUpstream) const {
10391060
InternalRef Ref = getInternalRef(ExternalRef);
10401061
IndexProxy I = getIndexProxyFromRef(Ref);
10411062
TrieRecord::Data Object = I.Ref.load();
1042-
return Object.SK != TrieRecord::StorageKind::Unknown;
1063+
if (Object.SK != TrieRecord::StorageKind::Unknown)
1064+
return true;
1065+
if (!CheckUpstream || !UpstreamDB)
1066+
return false;
1067+
std::optional<ObjectID> UpstreamID =
1068+
UpstreamDB->getExistingReference(getDigest(I));
1069+
return UpstreamID.has_value();
10431070
}
10441071

1045-
InternalRef OnDiskGraphDB::makeInternalRef(FileOffset IndexOffset) const {
1072+
InternalRef OnDiskGraphDB::makeInternalRef(FileOffset IndexOffset) {
10461073
return InternalRef::getFromOffset(IndexOffset);
10471074
}
10481075

@@ -1260,9 +1287,9 @@ Error OnDiskGraphDB::store(ObjectID ID, ArrayRef<ObjectID> Refs,
12601287
return Error::success();
12611288
}
12621289

1263-
Expected<std::unique_ptr<OnDiskGraphDB>>
1264-
OnDiskGraphDB::open(StringRef AbsPath, StringRef HashName,
1265-
unsigned HashByteSize) {
1290+
Expected<std::unique_ptr<OnDiskGraphDB>> OnDiskGraphDB::open(
1291+
StringRef AbsPath, StringRef HashName, unsigned HashByteSize,
1292+
std::unique_ptr<OnDiskGraphDB> UpstreamDB, FaultInPolicy Policy) {
12661293
if (std::error_code EC = sys::fs::create_directories(AbsPath))
12671294
return createFileError(AbsPath, EC);
12681295

@@ -1280,21 +1307,27 @@ OnDiskGraphDB::open(StringRef AbsPath, StringRef HashName,
12801307
return std::move(E);
12811308

12821309
std::optional<OnDiskDataAllocator> DataPool;
1310+
StringRef PolicyName =
1311+
Policy == FaultInPolicy::SingleNode ? "single" : "full";
12831312
if (Error E = OnDiskDataAllocator::create(
12841313
AbsPath + Slash + FilePrefix + DataPoolFile,
1285-
DataPoolTableName + "[" + HashName + "]",
1314+
DataPoolTableName + "[" + HashName + "]" + PolicyName,
12861315
/*MaxFileSize=*/16 * GB, /*MinFileSize=*/MB)
12871316
.moveInto(DataPool))
12881317
return std::move(E);
12891318

12901319
return std::unique_ptr<OnDiskGraphDB>(
1291-
new OnDiskGraphDB(AbsPath, std::move(*Index), std::move(*DataPool)));
1320+
new OnDiskGraphDB(AbsPath, std::move(*Index), std::move(*DataPool),
1321+
std::move(UpstreamDB), Policy));
12921322
}
12931323

12941324
OnDiskGraphDB::OnDiskGraphDB(StringRef RootPath, OnDiskHashMappedTrie Index,
1295-
OnDiskDataAllocator DataPool)
1325+
OnDiskDataAllocator DataPool,
1326+
std::unique_ptr<OnDiskGraphDB> UpstreamDB,
1327+
FaultInPolicy Policy)
12961328
: Index(std::move(Index)), DataPool(std::move(DataPool)),
1297-
RootPath(RootPath.str()) {
1329+
RootPath(RootPath.str()), UpstreamDB(std::move(UpstreamDB)),
1330+
FIPolicy(Policy) {
12981331
/// Lifetime for "big" objects not in DataPool.
12991332
///
13001333
/// NOTE: Could use ThreadSafeHashMappedTrie here. For now, doing something
@@ -1310,3 +1343,119 @@ OnDiskGraphDB::OnDiskGraphDB(StringRef RootPath, OnDiskHashMappedTrie Index,
13101343
OnDiskGraphDB::~OnDiskGraphDB() {
13111344
delete static_cast<StandaloneDataMapTy *>(StandaloneData);
13121345
}
1346+
1347+
Error OnDiskGraphDB::importFullTree(ObjectID PrimaryID,
1348+
ObjectHandle UpstreamNode) {
1349+
// Copies the full CAS tree from upstream. Uses depth-first copying to protect
1350+
// against the process dying during importing and leaving the database with an
1351+
// incomplete tree. Note that if the upstream has missing nodes then the tree
1352+
// will be copied with missing nodes as well, it won't be considered an error.
1353+
1354+
struct UpstreamCursor {
1355+
ObjectHandle Node;
1356+
size_t RefsCount;
1357+
object_refs_iterator RefI;
1358+
object_refs_iterator RefE;
1359+
};
1360+
/// Keeps track of the state of visitation for current node and all of its
1361+
/// parents.
1362+
SmallVector<UpstreamCursor, 16> CursorStack;
1363+
/// Keeps track of the currently visited nodes as they are imported into
1364+
/// primary database, from current node and its parents. When a node is
1365+
/// entered for visitation it appends its own ID, then appends referenced IDs
1366+
/// as they get imported. When a node is fully imported it removes the
1367+
/// referenced IDs from the bottom of the stack which leaves its own ID at the
1368+
/// bottom, adding to the list of referenced IDs for the parent node.
1369+
SmallVector<ObjectID, 128> PrimaryNodesStack;
1370+
1371+
auto enqueueNode = [&](ObjectID PrimaryID, std::optional<ObjectHandle> Node) {
1372+
PrimaryNodesStack.push_back(PrimaryID);
1373+
if (!Node)
1374+
return;
1375+
auto Refs = UpstreamDB->getObjectRefs(*Node);
1376+
CursorStack.push_back({*Node,
1377+
(size_t)std::distance(Refs.begin(), Refs.end()),
1378+
Refs.begin(), Refs.end()});
1379+
};
1380+
1381+
enqueueNode(PrimaryID, UpstreamNode);
1382+
1383+
while (!CursorStack.empty()) {
1384+
UpstreamCursor &Cur = CursorStack.back();
1385+
if (Cur.RefI == Cur.RefE) {
1386+
// Copy the node data into the primary store.
1387+
// FIXME: Use hard-link or cloning if the file-system supports it and data
1388+
// is stored into a separate file.
1389+
1390+
// The bottom of \p PrimaryNodesStack contains the primary ID for the
1391+
// current node plus the list of imported referenced IDs.
1392+
assert(PrimaryNodesStack.size() >= Cur.RefsCount + 1);
1393+
ObjectID PrimaryID = *(PrimaryNodesStack.end() - Cur.RefsCount - 1);
1394+
auto PrimaryRefs = ArrayRef(PrimaryNodesStack)
1395+
.slice(PrimaryNodesStack.size() - Cur.RefsCount);
1396+
auto Data = UpstreamDB->getObjectData(Cur.Node);
1397+
if (Error E = store(PrimaryID, PrimaryRefs, Data))
1398+
return E;
1399+
// Remove the current node and its IDs from the stack.
1400+
PrimaryNodesStack.truncate(PrimaryNodesStack.size() - Cur.RefsCount);
1401+
CursorStack.pop_back();
1402+
continue;
1403+
}
1404+
1405+
ObjectID UpstreamID = *(Cur.RefI++);
1406+
ObjectID PrimaryID = getReference(UpstreamDB->getDigest(UpstreamID));
1407+
if (containsObject(PrimaryID, /*CheckUpstream=*/false)) {
1408+
// This \p ObjectID already exists in the primary. Either it was imported
1409+
// via \p importFullTree or the client created it, in which case the
1410+
// client takes responsibility for how it was formed.
1411+
enqueueNode(PrimaryID, std::nullopt);
1412+
continue;
1413+
}
1414+
Expected<std::optional<ObjectHandle>> UpstreamNode =
1415+
UpstreamDB->load(UpstreamID);
1416+
if (!UpstreamNode)
1417+
return UpstreamNode.takeError();
1418+
enqueueNode(PrimaryID, *UpstreamNode);
1419+
}
1420+
1421+
assert(PrimaryNodesStack.size() == 1);
1422+
assert(PrimaryNodesStack.front() == PrimaryID);
1423+
return Error::success();
1424+
}
1425+
1426+
Error OnDiskGraphDB::importSingleNode(ObjectID PrimaryID,
1427+
ObjectHandle UpstreamNode) {
1428+
// Copies only a single node, it doesn't copy the referenced nodes.
1429+
1430+
// Copy the node data into the primary store.
1431+
// FIXME: Use hard-link or cloning if the file-system supports it and data is
1432+
// stored into a separate file.
1433+
1434+
auto Data = UpstreamDB->getObjectData(UpstreamNode);
1435+
auto UpstreamRefs = UpstreamDB->getObjectRefs(UpstreamNode);
1436+
SmallVector<ObjectID, 64> Refs;
1437+
Refs.reserve(std::distance(UpstreamRefs.begin(), UpstreamRefs.end()));
1438+
for (ObjectID UpstreamRef : UpstreamRefs)
1439+
Refs.push_back(getReference(UpstreamDB->getDigest(UpstreamRef)));
1440+
1441+
return store(PrimaryID, Refs, Data);
1442+
}
1443+
1444+
Expected<std::optional<ObjectHandle>>
1445+
OnDiskGraphDB::faultInFromUpstream(ObjectID PrimaryID) {
1446+
assert(UpstreamDB);
1447+
1448+
ObjectID UpstreamID = UpstreamDB->getReference(getDigest(PrimaryID));
1449+
Expected<std::optional<ObjectHandle>> UpstreamNode =
1450+
UpstreamDB->load(UpstreamID);
1451+
if (!UpstreamNode)
1452+
return UpstreamNode.takeError();
1453+
if (!*UpstreamNode)
1454+
return std::nullopt;
1455+
1456+
if (Error E = FIPolicy == FaultInPolicy::SingleNode
1457+
? importSingleNode(PrimaryID, **UpstreamNode)
1458+
: importFullTree(PrimaryID, **UpstreamNode))
1459+
return std::move(E);
1460+
return load(PrimaryID);
1461+
}

0 commit comments

Comments
 (0)