Skip to content

Commit 06eb62a

Browse files
authored
Merge pull request #2062 from apache/abderrahim/cascache-cleanup
Use CASDProcessManager directly to access REAPI stubs
2 parents 045c25c + 020e58f commit 06eb62a

File tree

9 files changed

+35
-133
lines changed

9 files changed

+35
-133
lines changed

src/buildstream/_assetcache.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ def push_directory(
264264
# to establish a connection to this remote at initialization time.
265265
#
266266
class RemotePair:
267-
def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec):
267+
def __init__(self, casd: CASDProcessManager, spec: RemoteSpec):
268268
self.index: Optional[AssetRemote] = None
269269
self.storage: Optional[CASRemote] = None
270270
self.error: Optional[str] = None
@@ -275,7 +275,7 @@ def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec):
275275
index.check()
276276
self.index = index
277277
if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]:
278-
storage = CASRemote(spec, cas)
278+
storage = CASRemote(spec, casd)
279279
storage.check()
280280
self.storage = storage
281281
except RemoteError as e:
@@ -322,7 +322,7 @@ def setup_remotes(self, specs: Iterable[RemoteSpec], project_specs: Dict[str, Li
322322
if spec in self._remotes:
323323
continue
324324

325-
remote = RemotePair(casd, self.cas, spec)
325+
remote = RemotePair(casd, spec)
326326
if remote.error:
327327
self.context.messenger.warn("Failed to initialize remote {}: {}".format(spec.url, remote.error))
328328

src/buildstream/_cas/cascache.py

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,9 @@ def __init__(self, path, *, casd, remote_cache=False):
7171
else:
7272
assert not self._remote_cache
7373

74-
self._default_remote = CASRemote(None, self)
74+
self._default_remote = CASRemote(None, casd)
7575
self._default_remote.init()
7676

77-
# get_cas():
78-
#
79-
# Return ContentAddressableStorage stub for buildbox-casd channel.
80-
#
81-
def get_cas(self):
82-
assert self._casd, "CASCache was created without buildbox-casd"
83-
return self._casd.get_cas()
84-
85-
# get_local_cas():
86-
#
87-
# Return LocalCAS stub for buildbox-casd channel.
88-
#
89-
def get_local_cas(self):
90-
assert self._casd, "CASCache was created without buildbox-casd"
91-
return self._casd.get_local_cas()
92-
9377
# preflight():
9478
#
9579
# Preflight check.
@@ -133,7 +117,7 @@ def contains_files(self, digests):
133117
# Returns: True if the directory is available in the local cache
134118
#
135119
def contains_directory(self, digest):
136-
local_cas = self.get_local_cas()
120+
local_cas = self._casd.get_local_cas()
137121

138122
# Without a remote cache, `FetchTree` simply checks the local cache.
139123
request = local_cas_pb2.FetchTreeRequest()
@@ -241,7 +225,7 @@ def checkout(self, dest, tree, *, can_link=False, _fetch=True):
241225
#
242226
def ensure_tree(self, tree):
243227
if self._remote_cache:
244-
local_cas = self.get_local_cas()
228+
local_cas = self._casd.get_local_cas()
245229

246230
request = local_cas_pb2.FetchTreeRequest()
247231
request.root_digest.CopyFrom(tree)
@@ -260,7 +244,7 @@ def ensure_tree(self, tree):
260244
# dir_digest (Digest): Digest object for the directory to fetch.
261245
#
262246
def fetch_directory(self, remote, dir_digest):
263-
local_cas = self.get_local_cas()
247+
local_cas = self._casd.get_local_cas()
264248

265249
request = local_cas_pb2.FetchTreeRequest()
266250
request.instance_name = remote.local_cas_instance_name
@@ -399,7 +383,7 @@ def add_objects(self, *, paths=None, buffers=None, instance_name=None):
399383
for path in paths:
400384
request.path.append(path)
401385

402-
local_cas = self.get_local_cas()
386+
local_cas = self._casd.get_local_cas()
403387

404388
response = local_cas.CaptureFiles(request)
405389

@@ -432,7 +416,7 @@ def add_objects(self, *, paths=None, buffers=None, instance_name=None):
432416
# (Digest): The digest of the imported directory
433417
#
434418
def import_directory(self, path: str, properties: Optional[List[str]] = None) -> SourceRef:
435-
local_cas = self.get_local_cas()
419+
local_cas = self._casd.get_local_cas()
436420

437421
request = local_cas_pb2.CaptureTreeRequest()
438422
request.path.append(path)
@@ -478,7 +462,7 @@ def import_directory(self, path: str, properties: Optional[List[str]] = None) ->
478462
#
479463
@contextlib.contextmanager
480464
def stage_directory(self, directory_digest):
481-
local_cas = self.get_local_cas()
465+
local_cas = self._casd.get_local_cas()
482466

483467
request = local_cas_pb2.StageTreeRequest()
484468
request.root_digest.CopyFrom(directory_digest)
@@ -535,7 +519,7 @@ def missing_blobs_for_directory(self, digest, *, remote=None):
535519
# Returns: List of missing Digest objects
536520
#
537521
def missing_blobs(self, blobs, *, remote=None):
538-
cas = self.get_cas()
522+
cas = self._casd.get_cas()
539523

540524
if remote:
541525
instance_name = remote.local_cas_instance_name
@@ -576,7 +560,7 @@ def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=Non
576560

577561
if self._remote_cache and _fetch_tree:
578562
# Ensure we have the directory protos in the local cache
579-
local_cas = self.get_local_cas()
563+
local_cas = self._casd.get_local_cas()
580564

581565
request = local_cas_pb2.FetchTreeRequest()
582566
request.root_digest.CopyFrom(directory_digest)
@@ -719,17 +703,6 @@ def _send_directory(self, remote, digest):
719703
def get_cache_usage(self):
720704
return self._cache_usage_monitor.get_cache_usage()
721705

722-
# get_casd()
723-
#
724-
# Get the underlying buildbox-casd process
725-
#
726-
# Returns:
727-
# (subprocess.Process): The casd process that is used for the current cascache
728-
#
729-
def get_casd(self):
730-
assert self._casd is not None, "Only call this with a running buildbox-casd process"
731-
return self._casd
732-
733706

734707
# _CASCacheUsage
735708
#

src/buildstream/_cas/casremote.py

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ def __init__(self, blob, msg):
3737
# Represents a single remote CAS cache.
3838
#
3939
class CASRemote(BaseRemote):
40-
def __init__(self, spec, cascache, **kwargs):
40+
def __init__(self, spec, casd, **kwargs):
4141
super().__init__(spec, **kwargs)
4242

43-
self.cascache = cascache
43+
self.casd = casd
4444
self.local_cas_instance_name = None
4545

4646
# check_remote
@@ -55,30 +55,12 @@ def _configure_protocols(self):
5555
self.local_cas_instance_name = ""
5656
return
5757

58-
local_cas = self.cascache.get_local_cas()
58+
local_cas = self.casd.get_local_cas()
5959
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
6060
self.spec.to_localcas_remote(request.content_addressable_storage)
6161
response = local_cas.GetInstanceNameForRemotes(request)
6262
self.local_cas_instance_name = response.instance_name
6363

64-
# push_message():
65-
#
66-
# Push the given protobuf message to a remote.
67-
#
68-
# Args:
69-
# message (Message): A protobuf message to push.
70-
#
71-
# Raises:
72-
# (CASRemoteError): if there was an error
73-
#
74-
def push_message(self, message):
75-
76-
message_buffer = message.SerializeToString()
77-
78-
self.init()
79-
80-
return self.cascache.add_object(buffer=message_buffer, instance_name=self.local_cas_instance_name)
81-
8264

8365
# Represents a batch of blobs queued for fetching.
8466
#
@@ -107,7 +89,7 @@ def send(self, *, missing_blobs=None):
10789
if not self._requests:
10890
return
10991

110-
local_cas = self._remote.cascache.get_local_cas()
92+
local_cas = self._remote.casd.get_local_cas()
11193

11294
for request in self._requests:
11395
batch_response = local_cas.FetchMissingBlobs(request)
@@ -161,7 +143,7 @@ def send(self):
161143
if not self._requests:
162144
return
163145

164-
local_cas = self._remote.cascache.get_local_cas()
146+
local_cas = self._remote.casd.get_local_cas()
165147

166148
for request in self._requests:
167149
batch_response = local_cas.UploadMissingBlobs(request)

src/buildstream/_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ def _run(self, *, announce_session: bool = False):
18101810
self._session_start_callback()
18111811

18121812
self._running = True
1813-
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd())
1813+
status = self._scheduler.run(self.queues, self._context.get_casd())
18141814
self._running = False
18151815

18161816
if status == SchedStatus.ERROR:

src/buildstream/sandbox/_reremote.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222

2323

2424
class RERemote(CASRemote):
25-
def __init__(self, cas_spec, remote_execution_specs, cascache):
26-
super().__init__(cas_spec, cascache)
25+
def __init__(self, cas_spec, remote_execution_specs, casd):
26+
super().__init__(cas_spec, casd)
2727

2828
self.remote_execution_specs = remote_execution_specs
2929
self.exec_service = None
3030
self.operations_service = None
3131
self.ac_service = None
3232

3333
def _configure_protocols(self):
34-
local_cas = self.cascache.get_local_cas()
34+
local_cas = self.casd.get_local_cas()
3535
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
3636
if self.remote_execution_specs.storage_spec:
3737
self.remote_execution_specs.storage_spec.to_localcas_remote(request.content_addressable_storage)
@@ -50,10 +50,9 @@ def _configure_protocols(self):
5050
response = local_cas.GetInstanceNameForRemotes(request)
5151
self.local_cas_instance_name = response.instance_name
5252

53-
casd = self.cascache.get_casd()
54-
self.exec_service = casd.get_exec_service()
55-
self.operations_service = casd.get_operations_service()
56-
self.ac_service = casd.get_ac_service()
53+
self.exec_service = self.casd.get_exec_service()
54+
self.operations_service = self.casd.get_operations_service()
55+
self.ac_service = self.casd.get_ac_service()
5756

5857
def _check(self):
5958
super()._check()

src/buildstream/sandbox/_sandboxbuildboxrun.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ def __init__(self, *args, **kwargs):
3838
super().__init__(*args, **kwargs)
3939

4040
context = self._get_context()
41-
cascache = context.get_cascache()
41+
casd = context.get_casd()
4242

4343
re_specs = context.remote_execution_specs
4444
if re_specs and re_specs.action_spec:
45-
self.re_remote = RERemote(context.remote_cache_spec, re_specs, cascache)
45+
self.re_remote = RERemote(context.remote_cache_spec, re_specs, casd)
4646
try:
4747
self.re_remote.init()
4848
self.re_remote.check()
@@ -110,8 +110,7 @@ def _execute_action(self, action, flags):
110110
stdout, stderr = self._get_output()
111111

112112
context = self._get_context()
113-
cascache = context.get_cascache()
114-
casd = cascache.get_casd()
113+
casd = context.get_casd()
115114
config = self._get_config()
116115

117116
if config.remote_apis_socket_path and context.remote_cache_spec and not self.re_remote:

src/buildstream/sandbox/_sandboxremote.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def __init__(self, *args, **kwargs):
3737
super().__init__(*args, **kwargs)
3838

3939
context = self._get_context()
40-
cascache = context.get_cascache()
40+
casd = context.get_casd()
4141

4242
specs = context.remote_execution_specs
4343
if specs is None or specs.exec_spec is None:
@@ -48,7 +48,7 @@ def __init__(self, *args, **kwargs):
4848
self.action_spec = specs.action_spec
4949
self.operation_name = None
5050

51-
self.re_remote = RERemote(context.remote_cache_spec, specs, cascache)
51+
self.re_remote = RERemote(context.remote_cache_spec, specs, casd)
5252
try:
5353
self.re_remote.init()
5454
except grpc.RpcError as e:

tests/artifactcache/pull.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,12 @@ def test_pull_tree(cli, tmpdir, datafiles):
195195
# Push the Tree as a regular message
196196
_, remotes = artifactcache.get_remotes(project.name, True)
197197
assert len(remotes) == 1
198-
tree_digest = remotes[0].push_message(tree)
198+
199+
remotes[0].init()
200+
tree_digest = cas.add_object(
201+
buffer=tree.SerializeToString(), instance_name=remotes[0].local_cas_instance_name
202+
)
203+
199204
tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes
200205
assert tree_hash and tree_size
201206

tests/artifactcache/push.py

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
from buildstream import _yaml
2323
from buildstream._project import Project
24-
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
2524
from buildstream._testing import cli # pylint: disable=unused-import
2625

2726
from tests.testutils import create_artifact_share, create_split_share, dummy_context
@@ -131,58 +130,3 @@ def test_push_split(cli, tmpdir, datafiles):
131130
cli.get_artifact_name(project_dir, "test", "target.bst", cache_key=element_key)
132131
)
133132
assert storage.get_cas_files(proto) is not None
134-
135-
136-
@pytest.mark.datafiles(DATA_DIR)
137-
def test_push_message(tmpdir, datafiles):
138-
project_dir = str(datafiles)
139-
140-
# Set up an artifact cache.
141-
artifactshare = os.path.join(str(tmpdir), "artifactshare")
142-
with create_artifact_share(artifactshare) as share:
143-
# Configure artifact share
144-
rootcache_dir = os.path.join(str(tmpdir), "cache")
145-
user_config_file = str(tmpdir.join("buildstream.conf"))
146-
user_config = {
147-
"scheduler": {"pushers": 1},
148-
"artifacts": {
149-
"servers": [
150-
{
151-
"url": share.repo,
152-
"push": True,
153-
}
154-
]
155-
},
156-
"cachedir": rootcache_dir,
157-
}
158-
159-
# Write down the user configuration file
160-
_yaml.roundtrip_dump(user_config, file=user_config_file)
161-
162-
with dummy_context(config=user_config_file) as context:
163-
# Load the project manually
164-
project = Project(project_dir, context)
165-
project.ensure_fully_loaded()
166-
167-
# Create a local artifact cache handle
168-
artifactcache = context.artifactcache
169-
170-
# Initialize remotes
171-
context.initialize_remotes(True, True, None, None)
172-
assert artifactcache.has_push_remotes()
173-
174-
command = remote_execution_pb2.Command(
175-
arguments=["/usr/bin/gcc", "--help"],
176-
working_directory="/buildstream-build",
177-
output_directories=["/buildstream-install"],
178-
)
179-
180-
# Push the message object
181-
_, remotes = artifactcache.get_remotes(project.name, True)
182-
assert len(remotes) == 1
183-
command_digest = remotes[0].push_message(command)
184-
message_hash, message_size = command_digest.hash, command_digest.size_bytes
185-
186-
assert message_hash and message_size
187-
message_digest = remote_execution_pb2.Digest(hash=message_hash, size_bytes=message_size)
188-
assert share.has_object(message_digest)

0 commit comments

Comments
 (0)