Skip to content

Commit c7d1b63

Browse files
authored
Merge pull request Azure#13104 from m-ruiz21/master
Removed Unnecessary / Problematic to_csv from Cloud Watch -> S3 Lambda Connectors
2 parents 85ad67c + 58dfce7 commit c7d1b63

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

DataConnectors/AWS-S3/CloudWatchLambdaFunction.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
import boto3
33
import json
4-
import csv
4+
import gzip
55
import time
66
import pandas as pd
77
from datetime import datetime
@@ -66,9 +66,13 @@ def lambda_handler(event, context):
6666

6767
sanitized_stream_name = log_stream_name.replace('/', '_')
6868

69+
file_path = f'/tmp/{OUTPUT_FILE_NAME}_{sanitized_stream_name}.gz'
70+
6971
# Export data to temporary file in the right format, which will be deleted as soon as the session ends
70-
fileToS3.to_csv( f'/tmp/{OUTPUT_FILE_NAME}_{sanitized_stream_name}.gz', index=False, header=False, compression='gzip', sep = ' ', escapechar=' ', doublequote=False, quoting=csv.QUOTE_NONE)
71-
72+
text_content = '\n'.join(fileToS3['message'].astype(str).values)
73+
with gzip.open(file_path, 'wt', encoding='utf-8') as f:
74+
f.write(text_content)
75+
7276
# Upload data to desired folder in bucket
7377
s3.Bucket(BUCKET_NAME).upload_file(f'/tmp/{OUTPUT_FILE_NAME}_{sanitized_stream_name}.gz', f'{BUCKET_PREFIX}{OUTPUT_FILE_NAME}_{sanitized_stream_name}.gz')
7478

DataConnectors/AWS-S3/CloudWatchLambdaFunction_V2.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

2+
import gzip
23
import boto3
34
import json
4-
import csv
55
import time
66
import pandas as pd
77
from datetime import datetime
@@ -99,12 +99,14 @@ def lambda_handler(event, context):
9999
# If the column exists, drop it
100100
df.drop(columns=["ingestionTime"], inplace=True)
101101
fileToS3 = df
102-
try:
103-
# Export data to temporary file in the right format, which will be deleted as soon as the session ends
104-
fileToS3.to_csv( f'/tmp/{output_File_Name}.gz', index=False, header=False, compression='gzip', sep = ' ', escapechar=' ', doublequote=False, quoting=csv.QUOTE_NONE)
102+
try:
103+
file_path = f'/tmp/{output_File_Name}.gz'
104+
text_content = '\n'.join(fileToS3['message'].astype(str).values)
105+
with gzip.open(file_path, 'wt', encoding='utf-8') as f:
106+
f.write(text_content)
105107

106108
# Upload data to desired folder in bucket
107-
s3.Bucket(BUCKET_NAME).upload_file(f'/tmp/{output_File_Name}.gz', f'{BUCKET_PREFIX}{output_File_Name}.gz')
109+
s3.Bucket(BUCKET_NAME).upload_file(file_path, f'{BUCKET_PREFIX}{output_File_Name}.gz')
108110
except Exception as e:
109111
print("Error exporting to S3 %s %s: %s" % (key["logGroupName"], key["logStreamName"], getattr(e, 'message', repr(e))))
110112
else:

DataConnectors/AWS-S3/CloudWatchPushBasedLambdaFunction.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ def lambda_handler(event, context):
5959
sanitized_stream_name = log_data['logStream'].replace('/', '_')
6060
first_timestamp = df['timestamp'].iloc[0]
6161
file_path = f'/tmp/{sanitized_stream_name}_{first_timestamp}.gz'
62-
fileToS3.to_csv(file_path, index=False, header=False, compression='gzip', sep=' ', escapechar=' ', doublequote=False, quoting=csv.QUOTE_NONE)
62+
63+
text_content = '\n'.join(fileToS3['message'].astype(str).values)
64+
with gzip.open(file_path, 'wt', encoding='utf-8') as f:
65+
f.write(text_content)
6366

6467
# Update S3 path with or without OUTPUT_FILE_NAME
6568
s3_key = f'{BUCKET_PREFIX}{current_date}/'

0 commit comments

Comments
 (0)