Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ By using template classes and common functions found in `_common`, the user can
- RL4SysTrainingServerAbstract(ABC)
- `BaseAction.py`
- RL4SysActionAbstract(ABC)

# Installation Notes
If using python 3.7 or earlier, you will need to install pickle as it is not included as part of the python standard library until python 3.8
You can uncomment the lines for pickle in setup.py or requirements.txt
100 changes: 75 additions & 25 deletions examples/job-scheduling/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""
Environment script: Batch Job Scheduling

x
Training server parameters:
kernel_size | MAX_QUEUE_SIZE = 128
kernel_dim | JOB_FEATURES = 8
Expand Down Expand Up @@ -155,6 +155,11 @@ def __init__(self, path):
self.max_procs = int(line.split(":")[1].strip())
continue

# if max_procs = 0, it means node/proc are the same.
if self.max_procs == 0:
self.max_procs = self.max_nodes


j = Job(line)
if j.run_time > self.max_exec_time:
self.max_exec_time = j.run_time
Expand All @@ -173,14 +178,18 @@ def __init__(self, path):
if j.run_time < 0:
j.run_time = 10
if j.run_time > 0:
self.all_jobs.append(j)
#job has to pass checks below before adding to the list

if j.request_number_of_processors > self.max_procs:
j.request_number_of_processors = self.max_procs
#makes sure no job has more than file set max procs

if j.request_number_of_processors > self.max:
self.max = j.request_number_of_processors
#self.max = largest processor request seen so far

self.all_jobs.append(j)

# if max_procs = 0, it means node/proc are the same.
if self.max_procs == 0:
self.max_procs = self.max_nodes

print("Max Allocated Processors:", str(self.max), "; max node:", self.max_nodes,
"; max procs:", self.max_procs,
Expand Down Expand Up @@ -318,12 +327,12 @@ def reset(self):
JOB_FEATURES = 8
DEBUG = False

JOB_SEQUENCE_SIZE = 256
# JOB_SEQUENCE_SIZE = 256
SKIP_TIME = 360 # skip 60 seconds


class BatchSchedSim(ApplicationAbstract):
def __init__(self, workload_file, seed, job_score_type=0, backfil=False, model=None, tensorboard=False):
def __init__(self, workload_file, seed, job_score_type=0, backfil=False, model=None, tensorboard=False, sequence_length=256, batch_job_slice=0):
super().__init__()
print("Initialize Batch Job Scheduler Simulator from dataset:", workload_file)

Expand All @@ -335,23 +344,47 @@ def __init__(self, workload_file, seed, job_score_type=0, backfil=False, model=N
self.visible_jobs = []
self.pairs = []

self.current_timestamp = 0
self.start = 0
self.next_arriving_job_idx = 0
self.sequence_length = sequence_length
self.batch_job_slice = batch_job_slice


if seed < 0:
print(f"Seed must be a non-negative integer or omitted, not {seed}")

seed_seq = np.random.SeedSequence(seed)
self.np_random = np.random.Generator(np.random.PCG64(seed_seq))

#have to set up randomnness seed before we do random start, as it will affect the randomness seed and we wont get consistent results

if self.batch_job_slice == 0: #if the user does not specify a slice, use the entire workload
self.start = self.np_random.integers(
low=self.sequence_length,
high=(self.loads.size() - self.sequence_length - 1) + 1,
endpoint=True
) #ensures the we can fit the sequence length
else:
assert batch_job_slice > self.sequence_length, "Slice must be larger than sequence length"
self.start = self.np_random.integers(
low=self.sequence_length,
high=(self.batch_job_slice - self.sequence_length - 1) + 1,
endpoint=True
) #ensures the slice can fit the sequence length

self.next_arriving_job_idx = self.start + 1
# just avoid hitting the end of the workloads.
self.last_job_in_batch = self.start + self.loads.size() - JOB_SEQUENCE_SIZE
self.num_job_in_batch = self.loads.size() - JOB_SEQUENCE_SIZE
self.last_job_in_batch = self.start + self.sequence_length
self.num_job_in_batch = self.sequence_length
self.current_timestamp = self.loads[self.start].submit_time



# 0: Average bounded slowdown, 1: Average waiting time
# 2: Average turnaround time, 3: Resource utilization
# 4: Average slowdown
self.job_score_type = job_score_type
self.backfil = backfil

if seed < 0:
print(f"Seed must be a non-negative integer or omitted, not {seed}")

seed_seq = np.random.SeedSequence(seed)
self.np_random = np.random.Generator(np.random.PCG64(seed_seq))

self.rlagent = RL4SysAgent(model=model)

Expand Down Expand Up @@ -428,11 +461,25 @@ def reset(self):
self.visible_jobs = []
self.pairs = []

self.current_timestamp = 0
self.start = 0
self.next_arriving_job_idx = 0
self.last_job_in_batch = self.start + self.loads.size() - JOB_SEQUENCE_SIZE
self.num_job_in_batch = self.loads.size() - JOB_SEQUENCE_SIZE
if self.batch_job_slice == 0: #if the user does not specify a slice, use the entire workload
self.start = self.np_random.integers(
low=self.sequence_length,
high=(self.loads.size() - self.sequence_length - 1) + 1,
endpoint=True
) #ensures the we can fit the sequence length
else:
assert batch_job_slice > self.sequence_length, "Slice must be larger than sequence length"
self.start = self.np_random.integers(
low=self.sequence_length,
high=(self.batch_job_slice - self.sequence_length - 1) + 1,
endpoint=True
) #ensures the slice can fit the sequence length

self.next_arriving_job_idx = self.start + 1
# just avoid hitting the end of the workloads.
self.last_job_in_batch = self.start + self.sequence_length
self.num_job_in_batch = self.sequence_length
self.current_timestamp = self.loads[self.start].submit_time
print("[schedule.py - reset()]")

"""
Expand Down Expand Up @@ -481,7 +528,7 @@ def run_application(self):

self.rl_scheduled_jobs.append(job_for_scheduling)
print("[schedule.py - schedule_whole_trace()]: ",job_for_scheduling)
if rl_runs > JOB_SEQUENCE_SIZE:
if rl_runs > self.sequence_length:
rl_runs = 0
rl_working = False
rl_total = self.calculate_performance_return(self.rl_scheduled_jobs)
Expand Down Expand Up @@ -843,8 +890,11 @@ def job_score(self, job_for_scheduling):
help="number of iterations of entire workload to train model on")
parser.add_argument('--start-server', '-s', dest='algorithm', type=str, default="PPO",
help="run a local training server, using specified algorithm")
parser.add_argument('--sequence_length', '-l', type=int, default=256, help="sequence length to use from the workload")
parser.add_argument('--batch_job_slice', '-slice', type=int, default=0, help="slice of the workload to sample from during training")
args, extras = parser.parse_known_args()


# get workload file's absolute location if user-specified
app_dir = os.path.dirname(os.path.abspath(__file__))
if args.workload == 'DEFAULT':
Expand All @@ -854,9 +904,9 @@ def job_score(self, job_for_scheduling):

# start training server
if args.algorithm != "No Server":
# buffer size for this environment should be JOB_SEQUENCE_SIZE * 100
# buffer size for this environment should be sequence_length * 100
extras.append('--buf_size')
extras.append(str(JOB_SEQUENCE_SIZE * 100))
extras.append(str(args.sequence_length * 100))
rl_training_server = TrainingServer(args.algorithm, MAX_QUEUE_SIZE, JOB_FEATURES, extras, app_dir, args.tensorboard)
print("[schedule.py] Created Training Server")

Expand All @@ -865,7 +915,7 @@ def job_score(self, job_for_scheduling):

# create simulation environment
sim = BatchSchedSim(workload_file=workload_file, seed=args.seed, job_score_type=args.job_score_type,
backfil=args.backfil, model=model_arg)
backfil=args.backfil, model=model_arg, sequence_length=args.sequence_length, batch_job_slice=args.batch_job_slice)

# iterate multiple rounds to train the models, default 100
iters = args.number_of_iterations
Expand Down
6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
numpy
torch
pyzmq
pickle
tensorboard
# pickle
tensorboard
seaborn
scipy
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
'numpy',
'torch',
'pyzmq',
'pickle',
# 'pickle',
# Add other dependencies required by your package
],
classifiers=[
Expand Down