Skip to content

Commit 2091726

Browse files
authored
Implement remote state managment changes (#380)
* Implement remote stage managment Signed-off-by: Piotr Pawłowski <[email protected]> --------- Signed-off-by: Piotr Pawłowski <[email protected]>
1 parent 5cf4890 commit 2091726

File tree

14 files changed

+391
-16
lines changed

14 files changed

+391
-16
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ dependencies = [
4040
"google-api-core",
4141
"packaging",
4242
"google-cloud-filestore",
43+
"google-cloud-storage"
4344
]
4445

4546
[project.urls]
@@ -65,7 +66,7 @@ dev = [
6566
]
6667

6768
[tool.setuptools]
68-
packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.api", "xpk.templates", "xpk.utils", "xpk.core.blueprint", "xpk.core.workload_decorators"]
69+
packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.api", "xpk.templates", "xpk.utils", "xpk.core.blueprint", "xpk.core.remote_state", "xpk.core.workload_decorators"]
6970
package-dir = {"" = "src"}
7071
package-data = {"xpk.api" = ["storage_crd.yaml"], "xpk.templates" = ["storage.yaml"]}
7172

src/xpk/commands/cluster_gcluster.py

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
import os
1818

19+
from ..core.remote_state.remote_state_client import RemoteStateClient
20+
from ..core.remote_state.fuse_remote_state import FuseStateClient
1921
from ..core.blueprint.blueprint_generator import (
2022
BlueprintGenerator,
2123
BlueprintGeneratorOutput,
2224
a3mega_device_type,
2325
a3ultra_device_type,
2426
supported_device_types,
2527
)
28+
from ..core.commands import run_command_for_value
2629
from ..core.capacity import get_capacity_type
2730
from ..core.docker_manager import DockerManager
2831
from ..core.gcloud_context import zone_to_region
@@ -49,13 +52,22 @@ def cluster_create(args) -> None:
4952
"""
5053
check_gcloud_authenticated()
5154
prepare_directories()
52-
gcm = prepare_gcluster_manager()
5355
region = zone_to_region(args.zone)
5456

5557
# unique_name uses shortened hash string, so still name collision is possible
5658
unique_name = get_unique_name(args.project, region, args.cluster)
5759
# prefix is to prevent name collisions for blueprints and also deployments by storing them in prefix directory. Ex.: blueprints/{prefix}/cluster_name_hash
5860
prefix = get_prefix_path(args.project, region)
61+
remote_state_client = None
62+
if args.cluster_state_gcs_bucket is not None:
63+
remote_state_client = FuseStateClient(
64+
bucket=args.cluster_state_gcs_bucket,
65+
state_directory=os.path.join(blueprints_path, prefix, unique_name),
66+
prefix=prefix,
67+
cluster=args.cluster,
68+
deployment_name=unique_name,
69+
)
70+
gcm = prepare_gcluster_manager(remote_state_client)
5971

6072
bp = generate_blueprint(blueprint_name=unique_name, args=args, prefix=prefix)
6173

@@ -70,6 +82,8 @@ def cluster_create(args) -> None:
7082
deployment_name=unique_name,
7183
prefix=prefix,
7284
)
85+
if args.cluster_state_gcs_bucket is not None:
86+
gcm.upload_state()
7387

7488
set_cluster_command_code = set_cluster_command(args)
7589
if set_cluster_command_code != 0:
@@ -89,15 +103,42 @@ def cluster_delete(args) -> None:
89103
"""
90104
check_gcloud_authenticated()
91105
prepare_directories()
92-
gcm = prepare_gcluster_manager()
93106
region = zone_to_region(args.zone)
107+
unique_name = get_unique_name(args.project, region, args.cluster)
108+
# prefix is to prevent name collisions for blueprints and also deployments by storing them in prefix directory. Ex.: blueprints/{prefix}/cluster_name_hash
109+
prefix = get_prefix_path(args.project, region)
110+
remote_state_client = None
111+
if args.cluster_state_gcs_bucket is not None:
112+
remote_state_client = FuseStateClient(
113+
bucket=args.cluster_state_gcs_bucket,
114+
state_directory=os.path.join(blueprints_path, prefix, unique_name),
115+
prefix=prefix,
116+
cluster=args.cluster,
117+
deployment_name=unique_name,
118+
)
119+
gcm = prepare_gcluster_manager(remote_state_client)
94120

95121
# unique_name uses shortened hash string, so still name collision is possible
96122
unique_name = get_unique_name(args.project, region, args.cluster)
97123
# prefix is to prevent name collisions for blueprints and also deployments by storing them in prefix directory. Ex.: blueprints/{prefix}/cluster_name_hash
98-
prefix_path = get_prefix_path(args.project, region)
124+
prefix = get_prefix_path(args.project, region)
125+
if args.cluster_state_gcs_bucket is not None:
126+
gcm.download_state()
127+
128+
bp = BlueprintGeneratorOutput(
129+
blueprint_file=os.path.join(blueprints_path, prefix, unique_name)
130+
+ '.yaml',
131+
blueprint_dependencies=os.path.join(
132+
blueprints_path, prefix, unique_name
133+
),
134+
)
99135

100-
gcm.destroy_deployment(deployment_name=unique_name, prefix=prefix_path)
136+
gcm.stage_files(
137+
blueprint_file=bp.blueprint_file,
138+
blueprint_dependencies=bp.blueprint_dependencies,
139+
prefix=prefix,
140+
)
141+
gcm.destroy_deployment(deployment_name=unique_name, prefix=prefix)
101142

102143
xpk_exit(0)
103144

@@ -140,18 +181,35 @@ def check_gcloud_authenticated():
140181
xpk_exit(1)
141182

142183

143-
def prepare_gcluster_manager() -> GclusterManager:
184+
def prepare_gcluster_manager(
185+
remote_state_client: RemoteStateClient | None,
186+
) -> GclusterManager:
144187
dm = DockerManager(
145188
working_dir=gcluster_working_dir, gcloud_cfg_path=gcloud_cfg_path
146189
)
147190
dm.initialize()
148-
return GclusterManager(gcluster_command_runner=dm)
191+
return GclusterManager(
192+
gcluster_command_runner=dm, remote_state_client=remote_state_client
193+
)
149194

150195

151196
def prepare_blueprint_generator() -> BlueprintGenerator:
152197
return BlueprintGenerator(storage_path=blueprints_path)
153198

154199

200+
def validate_state_gcs_bucket(args):
201+
bucket_validate_cmd = (
202+
f'gcloud storage buckets describe gs://{args.cluster_state_gcs_bucket}'
203+
)
204+
err_code, _ = run_command_for_value(
205+
bucket_validate_cmd,
206+
'Validate remote state bucket existence.',
207+
global_args=args,
208+
)
209+
if err_code != 0:
210+
xpk_exit(err_code)
211+
212+
155213
def generate_blueprint(
156214
blueprint_name, args, prefix=None
157215
) -> BlueprintGeneratorOutput:
@@ -162,6 +220,9 @@ def generate_blueprint(
162220

163221
bpg = prepare_blueprint_generator()
164222

223+
if args.cluster_state_gcs_bucket is not None:
224+
validate_state_gcs_bucket(args)
225+
165226
if args.device_type in supported_device_types:
166227
if args.device_type == a3mega_device_type:
167228
num_nodes = args.num_nodes if not args.num_nodes is None else 2
@@ -178,6 +239,7 @@ def generate_blueprint(
178239
capacity_type=capacity_type,
179240
system_node_pool_machine_type=args.default_pool_cpu_machine_type,
180241
system_node_pool_min_node_count=args.default_pool_cpu_num_nodes,
242+
gcs_bucket=args.cluster_state_gcs_bucket,
181243
)
182244
if args.device_type == a3ultra_device_type:
183245
num_nodes = args.num_nodes if not args.num_nodes is None else 2
@@ -195,5 +257,6 @@ def generate_blueprint(
195257
capacity_type=capacity_type,
196258
system_node_pool_machine_type=args.default_pool_cpu_machine_type,
197259
system_node_pool_min_node_count=args.default_pool_cpu_num_nodes,
260+
gcs_bucket=args.cluster_state_gcs_bucket,
198261
)
199262
return None

src/xpk/core/blueprint/blueprint_generator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ def _getblock_terraform_backend(
641641
}
642642

643643
def _get_terraforrm_backend_full_prefix(self, prefix: str = "") -> str:
644-
return f"xpk_terraform_state/{prefix}"
644+
return f"xpk_terraform_state/{prefix}/tfstate/"
645645

646646
def _save_blueprint_to_file(
647647
self, blueprint_name: str, xpk_blueprint: Blueprint, prefix: str = ""

src/xpk/core/gcluster_manager.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
"""
1616

1717
from .docker_manager import CommandRunner
18-
from ..utils.console import xpk_print
19-
18+
from ..utils.console import xpk_exit, xpk_print
19+
from .remote_state.remote_state_client import RemoteStateClient
2020

2121
xpk_gcloud_cfg_path = '~/gcloud/cfg'
2222
xpk_deployment_dir = '/deployment'
@@ -44,8 +44,10 @@ class GclusterManager:
4444
def __init__(
4545
self,
4646
gcluster_command_runner: CommandRunner,
47+
remote_state_client: RemoteStateClient | None,
4748
) -> None:
4849
self.gcluster_command_runner = gcluster_command_runner
50+
self.remote_state_client = remote_state_client
4951

5052
def _run_create_deployment_cmd(
5153
self, blueprint_container_path: str, prefix: str = ''
@@ -156,3 +158,19 @@ def stage_files(
156158
xpk_print('Staging blueprint completed!')
157159
xpk_print(f"File path in gcluster's working directory: {staged_blueprint}")
158160
return staged_blueprint
161+
162+
def upload_state(self) -> None:
163+
xpk_print('Uploading state.')
164+
if self.remote_state_client is None:
165+
xpk_print('No remote state defined')
166+
xpk_exit(1)
167+
self.remote_state_client.upload_state()
168+
169+
def download_state(self) -> None:
170+
if self.remote_state_client is None:
171+
xpk_print('No remote state defined')
172+
xpk_exit(1)
173+
174+
if self.remote_state_client.check_remote_state_exists():
175+
self.remote_state_client.download_state()
176+
xpk_print('Remote state not found.')
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""
2+
Copyright 2025 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""
2+
Copyright 2025 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
17+
from .remote_state_client import RemoteStateClient
18+
from ...utils.gcs_utils import upload_directory_to_gcs, check_file_exists, download_bucket_to_dir, upload_file_to_gcs
19+
from ...utils.console import xpk_print
20+
from google.cloud.storage import Client
21+
import os
22+
23+
24+
class FuseStateClient(RemoteStateClient):
25+
"""FuseStateClient is a class for managing remote xpk state stored in GCS Fuse."""
26+
27+
def __init__(
28+
self,
29+
bucket: str,
30+
state_directory: str,
31+
cluster: str,
32+
deployment_name: str,
33+
prefix: str,
34+
) -> None:
35+
self.bucket = bucket
36+
self.state_dir = state_directory
37+
self.storage_client = Client()
38+
self.cluster = cluster
39+
self.prefix = prefix
40+
self.deployment_name = deployment_name
41+
42+
def _get_bucket_path(self) -> str:
43+
return (
44+
f'xpk_terraform_state/{self.prefix}/blueprints/{self.deployment_name}/'
45+
)
46+
47+
def _get_bucket_path_blueprint(self) -> str:
48+
return f'xpk_terraform_state/{self.prefix}/blueprints/'
49+
50+
def _get_deployment_filename(self) -> str:
51+
return f'{self.deployment_name}.yaml'
52+
53+
def _get_blueprint_path(self) -> str:
54+
blueprint_dir = '/'.join(self.state_dir.split('/')[:-1])
55+
return os.path.join(blueprint_dir, self.deployment_name) + '.yaml'
56+
57+
def upload_state(self) -> None:
58+
xpk_print(
59+
f'Uploading dependecies from directory {self.state_dir} to bucket:'
60+
f' {self.bucket}. Path within bucket is: {self._get_bucket_path()}'
61+
)
62+
upload_directory_to_gcs(
63+
storage_client=self.storage_client,
64+
bucket_name=self.bucket,
65+
bucket_path=self._get_bucket_path(),
66+
source_directory=self.state_dir,
67+
)
68+
blueprint_bucket_path = (
69+
self._get_bucket_path_blueprint() + self._get_deployment_filename()
70+
)
71+
xpk_print(
72+
f'Uploading blueprint file: {self._get_blueprint_path()} to bucket'
73+
f' {self.bucket}. Path within bucket is: {blueprint_bucket_path}'
74+
)
75+
upload_file_to_gcs(
76+
storage_client=self.storage_client,
77+
bucket_name=self.bucket,
78+
bucket_path=blueprint_bucket_path,
79+
file=self._get_blueprint_path(),
80+
)
81+
82+
def download_state(self) -> None:
83+
xpk_print(
84+
f'Downloading from bucket: {self.bucket}, from path:'
85+
f' {self._get_bucket_path()} to directory: {self.state_dir}'
86+
)
87+
download_bucket_to_dir(
88+
self.storage_client,
89+
self.bucket,
90+
self._get_bucket_path(),
91+
destination_directory=self.state_dir,
92+
)
93+
94+
def check_remote_state_exists(self) -> bool:
95+
return check_file_exists(
96+
self.storage_client,
97+
self.bucket,
98+
self._get_bucket_path_blueprint() + self._get_deployment_filename(),
99+
)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""
2+
Copyright 2025 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
17+
from abc import ABC, abstractmethod
18+
19+
20+
class RemoteStateClient(ABC):
21+
"""This is a base class that defines methods a class for managing remote cluster state.
22+
Args:
23+
ABC (_type_): _description_
24+
"""
25+
26+
@abstractmethod
27+
def upload_state(self) -> None:
28+
"""Upload state to remote storage"""
29+
return None
30+
31+
@abstractmethod
32+
def download_state(self) -> None:
33+
"""Download state from remote storage"""
34+
return None
35+
36+
@abstractmethod
37+
def check_remote_state_exists(self) -> bool:
38+
return False

src/xpk/core/tests/data/a3_ultra.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ terraform_backend_defaults:
2222
type: gcs
2323
configuration:
2424
bucket: test-bucket
25-
prefix: xpk_terraform_state/testdir
25+
prefix: xpk_terraform_state/testdir/tfstate/
2626

2727
deployment_groups:
2828
- !DeploymentGroup

src/xpk/core/tests/integration/test_gcluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def test_create_deployment():
8282
assert os.path.exists(blueprint_test_path)
8383

8484
gcluster_manager = GclusterManager(
85-
gcluster_command_runner=docker_manager,
85+
gcluster_command_runner=docker_manager, remote_state_client=None
8686
)
8787

8888
staged_bp_path = gcluster_manager.stage_files(

0 commit comments

Comments
 (0)