Skip to content

Commit 9606aa9

Browse files
committed
Add send iters loop inside the timelord, remove POLL
1 parent 8351468 commit 9606aa9

File tree

2 files changed

+31
-57
lines changed

2 files changed

+31
-57
lines changed

lib/chiavdf/fast_vdf/vdf_server.cpp

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using boost::asio::ip::tcp;
44

55
const int max_length = 2048;
6+
const int kMaxProcessesAllowed = 3;
67
std::mutex socket_mutex;
78

89
void PrintInfo(std::string input) {
@@ -24,22 +25,6 @@ void CreateAndWriteProof(integer D, form x, int64_t num_iterations, WesolowskiCa
2425
boost::asio::write(sock, boost::asio::buffer(str_result.c_str(), str_result.size()));
2526
}
2627

27-
28-
void PollTimelord(tcp::socket& sock, bool& got_iters) {
29-
// Wait for 15s, if no iters come, poll each 5 seconds the timelord.
30-
int seconds = 0;
31-
while (!got_iters) {
32-
std::this_thread::sleep_for (std::chrono::seconds(1));
33-
seconds++;
34-
if (seconds >= 15 && (seconds - 15) % 5 == 0) {
35-
socket_mutex.lock();
36-
boost::asio::write(sock, boost::asio::buffer("POLL", 4));
37-
socket_mutex.unlock();
38-
}
39-
}
40-
}
41-
42-
4328
void session(tcp::socket sock) {
4429
try {
4530
char disc[350];
@@ -107,7 +92,6 @@ void session(tcp::socket sock) {
10792
bool stopped = false;
10893
bool got_iters = false;
10994
std::thread vdf_worker(repeated_square, f, D, L, std::ref(weso), std::ref(stopped));
110-
std::thread poll_thread(PollTimelord, std::ref(sock), std::ref(got_iters));
11195

11296
// Tell client that I'm ready to get the challenges.
11397
boost::asio::write(sock, boost::asio::buffer("OK", 2));
@@ -125,7 +109,6 @@ void session(tcp::socket sock) {
125109
if (iters == 0) {
126110
PrintInfo("Got stop signal!");
127111
stopped = true;
128-
poll_thread.join();
129112
for (int i = 0; i < threads.size(); i++)
130113
stop_vector[i] = true;
131114
for (int t = 0; t < threads.size(); t++) {
@@ -135,7 +118,7 @@ void session(tcp::socket sock) {
135118
free(forms);
136119
} else {
137120
int max_iter = 0;
138-
int max_iter_thread_id;
121+
int max_iter_thread_id = -1;
139122
int min_iter = std::numeric_limits<int> :: max();
140123
bool unique = true;
141124
for (auto active_iter: seen_iterations) {
@@ -155,13 +138,13 @@ void session(tcp::socket sock) {
155138
PrintInfo("Duplicate iteration " + to_string(iters) + "... Ignoring.");
156139
continue;
157140
}
158-
if (threads.size() < 3 || iters < min_iter) {
141+
if (threads.size() < kMaxProcessesAllowed || iters < min_iter) {
159142
seen_iterations.insert({iters, threads.size()});
160143
PrintInfo("Running proving for iter: " + to_string(iters));
161144
stop_vector[threads.size()] = false;
162145
threads.push_back(std::thread(CreateAndWriteProof, D, f, iters, std::ref(weso),
163146
std::ref(stop_vector[threads.size()]), std::ref(sock)));
164-
if (threads.size() > 3) {
147+
if (threads.size() > kMaxProcessesAllowed) {
165148
PrintInfo("Stopping proving for iter: " + to_string(max_iter));
166149
stop_vector[max_iter_thread_id] = true;
167150
seen_iterations.erase({max_iter, max_iter_thread_id});

src/timelord.py

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,31 @@ async def _update_proofs_count(self, challenge_weight):
180180
del self.active_discriminants_start_time[active_disc]
181181
self.done_discriminants.append(active_disc)
182182

183+
async def send_iterations(self, challenge_hash, writer):
184+
alive_discriminant = True
185+
while (alive_discriminant):
186+
async with self.lock:
187+
if (challenge_hash in self.active_discriminants) and (
188+
challenge_hash in self.pending_iters
189+
):
190+
if challenge_hash not in self.submitted_iters:
191+
self.submitted_iters[challenge_hash] = []
192+
for iter in sorted(self.pending_iters[challenge_hash]):
193+
if iter in self.submitted_iters[challenge_hash]:
194+
continue
195+
self.submitted_iters[challenge_hash].append(iter)
196+
if len(str(iter)) < 10:
197+
iter_size = "0" + str(len(str(iter)))
198+
else:
199+
iter_size = str(len(str(iter)))
200+
writer.write((iter_size + str(iter)).encode())
201+
await writer.drain()
202+
log.info(f"New iteration submitted: {iter}")
203+
await asyncio.sleep(3)
204+
async with self.lock:
205+
if (challenge_hash in self.done_discriminants):
206+
alive_discriminant = False
207+
183208
async def _do_process_communication(
184209
self, challenge_hash, challenge_weight, ip, port
185210
):
@@ -219,29 +244,10 @@ async def _do_process_communication(
219244
self.active_discriminants[challenge_hash] = (writer, challenge_weight, ip)
220245
self.active_discriminants_start_time[challenge_hash] = time.time()
221246

247+
asyncio.create_task(self.send_iterations(challenge_hash, writer))
248+
222249
# Listen to the server until "STOP" is received.
223250
while True:
224-
async with self.lock:
225-
if (challenge_hash in self.active_discriminants) and (
226-
challenge_hash in self.pending_iters
227-
):
228-
if challenge_hash not in self.submitted_iters:
229-
self.submitted_iters[challenge_hash] = []
230-
log.info(
231-
f"Pending: {self.pending_iters[challenge_hash]} "
232-
f"Submitted: {self.submitted_iters[challenge_hash]} Hash: {challenge_hash}"
233-
)
234-
for iter in sorted(self.pending_iters[challenge_hash]):
235-
if iter in self.submitted_iters[challenge_hash]:
236-
continue
237-
self.submitted_iters[challenge_hash].append(iter)
238-
if len(str(iter)) < 10:
239-
iter_size = "0" + str(len(str(iter)))
240-
else:
241-
iter_size = str(len(str(iter)))
242-
writer.write((iter_size + str(iter)).encode())
243-
await writer.drain()
244-
245251
try:
246252
data = await reader.readexactly(4)
247253
except (asyncio.IncompleteReadError, ConnectionResetError) as e:
@@ -259,21 +265,6 @@ async def _do_process_communication(
259265
len_server = len(self.free_servers)
260266
log.info(f"Process ended... Server length {len_server}")
261267
break
262-
elif data.decode() == "POLL":
263-
async with self.lock:
264-
# If I have a newer discriminant... Free up the VDF server
265-
if (
266-
len(self.discriminant_queue) > 0
267-
and challenge_weight
268-
< max([h for _, h in self.discriminant_queue])
269-
and challenge_hash in self.active_discriminants
270-
):
271-
log.info("Got poll, stopping the challenge!")
272-
writer.write(b"010")
273-
await writer.drain()
274-
del self.active_discriminants[challenge_hash]
275-
del self.active_discriminants_start_time[challenge_hash]
276-
self.done_discriminants.append(challenge_hash)
277268
else:
278269
try:
279270
# This must be a proof, read the continuation.

0 commit comments

Comments
 (0)