-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.py
More file actions
100 lines (80 loc) · 2.98 KB
/
server.py
File metadata and controls
100 lines (80 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
'''
Common file for servers 1-9
'''
import config
import socket
import threading
from sys import argv, stdout
from os import _exit
from consensus_module import RaftConsensus
from common_utils import send_msg, MessageType
import json
from time import sleep
def handle_server_msg(conn, data):
data = json.loads(data)
# if its a print request, do not add delay
if data["msg_type"] == MessageType.PRINT_BALANCE:
balance = raft.state_machine_read(int(data["command"]))
msg = {"msg_type": MessageType.BALANCE_RESPONSE, "balance": balance,
"client_id": data["client_id"], "account_id": data["command"], "server_id": pid}
# send response to network server
send_msg(network_sock, msg, pid)
return
sleep(config.NETWORK_DELAY)
raft.handle_message(data)
def recv_msg(conn, addr):
buffer = ""
while True:
try:
data = conn.recv(1024)
except:
break
if not data:
conn.close()
break
buffer += data.decode()
while "\n" in buffer:
msg, buffer = buffer.split("\n", 1)
try:
# Spawn new thread for every msg to ensure IO is non-blocking
threading.Thread(target=handle_server_msg,
args=(conn, msg)).start()
except:
print("[ERROR] Exception in handling message at server {pid}")
break
def get_user_input():
while True:
# wait for user input
user_input = input()
cmd = user_input.split()[0]
if cmd == "exit":
# send msg to network server about exit so that it can be marked as down
msg = {"msg_type": "server_exit", "node_id": pid}
send_msg(network_sock, msg, pid)
stdout.flush()
# exit program with status 0
_exit(0)
# used to print the balance of the id
elif cmd == "print":
id = int(user_input.split()[1])
print(raft.state_machine_read(id))
if __name__ == "__main__":
global network_sock
pid = int(argv[1])
SERVER_IP = socket.gethostname()
SERVER_PORT = config.SERVER_PORTS[int(pid)]
# listen for user input in separate thread - mostly just exit commands
threading.Thread(target=get_user_input).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((SERVER_IP, SERVER_PORT))
# Connect to network server
network_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
network_sock.connect((SERVER_IP, config.NETWORK_SERVER_PORT))
threading.Thread(target=recv_msg, args=(
network_sock, (SERVER_IP, config.NETWORK_SERVER_PORT))).start()
# Send test message to network server
msg = {"msg_type": "init", "node_id": pid}
send_msg(network_sock, msg, pid)
# Create Raft consensus object for current server
raft = RaftConsensus(pid, network_sock)