@@ -130,9 +130,18 @@ Resources:
130130 DATABASE_NAME : !Ref DatabaseName
131131 Code :
132132 ZipFile : |
133- import os, json, csv, io, logging, boto3, urllib3, time, re, tempfile
134- from datetime import datetime, timezone
133+ import io
134+ import json
135+ import logging
136+ import os
137+ import re
138+ import tempfile
139+ import time
135140 from contextlib import contextmanager
141+ from datetime import datetime, timezone
142+ from functools import partial
143+ import boto3
144+ import urllib3
136145 from botocore.auth import SigV4Auth
137146 from botocore.awsrequest import AWSRequest
138147 from botocore.credentials import Credentials
@@ -234,32 +243,18 @@ Resources:
234243 credentials
235244 )
236245
237- @contextmanager
238- def s3_csv_file(s3_client, bucket, s3_path, fieldnames):
239- temp_file = None
240- try:
241- # Create temporary file
242- temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8')
243- writer = csv.DictWriter(temp_file, fieldnames=fieldnames, quoting=csv.QUOTE_ALL)
244- writer.writeheader()
245- yield writer.writerow
246- if not temp_file.closed:
247- temp_file.close()
248- s3_client.upload_file(temp_file.name, bucket, s3_path)
249- logger.info(f"Uploaded CSV to s3://{bucket}/{s3_path}")
246+ def s3_put_jsonl(bucket, key, rows):
247+ s3 = boto3.client('s3')
248+ buf = io.StringIO()
249+ for r in rows:
250+ buf.write(json.dumps(r, default=json_converter, ensure_ascii=False))
251+ buf.write('\n')
252+ s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue().encode('utf-8'), ContentType='application/json')
250253
251- except Exception as e:
252- logger.error(f"Error during CSV S3 upload s3://{bucket}/{s3_path}: {str(e)}")
253- raise
254-
255- finally:
256- if temp_file and not temp_file.closed:
257- temp_file.close()
258- if temp_file and os.path.exists(temp_file.name):
259- try:
260- os.unlink(temp_file.name)
261- except OSError as e:
262- logger.info(f"Warning: Could not delete temporary file {temp_file.name}: {e}")
254+ def json_converter(obj):
255+ if isinstance(obj, datetime):
256+ return obj.isoformat()
257+ raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
263258
264259 # ---------- handler ----------
265260 def lambda_handler(event, context):
@@ -428,33 +423,16 @@ Resources:
428423 logger.info(f"No agreements found for account {account_id}")
429424 return
430425
431- # Write marketplace data as CSV
432- s3_client = boto3.client('s3')
433- marketplace_key = f"{MODULE_NAME}/marketplace/data/marketplace_{account_id}.csv"
434- marketplace_fieldnames = [
435- 'acceptance_time','acceptor_account_id','agreement_id','agreement_type','agreement_value',
436- 'collection_timestamp','currency_code','end_time','offer_id','party_type',
437- 'product_deployedOnAws','product_id','product_manufacturer_displayName',
438- 'product_productId','product_productName','product_shortDescription',
439- 'proposer_account_id','source_account_id','status','eula_url'
440- ]
441- with s3_csv_file(s3_client, BUCKET, marketplace_key, marketplace_fieldnames) as write_row:
442- for row in rows:
443- write_row(row)
444-
445- logger.info(f"Collected {len(rows)} marketplace agreements for account {account_id}")
426+ # Write marketplace data as JSONL file per account (matching agreements structure)
427+ key = f"{MODULE_NAME}/agreements/data/agreements_{account_id}.jsonl"
428+ s3_put_jsonl(BUCKET, key, rows)
429+ logger.info("Wrote %d rows to s3://%s/%s", len(rows), BUCKET, key)
446430
447- # Write terms data as CSV if available
431+ # Write terms data as JSONL file per account if available
448432 if terms_rows:
449- terms_key = f"{MODULE_NAME}/terms/data/agreement_terms_{account_id}.csv"
450- terms_fieldnames = [
451- 'agreement_id', 'term_type', 'documents_url', 'documents_type',
452- 'currencyCode', 'chargeAmount', 'chargeDate'
453- ]
454- with s3_csv_file(s3_client, BUCKET, terms_key, terms_fieldnames) as write_row:
455- for term_row in terms_rows:
456- write_row(term_row)
457- logger.info(f"Collected {len(terms_rows)} agreement terms for account {account_id}")
433+ terms_key = f"{MODULE_NAME}/terms/data/agreement_terms_{account_id}.jsonl"
434+ s3_put_jsonl(BUCKET, terms_key, terms_rows)
435+ logger.info("Wrote %d term rows to s3://%s/%s", len(terms_rows), BUCKET, terms_key)
458436 else:
459437 logger.info(f"No terms found for account {account_id}")
460438
0 commit comments