Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions awsglue/transfer_to_gc/transfer_logs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import boto3
import time
import logging
import pprint
import logging

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)

region='us-east-1'
ds_client = boto3.client('datasync', region_name=region)
ec2_client = boto3.client('ec2', region_name=region)
r=1
task_list = {'aws_pub_logs_to_google','aws_cr_logs_to_google','aws_two_logs_to_google'}
task_list = ['aws_pub_logs_to_google','aws_cr_logs_to_google','aws_two_logs_to_google']
VM_FILTER = [{'Name':'tag:Name', 'Values':['DataSync_for_Logs']}]
ec='DataSync_for_Logs'
WAIT_MAX_20 = (20 * 60)
Expand All @@ -25,6 +27,7 @@
state=get_inst['Reservations'][0]['Instances'][0]['State']['Name']
is_stopped = bool(state=='stopped')
wait = 0
start_time = time.time()
while is_stopped and wait < WAIT_MAX_20:
ec2_client.start_instances(InstanceIds=[instanceId])
time.sleep(WAIT_VM_INT)
Expand All @@ -35,16 +38,20 @@

if is_stopped:
# Unable to start the instance in 20 minutes--something might be wrong
logger.error("[ERROR] Unable to start instance in {} minutes! Exiting.".format(str(WAIT_MAX_20/60))
logger.error("[ERROR] Unable to start instance in {} minutes! Exiting.".format(str(WAIT_MAX_20/60)))
exit(1)
stop_time = time.time()

logger.info("[STATUS] EC2 instance started.")
logger.info("[STATUS] Time to start VM: {}s".format(str(stop_time-start_time)))

filters=[{'Name':'Name', 'Values':['aws_pub_logs_to_google'], 'Operator':'Equals'}]
tasks=ds_client.list_tasks()['Tasks']
tasks=[tsk for tsk in tasks if tsk.get('Name',None) in task_list]
task_result = {tsk.get('Name',None): 'incomplete' for tsk in tasks}
for task in tasks:
start_time = time.time()
task_name = task.get('Name',None)
wait = 0
task_desc=ds_client.describe_task(TaskArn=task['TaskArn'])
is_unavail = bool(task_desc['Status'] == 'UNAVAILABLE')
Expand All @@ -55,6 +62,7 @@
is_unavail = bool(task_desc['Status'] == 'UNAVAILABLE')

if (task_desc['Status'] == 'AVAILABLE'):
logger.info("[STATUS] Task {} execution beginning.".format(task_name))
ds_client.start_task_execution(TaskArn=task['TaskArn'])
wait = 0
is_avail = False
Expand All @@ -63,18 +71,20 @@
cur_desc=ds_client.describe_task(TaskArn=task['TaskArn'])
is_avail = bool(cur_desc['Status']=='AVAILABLE')
wait = wait+WAIT_TASK_INT
stop_time = time.time()
if not is_avail:
# Task unavailable after 60 minutes - it might be stuck
logger.error("[STATUS] Task {} didn't become available in the time alotted ({} minutes) - possibly incomplete.".format(task.get('Name',None),str(WAIT_MAX_60/60)))
task_result[task.get('Name',None)] = 'over time'
logger.error("[STATUS] Task {} didn't become available in the time alotted ({} minutes) - possibly incomplete.".format(task_name,str(WAIT_MAX_60/60)))
task_result[task_name] = 'over time'
else:
task_result[task.get('Name',None)] = 'complete'
task_result[task_name] = 'complete'
logger.info("[STATUS] Task {} completed in {}s.".format(task_name, str(stop_time-start_time)))
else:
# Task never became available!
logger.error("[STATUS] Task {} never became available after {} minutes--skipping.".format(task.get('Name',None),str(WAIT_MAX_10/60)))
logger.error("[STATUS] Task {} never became available after {} minutes--skipping.".format(task_name,str(WAIT_MAX_10/60)))
time.sleep(WAIT_BTW_TASK_INT)

logger.info("[STATUS] Final task dispositions: ")
logger.info(pprint.pp(task_result,width=10))
logger.info(pprint.pformat(task_result,width=10))

ec2_client.stop_instances(InstanceIds=[instanceId])