Skip to content

Commit 93da68c

Browse files
kminhtateoparvanov
andauthored
[Task Runner API] [Flower Interoperability] Enable option to run ClientApp as a separate process in SGX (securefederatedai#1580)
* add clientapp isolation Signed-off-by: kta-intel <kevin.ta@intel.com> * enforce the sequence for spawning and terminating clientapp relative to supernode Signed-off-by: kta-intel <kevin.ta@intel.com> * move is_port_open() check to inside conditional, add a small delay after port is confirmed to be open Signed-off-by: kta-intel <kevin.ta@intel.com> * refactor patch > SGX_enabled Signed-off-by: kta-intel <kevin.ta@intel.com> * refactor local_grpc_server > interop_server Signed-off-by: kta-intel <kevin.ta@intel.com> * format Signed-off-by: kta-intel <kevin.ta@intel.com> * change SGX_enabled to lowercase sgx_enabled for visual consistency Signed-off-by: kta-intel <kevin.ta@intel.com> --------- Signed-off-by: kta-intel <kevin.ta@intel.com> Co-authored-by: teoparvanov <teodor.parvanov@intel.com>
1 parent ada23fa commit 93da68c

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

openfl-workspace/flower-app-pytorch/plan/plan.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ connector :
2222
flwr_run_params :
2323
flwr_app_name : "app-pytorch"
2424
federation_name : "local-poc"
25-
patch: True
25+
sgx_enabled: False
2626

2727
collaborator :
2828
defaults : plan/defaults/collaborator.yaml
@@ -39,7 +39,7 @@ task_runner :
3939
template : src.runner.FlowerTaskRunner
4040
settings :
4141
flwr_app_name: app-pytorch
42-
patch: True
42+
sgx_enabled: False
4343

4444
network :
4545
defaults : plan/defaults/network.yaml

openfl-workspace/flower-app-pytorch/src/connector_flower.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def _build_flwr_run_command(self) -> list[str]:
156156

157157
os.environ["TMPDIR"] = os.environ["FLWR_HOME"]
158158

159-
if self.flwr_run_params.get("patch"):
159+
if self.flwr_run_params.get("sgx_enabled"):
160160
command = ["python", "src/patch/flwr_run_patch.py", "run", f"./src/{flwr_app_name}"]
161161
else:
162162
command = ["flwr", "run", f"./src/{flwr_app_name}"]

openfl-workspace/flower-app-pytorch/src/runner.py

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ def __init__(self, **kwargs):
3434
"""
3535
super().__init__(**kwargs)
3636

37-
self.patch = kwargs.get('patch')
37+
self.sgx_enabled = kwargs.get('sgx_enabled')
3838
if self.data_loader is None:
3939
flwr_app_name = kwargs.get('flwr_app_name')
40-
if self.patch:
40+
if self.sgx_enabled:
4141
install_flower_FAB(flwr_app_name)
4242
return
4343

@@ -52,12 +52,12 @@ def __init__(self, **kwargs):
5252

5353
self.shutdown_requested = False # Flag to signal shutdown
5454

55-
def start_client_adapter(self, local_grpc_server, **kwargs):
55+
def start_client_adapter(self, interop_server, **kwargs):
5656
"""
5757
Start the local gRPC server and the Flower SuperNode.
5858
5959
Args:
60-
local_grpc_server: The local gRPC server instance.
60+
interop_server: The local gRPC server instance.
6161
**kwargs: Additional parameters, including 'local_server_port'.
6262
"""
6363
local_server_port = kwargs.get('local_server_port')
@@ -66,10 +66,10 @@ def message_callback():
6666
self.shutdown_requested = True
6767

6868
# Set the callback for ending the experiment
69-
local_grpc_server.set_end_experiment_callback(message_callback)
70-
local_grpc_server.start_server(local_server_port)
69+
interop_server.set_end_experiment_callback(message_callback)
70+
interop_server.start_server(local_server_port)
7171

72-
local_server_port = local_grpc_server.get_port()
72+
local_server_port = interop_server.get_port()
7373

7474
command = [
7575
"flower-supernode",
@@ -80,20 +80,43 @@ def message_callback():
8080
"--node-config", f"data-path='{self.data_path}'"
8181
]
8282

83+
if self.sgx_enabled:
84+
command += ["--isolation", "process"]
85+
flwr_clientapp_command = [
86+
"flwr-clientapp",
87+
"--insecure",
88+
"--clientappio-api-address", f"127.0.0.1:{self.client_port}",
89+
]
90+
91+
self.logger.info("Starting Flower SuperNode process...")
8392
supernode_process = subprocess.Popen(command, shell=False)
84-
local_grpc_server.handle_signals(supernode_process)
93+
interop_server.handle_signals(supernode_process)
94+
95+
if self.sgx_enabled:
96+
# Check if port is open before starting the client app
97+
while not is_port_open('127.0.0.1', local_server_port):
98+
time.sleep(0.5)
99+
100+
time.sleep(1) # Add a small delay after confirming the port is open
101+
102+
self.logger.info("Starting Flower ClientApp process...")
103+
flwr_clientapp_process = subprocess.Popen(flwr_clientapp_command, shell=False)
104+
interop_server.handle_signals(flwr_clientapp_process)
85105

86106
self.logger.info("Press CTRL+C to stop the server and SuperNode process.")
87107

88-
try:
89-
while not local_grpc_server.termination_event.is_set():
90-
if self.shutdown_requested:
91-
local_grpc_server.terminate_supernode_process(supernode_process)
92-
local_grpc_server.stop_server()
93-
time.sleep(0.1)
94-
except KeyboardInterrupt:
95-
local_grpc_server.terminate_supernode_process(supernode_process)
96-
local_grpc_server.stop_server()
108+
while not interop_server.termination_event.is_set():
109+
if self.shutdown_requested:
110+
if self.sgx_enabled:
111+
self.logger.info("Terminating Flower ClientApp process...")
112+
interop_server.terminate_supernode_process(flwr_clientapp_process)
113+
flwr_clientapp_process.wait()
114+
115+
self.logger.info("Shutting down the server and SuperNode process...")
116+
interop_server.terminate_supernode_process(supernode_process)
117+
interop_server.stop_server()
118+
time.sleep(0.1)
119+
97120

98121
def set_tensor_dict(self, tensor_dict, with_opt_vars=False):
99122
"""
@@ -186,3 +209,10 @@ def get_dynamic_port():
186209
# Get the assigned port number
187210
port = s.getsockname()[1]
188211
return port
212+
213+
def is_port_open(host, port):
214+
"""Check if a port is open on the given host."""
215+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
216+
sock.settimeout(1)
217+
result = sock.connect_ex((host, port))
218+
return result == 0

0 commit comments

Comments
 (0)