Skip to content

Commit 390f789

Browse files
committed
Add data generator
1 parent fc5f303 commit 390f789

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

source-code/dash/generate_data.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#!/usr/bin/env python
2+
3+
from argparse import ArgumentParser
4+
import csv
5+
from datetime import datetime, timedelta
6+
import random
7+
8+
class Job:
9+
10+
def __init__(self, job_id, nr_nodes, walltime):
11+
self._job_id = job_id
12+
self._nr_nodes = nr_nodes
13+
self._nodes = list()
14+
self._walltime = timedelta(seconds=walltime)
15+
self._remaining = None
16+
17+
@property
18+
def job_id(self):
19+
return f'{self._job_id:05d}'
20+
21+
@property
22+
def nr_nodes(self):
23+
return self._nr_nodes
24+
25+
@property
26+
def nodes(self):
27+
return self._nodes.copy()
28+
29+
@property
30+
def walltime(self):
31+
return self._walltime
32+
33+
@property
34+
def remaining(self):
35+
return self._remaining
36+
37+
def start(self, nodes):
38+
self._nodes = nodes.copy()
39+
self._remaining = self._walltime
40+
41+
def update(self, delta):
42+
self._remaining -= delta
43+
44+
@property
45+
def is_done(self):
46+
return self._remaining <= timedelta(seconds=0)
47+
48+
def __repr__(self):
49+
return f'{self.job_id} {self.nr_nodes} {self.walltime}'
50+
51+
52+
class ResoureManager:
53+
54+
def __init__(self, nodes):
55+
self._free_nodes = list(nodes)
56+
self._busy_nodes = set()
57+
self._queue = list()
58+
self._running = set()
59+
self._completed = set()
60+
self._next_id = 1
61+
62+
@property
63+
def nr_nodes(self):
64+
return len(self._free_nodes) + len(self._busy_nodes)
65+
66+
@property
67+
def nr_free_nodes(self):
68+
return len(self._free_nodes)
69+
70+
@property
71+
def running_jobs(self):
72+
return self._running
73+
74+
def qstat(self):
75+
for job in self._queue:
76+
print(job, 'Q')
77+
for job in self._running:
78+
print(job, job.remaining, 'R')
79+
80+
def submit(self):
81+
self._queue.append(Job(self._next_id,
82+
random.randrange(1, self.nr_nodes),
83+
random.randint(5, 50)))
84+
self._next_id += 1
85+
86+
def cycle(self, delta):
87+
# check running jobs to see whether they are done
88+
for job in self._running:
89+
job.update(delta)
90+
if job.is_done:
91+
nodes = job.nodes
92+
for node in nodes:
93+
self._busy_nodes.remove(node)
94+
self._free_nodes.extend(nodes)
95+
self._completed.add(job)
96+
for job in self._completed:
97+
if job in self._running:
98+
self._running.remove(job)
99+
# start all jobs that can be started
100+
for job in self._queue:
101+
if not self._free_nodes:
102+
break
103+
if job.nr_nodes <= self.nr_free_nodes:
104+
nodes = [self._free_nodes.pop(0) for _ in range(job.nr_nodes)]
105+
for node in nodes:
106+
self._busy_nodes.add(node)
107+
job.start(nodes)
108+
self._running.add(job)
109+
for job in self._running:
110+
if job in self._queue:
111+
self._queue.remove(job)
112+
113+
114+
if __name__ == '__main__':
115+
arg_parser = ArgumentParser(description='generate data')
116+
arg_parser.add_argument('--nr-nodes', type=int, default=1,
117+
help='number of nodes to generate data for')
118+
arg_parser.add_argument('--nr-timesteps', type=int, default=10,
119+
help='number of timesteps')
120+
arg_parser.add_argument('--nr-jobs', type=int, default=5,
121+
help='number of jobs to submit')
122+
arg_parser.add_argument('file', help='prefix name of the file to write to')
123+
options = arg_parser.parse_args()
124+
time = datetime.now()
125+
delta = timedelta(seconds=5)
126+
nodes = [f'node_{i + 1:03d}' for i in range(options.nr_nodes)]
127+
resource_manager = ResoureManager(nodes)
128+
for _ in range(options.nr_jobs):
129+
resource_manager.submit()
130+
with open(f'{options.file}_load.csv', 'w', newline='') as csv_load_file:
131+
csv_load_writer = csv.writer(csv_load_file)
132+
csv_load_writer.writerow(['time', 'node', 'cpu_load', 'mem_load'])
133+
with open(f'{options.file}_jobs.csv', 'w', newline='') as csv_jobs_file:
134+
csv_jobs_writer = csv.writer(csv_jobs_file)
135+
csv_jobs_writer.writerow(['time', 'job_id', 'node'])
136+
for _append in range(options.nr_timesteps):
137+
print('-'*20)
138+
print(f'---- {time} ----')
139+
resource_manager.qstat()
140+
for node in nodes:
141+
cpu_load = f'{100*random.random():.2f}'
142+
mem_load = f'{100*random.random():.2f}'
143+
csv_load_writer.writerow([node, time, cpu_load, mem_load])
144+
resource_manager.cycle(delta)
145+
for job in resource_manager.running_jobs:
146+
for node in job.nodes:
147+
csv_jobs_writer.writerow([time, job.job_id, node])
148+
time += delta

0 commit comments

Comments
 (0)