1313# limitations under the License.
1414"""Managed relational database provisioning and teardown for AWS Aurora DSQL."""
1515
16+ import functools
1617import json
1718from typing import Any
1819
2728from perfkitbenchmarker .providers .aws import util
2829
2930
30- # TODO(shuninglin): Add cluster creation from a backup.
3131# TODO(shuninglin): Add reaper for this new resource.
3232
3333FLAGS = flags .FLAGS
3434
35+ AWS_AURORA_DSQL_RECOVERY_POINT_ARN = flags .DEFINE_string (
36+ 'aws_aurora_dsql_recovery_point_arn' ,
37+ None ,
38+ 'The ARN of the recovery point to restore AWS Aurora DSQL cluster from. If '
39+ 'not provided, a new cluster is created from scratch.' ,
40+ )
3541DEFAULT_AURORA_DSQL_POSTGRES_VERSION = '16.2'
3642
3743_MAP_ENGINE_TO_DEFAULT_VERSION = {
@@ -69,7 +75,15 @@ class AwsAuroraDsqlRelationalDb(aws_relational_db.BaseAwsRelationalDb):
6975 def __init__ (self , dsql_spec : AwsAuroraDsqlSpec ):
7076 super ().__init__ (dsql_spec )
7177 self .cluster_id = None
78+ self .cluster_arn = None
7279 self .assigned_name = f'pkb-{ FLAGS .run_uri } '
80+ self .use_backup = bool (AWS_AURORA_DSQL_RECOVERY_POINT_ARN .value )
81+ self .restore_job_id = None
82+
83+ @functools .cached_property
84+ def account_id (self ) -> str :
85+ """Returns the AWS account ID."""
86+ return util .GetAccount ()
7387
7488 # DSQL has different format for tags:
7589 # https://docs.aws.amazon.com/cli/v1/reference/rds/create-db-cluster.html
@@ -83,6 +97,64 @@ def _MakeDsqlTags(self) -> list[str]:
8397 return [formatted_tags_str ]
8498
8599 def _Create (self ) -> None :
100+ """Creates AWS Aurora DSQL cluster, from backup if recovery point ARN is provided."""
101+ if not self .use_backup :
102+ self ._CreateRawCluster ()
103+ return
104+ if self .restore_job_id :
105+ logging .info (
106+ 'Restore job %s already exists. Skipping creation.' ,
107+ self .restore_job_id ,
108+ )
109+ return
110+ cmd = util .AWS_PREFIX + [
111+ 'backup' ,
112+ 'start-restore-job' ,
113+ '--recovery-point-arn' ,
114+ AWS_AURORA_DSQL_RECOVERY_POINT_ARN .value ,
115+ '--iam-role-arn' ,
116+ (
117+ f'arn:aws:iam::{ self .account_id } :role/service-role/'
118+ 'AWSBackupDefaultServiceRole'
119+ ),
120+ '--metadata' ,
121+ '{"regionalConfig": "[{\\ "region\\ ": \\ "%s\\ ",'
122+ ' \\ "isDeletionProtectionEnabled\\ ": false}]"}'
123+ % self .region ,
124+ ]
125+ stdout , _ , _ = vm_util .IssueCommand (cmd )
126+ response = json .loads (stdout )
127+ self .restore_job_id = response ['RestoreJobId' ]
128+ if self .restore_job_id :
129+ # Mark created so we don't try to create it again on a retry.
130+ self .created = True
131+
132+ def _DescribeRestoreJob (self , job_id : str ) -> dict [str , Any ]:
133+ """Describes the restore job."""
134+ cmd = util .AWS_PREFIX + [
135+ 'backup' ,
136+ 'describe-restore-job' ,
137+ '--restore-job-id' ,
138+ job_id ,
139+ ]
140+ stdout , _ , _ = vm_util .IssueCommand (cmd )
141+ return json .loads (stdout )
142+
143+ def _AddTagsToCluster (self , cluster_arn : str ) -> None :
144+ """Adds tags to the DSQL cluster."""
145+ cmd = (
146+ util .AWS_PREFIX
147+ + [
148+ 'dsql' ,
149+ 'tag-resource' ,
150+ '--resource-arn=%s' % cluster_arn ,
151+ '--tags' ,
152+ ]
153+ + self ._MakeDsqlTags ()
154+ )
155+ vm_util .IssueCommand (cmd )
156+
157+ def _CreateRawCluster (self ) -> None :
86158 """Creates the AWS Aurora DSQL instance.
87159
88160 Raises:
@@ -108,6 +180,9 @@ def _Create(self) -> None:
108180 self .cluster_id = response ['identifier' ]
109181
110182 def _DescribeCluster (self ) -> dict [str , Any ] | None :
183+ if not self .cluster_id :
184+ logging .info ('Cluster id is not set.' )
185+ return None
111186 cmd = util .AWS_PREFIX + [
112187 'dsql' ,
113188 'get-cluster' ,
@@ -122,8 +197,29 @@ def _DescribeCluster(self) -> dict[str, Any] | None:
122197
123198 def _IsReady (self , timeout = aws_relational_db .IS_READY_TIMEOUT ) -> bool :
124199 """Returns true if the cluster is ready."""
125- json_output = self ._DescribeCluster ()
126- return bool (json_output and json_output ['status' ] == 'ACTIVE' )
200+ if self .use_backup :
201+ if not self .restore_job_id :
202+ return False
203+ job_description = self ._DescribeRestoreJob (self .restore_job_id )
204+ status = job_description ['Status' ]
205+ if status == 'COMPLETED' :
206+ self .cluster_id = job_description ['CreatedResourceArn' ].split ('/' )[- 1 ]
207+ self .cluster_arn = job_description ['CreatedResourceArn' ]
208+ return True
209+ if status in ['ABORTED' , 'FAILED' ]:
210+ raise errors .Resource .CreationError (
211+ f'Restore job { self .restore_job_id } failed with status { status } '
212+ )
213+ return False
214+ else :
215+ json_output = self ._DescribeCluster ()
216+ return bool (json_output and json_output ['status' ] == 'ACTIVE' )
217+
218+ def _PostCreate (self ) -> None :
219+ """Add tags if we are restoring from backup."""
220+ super ()._PostCreate ()
221+ if self .use_backup :
222+ self ._AddTagsToCluster (self .cluster_arn )
127223
128224 def _Exists (self ) -> bool :
129225 """Returns true if the underlying cluster exists."""
0 commit comments