diff --git a/README.md b/README.md index fdbe9c8d..cfe57c4e 100644 --- a/README.md +++ b/README.md @@ -121,3 +121,7 @@ By using template classes and common functions found in `_common`, the user can - RL4SysTrainingServerAbstract(ABC) - `BaseAction.py` - RL4SysActionAbstract(ABC) + +# Installation Notes +If using python 3.7 or earlier, you will need to install pickle as it is not included as part of the python standard library until python 3.8 +You can uncomment the lines for pickle in setup.py or requirements.txt \ No newline at end of file diff --git a/examples/job-scheduling/scheduler.py b/examples/job-scheduling/scheduler.py index f17495a4..f00226c9 100644 --- a/examples/job-scheduling/scheduler.py +++ b/examples/job-scheduling/scheduler.py @@ -17,7 +17,7 @@ """ Environment script: Batch Job Scheduling - +x Training server parameters: kernel_size | MAX_QUEUE_SIZE = 128 kernel_dim | JOB_FEATURES = 8 @@ -155,6 +155,11 @@ def __init__(self, path): self.max_procs = int(line.split(":")[1].strip()) continue + # if max_procs = 0, it means node/proc are the same. + if self.max_procs == 0: + self.max_procs = self.max_nodes + + j = Job(line) if j.run_time > self.max_exec_time: self.max_exec_time = j.run_time @@ -173,14 +178,18 @@ def __init__(self, path): if j.run_time < 0: j.run_time = 10 if j.run_time > 0: - self.all_jobs.append(j) + #job has to pass checks below before adding to the list + + if j.request_number_of_processors > self.max_procs: + j.request_number_of_processors = self.max_procs + #makes sure no job has more than file set max procs if j.request_number_of_processors > self.max: self.max = j.request_number_of_processors + #self.max = largest processor request seen so far + + self.all_jobs.append(j) - # if max_procs = 0, it means node/proc are the same. - if self.max_procs == 0: - self.max_procs = self.max_nodes print("Max Allocated Processors:", str(self.max), "; max node:", self.max_nodes, "; max procs:", self.max_procs, @@ -318,12 +327,12 @@ def reset(self): JOB_FEATURES = 8 DEBUG = False -JOB_SEQUENCE_SIZE = 256 +# JOB_SEQUENCE_SIZE = 256 SKIP_TIME = 360 # skip 60 seconds class BatchSchedSim(ApplicationAbstract): - def __init__(self, workload_file, seed, job_score_type=0, backfil=False, model=None, tensorboard=False): + def __init__(self, workload_file, seed, job_score_type=0, backfil=False, model=None, tensorboard=False, sequence_length=256, batch_job_slice=0): super().__init__() print("Initialize Batch Job Scheduler Simulator from dataset:", workload_file) @@ -335,23 +344,47 @@ def __init__(self, workload_file, seed, job_score_type=0, backfil=False, model=N self.visible_jobs = [] self.pairs = [] - self.current_timestamp = 0 - self.start = 0 - self.next_arriving_job_idx = 0 + self.sequence_length = sequence_length + self.batch_job_slice = batch_job_slice + + + if seed < 0: + print(f"Seed must be a non-negative integer or omitted, not {seed}") + + seed_seq = np.random.SeedSequence(seed) + self.np_random = np.random.Generator(np.random.PCG64(seed_seq)) + + #have to set up randomnness seed before we do random start, as it will affect the randomness seed and we wont get consistent results + + if self.batch_job_slice == 0: #if the user does not specify a slice, use the entire workload + self.start = self.np_random.integers( + low=self.sequence_length, + high=(self.loads.size() - self.sequence_length - 1) + 1, + endpoint=True + ) #ensures the we can fit the sequence length + else: + assert batch_job_slice > self.sequence_length, "Slice must be larger than sequence length" + self.start = self.np_random.integers( + low=self.sequence_length, + high=(self.batch_job_slice - self.sequence_length - 1) + 1, + endpoint=True + ) #ensures the slice can fit the sequence length + + self.next_arriving_job_idx = self.start + 1 # just avoid hitting the end of the workloads. - self.last_job_in_batch = self.start + self.loads.size() - JOB_SEQUENCE_SIZE - self.num_job_in_batch = self.loads.size() - JOB_SEQUENCE_SIZE + self.last_job_in_batch = self.start + self.sequence_length + self.num_job_in_batch = self.sequence_length + self.current_timestamp = self.loads[self.start].submit_time + + + # 0: Average bounded slowdown, 1: Average waiting time # 2: Average turnaround time, 3: Resource utilization # 4: Average slowdown self.job_score_type = job_score_type self.backfil = backfil - if seed < 0: - print(f"Seed must be a non-negative integer or omitted, not {seed}") - seed_seq = np.random.SeedSequence(seed) - self.np_random = np.random.Generator(np.random.PCG64(seed_seq)) self.rlagent = RL4SysAgent(model=model) @@ -428,11 +461,25 @@ def reset(self): self.visible_jobs = [] self.pairs = [] - self.current_timestamp = 0 - self.start = 0 - self.next_arriving_job_idx = 0 - self.last_job_in_batch = self.start + self.loads.size() - JOB_SEQUENCE_SIZE - self.num_job_in_batch = self.loads.size() - JOB_SEQUENCE_SIZE + if self.batch_job_slice == 0: #if the user does not specify a slice, use the entire workload + self.start = self.np_random.integers( + low=self.sequence_length, + high=(self.loads.size() - self.sequence_length - 1) + 1, + endpoint=True + ) #ensures the we can fit the sequence length + else: + assert batch_job_slice > self.sequence_length, "Slice must be larger than sequence length" + self.start = self.np_random.integers( + low=self.sequence_length, + high=(self.batch_job_slice - self.sequence_length - 1) + 1, + endpoint=True + ) #ensures the slice can fit the sequence length + + self.next_arriving_job_idx = self.start + 1 + # just avoid hitting the end of the workloads. + self.last_job_in_batch = self.start + self.sequence_length + self.num_job_in_batch = self.sequence_length + self.current_timestamp = self.loads[self.start].submit_time print("[schedule.py - reset()]") """ @@ -481,7 +528,7 @@ def run_application(self): self.rl_scheduled_jobs.append(job_for_scheduling) print("[schedule.py - schedule_whole_trace()]: ",job_for_scheduling) - if rl_runs > JOB_SEQUENCE_SIZE: + if rl_runs > self.sequence_length: rl_runs = 0 rl_working = False rl_total = self.calculate_performance_return(self.rl_scheduled_jobs) @@ -843,8 +890,11 @@ def job_score(self, job_for_scheduling): help="number of iterations of entire workload to train model on") parser.add_argument('--start-server', '-s', dest='algorithm', type=str, default="PPO", help="run a local training server, using specified algorithm") + parser.add_argument('--sequence_length', '-l', type=int, default=256, help="sequence length to use from the workload") + parser.add_argument('--batch_job_slice', '-slice', type=int, default=0, help="slice of the workload to sample from during training") args, extras = parser.parse_known_args() + # get workload file's absolute location if user-specified app_dir = os.path.dirname(os.path.abspath(__file__)) if args.workload == 'DEFAULT': @@ -854,9 +904,9 @@ def job_score(self, job_for_scheduling): # start training server if args.algorithm != "No Server": - # buffer size for this environment should be JOB_SEQUENCE_SIZE * 100 + # buffer size for this environment should be sequence_length * 100 extras.append('--buf_size') - extras.append(str(JOB_SEQUENCE_SIZE * 100)) + extras.append(str(args.sequence_length * 100)) rl_training_server = TrainingServer(args.algorithm, MAX_QUEUE_SIZE, JOB_FEATURES, extras, app_dir, args.tensorboard) print("[schedule.py] Created Training Server") @@ -865,7 +915,7 @@ def job_score(self, job_for_scheduling): # create simulation environment sim = BatchSchedSim(workload_file=workload_file, seed=args.seed, job_score_type=args.job_score_type, - backfil=args.backfil, model=model_arg) + backfil=args.backfil, model=model_arg, sequence_length=args.sequence_length, batch_job_slice=args.batch_job_slice) # iterate multiple rounds to train the models, default 100 iters = args.number_of_iterations diff --git a/requirements.txt b/requirements.txt index 1aa719c3..27cd07c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ numpy torch pyzmq -pickle -tensorboard \ No newline at end of file +# pickle +tensorboard +seaborn +scipy \ No newline at end of file diff --git a/setup.py b/setup.py index bc6f341a..0b8b59fa 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ 'numpy', 'torch', 'pyzmq', - 'pickle', + # 'pickle', # Add other dependencies required by your package ], classifiers=[