-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclient.py
More file actions
260 lines (215 loc) · 10.4 KB
/
client.py
File metadata and controls
260 lines (215 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
'''
Gives commands to a server node part of a cluster.
Client knows the mapping of account shards to server nodes.
'''
from sys import argv, stdout
from os import _exit
import os
import socket
import config
import threading
from common_utils import send_msg, MessageType, Colors
from constants import TransactionStatus
import json
from rich.table import Table
from rich.console import Console
import time
# maintain list of transactions and their status
transactions = {}
# maintain list of watchdogs conditionals for each transaction
watchdogs = {}
# maintain list of transactions and their latency
latency = {}
console = Console()
def handle_server_msg(conn, data):
global trans_id
global prev_status_id
global transactions
global latency
data = json.loads(data)
# If prepare status received from server for 2PC, send commit or abort based on how clusters have responded
if ("prepare_status" in data):
with lock:
# If this is the first response, store the prepare status
if (data["trans_id"] not in prev_status_id):
prev_status_id[data["trans_id"]] = data["prepare_status"]
else:
# If both clusters have responded with a true, send commit
if (prev_status_id[data["trans_id"]] and data["prepare_status"] == True):
print(f"{Colors.YELLOW}PREPARE STATUS: YES from both the clusters{Colors.ENDCOLOR}")
msg = {"msg_type": "client_commit",
"command": data["command"], "client_id": cid, "trans_id": data["trans_id"], "commit": True}
send_msg(network_sock, msg)
# Else, send abort
else:
print(
f"{Colors.ERROR}PREPARE STATUS: NO from one of the clusters so transaction failed for : {data['command']}{Colors.ENDCOLOR}")
msg = {"msg_type": "client_commit",
"command": data["command"], "client_id": cid, "trans_id": data["trans_id"], "commit": False}
send_msg(network_sock, msg)
del prev_status_id[data["trans_id"]]
return
if data["msg_type"] == MessageType.BALANCE_RESPONSE:
# Print as table
table = Table(title="", show_header=True, header_style="bold cyan")
table.add_column("Server ID", style="bold", justify="left")
table.add_column("Account ID", style="bold blue", justify="center")
table.add_column("Balance", style="bold red", justify="right")
table.add_row(str(data['server_id']), str(data['account_id']), str(data['balance']))
console.print(table)
return
elif data["msg_type"] == MessageType.CLIENT_RESPONSE:
transaction_id = data['trans_id']
with lock:
if transaction_id in transactions and transactions[transaction_id] == TransactionStatus.PENDING:
if data['status']:
transactions[transaction_id] = TransactionStatus.SUCCESS
print(f"{Colors.GREEN}Command {data['command']} successfully executed for transaction id {transaction_id}{Colors.ENDCOLOR}")
else:
transactions[transaction_id] = TransactionStatus.FAILURE
print(f"{Colors.ERROR}Command {data['command']} failed for transaction id {transaction_id}{Colors.ENDCOLOR}")
# Calculate latency
latency[transaction_id] = time.perf_counter() - latency[transaction_id]
print(f"Latency of transaction {transaction_id}: {Colors.YELLOW}{latency[transaction_id]} seconds {Colors.ENDCOLOR}")
# Notify the condition variable so the watchdog thread stops waiting
if transaction_id in watchdogs:
with watchdogs[transaction_id]:
watchdogs[transaction_id].notify_all()
def recv_msg(conn, addr):
buffer = ""
while True:
try:
data = conn.recv(1024)
except:
break
if not data:
conn.close()
break
buffer += data.decode()
while "\n" in buffer:
msg, buffer = buffer.split("\n", 1)
try:
# Spawn new thread for every msg to ensure IO is non-blocking
threading.Thread(target=handle_server_msg,
args=(conn, msg)).start()
except:
print(f"{Colors.ERROR}[ERROR] Exception in handling message at server{Colors.ENDCOLOR}")
break
def transaction_watchdog(transactionid, msg, timeout=15*config.NETWORK_DELAY):
'''
Retry transaction if no response received from server within timeout
'''
global watchdogs
condition = threading.Condition()
# Store condition for transaction
with lock:
watchdogs[transactionid] = condition
with condition:
success = condition.wait(timeout)
if not success:
print(f"Transaction {transactionid} timed out. Retrying...")
# Retry transaction
send_msg(network_sock, msg)
# restart the watchdog
threading.Thread(target=transaction_watchdog, args=(transactionid, msg)).start()
else:
print(f"{Colors.GREEN}Transaction {transactionid} completed successfully.{Colors.ENDCOLOR}")
# Remove the condition from the dictionary
with lock:
del watchdogs[transactionid]
def get_user_input():
global trans_id
global transactions
global latency
while True:
# wait for user input
user_input = input()
# if user input = empty new line, ignore
if user_input == "":
continue
cmd = user_input.split()[0]
if cmd == "exit":
stdout.flush()
# exit program with status 0
_exit(0)
# If command is print_balance, print balance of account
# from all servers in the cluster
elif cmd == "print_balance":
if len(user_input.split()) != 2:
print("Invalid command format. Usage: print_balance <account_id>")
continue
msg = {"msg_type": MessageType.PRINT_BALANCE,
"command": user_input.split()[1], "client_id": cid}
elif cmd == "performance":
# Print the latency of non-pending transactions
table = Table(title="Transaction Latency", show_header=True, header_style="bold cyan")
table.add_column("Transaction ID", style="bold", justify="left")
table.add_column("Latency (seconds)", style="bold red", justify="right")
table.add_column("Transaction Status", style="bold blue", justify="center")
for tid in latency:
if transactions.get(tid) and transactions[tid] != TransactionStatus.PENDING:
table.add_row(tid, str(latency[tid]), transactions[tid])
console.print(table)
continue
elif cmd == "print_committed_txns":
# Print the committed transactions in each server stored in logs
for i in range(1, 10):
filename = f'{config.FILEPATH}/server_{i}.txt'
if os.path.exists(filename):
with open(filename, 'r') as file:
commit_index = int(file.readline())
data = json.load(file)
table = Table(title=f"Committed txns in server {i}", show_header=True, header_style="bold white")
# Add columns
table.add_column("Term", style="bold", justify="center")
table.add_column("Index", style="bold blue", justify="center")
table.add_column("Command", style="bold red", justify="left")
# Filter and add rows (entries with index 1 to 5)
for entry in data[1:commit_index+1]:
if(entry["status"]==1):
table.add_row(str(entry["term"]), str(entry["index"]), entry["command"])
# Print the table
console.print(table)
continue
else:
# If sender or receriver < 1 or > 3000, invalid command
sender = int(user_input.split(',')[0])
receiver = int(user_input.split(',')[1])
if sender < 1 or sender > 3000 or receiver < 1 or receiver > 3000:
print(f"{Colors.ERROR}Invalid command. Sender and receiver should be between 1 and 3000{Colors.ENDCOLOR}")
continue
# Transaction ID = clientId_transId
with lock:
temp_id = trans_id
trans_id += 1
transactionid = str(cid) + "_" + str(temp_id)
msg = {"msg_type": "client_request_init", "command": user_input,
"client_id": cid, "trans_id": transactionid}
with lock:
# set transaction status to pending
transactions[transactionid] = TransactionStatus.PENDING
# start timer for latency setting to current time
latency[transactionid] = time.perf_counter()
# start timer watchdog
threading.Thread(target=transaction_watchdog, args=(transactionid, msg)).start()
send_msg(network_sock, msg)
if __name__ == "__main__":
cid = int(argv[1])
trans_id = 0
prev_status_id = {}
lock = threading.Lock()
CLIENT_IP = socket.gethostname()
CLIENT_PORT = config.CLIENT_PORTS[int(cid)]
# Commands for exit, inter-shard and intra-shard
threading.Thread(target=get_user_input).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((CLIENT_IP, CLIENT_PORT))
# Connect to network server
network_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
network_sock.connect((CLIENT_IP, config.NETWORK_SERVER_PORT))
threading.Thread(target=recv_msg, args=(
network_sock, (CLIENT_IP, config.NETWORK_SERVER_PORT))).start()
# Send test message to network server
msg = {"msg_type": "init_client", "node_id": cid}
send_msg(network_sock, msg)