-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
146 lines (131 loc) · 7.55 KB
/
server.py
File metadata and controls
146 lines (131 loc) · 7.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# server.py
import socket
import random
import threading
import queue
from packet import TCPPacket
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345
HANDLER_INACTIVITY_TIMEOUT = 30
connections = {}
connections_lock = threading.Lock()
# یک مجموعه (set) برای نگهداری آدرسهایی که اخیراً بسته شدهاند
# این کار برای تشخیص اینکه آیا باید RST ارسال کنیم یا نه، لازم است
recently_closed = set()
recently_closed_lock = threading.Lock()
class ClientHandler(threading.Thread):
def __init__(self, sock, client_address, initial_packet_queue):
# ... (کد سازنده بدون تغییر)
super().__init__()
self.sock = sock
self.client_address = client_address
self.packet_queue = initial_packet_queue
self.name = f"Handler-{client_address[0]}:{client_address[1]}"
self.out_of_order_buffer = {}
print(f"[{self.name}] Thread created to handle new client.")
def run(self):
# ... (کد run بدون تغییر است)
full_message = ""
try:
syn_packet = self.packet_queue.get(timeout=5)
server_seq = random.randint(0, 10000)
server_ack = syn_packet.seq_num + 1
syn_ack_packet = TCPPacket(src_port=SERVER_PORT, dest_port=self.client_address[1], seq_num=server_seq, ack_num=server_ack, syn=1, ack=1)
self.sock.sendto(syn_ack_packet.to_bytes(), self.client_address)
ack_packet = self.packet_queue.get(timeout=5)
print(f"[{self.name}] Connection Established.")
server_seq = ack_packet.ack_num
expected_client_seq = ack_packet.seq_num
while True:
packet = self.packet_queue.get(timeout=HANDLER_INACTIVITY_TIMEOUT)
if packet.flags['FIN']:
# ... (Termination logic is unchanged)
print(f"\n[{self.name}] Received FIN packet. Starting termination.")
server_ack = packet.seq_num + 1
ack_for_fin = TCPPacket(src_port=SERVER_PORT, dest_port=self.client_address[1], seq_num=server_seq, ack_num=server_ack, ack=1)
self.sock.sendto(ack_for_fin.to_bytes(), self.client_address)
fin_from_server = TCPPacket(src_port=SERVER_PORT, dest_port=self.client_address[1], seq_num=server_seq, ack_num=server_ack, fin=1)
self.sock.sendto(fin_from_server.to_bytes(), self.client_address)
final_ack_packet = self.packet_queue.get(timeout=10)
if final_ack_packet.flags['ACK']: print(f"[{self.name}] Connection terminated successfully.")
break
if packet.payload_len > 0:
if packet.seq_num == expected_client_seq:
print(f"[{self.name}] Received IN-ORDER packet seq={packet.seq_num}. Processing.")
full_message += packet.payload.decode('utf-8')
expected_client_seq += packet.payload_len
while expected_client_seq in self.out_of_order_buffer:
buffered_packet = self.out_of_order_buffer.pop(expected_client_seq)
print(f"[{self.name}] Pulled packet seq={buffered_packet.seq_num} from buffer. Processing.")
full_message += buffered_packet.payload.decode('utf-8')
expected_client_seq += buffered_packet.payload_len
elif packet.seq_num > expected_client_seq:
if packet.seq_num not in self.out_of_order_buffer:
print(f"[{self.name}] Received OUT-OF-ORDER packet seq={packet.seq_num}. Buffering.")
self.out_of_order_buffer[packet.seq_num] = packet
else:
print(f"[{self.name}] Received duplicate of a buffered packet seq={packet.seq_num}. Discarding.")
# Case 3: Packet is a duplicate of something already processed (in the past)
else:
print(f"[{self.name}] Received old duplicate packet seq={packet.seq_num}. Discarding.")
# Always send a cumulative ACK for the last contiguous byte received
ack_for_data = TCPPacket(src_port=SERVER_PORT, dest_port=self.client_address[1], seq_num=server_seq, ack_num=expected_client_seq, ack=1)
self.sock.sendto(ack_for_data.to_bytes(), self.client_address)
except queue.Empty:
print(f"[{self.name}] Connection timed out due to inactivity.")
except Exception as e:
print(f"[{self.name}] An error occurred: {e}")
finally:
with connections_lock:
if self.client_address in connections:
del connections[self.client_address]
# *** تغییر در اینجا: آدرس را به لیست بستهشدهها اضافه میکنیم ***
with recently_closed_lock:
recently_closed.add(self.client_address)
print(f"[{self.name}] Thread finished. Final message: '{full_message}'")
def main_dispatcher():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind((SERVER_HOST, SERVER_PORT))
print(f"Main Dispatcher is listening on {SERVER_HOST}:{SERVER_PORT}...")
while True:
try:
data, addr = server_socket.recvfrom(4096)
packet = TCPPacket.from_bytes(data)
# *** شروع تغییرات مهم در Dispatcher ***
with connections_lock:
# اگر بسته برای یک اتصال فعال است
if addr in connections:
connections[addr].put(packet)
continue
# اگر بسته برای یک اتصال فعال نیست
with recently_closed_lock:
# اگر این اتصال اخیراً بسته شده، RST بفرست
if addr in recently_closed:
print(f"[Dispatcher] Received packet for a recently closed connection from {addr}. Sending RST.")
# seq و ack در بسته RST معمولاً بر اساس بسته دریافتی تنظیم میشوند
rst_packet = TCPPacket(
src_port=SERVER_PORT, dest_port=addr[1],
seq_num=packet.ack_num, ack_num=packet.seq_num + packet.payload_len,
rst=1
)
server_socket.sendto(rst_packet.to_bytes(), addr)
continue
# اگر یک درخواست اتصال جدید است
if packet.flags['SYN']:
print(f"\n[Dispatcher] New connection request from {addr}.")
new_queue = queue.Queue()
connections[addr] = new_queue
connections[addr].put(packet)
handler = ClientHandler(server_socket, addr, new_queue)
handler.start()
else:
# اگر اتصال ناشناس است و قبلاً هم وجود نداشته
print(f"[Dispatcher] Received packet from completely unknown address {addr}. Discarding.")
# *** پایان تغییرات مهم ***
except Exception as e:
print(f"[Dispatcher] An error occurred: {e}")
break
server_socket.close()
print("[Dispatcher] Shutting down.")
if __name__ == "__main__":
main_dispatcher()