-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathsync.py
More file actions
109 lines (96 loc) · 4.19 KB
/
sync.py
File metadata and controls
109 lines (96 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import argparse
import os
import tempfile
import uuid
from skyplane.utils.definitions import KB
from skyplane.obj_store.object_store_interface import ObjectStoreInterface
from skyplane.cli.cli import sync
from skyplane.config_paths import cloud_config
from skyplane.utils import logger
def setup_buckets(src_region, dest_region, n_files=1, file_size_kb=1):
src_provider, src_zone = src_region.split(":")
dest_provider, dest_zone = dest_region.split(":")
if src_provider == "azure":
src_bucket_name = f"integration{src_zone}/{str(uuid.uuid4()).replace('-', '')}"
else:
src_bucket_name = f"integration{src_zone}-{str(uuid.uuid4())[:8]}"
if dest_provider == "azure":
dest_bucket_name = f"integration{dest_zone}/{str(uuid.uuid4()).replace('-', '')}"
else:
dest_bucket_name = f"skyplane-integration-{dest_zone}-{str(uuid.uuid4())[:8]}"
logger.debug(f"creating buckets {src_bucket_name} and {dest_bucket_name}")
src_interface = ObjectStoreInterface.create(src_region, src_bucket_name)
dest_interface = ObjectStoreInterface.create(dest_region, dest_bucket_name)
src_interface.create_bucket(src_zone)
dest_interface.create_bucket(dest_zone)
src_prefix = f"src_{uuid.uuid4()}"
dest_prefix = f"dest_{uuid.uuid4()}"
with tempfile.NamedTemporaryFile() as tmp:
fpath = tmp.name
with open(fpath, "wb+") as f:
f.write(os.urandom(int(file_size_kb * KB)))
for i in range(n_files):
src_interface.upload_object(fpath, f"{src_prefix}/{i}", mime_type="text/plain")
return src_bucket_name, dest_bucket_name, src_prefix, dest_prefix
def run(src_region, dest_region, n_files=1, file_size_kb=1, multipart=False, autoshowdown_minutes=15):
logger.info(
f"Running skyplane sync integration test with config "
+ f"src_region={src_region}, "
+ f"dest_region={dest_region}, "
+ f"n_files={n_files}, "
+ f"file_size_kb={file_size_kb}, "
+ f"multipart={multipart}"
+ f"autoshowdown_minutes={autoshowdown_minutes}"
)
cloud_config.set_flag("autoshutdown_minutes", autoshowdown_minutes)
src_bucket_name, dest_bucket_name, src_prefix, dest_prefix = setup_buckets(
src_region, dest_region, n_files=n_files, file_size_kb=file_size_kb
)
def map_path(region, bucket, prefix):
provider, _ = region.split(":")
if provider == "aws":
return f"s3://{bucket}/{prefix}"
elif provider == "azure":
storage_account, container = bucket.split("/")
return f"https://{storage_account}.blob.core.windows.net/{container}/{prefix}"
elif provider == "gcp":
return f"gs://{bucket}/{prefix}"
else:
raise Exception(f"Unknown provider {provider}")
return_code = sync(
map_path(src_region, src_bucket_name, src_prefix),
map_path(dest_region, dest_bucket_name, dest_prefix),
debug=False,
multipart=multipart,
confirm=True,
max_instances=1,
max_connections=1,
solver="direct",
solver_required_throughput_gbits=1,
)
# clean up path
src_interface = ObjectStoreInterface.create(src_region, src_bucket_name)
dest_interface = ObjectStoreInterface.create(dest_region, dest_bucket_name)
src_interface.delete_objects([f"{src_prefix}/{i}" for i in range(n_files)])
dest_interface.delete_objects([f"{dest_prefix}/{i}" for i in range(n_files)])
src_interface.delete_bucket()
dest_interface.delete_bucket()
return return_code
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("src", help="source region")
parser.add_argument("dest", help="destination region")
parser.add_argument("--n-files", type=int, default=1)
parser.add_argument("--file-size-kb", type=int, default=1)
parser.add_argument("--multipart", action="store_true")
parser.add_argument("--autoshutdown", type=int, default=15)
args = parser.parse_args()
return_code = run(
args.src,
args.dest,
n_files=args.n_files,
file_size_kb=args.file_size_kb,
multipart=args.multipart,
autoshowdown_minutes=args.autoshutdown,
)
exit(return_code)