Skip to content

Commit cf92854

Browse files
committed
refactored inbound and outbound data processing in federation client and federation server services, updated version information, fixed bug which shared intraserver private geochats between federates
1 parent dd77e72 commit cf92854

File tree

8 files changed

+436
-57
lines changed

8 files changed

+436
-57
lines changed

FreeTAKServer/controllers/configuration/MainConfig.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class MainConfig:
1111
"""
1212

1313
# the version information of the server (recommended to leave as default)
14-
version = 'FreeTAKServer-1.9.7.6 Public'
14+
version = 'FreeTAKServer-1.9.8 Public'
1515
#
1616
yaml_path = str(os.environ.get('FTS_CONFIG_PATH', '/opt/FTSConfig.yaml'))
1717

FreeTAKServer/controllers/services/Orchestrator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def clientdatapipe_status_check(self) -> bool:
104104
else: # otherwise return true
105105
return True
106106

107-
def remove_client_information(self, client_information):
107+
def remove_service_user(self, client_information):
108108
""" this method generates the presence object from the
109109
client_information parameter and sends it as a remove message
110110
to the client data pipe
@@ -155,7 +155,7 @@ def update_client_information(self, client_information):
155155
self.logger.error("exception has been thrown updating client data in queue "+str(e))
156156
raise e
157157

158-
def add_client_information(self, client_information):
158+
def add_service_user(self, client_information):
159159
""" this method generates the presence and connection objects from the
160160
client_information parameter and sends it to
161161
@@ -318,7 +318,7 @@ def clientConnected(self, rawConnectionInformation):
318318
# self.logger.info(loggingConstants.CLIENTCONNECTEDFINISHED + str(clientInformation.modelObject.detail.contact.callsign))
319319
print("adding client")
320320
self.clientInformationQueue[clientInformation.modelObject.uid] = [clientInformation.socket]
321-
self.add_client_information(client_information=clientInformation)
321+
self.add_service_user(client_information=clientInformation)
322322
self.get_client_information()
323323
print("client added")
324324
self.sendUserConnectionGeoChat(clientInformation)
@@ -466,7 +466,7 @@ def clientDisconnected(self, clientInformation: User):
466466
'there has been an error in a clients disconnection while adding information to the database ' + str(e))
467467
pass
468468
try:
469-
self.remove_client_information(client_information=clientInformation)
469+
self.remove_service_user(client_information=clientInformation)
470470
# working
471471
# time.sleep(1)
472472
print('stage 1 c')

FreeTAKServer/controllers/services/federation/FederationClientService.py

Lines changed: 141 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
#######################################################
1010
from FreeTAKServer.controllers.configuration.MainConfig import MainConfig
1111
from FreeTAKServer.controllers.configuration.types import Types
12-
from FreeTAKServer.controllers.services.federation.handlers import StopHandler, DisconnectHandler, ConnectHandler, SendDataHandler, SendConnectionDataHandler, SendDisconnectionDataHandler
12+
from FreeTAKServer.controllers.services.federation.handlers import StopHandler, DisconnectHandler, ConnectHandler, SendDataHandler, SendConnectionDataHandler, SendDisconnectionDataHandler, DestinationValidationHandler, DataValidationHandler, HandlerBase
13+
from FreeTAKServer.controllers.services.federation.external_data_handlers import *
1314
from FreeTAKServer.model.protobufModel.fig_pb2 import FederatedEvent
1415

1516

@@ -21,7 +22,7 @@
2122
from FreeTAKServer.model.federate import Federate
2223
import selectors
2324
import socket
24-
from typing import Tuple
25+
from typing import Tuple, Dict, List
2526
import ssl
2627
import codecs
2728
import threading
@@ -30,9 +31,13 @@
3031
from FreeTAKServer.controllers.serializers.protobuf_serializer import ProtobufSerializer
3132
from FreeTAKServer.controllers.serializers.xml_serializer import XmlSerializer
3233
from FreeTAKServer.controllers.XMLCoTController import XMLCoTController
34+
35+
3336
from FreeTAKServer.model.SpecificCoT.SendOther import SendOther
3437
from FreeTAKServer.model.FTSModel.Event import Event
38+
from FreeTAKServer.model.SpecificCoT.SpecificCoTAbstract import SpecificCoTAbstract
3539
from FreeTAKServer.model.ClientInformation import ClientInformation
40+
from FreeTAKServer.model.SQLAlchemy.User import User
3641
from FreeTAKServer.model.SpecificCoT.SendDisconnect import SendDisconnect
3742
from FreeTAKServer.controllers.DatabaseControllers.DatabaseController import DatabaseController
3843

@@ -50,18 +55,133 @@ class FederationClientServiceController(FederationServiceBase):
5055

5156
def __init__(self):
5257
self.logger = logger
53-
self._define_responsibility_chain()
58+
self._define_command_responsibility_chain()
59+
self._define_connection_responsibility_chain()
60+
self._define_service_responsibility_chain()
61+
self._define_external_data_responsibility_chain()
62+
self._define_data_responsibility_chain()
5463
self.pipe = None
5564
self.federates: {str: Federate} = {}
5665
self.sel = selectors.DefaultSelector()
66+
self.user_dict = {}
67+
68+
def get_service_users(self) -> List[FederatedEvent]:
69+
return self.user_dict.values()
70+
71+
def add_service_user(self, user: FederatedEvent) -> None:
72+
""" add a service user to this services user persistence mechanism
73+
74+
Returns: None
75+
76+
"""
77+
self.user_dict[user.contact.uid] = user
78+
79+
def remove_service_user(self, user: FederatedEvent):
80+
""" remove a service user from this services user persistence mechanism
81+
82+
Returns: None
83+
84+
"""
85+
del self.user_dict[user.contact.uid]
86+
87+
def define_responsibility_chain(self):
88+
pass
5789

5890
def _create_context(self):
5991
self.context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
6092
self.context.load_cert_chain(MainConfig.federationCert, MainConfig.federationKey,
6193
password=MainConfig.federationKeyPassword)
6294
self.context.set_ciphers('DEFAULT@SECLEVEL=1')
6395

64-
def _define_responsibility_chain(self):
96+
def _define_external_data_responsibility_chain(self):
97+
""" this method is responsible for defining the responsibility chain which handles external data
98+
eg. data sent to FTS by a federate
99+
100+
Returns:
101+
102+
"""
103+
fed_proto_standard_handler = FederationProtobufStandardHandler()
104+
105+
fed_proto_disconnect_handler = FederationProtobufDisconnectionHandler()
106+
fed_proto_disconnect_handler.setNextHandler(fed_proto_standard_handler)
107+
108+
fed_proto_connection_handler = FederationProtobufConnectionHandler()
109+
fed_proto_connection_handler.setNextHandler(fed_proto_disconnect_handler)
110+
111+
fed_proto_validation_handler = FederationProtobufValidationHandler()
112+
fed_proto_validation_handler.setNextHandler(fed_proto_connection_handler)
113+
114+
self.external_data_chain = fed_proto_validation_handler
115+
116+
def _call_responsibility_chain(self, command):
117+
""" this method is responsible for calling the responsibility chains for all command types:
118+
service level commands; start, stop etc
119+
Connection level commands; close connection, open connection etc
120+
data level commands; send data x, each handler is responsible for some facet of data validation before
121+
the connection receives it
122+
123+
Returns: output from successful handler
124+
125+
"""
126+
#if command.level == "SERVICE":
127+
if command == "STOP":
128+
self.service_chain.Handle(obj = self, command= command)
129+
130+
# elif command.level == "CONNECTION":
131+
elif isinstance(command, tuple) and (command[1] == "DELETE" or command[1] == "CREATE" or command[1] == "UPDATE"):
132+
self.connection_chain.Handle(obj=self, command=command)
133+
134+
#elif command.level == "DATA":
135+
if isinstance(command, SpecificCoTAbstract) or isinstance(command, ClientInformation):
136+
self.data_chain.Handle(obj = self, command= command)
137+
138+
def _define_service_responsibility_chain(self):
139+
""" this method is responsible for defining the responsibility chain which will handle service level commands;
140+
or commands which effect the entire service
141+
142+
Returns: the entry handler for this responsibility chain
143+
144+
"""
145+
stop_handler = StopHandler()
146+
self.service_chain = stop_handler
147+
148+
def _define_connection_responsibility_chain(self):
149+
""" this method is responsible for defining the responsibility chain which will handle connection level commands;
150+
or commands which effect the status of a connection at the socket level
151+
152+
Returns: the entry handler for this responsibility chain
153+
154+
"""
155+
connect_handler = ConnectHandler()
156+
disconnect_handler = DisconnectHandler()
157+
disconnect_handler.setNextHandler(connect_handler)
158+
self.connection_chain = disconnect_handler
159+
160+
def _define_data_responsibility_chain(self):
161+
""" this method is responsible for defining the responsibility chain which will handle data level commands;
162+
or commands which transfer data to a client
163+
164+
Returns: the entry handler for this responsibility chain
165+
166+
"""
167+
168+
send_data_handler = SendDataHandler()
169+
170+
destination_validation_handler = DestinationValidationHandler()
171+
destination_validation_handler.setNextHandler(send_data_handler)
172+
173+
send_disconnection_data_handler = SendDisconnectionDataHandler()
174+
send_disconnection_data_handler.setNextHandler(destination_validation_handler)
175+
176+
send_connection_data_handler = SendConnectionDataHandler()
177+
send_connection_data_handler.setNextHandler(send_disconnection_data_handler)
178+
179+
data_validation_handler = DataValidationHandler()
180+
data_validation_handler.setNextHandler(send_connection_data_handler)
181+
182+
self.data_chain = data_validation_handler
183+
184+
def _define_command_responsibility_chain(self) -> HandlerBase:
65185
self.m_StopHandler = StopHandler()
66186

67187
self.m_ConnectHandler = ConnectHandler()
@@ -87,8 +207,13 @@ def main(self):
87207
outbound_data_thread.start()
88208
inbound_data_thread.join()
89209

210+
def serialize_data(self, data_object: FederatedEvent):
211+
specific_obj, xmlstring = self._process_protobuff_to_object(data_object)
212+
specific_obj.xmlString = etree.tostring(xmlstring)
213+
return specific_obj
214+
90215
def outbound_data_handler(self):
91-
""" this is the main process respoonsible for receiving data from federates and sharing
216+
""" this is the main process responsible for receiving data from federates and sharing
92217
with FTS core
93218
94219
Returns:
@@ -108,13 +233,10 @@ def outbound_data_handler(self):
108233
# event = etree.Element('event')
109234
# SpecificCoTObj = XMLCoTController().categorize_type(protobuf_object.type)
110235
try:
111-
specific_obj, xmlstring = self._process_protobuff_to_object(protobuf_object)
112-
# specific_obj.xmlString = etree.tostring(xmlstring)
113-
print(etree.tostring(xmlstring))
114-
specific_obj.xmlString = etree.tostring(xmlstring)
115-
self.pipe.put(specific_obj)
236+
serialized_data = self.serialize_data(protobuf_object)
237+
self.send_command_to_core(serialized_data)
116238
except Exception as e:
117-
pass
239+
self.logger.warning("there has been an exception thrown in the outbound_data_handler "+str(e))
118240
"""if isinstance(SpecificCoTObj, SendOtherController):
119241
detail = protobuf_object.event.other
120242
protobuf_object.event.other = ''
@@ -130,6 +252,11 @@ def outbound_data_handler(self):
130252
else:
131253
time.sleep(MainConfig.MainLoopDelay / 1000)
132254

255+
def send_command_to_core(self, serialized_data):
256+
if self.pipe.sender_queue.full():
257+
print('queue full !!!')
258+
self.pipe.put(serialized_data)
259+
133260
def inbound_data_handler(self):
134261
"""this is the main process responsible for receiving data from FTS core
135262
@@ -141,11 +268,10 @@ def inbound_data_handler(self):
141268
command = self.pipe.get()
142269
if command:
143270
try:
144-
self.m_SendConnectionHandler.Handle(self, command)
271+
self._call_responsibility_chain(command)
145272
except Exception as e:
146273
pass
147-
else:
148-
pass
274+
149275
except Exception as e:
150276
self.logger.error(str(e))
151277

@@ -201,6 +327,7 @@ def receive_data_from_federate(self, timeout):
201327
print(raw_protobuf_message)
202328
protobuf_object = FederatedEvent()
203329
protobuf_object.ParseFromString(raw_protobuf_message)
330+
self.external_data_chain.Handle(self, protobuf_object)
204331
dataarray.append(protobuf_object)
205332
except Exception as e:
206333
conn.recv(10000)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from FreeTAKServer.controllers.services.federation.handlers import HandlerBase
2+
3+
from FreeTAKServer.model.protobufModel.fig_pb2 import FederatedEvent
4+
5+
class FederationProtobufValidationHandler(HandlerBase):
6+
"""
7+
this handler is responsible for validation that a passed command has the proper type for subsequent handlers
8+
"""
9+
def Handle(self, obj, command):
10+
11+
if isinstance(command, FederatedEvent):
12+
self.callNextHandler(obj, command)
13+
14+
else:
15+
raise TypeError("this command chain only accepts Protobuf data of type FederatedEvent")
16+
17+
class FederationProtobufConnectionHandler(HandlerBase):
18+
"""
19+
this handler is responsible for parsing connection data coming from a federate and adding it to the calling services user dictionary
20+
"""
21+
def Handle(self, obj, command):
22+
if command.contact.operation == 1:
23+
obj.add_service_user(command)
24+
self.callNextHandler(obj, command)
25+
else:
26+
self.callNextHandler(obj,command)
27+
28+
class FederationProtobufDisconnectionHandler(HandlerBase):
29+
"""
30+
this handler is responsible for parsing clients disconnection data coming from a federate
31+
"""
32+
def Handle(self, obj, command):
33+
if command.contact.operation == 4:
34+
obj.remove_service_user(command)
35+
self.callNextHandler(obj, command)
36+
else:
37+
self.callNextHandler(obj, command)
38+
39+
class FederationProtobufStandardHandler(HandlerBase):
40+
"""
41+
this handler is responsible for serializing client data and sharing it with FTS core
42+
"""
43+
def Handle(self, obj, command):
44+
pass
45+
# serialized_data = obj.serialize_data(command)
46+
# obj.send_command_to_core(serialized_data)

0 commit comments

Comments
 (0)