1
+ import boto3
2
+ import json
3
+ from dataclasses import dataclass , field
4
+
5
+ from . import Bootstrappable
6
+ from .iam import Role , UserPolicies
7
+ from .s3 import Bucket
8
+ from .. import resources
9
+
10
+ @dataclass
11
+ class DeliveryStream (Bootstrappable ):
12
+ # Inputs
13
+ name_prefix : str
14
+ s3_bucket_prefix : str = "cloudwatch-metric-stream"
15
+
16
+ # Subresources
17
+ s3_bucket : Bucket = field (default = None )
18
+ firehose_role : Role = field (default = None )
19
+
20
+ # Outputs
21
+ name : str = field (init = False )
22
+ arn : str = field (init = False )
23
+
24
+ def __post_init__ (self ):
25
+ self .name = resources .random_suffix_name (self .name_prefix , 63 )
26
+
27
+ self .s3_bucket = Bucket (
28
+ name_prefix = self .s3_bucket_prefix
29
+ )
30
+
31
+ # Create IAM role with trust policy for Firehose
32
+ firehose_policy_doc = {
33
+ "Version" : "2012-10-17" ,
34
+ "Statement" : [
35
+ {
36
+ "Effect" : "Allow" ,
37
+ "Action" : [
38
+ "s3:AbortMultipartUpload" ,
39
+ "s3:GetBucketLocation" ,
40
+ "s3:GetObject" ,
41
+ "s3:ListBucket" ,
42
+ "s3:ListBucketMultipartUploads" ,
43
+ "s3:PutObject"
44
+ ],
45
+ "Resource" : [
46
+ f"arn:aws:s3:::{ self .s3_bucket .name } " ,
47
+ f"arn:aws:s3:::{ self .s3_bucket .name } /*"
48
+ ]
49
+ }
50
+ ]
51
+ }
52
+
53
+ self .firehose_role = Role (
54
+ name_prefix = "firehose-delivery-role" ,
55
+ principal_service = "firehose.amazonaws.com" ,
56
+ description = "Role for Kinesis Data Firehose delivery stream" ,
57
+ user_policies = UserPolicies (
58
+ name_prefix = "firehose-s3-policy" ,
59
+ policy_documents = [json .dumps (firehose_policy_doc )]
60
+ )
61
+ )
62
+
63
+ @property
64
+ def firehose_client (self ):
65
+ return boto3 .client ("firehose" , region_name = self .region )
66
+
67
+ def bootstrap (self ):
68
+ """Creates a Kinesis Data Firehose delivery stream with S3 destination.
69
+ """
70
+ super ().bootstrap ()
71
+
72
+ # Create the delivery stream
73
+ response = self .firehose_client .create_delivery_stream (
74
+ DeliveryStreamName = self .name ,
75
+ S3DestinationConfiguration = {
76
+ 'RoleARN' : self .firehose_role .arn ,
77
+ 'BucketARN' : f"arn:aws:s3:::{ self .s3_bucket .name } "
78
+ }
79
+ )
80
+
81
+ self .arn = response ['DeliveryStreamARN' ]
82
+
83
+ def cleanup (self ):
84
+ """Deletes the Kinesis Data Firehose delivery stream.
85
+ """
86
+ try :
87
+ self .firehose_client .delete_delivery_stream (
88
+ DeliveryStreamName = self .name ,
89
+ AllowForceDelete = True
90
+ )
91
+ except Exception :
92
+ pass
93
+
94
+ super ().cleanup ()
0 commit comments