12
12
# language governing permissions and limitations under the License.
13
13
from __future__ import absolute_import
14
14
15
- import collections
16
- import logging
17
15
from operator import itemgetter
18
16
import os
19
17
from os import path
33
31
PREFIX = "ec2_fs_key_"
34
32
KEY_NAME = PREFIX + str (uuid .uuid4 ().hex .upper ()[0 :8 ])
35
33
ROLE_NAME = "SageMakerRole"
36
- EC2_INSTANCE_TYPE = "t2.micro"
37
34
MIN_COUNT = 1
38
35
MAX_COUNT = 1
39
36
37
+ EFS_MOUNT_DIRECTORY = "efs"
38
+ FSX_MOUNT_DIRECTORY = "/mnt/fsx"
39
+
40
40
RESOURCE_PATH = os .path .join (os .path .dirname (__file__ ), ".." , "data" )
41
41
MNIST_RESOURCE_PATH = os .path .join (RESOURCE_PATH , "tensorflow_mnist" )
42
42
MNIST_LOCAL_DATA = os .path .join (MNIST_RESOURCE_PATH , "data" )
49
49
KEY_PATH = os .path .join (tempfile .gettempdir (), FILE_NAME )
50
50
STORAGE_CAPACITY_IN_BYTES = 3600
51
51
52
- FsResources = collections .namedtuple (
53
- "FsResources" ,
54
- [
55
- "key_name" ,
56
- "key_path" ,
57
- "role_name" ,
58
- "subnet_id" ,
59
- "security_group_ids" ,
60
- "file_system_efs_id" ,
61
- "file_system_fsx_id" ,
62
- "ec2_instance_id" ,
63
- "mount_efs_target_id" ,
64
- ],
65
- )
66
-
67
-
68
- def set_up_efs_fsx (sagemaker_session ):
69
- _check_or_create_key_pair (sagemaker_session )
70
- _check_or_create_iam_profile_and_attach_role (sagemaker_session )
71
- subnet_ids , security_group_ids = check_or_create_vpc_resources_efs_fsx (
72
- sagemaker_session , VPC_NAME
73
- )
74
-
75
- ami_id = _ami_id_for_region (sagemaker_session )
76
- ec2_instance = _create_ec2_instance (
77
- sagemaker_session ,
78
- ami_id ,
79
- EC2_INSTANCE_TYPE ,
80
- KEY_NAME ,
81
- MIN_COUNT ,
82
- MAX_COUNT ,
83
- security_group_ids ,
84
- subnet_ids [0 ],
85
- )
52
+ fs_resources = {"key_name" : KEY_NAME , "key_path" : KEY_PATH , "role_name" : ROLE_NAME }
86
53
87
- file_system_efs_id = _check_or_create_efs (sagemaker_session )
88
- mount_efs_target_id = _create_efs_mount (sagemaker_session , file_system_efs_id )
89
-
90
- file_system_fsx_id = _check_or_create_fsx (sagemaker_session )
91
-
92
- fs_resources = FsResources (
93
- KEY_NAME ,
94
- KEY_PATH ,
95
- ROLE_NAME ,
96
- subnet_ids [0 ],
97
- security_group_ids ,
98
- file_system_efs_id ,
99
- file_system_fsx_id ,
100
- ec2_instance .id ,
101
- mount_efs_target_id ,
102
- )
103
54
104
- region = sagemaker_session . boto_region_name
55
+ def set_up_efs_fsx ( sagemaker_session , ec2_instance_type ):
105
56
try :
57
+ _check_or_create_key_pair (sagemaker_session )
58
+ _check_or_create_iam_profile_and_attach_role (sagemaker_session )
59
+
60
+ subnet_ids , security_group_ids = check_or_create_vpc_resources_efs_fsx (
61
+ sagemaker_session , VPC_NAME
62
+ )
63
+ fs_resources ["subnet_id" ] = subnet_ids [0 ]
64
+ fs_resources ["security_group_ids" ] = security_group_ids
65
+
66
+ ami_id = _ami_id_for_region (sagemaker_session )
67
+ ec2_instance = _create_ec2_instance (
68
+ sagemaker_session ,
69
+ ami_id ,
70
+ ec2_instance_type ,
71
+ KEY_NAME ,
72
+ MIN_COUNT ,
73
+ MAX_COUNT ,
74
+ security_group_ids ,
75
+ subnet_ids [0 ],
76
+ )
77
+
78
+ file_system_efs_id , mount_efs_target_id = _create_efs (sagemaker_session )
79
+ file_system_fsx_id = _create_fsx (sagemaker_session )
80
+
106
81
connected_instance = _connect_ec2_instance (ec2_instance )
82
+ region = sagemaker_session .boto_region_name
107
83
_upload_data_and_mount_fs (
108
84
connected_instance , file_system_efs_id , file_system_fsx_id , region
109
85
)
86
+ return fs_resources
110
87
except Exception :
111
88
tear_down (sagemaker_session , fs_resources )
112
89
raise
113
90
114
- return fs_resources
115
-
116
91
117
92
def _ami_id_for_region (sagemaker_session ):
118
93
ec2_client = sagemaker_session .boto_session .client ("ec2" )
@@ -146,43 +121,26 @@ def _upload_data_and_mount_fs(connected_instance, file_system_efs_id, file_syste
146
121
connected_instance .put (local_file , "temp_tf/" )
147
122
connected_instance .put (ONE_P_LOCAL_DATA , "temp_one_p/" )
148
123
connected_instance .run (
149
- "sudo sh fs_mount_setup.sh {} {} {}" .format (file_system_efs_id , file_system_fsx_id , region ),
124
+ "sudo sh fs_mount_setup.sh {} {} {} {} {}" .format (
125
+ file_system_efs_id , file_system_fsx_id , region , EFS_MOUNT_DIRECTORY , FSX_MOUNT_DIRECTORY
126
+ ),
150
127
in_stream = False ,
151
128
)
152
129
153
130
154
- def _check_or_create_efs (sagemaker_session ):
131
+ def _create_efs (sagemaker_session ):
155
132
efs_client = sagemaker_session .boto_session .client ("efs" )
156
- file_system_exists = False
157
- efs_id = ""
158
- try :
159
- create_response = efs_client .create_file_system (CreationToken = EFS_CREATION_TOKEN )
160
- efs_id = create_response ["FileSystemId" ]
161
- except ClientError as e :
162
- error_code = e .response ["Error" ]["Code" ]
163
- if error_code == "FileSystemAlreadyExists" :
164
- file_system_exists = True
165
- logging .warning (
166
- "File system with given creation token %s already exists" , EFS_CREATION_TOKEN
167
- )
168
- else :
169
- raise
170
-
171
- if file_system_exists :
172
- desc = efs_client .describe_file_systems (CreationToken = EFS_CREATION_TOKEN )
173
- efs_id = desc ["FileSystems" ][0 ]["FileSystemId" ]
174
- mount_target_id = efs_client .describe_mount_targets (FileSystemId = efs_id )["MountTargets" ][0 ][
175
- "MountTargetId"
176
- ]
177
- return efs_id , mount_target_id
178
-
133
+ create_response = efs_client .create_file_system (CreationToken = EFS_CREATION_TOKEN )
134
+ efs_id = create_response ["FileSystemId" ]
135
+ fs_resources ["file_system_efs_id" ] = efs_id
179
136
for _ in retries (50 , "Checking EFS creating status" ):
180
137
desc = efs_client .describe_file_systems (CreationToken = EFS_CREATION_TOKEN )
181
138
status = desc ["FileSystems" ][0 ]["LifeCycleState" ]
182
139
if status == "available" :
183
140
break
141
+ mount_target_id = _create_efs_mount (sagemaker_session , efs_id )
184
142
185
- return efs_id
143
+ return efs_id , mount_target_id
186
144
187
145
188
146
def _create_efs_mount (sagemaker_session , file_system_id ):
@@ -194,6 +152,7 @@ def _create_efs_mount(sagemaker_session, file_system_id):
194
152
FileSystemId = file_system_id , SubnetId = subnet_ids [0 ], SecurityGroups = security_group_ids
195
153
)
196
154
mount_target_id = mount_response ["MountTargetId" ]
155
+ fs_resources ["mount_efs_target_id" ] = mount_target_id
197
156
198
157
for _ in retries (50 , "Checking EFS mounting target status" ):
199
158
desc = efs_client .describe_mount_targets (MountTargetId = mount_target_id )
@@ -204,7 +163,7 @@ def _create_efs_mount(sagemaker_session, file_system_id):
204
163
return mount_target_id
205
164
206
165
207
- def _check_or_create_fsx (sagemaker_session ):
166
+ def _create_fsx (sagemaker_session ):
208
167
fsx_client = sagemaker_session .boto_session .client ("fsx" )
209
168
subnet_ids , security_group_ids = check_or_create_vpc_resources_efs_fsx (
210
169
sagemaker_session , VPC_NAME
@@ -216,6 +175,7 @@ def _check_or_create_fsx(sagemaker_session):
216
175
SecurityGroupIds = security_group_ids ,
217
176
)
218
177
fsx_id = create_response ["FileSystem" ]["FileSystemId" ]
178
+ fs_resources ["file_system_fsx_id" ] = fsx_id
219
179
220
180
for _ in retries (50 , "Checking FSX creating status" ):
221
181
desc = fsx_client .describe_file_systems (FileSystemIds = [fsx_id ])
@@ -257,8 +217,8 @@ def _create_ec2_instance(
257
217
258
218
ec2_instances [0 ].wait_until_running ()
259
219
ec2_instances [0 ].reload ()
220
+ fs_resources ["ec2_instance_id" ] = ec2_instances [0 ].id
260
221
ec2_client = sagemaker_session .boto_session .client ("ec2" )
261
-
262
222
for _ in retries (30 , "Checking EC2 creation status" ):
263
223
statuses = ec2_client .describe_instance_status (InstanceIds = [ec2_instances [0 ].id ])
264
224
status = statuses ["InstanceStatuses" ][0 ]
@@ -326,28 +286,30 @@ def _instance_profile_exists(sagemaker_session):
326
286
327
287
328
288
def tear_down (sagemaker_session , fs_resources ):
329
- fsx_client = sagemaker_session . boto_session . client ( "fsx" )
330
- file_system_fsx_id = fs_resources . file_system_fsx_id
331
- fsx_client .delete_file_system (FileSystemId = file_system_fsx_id )
289
+ if "file_system_fsx_id" in fs_resources :
290
+ fsx_client = sagemaker_session . boto_session . client ( "fsx" )
291
+ fsx_client .delete_file_system (FileSystemId = fs_resources [ " file_system_fsx_id" ] )
332
292
333
293
efs_client = sagemaker_session .boto_session .client ("efs" )
334
- mount_efs_target_id = fs_resources .mount_efs_target_id
335
- efs_client .delete_mount_target (MountTargetId = mount_efs_target_id )
336
-
337
- file_system_efs_id = fs_resources .file_system_efs_id
338
- for _ in retries (30 , "Checking mount target deleting status" ):
339
- desc = efs_client .describe_mount_targets (FileSystemId = file_system_efs_id )
340
- if len (desc ["MountTargets" ]) > 0 :
341
- status = desc ["MountTargets" ][0 ]["LifeCycleState" ]
342
- if status == "deleted" :
294
+ if "mount_efs_target_id" in fs_resources :
295
+ efs_client .delete_mount_target (MountTargetId = fs_resources ["mount_efs_target_id" ])
296
+
297
+ if "file_system_efs_id" in fs_resources :
298
+ for _ in retries (30 , "Checking mount target deleting status" ):
299
+ desc = efs_client .describe_mount_targets (
300
+ FileSystemId = fs_resources ["file_system_efs_id" ]
301
+ )
302
+ if len (desc ["MountTargets" ]) > 0 :
303
+ status = desc ["MountTargets" ][0 ]["LifeCycleState" ]
304
+ if status == "deleted" :
305
+ break
306
+ else :
343
307
break
344
- else :
345
- break
346
308
347
- efs_client .delete_file_system (FileSystemId = file_system_efs_id )
309
+ efs_client .delete_file_system (FileSystemId = fs_resources [ " file_system_efs_id" ] )
348
310
349
- ec2_resource = sagemaker_session . boto_session . resource ( "ec2" )
350
- instance_id = fs_resources . ec2_instance_id
351
- _terminate_instance (ec2_resource , [instance_id ])
311
+ if "ec2_instance_id" in fs_resources :
312
+ ec2_resource = sagemaker_session . boto_session . resource ( "ec2" )
313
+ _terminate_instance (ec2_resource , [fs_resources [ "ec2_instance_id" ] ])
352
314
353
315
_delete_key_pair (sagemaker_session )
0 commit comments