From 043089e489e25d5007f82a3c77e91aacd085971c Mon Sep 17 00:00:00 2001 From: Kyle Siefring Date: Mon, 5 Jun 2017 08:11:54 -0400 Subject: [PATCH 1/2] Add server side run estimates. --- estimator.py | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++++ rd_server.py | 66 +++++++++++++++++++++++++++++++++-- rd_tool.py | 2 +- sshslot.py | 1 + work.py | 17 +++++++++ 5 files changed, 181 insertions(+), 4 deletions(-) create mode 100644 estimator.py diff --git a/estimator.py b/estimator.py new file mode 100644 index 0000000..b1c9a92 --- /dev/null +++ b/estimator.py @@ -0,0 +1,99 @@ +import json +import os +import threading +import warnings +from numpy.polynomial import polynomial as poly + +# A global lock used to preventing estimator data files from being read and written to a the same time. +estimator_file_lock = threading.Lock() + +# Classes that extend Estimator assume that work is of a correlated type (i.e. RDEstimator has RDWork) +class Estimator: + def __init__(self): + pass + def update(self, work, time): + pass + def get_estimate(self, work): + return 0 + def finish(self): + pass + +class RDEstimator(Estimator): + def __init__(self, run): + super().__init__() + self.run = run + self.ref_scale = 0.0 + self.data = None + input_filename = 'estimate_data/rd/{}/{}.json'.format(run.codec, run.set) + if os.path.isfile(input_filename): + with estimator_file_lock: + input_file = open(input_filename,'r',encoding='utf-8') + input_json = json.load(input_file) + input_file.close() + self.ref_scale = input_json['scale'] + data = {} + for filename_to_quality in input_json['videos'].items(): + qualities = [] + ratios = [] + for quality_to_ratio in filename_to_quality[1].items(): + qualities.append(int(quality_to_ratio[0])) + ratios.append(quality_to_ratio[1]) + with warnings.catch_warnings(): + # ignore the RankWarning that polyfit emits + warnings.simplefilter('ignore') + data[filename_to_quality[0]] = poly.Polynomial(poly.polyfit(qualities, ratios, 5)) + self.data = data + self.scale = 0.0 + self.sample_sum = 0 + self.sample_counter = 0 + def update(self, work, time): + ratio = 1.0 + if self.data: + ratio = self.data[work.filename](work.quality) + # time/ratio is the scale that would have been correct for the given work + # the scale is then weighted be the time it took + self.sample_sum += (time/ratio) * time + self.sample_counter += time + self.scale = self.sample_sum/self.sample_counter + def get_estimate(self, work): + ratio = 1.0 + if self.data: + ratio = self.data[work.filename](work.quality) + if self.sample_counter == 0: + return ratio * self.ref_scale + return ratio * self.scale + +# Extend RDEstimator since we still want an estimate +class RDDataCollector(RDEstimator): + def __init__(self, run, video_filenames): + super().__init__(run) + collected_data = {} + for filename in video_filenames: + #todo: use sorted array on the other side + collected_data[filename] = {} + self.collected_data = collected_data + self.longest_work = 0.0 + def update(self, work, time): + super().update(work, time) + self.collected_data[work.filename][work.quality] = time + self.longest_work = max(self.longest_work, time) + def finish(self): + run = self.run + data_json = {} + data_json['scale'] = self.longest_work + videos_json = {} + for filename_to_quality in self.collected_data.items(): + qualities_json = {} + for quality_to_time in filename_to_quality[1].items(): + qualities_json[str(quality_to_time[0])] = quality_to_time[1] / self.longest_work + videos_json[filename_to_quality[0]] = qualities_json + data_json['videos'] = videos_json + output_filename = 'estimate_data/rd/{}/{}.json'.format(run.codec, run.set) + with estimator_file_lock: + try: + os.makedirs(os.path.dirname(output_filename), exist_ok=True) + output_file = open(output_filename,'w',encoding='utf-8') + json.dump(data_json, output_file) + except Exception as e: + pass + output_file.close() diff --git a/rd_server.py b/rd_server.py index f03c0a2..53d1179 100755 --- a/rd_server.py +++ b/rd_server.py @@ -3,18 +3,20 @@ import tornado.ioloop import tornado.web import os -import codecs import json import argparse import sshslot import threading import time import awsremote +from queue import PriorityQueue from work import * from utility import * +from estimator import * -video_sets_f = codecs.open('sets.json','r',encoding='utf-8') +video_sets_f = open('sets.json','r',encoding='utf-8') video_sets = json.load(video_sets_f) +video_sets_f.close() machines = [] slots = [] @@ -81,7 +83,7 @@ def get(self): pass if 'qualities' in info: if info['qualities'] != '': - run.quality = info['qualities'].split() + run.quality = list(map(int, info['qualities'].split())) if 'extra_options' in info: run.extra_options = info['extra_options'] if 'save_encode' in info: @@ -91,6 +93,10 @@ def get(self): run.write_status() run_list.append(run) video_filenames = video_sets[run.set]['sources'] + if 'collect_estimate_times' in info and info['collect_estimate_times']: + run.estimator = RDDataCollector(run, video_filenames) + else: + run.estimator = RDEstimator(run) run.work_items = create_rdwork(run, video_filenames) work_list.extend(run.work_items) if False: @@ -118,10 +124,12 @@ def get(self): class RunStatusHandler(tornado.web.RequestHandler): def get(self): self.set_header("Content-Type", "application/json") + current_time = time.perf_counter() runs = [] for run in run_list: run_json = {} run_json['run_id'] = run.runid + run_json['eta'] = max(0, run.eta - current_time) run_json['completed'] = 0 run_json['total'] = 0 run_json['info'] = run.info @@ -240,11 +248,20 @@ def machine_allocator(): awsremote.stop_machines(args.awsgroup) time.sleep(60) +class KeyedEntry: + def __init__(self, key, data): + self.key = key + self.data = data + def __lt__(self, other): + return self.key < other.key + def scheduler_tick(): global free_slots global work_list global run_list global work_done + active_slots = [] + update_simulation = False max_retries = 5 # look for completed work for slot in slots: @@ -252,6 +269,9 @@ def scheduler_tick(): if slot.work.failed == False: slot.work.done = True work_done.append(slot.work) + slot.work.run.completed += 1 + slot.work.update_estimator() + update_simulation = True rd_print(slot.work.log,slot.work.get_name(),'finished.') elif slot.work.retries < max_retries: slot.work.retries += 1 @@ -261,9 +281,49 @@ def scheduler_tick(): else: slot.work.done = True work_done.append(slot.work) + slot.work.run.completed += 1 rd_print(slot.work.log,slot.work.get_name(),'given up on.') slot.work = None free_slots.append(slot) + elif slot.work != None: + active_slots.append(slot) + # update the simulation to find etas for runs + if update_simulation: + sim_queue = PriorityQueue() + sim_completed = {} + for run in run_list: + sim_completed[id(run)] = run.completed + current_time = time.perf_counter() + # load in progress work into the queue + for slot in active_slots: + work = slot.work + remaining = max(0, work.estimate_time() - (current_time - work.start_time)) + sim_queue.put(KeyedEntry(remaining, work)) + sim_work_list = list(work_list) + # fill any free slots + for i in range(0, min(len(free_slots), len(sim_work_list))): + work = sim_work_list.pop(0) + sim_queue.put(KeyedEntry(work.estimate_time(), work)) + # go through the simulation's main loop until there is no work to add to empty slots + while len(sim_work_list) != 0: + finished = sim_queue.get() + sim_time = finished.key + work = finished.data + run = work.run + sim_completed[id(run)] += 1 + if sim_completed[id(run)] == len(run.work_items): + run.eta = current_time + sim_time + new_work = sim_work_list.pop(0) + sim_queue.put(KeyedEntry(sim_time + new_work.estimate_time(), new_work)) + # go through the simulation's main loop until all the slots are empty + while not sim_queue.empty(): + finished = sim_queue.get() + sim_time = finished.key + work = finished.data + run = work.run + sim_completed[id(run)] += 1 + if sim_completed[id(run)] == len(run.work_items): + run.eta = current_time + sim_time # fill empty slots with new work if len(work_list) != 0: if len(free_slots) != 0: diff --git a/rd_tool.py b/rd_tool.py index 0b3e121..4a1ca02 100755 --- a/rd_tool.py +++ b/rd_tool.py @@ -75,7 +75,7 @@ run = Run(args.codec) run.runid = str(args.runid) if args.qualities: - run.quality = args.qualities + run.quality = list(map(int, args.qualities)) run.set = args.set[0] run.bindir = args.bindir run.save_encode = args.save_encode diff --git a/sshslot.py b/sshslot.py index 5e6e31f..4410b5d 100644 --- a/sshslot.py +++ b/sshslot.py @@ -67,6 +67,7 @@ def gather(self): return self.p.communicate() def start_work(self, work): self.work = work + work.record_start_time() work_thread = threading.Thread(target=self.execute, args=(work,)) work_thread.daemon = True self.busy = True diff --git a/work.py b/work.py index a0264fb..bd0ff99 100644 --- a/work.py +++ b/work.py @@ -1,6 +1,8 @@ from utility import * import subprocess import sys +import time +from estimator import * # Finding files such as `this_(that)` requires `'` be placed on both # sides of the quote so the `()` are both captured. Files such as @@ -41,6 +43,9 @@ def __init__(self, codec): self.rundir = None self.status = 'running' self.work_items = [] + self.estimator = Estimator() + self.completed = 0 + self.eta = 0 def write_status(self): f = open(self.rundir+'/status.txt','w') f.write(self.status) @@ -53,6 +58,7 @@ def cancel(self): def finish(self): if self.log: self.log.close() + self.estimator.finish() class RDRun(Run): def reduce(self): @@ -84,9 +90,18 @@ def __init__(self): self.failed = False self.runid = '' self.slot = None + self.start_time = 0 + self.run = None def cancel(self): self.failed = True self.done = True + def record_start_time(self): + self.start_time = time.perf_counter() + def update_estimator(self): + elapsed = time.perf_counter() - self.start_time + self.run.estimator.update(self, elapsed) + def estimate_time(self): + return self.run.estimator.get_estimate(self) class RDWork(Work): def __init__(self): @@ -212,6 +227,7 @@ def create_rdwork(run, video_filenames): for filename in video_filenames: for q in sorted(run.quality, reverse = True): work = RDWork() + work.run = run work.log = run.log work.quality = q work.runid = run.runid @@ -274,6 +290,7 @@ def create_abwork(run, video_filenames): for filename in video_filenames: for bpp in bits_per_pixel: work = ABWork() + work.run = run work.log = run.log work.bpp = bpp work.codec = run.codec From 4334d74e4c148702c82cccb0b2dddb2f5cfd3a32 Mon Sep 17 00:00:00 2001 From: Kyle Siefring Date: Tue, 6 Jun 2017 21:30:11 -0400 Subject: [PATCH 2/2] Refine previous patch. --- estimator.py | 14 +++++++++----- rd_server.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/estimator.py b/estimator.py index b1c9a92..08a776f 100644 --- a/estimator.py +++ b/estimator.py @@ -3,6 +3,7 @@ import threading import warnings from numpy.polynomial import polynomial as poly +from utility import rd_print # A global lock used to preventing estimator data files from being read and written to a the same time. estimator_file_lock = threading.Lock() @@ -69,7 +70,6 @@ def __init__(self, run, video_filenames): super().__init__(run) collected_data = {} for filename in video_filenames: - #todo: use sorted array on the other side collected_data[filename] = {} self.collected_data = collected_data self.longest_work = 0.0 @@ -92,8 +92,12 @@ def finish(self): with estimator_file_lock: try: os.makedirs(os.path.dirname(output_filename), exist_ok=True) - output_file = open(output_filename,'w',encoding='utf-8') - json.dump(data_json, output_file) + try: + output_file = open(output_filename,'w',encoding='utf-8') + json.dump(data_json, output_file) + except Exception as e: + raise + finally: + output_file.close() except Exception as e: - pass - output_file.close() + rd_print(run.log,'Failed to save estimator data on '+run.runid) diff --git a/rd_server.py b/rd_server.py index 53d1179..b28e1a9 100755 --- a/rd_server.py +++ b/rd_server.py @@ -93,7 +93,7 @@ def get(self): run.write_status() run_list.append(run) video_filenames = video_sets[run.set]['sources'] - if 'collect_estimate_times' in info and info['collect_estimate_times']: + if 'profile_set' in info and info['profile_set']: run.estimator = RDDataCollector(run, video_filenames) else: run.estimator = RDEstimator(run)