Skip to content

Commit 2115c28

Browse files
authored
Merge pull request #40 from Chia-Network/timelord-fixes
Timelord fixes
2 parents 85c5826 + a86441b commit 2115c28

File tree

2 files changed

+54
-60
lines changed

2 files changed

+54
-60
lines changed

lib/chiavdf/fast_vdf/vdf_server.cpp

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using boost::asio::ip::tcp;
44

55
const int max_length = 2048;
6+
const int kMaxProcessesAllowed = 3;
67
std::mutex socket_mutex;
78

89
void PrintInfo(std::string input) {
@@ -24,22 +25,6 @@ void CreateAndWriteProof(integer D, form x, int64_t num_iterations, WesolowskiCa
2425
boost::asio::write(sock, boost::asio::buffer(str_result.c_str(), str_result.size()));
2526
}
2627

27-
28-
void PollTimelord(tcp::socket& sock, bool& got_iters) {
29-
// Wait for 15s, if no iters come, poll each 5 seconds the timelord.
30-
int seconds = 0;
31-
while (!got_iters) {
32-
std::this_thread::sleep_for (std::chrono::seconds(1));
33-
seconds++;
34-
if (seconds >= 15 && (seconds - 15) % 5 == 0) {
35-
socket_mutex.lock();
36-
boost::asio::write(sock, boost::asio::buffer("POLL", 4));
37-
socket_mutex.unlock();
38-
}
39-
}
40-
}
41-
42-
4328
void session(tcp::socket sock) {
4429
try {
4530
char disc[350];
@@ -107,7 +92,6 @@ void session(tcp::socket sock) {
10792
bool stopped = false;
10893
bool got_iters = false;
10994
std::thread vdf_worker(repeated_square, f, D, L, std::ref(weso), std::ref(stopped));
110-
std::thread poll_thread(PollTimelord, std::ref(sock), std::ref(got_iters));
11195

11296
// Tell client that I'm ready to get the challenges.
11397
boost::asio::write(sock, boost::asio::buffer("OK", 2));
@@ -125,7 +109,6 @@ void session(tcp::socket sock) {
125109
if (iters == 0) {
126110
PrintInfo("Got stop signal!");
127111
stopped = true;
128-
poll_thread.join();
129112
for (int i = 0; i < threads.size(); i++)
130113
stop_vector[i] = true;
131114
for (int t = 0; t < threads.size(); t++) {
@@ -135,7 +118,7 @@ void session(tcp::socket sock) {
135118
free(forms);
136119
} else {
137120
int max_iter = 0;
138-
int max_iter_thread_id;
121+
int max_iter_thread_id = -1;
139122
int min_iter = std::numeric_limits<int> :: max();
140123
bool unique = true;
141124
for (auto active_iter: seen_iterations) {
@@ -155,13 +138,13 @@ void session(tcp::socket sock) {
155138
PrintInfo("Duplicate iteration " + to_string(iters) + "... Ignoring.");
156139
continue;
157140
}
158-
if (threads.size() < 3 || iters < min_iter) {
141+
if (threads.size() < kMaxProcessesAllowed || iters < min_iter) {
159142
seen_iterations.insert({iters, threads.size()});
160143
PrintInfo("Running proving for iter: " + to_string(iters));
161144
stop_vector[threads.size()] = false;
162145
threads.push_back(std::thread(CreateAndWriteProof, D, f, iters, std::ref(weso),
163146
std::ref(stop_vector[threads.size()]), std::ref(sock)));
164-
if (threads.size() > 3) {
147+
if (threads.size() > kMaxProcessesAllowed) {
165148
PrintInfo("Stopping proving for iter: " + to_string(max_iter));
166149
stop_vector[max_iter_thread_id] = true;
167150
seen_iterations.erase({max_iter, max_iter_thread_id});

src/timelord.py

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def _update_avg_ips(self, challenge_hash, iterations_needed, ip):
149149
new_avg_ips = int((prev_avg_ips * trials + ips) / (trials + 1))
150150
self.avg_ips[ip] = (new_avg_ips, trials + 1)
151151
log.info(f"New estimate: {new_avg_ips}")
152-
# self.pending_iters[challenge_hash].remove(iterations_needed)
152+
self.pending_iters[challenge_hash].remove(iterations_needed)
153153
else:
154154
log.info(
155155
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}"
@@ -180,6 +180,31 @@ async def _update_proofs_count(self, challenge_weight):
180180
del self.active_discriminants_start_time[active_disc]
181181
self.done_discriminants.append(active_disc)
182182

183+
async def _send_iterations(self, challenge_hash, writer):
184+
alive_discriminant = True
185+
while (alive_discriminant):
186+
async with self.lock:
187+
if (challenge_hash in self.active_discriminants) and (
188+
challenge_hash in self.pending_iters
189+
):
190+
if challenge_hash not in self.submitted_iters:
191+
self.submitted_iters[challenge_hash] = []
192+
for iter in sorted(self.pending_iters[challenge_hash]):
193+
if iter in self.submitted_iters[challenge_hash]:
194+
continue
195+
self.submitted_iters[challenge_hash].append(iter)
196+
if len(str(iter)) < 10:
197+
iter_size = "0" + str(len(str(iter)))
198+
else:
199+
iter_size = str(len(str(iter)))
200+
writer.write((iter_size + str(iter)).encode())
201+
await writer.drain()
202+
log.info(f"New iteration submitted: {iter}")
203+
await asyncio.sleep(3)
204+
async with self.lock:
205+
if (challenge_hash in self.done_discriminants):
206+
alive_discriminant = False
207+
183208
async def _do_process_communication(
184209
self, challenge_hash, challenge_weight, ip, port
185210
):
@@ -219,29 +244,10 @@ async def _do_process_communication(
219244
self.active_discriminants[challenge_hash] = (writer, challenge_weight, ip)
220245
self.active_discriminants_start_time[challenge_hash] = time.time()
221246

247+
asyncio.create_task(self._send_iterations(challenge_hash, writer))
248+
222249
# Listen to the server until "STOP" is received.
223250
while True:
224-
async with self.lock:
225-
if (challenge_hash in self.active_discriminants) and (
226-
challenge_hash in self.pending_iters
227-
):
228-
if challenge_hash not in self.submitted_iters:
229-
self.submitted_iters[challenge_hash] = []
230-
log.info(
231-
f"Pending: {self.pending_iters[challenge_hash]} "
232-
f"Submitted: {self.submitted_iters[challenge_hash]} Hash: {challenge_hash}"
233-
)
234-
for iter in sorted(self.pending_iters[challenge_hash]):
235-
if iter in self.submitted_iters[challenge_hash]:
236-
continue
237-
self.submitted_iters[challenge_hash].append(iter)
238-
if len(str(iter)) < 10:
239-
iter_size = "0" + str(len(str(iter)))
240-
else:
241-
iter_size = str(len(str(iter)))
242-
writer.write((iter_size + str(iter)).encode())
243-
await writer.drain()
244-
245251
try:
246252
data = await reader.readexactly(4)
247253
except (asyncio.IncompleteReadError, ConnectionResetError) as e:
@@ -259,21 +265,6 @@ async def _do_process_communication(
259265
len_server = len(self.free_servers)
260266
log.info(f"Process ended... Server length {len_server}")
261267
break
262-
elif data.decode() == "POLL":
263-
async with self.lock:
264-
# If I have a newer discriminant... Free up the VDF server
265-
if (
266-
len(self.discriminant_queue) > 0
267-
and challenge_weight
268-
< max([h for _, h in self.discriminant_queue])
269-
and challenge_hash in self.active_discriminants
270-
):
271-
log.info("Got poll, stopping the challenge!")
272-
writer.write(b"010")
273-
await writer.drain()
274-
del self.active_discriminants[challenge_hash]
275-
del self.active_discriminants_start_time[challenge_hash]
276-
self.done_discriminants.append(challenge_hash)
277268
else:
278269
try:
279270
# This must be a proof, read the continuation.
@@ -341,9 +332,25 @@ async def _manage_discriminant_queue(self):
341332
)
342333
self.discriminant_queue.clear()
343334
else:
344-
disc = next(
335+
max_weight_disc = [
345336
d for d, h in self.discriminant_queue if h == max_weight
346-
)
337+
]
338+
with_iters = [
339+
d for d in max_weight_disc
340+
if d in self.pending_iters
341+
and len(self.pending_iters[d]) != 0
342+
]
343+
if (len(with_iters) == 0):
344+
disc = max_weight_disc[0]
345+
else:
346+
min_iter = min([
347+
min(self.pending_iters[d])
348+
for d in with_iters
349+
])
350+
disc = next(
351+
d for d in with_iters
352+
if min(self.pending_iters[d]) == min_iter
353+
)
347354
if len(self.free_servers) != 0:
348355
ip, port = self.free_servers[0]
349356
self.free_servers = self.free_servers[1:]
@@ -415,10 +422,14 @@ async def proof_of_space_info(
415422

416423
if proof_of_space_info.challenge_hash not in self.pending_iters:
417424
self.pending_iters[proof_of_space_info.challenge_hash] = []
425+
if proof_of_space_info.challenge_hash not in self.submitted_iters:
426+
self.submitted_iters[proof_of_space_info.challenge_hash] = []
418427

419428
if (
420429
proof_of_space_info.iterations_needed
421430
not in self.pending_iters[proof_of_space_info.challenge_hash]
431+
and proof_of_space_info.iterations_needed
432+
not in self.submitted_iters[proof_of_space_info.challenge_hash]
422433
):
423434
log.info(
424435
f"proof_of_space_info {proof_of_space_info.challenge_hash} adding "

0 commit comments

Comments
 (0)