Skip to content

Commit 6321951

Browse files
authored
[Task Runner API] [Flower Interoperability] Integrate changes for flower-app-pytorch workspace's custom Collaborator into core component (#1598)
* integrate flower collaborator into core collaborator Signed-off-by: kta-intel <[email protected]> * formatting Signed-off-by: kta-intel <[email protected]> * change task name interop -> prepare_for_interop, change references to local_grpc_server to interop_server Signed-off-by: kta-intel <[email protected]> * add prepare_interop_server as callback to run on_experiment_begin Signed-off-by: kta-intel <[email protected]> * fix typo Signed-off-by: kta-intel <[email protected]> * change to on_round_begin() Signed-off-by: kta-intel <[email protected]> * return empty dict Signed-off-by: kta-intel <[email protected]> * set self.prepare_interop_server() to run on experiment begin Signed-off-by: kta-intel <[email protected]> * remove self-explanatory comment, added another comment for clarity Signed-off-by: kta-intel <[email protected]> --------- Signed-off-by: kta-intel <[email protected]>
1 parent 77297d8 commit 6321951

File tree

7 files changed

+91
-94
lines changed

7 files changed

+91
-94
lines changed

openfl-workspace/flower-app-pytorch/README.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,26 @@ task_runner:
9191

9292
3. `FlowerDataLoader` with similar high-level functionality to other dataloaders.
9393

94-
4. `Task` - we introduce a `tasks_connector.yaml` that will allow the collaborator to connect to Flower framework via the local gRPC server. It also handles the task runner's `start_client_adapter` method, which actually starts the Flower component and local gRPC server. By setting `local_server_port` to 0, the port is dynamically allocated. This is mainly for local experiments to avoid overlapping the ports.
94+
4. `Task` - we introduce a `tasks_connector.yaml` that will allow the collaborator to connect to Flower framework via the interop server. It also handles the task runner's `start_client_adapter` method, which actually starts the Flower component and interop server. By setting `local_server_port` to 0, the port is dynamically allocated. This is mainly for local experiments to avoid overlapping the ports.
9595

9696
```yaml
9797
tasks:
98-
settings:
99-
connect_to: Flower
100-
start_client_adapter:
98+
prepare_for_interop:
10199
function: start_client_adapter
102100
kwargs:
103-
local_server_port: 0
101+
interop_server_port: 0
102+
settings:
103+
interop_server: src.grpc.connector.flower.interop_server
104+
```
105+
106+
5.`Collaborator` has an additional setting `interop_mode` which will invoke a callback to prepare the interop server that'll eventually be started by the Task Runner
107+
108+
```yaml
109+
collaborator :
110+
defaults : plan/defaults/collaborator.yaml
111+
template : openfl.component.Collaborator
112+
settings:
113+
interop_mode : True
104114
```
105115

106116
> **Note**: `aggregator.settings.rounds_to_train` is set to 1. __Do not edit this__. The actual number of rounds for the experiment is controlled by Flower logic inside of `./app-pytorch/pyproject.toml`. The entirety of the Flower experiment will run in a single OpenFL round. Increasing this will cause OpenFL to attempt to run the experiment again. The aggregator round is there to stop the OpenFL components at the completion of the experiment.

openfl-workspace/flower-app-pytorch/plan/plan.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ connector :
2525

2626
collaborator :
2727
defaults : plan/defaults/collaborator.yaml
28-
template : src.collaborator.CollaboratorFlower
28+
template : openfl.component.Collaborator
29+
settings:
30+
interop_mode : True
2931

3032
data_loader :
3133
defaults : plan/defaults/data_loader.yaml
@@ -51,12 +53,12 @@ assigner :
5153
- name : Connector_Flower
5254
percentage : 1.0
5355
tasks :
54-
- start_client_adapter
56+
- prepare_for_interop
5557

5658
tasks :
5759
defaults : plan/defaults/tasks_connector.yaml
5860
settings :
59-
connect_to : Flower
61+
interop_server: src.grpc.connector.flower.interop_server
6062

6163
compression_pipeline :
6264
defaults : plan/defaults/compression_pipeline.yaml

openfl-workspace/flower-app-pytorch/src/collaborator.py

Lines changed: 0 additions & 68 deletions
This file was deleted.

openfl-workspace/flower-app-pytorch/src/grpc/connector/flower/interop_server.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,14 @@ class FlowerInteropServer(grpcadapter_pb2_grpc.GrpcAdapterServicer):
2121
request handling issues.
2222
"""
2323

24-
def __init__(self, openfl_client, collaborator_name):
24+
def __init__(self, send_message_to_client):
2525
"""
2626
Initialize.
2727
2828
Args:
29-
openfl_client: An instance of the OpenFL Client.
30-
collaborator_name: The name of the collaborator.
29+
send_message_to_client (Callable): A callable function to send messages to the OpenFL client.
3130
"""
32-
self.openfl_client = openfl_client
33-
self.collaborator_name = collaborator_name
31+
self.send_message_to_client = send_message_to_client
3432
self.end_experiment_callback = None
3533
self.request_queue = queue.Queue()
3634
self.processing_thread = threading.Thread(target=self.process_queue)
@@ -86,7 +84,7 @@ def process_queue(self):
8684
openfl_request = flower_to_openfl_message(request)
8785

8886
# Send request to the OpenFL server
89-
openfl_response = self.openfl_client.send_message_to_server(openfl_request, self.collaborator_name)
87+
openfl_response = self.send_message_to_client(openfl_request)
9088

9189
# Check to end experiment
9290
if hasattr(openfl_response, 'metadata'):

openfl-workspace/flower-app-pytorch/src/runner.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,30 +51,40 @@ def __init__(self, **kwargs):
5151

5252
self.shutdown_requested = False # Flag to signal shutdown
5353

54-
def start_client_adapter(self, interop_server, **kwargs):
54+
def start_client_adapter(self,
55+
col_name=None,
56+
round_num=None,
57+
input_tensor_dict=None,
58+
**kwargs):
5559
"""
56-
Start the local gRPC server and the Flower SuperNode.
60+
Start the FlowerInteropServer and the Flower SuperNode.
5761
5862
Args:
59-
interop_server: The local gRPC server instance.
60-
**kwargs: Additional parameters, including 'local_server_port'.
63+
col_name (str, optional): The collaborator name. Defaults to None.
64+
round_num (int, optional): The current round number. Defaults to None.
65+
input_tensor_dict (dict, optional): The input tensor dictionary. Defaults to None.
66+
**kwargs: Additional parameters for configuration.
67+
includes:
68+
interop_server (object): The FlowerInteropServer instance.
69+
interop_server_port (int): The port for the interop server.
6170
"""
62-
local_server_port = kwargs.get('local_server_port')
6371

6472
def message_callback():
6573
self.shutdown_requested = True
6674

67-
# Set the callback for ending the experiment
75+
interop_server = kwargs.get('interop_server')
76+
interop_server_port = kwargs.get('interop_server_port')
6877
interop_server.set_end_experiment_callback(message_callback)
69-
interop_server.start_server(local_server_port)
78+
interop_server.start_server(interop_server_port)
7079

71-
local_server_port = interop_server.get_port()
80+
# interop server sets port dynamically
81+
interop_server_port = interop_server.get_port()
7282

7383
command = [
7484
"flower-supernode",
7585
"--insecure",
7686
"--grpc-adapter",
77-
"--superlink", f"127.0.0.1:{local_server_port}",
87+
"--superlink", f"127.0.0.1:{interop_server_port}",
7888
"--clientappio-api-address", f"127.0.0.1:{self.client_port}",
7989
"--node-config", f"data-path='{self.data_path}'"
8090
]
@@ -93,7 +103,7 @@ def message_callback():
93103

94104
if self.sgx_enabled:
95105
# Check if port is open before starting the client app
96-
while not is_port_open('127.0.0.1', local_server_port):
106+
while not is_port_open('127.0.0.1', interop_server_port):
97107
time.sleep(0.5)
98108

99109
time.sleep(1) # Add a small delay after confirming the port is open
@@ -116,6 +126,14 @@ def message_callback():
116126
interop_server.stop_server()
117127
time.sleep(0.1)
118128

129+
# Collaborator expects these dictionaries, but they are not used in this context
130+
# as Flower will handle the tensors internally.
131+
global_output_tensor_dict = {}
132+
local_output_tensor_dict = {}
133+
134+
return global_output_tensor_dict, local_output_tensor_dict
135+
136+
119137

120138
def set_tensor_dict(self, tensor_dict, with_opt_vars=False):
121139
"""
@@ -160,6 +178,9 @@ def initialize_tensorkeys_for_functions(self, with_opt_vars=False):
160178
"""Initialize tensor keys for functions. Currently not implemented."""
161179
pass
162180

181+
def get_required_tensorkeys_for_function(self, func_name, **kwargs):
182+
"""Get tensor keys for functions. Return empty dict."""
183+
return {}
163184

164185
def install_flower_FAB(flwr_app_name):
165186
"""
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
start_client_adapter:
1+
prepare_for_interop:
22
function : start_client_adapter
33
kwargs :
4-
local_server_port : 0 # local grpc server, 0 to dynamically allocate
4+
interop_server_port : 0 # interop server port, 0 to dynamically allocate

openfl/component/collaborator/collaborator.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
"""Collaborator module."""
66

7+
import importlib
78
import logging
89
from enum import Enum
910
from time import sleep
@@ -75,6 +76,7 @@ def __init__(
7576
write_logs=False,
7677
callbacks: Optional[List] = [],
7778
secure_aggregation=False,
79+
interop_mode=False,
7880
):
7981
"""Initialize the Collaborator object.
8082
@@ -143,6 +145,15 @@ def __init__(
143145
else:
144146
callbacks = [secure_aggregation_callback]
145147

148+
# Interoperability mode
149+
self._interop_mode_enabled = interop_mode
150+
if self._interop_mode_enabled:
151+
callbacks.append(
152+
callbacks_module.LambdaCallback(
153+
on_experiment_begin=lambda logs=None: self.prepare_interop_server()
154+
)
155+
)
156+
146157
# Callbacks
147158
self.callbacks = callbacks_module.CallbackList(
148159
callbacks,
@@ -584,3 +595,26 @@ def _apply_masks(
584595
continue
585596
masked_metric = np.add(self._private_mask, tensor_dict[tensor_key])
586597
tensor_dict[tensor_key] = np.add(masked_metric, self._shared_mask)
598+
599+
def prepare_interop_server(self):
600+
"""
601+
Prepare the interoperability server.
602+
603+
This function initializes the interoperability server and sets up
604+
the callback for receiving messages from the interop server.
605+
It also sets the interop server in the task configuration to be used
606+
by the Task Runner.
607+
"""
608+
609+
# Initialize the interop server
610+
framework = self.task_config["settings"]["interop_server"]
611+
module = importlib.import_module(framework)
612+
613+
def receive_message_from_interop(message):
614+
"""Receive message from interop server."""
615+
# Process the request and return a response
616+
response = self.client.send_message_to_server(message, self.collaborator_name)
617+
return response
618+
619+
interop_server = module.FlowerInteropServer(receive_message_from_interop)
620+
self.task_config["prepare_for_interop"]["kwargs"]["interop_server"] = interop_server

0 commit comments

Comments
 (0)