-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRead-Kinesis-DataStream-Aurora-Activity-Stream
More file actions
77 lines (61 loc) · 3.24 KB
/
Read-Kinesis-DataStream-Aurora-Activity-Stream
File metadata and controls
77 lines (61 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3
REGION_NAME = 'us-west-2' # us-east-1
RESOURCE_ID = 'cluster-H523FV3GPXJRI5IKFKTP2YPXDA' # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456
enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT)
class MyRawMasterKeyProvider(RawMasterKeyProvider):
provider_id = "BC"
def __new__(cls, *args, **kwargs):
obj = super(RawMasterKeyProvider, cls).__new__(cls)
return obj
def __init__(self, plain_key):
RawMasterKeyProvider.__init__(self)
self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
def _get_raw_key(self, key_id):
return self.wrapping_key
def decrypt_payload(payload, data_key):
my_key_provider = MyRawMasterKeyProvider(data_key)
my_key_provider.add_master_key("DataKey")
decrypted_plaintext, header = enc_client.decrypt(
source=payload,
materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
return decrypted_plaintext
def decrypt_decompress(payload, key):
decrypted = decrypt_payload(payload, key)
return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)
def main():
session = boto3.session.Session()
kms = session.client('kms', region_name=REGION_NAME)
kinesis = session.client('kinesis', region_name=REGION_NAME)
response = kinesis.describe_stream(StreamName=STREAM_NAME)
shard_iters = []
for shard in response['StreamDescription']['Shards']:
shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
ShardIteratorType='LATEST')
shard_iters.append(shard_iter_response['ShardIterator'])
while len(shard_iters) > 0:
next_shard_iters = []
for shard_iter in shard_iters:
response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
for record in response['Records']:
record_data = record['Data']
record_data = json.loads(record_data)
payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
data_key_decoded = base64.b64decode(record_data['key'])
data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
print(decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
if 'NextShardIterator' in response:
next_shard_iters.append(response['NextShardIterator'])
shard_iters = next_shard_iters
if __name__ == '__main__':
main()