Skip to content

Commit cb79a5c

Browse files
author
teoparvanov
authored
Collaborator CLI command for pinging the aggregator (securefederatedai#1516)
* Adding a collaborator command for pinging the aggregator, without starting any tasks Signed-off-by: Teodor Parvanov <teodor.parvanov@intel.com> * Addressing review comments Signed-off-by: Teodor Parvanov <teodor.parvanov@intel.com> * Additional logging for the "fx collaborator ping" command Signed-off-by: Teodor Parvanov <teodor.parvanov@intel.com> * Additional documentation on the usage of fx collaborator ping Signed-off-by: Teodor Parvanov <teodor.parvanov@intel.com> --------- Signed-off-by: Teodor Parvanov <teodor.parvanov@intel.com>
1 parent f104f17 commit cb79a5c

File tree

7 files changed

+136
-8
lines changed

7 files changed

+136
-8
lines changed

docs/about/features_index/taskrunner.rst

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,18 +448,19 @@ STEP 3: Start the Federation
448448

449449
1. Open a new terminal, change the directory to the workspace, and activate the virtual environment.
450450

451-
2. Run the Collaborator.
451+
2. Test the connectivity with the Aggregator.
452452

453453
.. code-block:: shell
454454
455-
$ fx collaborator start -n {COLLABORATOR_LABEL}
455+
$ fx collaborator ping -n {COLLABORATOR_LABEL}
456456
457457
where :code:`COLLABORATOR_LABEL` is the label for this Collaborator.
458458

459459
.. note::
460460

461461
Each workspace may have multiple FL plans and multiple collaborator lists associated with it.
462-
Therefore, :code:`fx collaborator start` has the following optional parameters.
462+
Therefore, the :code:`fx collaborator start` and :code:`fx collaborator ping` commands have
463+
the following optional parameters:
463464

464465
+-------------------------+---------------------------------------------------------+
465466
| Optional Parameters | Description |
@@ -469,7 +470,15 @@ STEP 3: Start the Federation
469470
| -d, --data_config PATH | The data set/shard configuration file |
470471
+-------------------------+---------------------------------------------------------+
471472

472-
3. Repeat the earlier steps for each collaborator node in the federation.
473+
3. Run the Collaborator.
474+
475+
.. code-block:: shell
476+
477+
$ fx collaborator start -n {COLLABORATOR_LABEL}
478+
479+
where :code:`COLLABORATOR_LABEL` is the label for this Collaborator.
480+
481+
4. Repeat the earlier steps for each collaborator node in the federation.
473482

474483
When all of the Collaborators connect, the Aggregator starts training. You will see log messages describing the progress of the federated training.
475484

docs/developer_guide/running_the_federation_with_gandlf.rst

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,18 +438,19 @@ STEP 3: Start the Federation
438438

439439
1. Open a new terminal, change the directory to the workspace, and activate the virtual environment.
440440

441-
2. Run the Collaborator.
441+
2. Test the connectivity with the Aggregator.
442442

443443
.. code-block:: shell
444444
445-
$ fx collaborator start -n {COLLABORATOR_LABEL}
445+
$ fx collaborator ping -n {COLLABORATOR_LABEL}
446446
447447
where :code:`COLLABORATOR_LABEL` is the label for this Collaborator.
448448

449449
.. note::
450450

451451
Each workspace may have multiple FL plans and multiple collaborator lists associated with it.
452-
Therefore, :code:`fx collaborator start` has the following optional parameters.
452+
Therefore, the :code:`fx collaborator start` and :code:`fx collaborator ping` commands have
453+
the following optional parameters:
453454

454455
+-------------------------+---------------------------------------------------------+
455456
| Optional Parameters | Description |
@@ -459,7 +460,15 @@ STEP 3: Start the Federation
459460
| -d, --data_config PATH | The data set/shard configuration file |
460461
+-------------------------+---------------------------------------------------------+
461462

462-
3. Repeat the earlier steps for each collaborator node in the federation.
463+
3. Run the Collaborator.
464+
465+
.. code-block:: shell
466+
467+
$ fx collaborator start -n {COLLABORATOR_LABEL}
468+
469+
where :code:`COLLABORATOR_LABEL` is the label for this Collaborator.
470+
471+
4. Repeat the earlier steps for each collaborator node in the federation.
463472

464473
When all of the Collaborators connect, the Aggregator starts training. You will see log messages describing the progress of the federated training.
465474

openfl/component/collaborator/collaborator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ def __init__(
153153
client=self.client,
154154
)
155155

156+
def ping(self):
157+
"""Ping the Aggregator."""
158+
self.client.ping()
159+
156160
def run(self):
157161
"""Run the collaborator."""
158162
# Experiment begin

openfl/interface/collaborator.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,60 @@ def start_(plan, collaborator_name, data_config):
8484
plan.get_collaborator(collaborator_name).run()
8585

8686

87+
@collaborator.command(name="ping")
88+
@option(
89+
"-p",
90+
"--plan",
91+
required=False,
92+
help="Path to an FL plan.",
93+
default="plan/plan.yaml",
94+
type=ClickPath(exists=True),
95+
show_default=True,
96+
)
97+
@option(
98+
"-d",
99+
"--data_config",
100+
required=False,
101+
help="The dataset shard configuration file.",
102+
default="plan/data.yaml",
103+
type=ClickPath(exists=True),
104+
show_default=True,
105+
)
106+
@option(
107+
"-n",
108+
"--collaborator_name",
109+
required=True,
110+
help="The certified common name of the collaborator.",
111+
)
112+
def ping_(plan, collaborator_name, data_config):
113+
"""Ping the aggregator without starting any tasks."""
114+
115+
if plan and is_directory_traversal(plan):
116+
echo("Federated learning plan path is out of the openfl workspace scope.")
117+
sys.exit(1)
118+
if data_config and is_directory_traversal(data_config):
119+
echo("The data set/shard configuration file path is out of the openfl workspace scope.")
120+
sys.exit(1)
121+
122+
fl_plan = Plan.parse(
123+
plan_config_path=Path(plan).absolute(),
124+
data_config_path=Path(data_config).absolute(),
125+
)
126+
127+
agg_addr = fl_plan.config["network"]["settings"]["agg_addr"]
128+
agg_port = fl_plan.config["network"]["settings"]["agg_port"]
129+
use_tls = fl_plan.config["network"]["settings"]["use_tls"]
130+
protocol = "TLS" if use_tls else "TCP"
131+
132+
logger.info(
133+
f"🧿 Testing connectivity with the Aggregator at {agg_addr}:{agg_port} via {protocol}..."
134+
)
135+
fl_plan.get_collaborator(collaborator_name).ping()
136+
137+
logger.info(f"The Aggregator is reachable at {agg_addr}:{agg_port}")
138+
logger.info(f"{protocol} connection established.")
139+
140+
87141
@collaborator.command(name="create")
88142
@option(
89143
"-n",

openfl/protocols/aggregator.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import "openfl/protocols/base.proto";
99

1010

1111
service Aggregator {
12+
rpc Ping(PingRequest) returns (PingResponse) {}
1213
rpc GetTasks(GetTasksRequest) returns (GetTasksResponse) {}
1314
rpc GetAggregatedTensor(GetAggregatedTensorRequest) returns (GetAggregatedTensorResponse) {}
1415
rpc SendLocalTaskResults(stream DataStream) returns (SendLocalTaskResultsResponse) {}
@@ -22,6 +23,14 @@ message MessageHeader {
2223
string single_col_cert_common_name = 4;
2324
}
2425

26+
message PingRequest {
27+
MessageHeader header = 1;
28+
}
29+
30+
message PingResponse {
31+
MessageHeader header = 1;
32+
}
33+
2534
message GetTasksRequest {
2635
MessageHeader header = 1;
2736
}

openfl/transport/grpc/aggregator_client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,25 @@ def reconnect(self):
289289

290290
self.stub = aggregator_pb2_grpc.AggregatorStub(self.channel)
291291

292+
@_resend_data_on_reconnection
293+
@_atomic_connection
294+
def ping(self):
295+
"""Ping the aggregator to check connectivity."""
296+
logger.info("Aggregator ping...")
297+
header = create_header(
298+
sender=self.collaborator_name,
299+
receiver=self.aggregator_uuid,
300+
federation_uuid=self.federation_uuid,
301+
single_col_cert_common_name=self.single_col_cert_common_name,
302+
)
303+
request = aggregator_pb2.PingRequest(header=header)
304+
response = self.stub.Ping(request)
305+
if response:
306+
self.validate_response(response)
307+
logger.info("Aggregator pong!")
308+
else:
309+
logger.warning("Aggregator ping failed...")
310+
292311
@_resend_data_on_reconnection
293312
@_atomic_connection
294313
def get_tasks(self):

openfl/transport/grpc/aggregator_server.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,30 @@ def check_request(self, request):
138138
f"Expected: {self.aggregator.single_col_cert_common_name}, Actual: {request.header.single_col_cert_common_name}" # noqa: E501
139139
)
140140

141+
def Ping(self, request, context): # NOQA:N802
142+
"""Ping endpoint of the Aggregator server.
143+
144+
This method handles a ping request from a collaborator.
145+
146+
Args:
147+
request (aggregator_pb2.PingRequest): The ping request from the
148+
collaborator.
149+
context (grpc.ServicerContext): The context of the request.
150+
151+
Returns:
152+
aggregator_pb2.PingResponse: The response to the ping request.
153+
"""
154+
self.validate_collaborator(request, context)
155+
self.check_request(request)
156+
header = create_header(
157+
sender=self.aggregator.uuid,
158+
receiver=request.header.sender,
159+
federation_uuid=self.aggregator.federation_uuid,
160+
single_col_cert_common_name=self.aggregator.single_col_cert_common_name,
161+
)
162+
163+
return aggregator_pb2.PingResponse(header=header)
164+
141165
def GetTasks(self, request, context): # NOQA:N802
142166
"""Request a job from aggregator.
143167

0 commit comments

Comments
 (0)