Skip to content

Commit 5bcfdde

Browse files
committed
[grpc] Number of processes through arguments.
1 parent 7b22496 commit 5bcfdde

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

run_example_service.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ def main():
3030
action="store_true",
3131
dest="run_ssl",
3232
help="start the daemon with SSL")
33+
parser.add_argument("--mp",
34+
help="number of concurrent processes",
35+
metavar="NUMBER_OF_PROCESSES",
36+
default=1,
37+
type=int,
38+
required=False)
3339
args = parser.parse_args()
3440
root_path = pathlib.Path(__file__).absolute().parent
3541

@@ -41,7 +47,8 @@ def main():
4147
service_modules,
4248
args.run_daemon,
4349
args.daemon_config,
44-
args.run_ssl)
50+
args.run_ssl,
51+
args.mp)
4552

4653
# Continuous checking all subprocess
4754
try:
@@ -56,7 +63,8 @@ def main():
5663
raise
5764

5865

59-
def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl):
66+
def start_all_services(cwd, service_modules,
67+
run_daemon, daemon_config, run_ssl, mp):
6068
"""
6169
Loop through all service_modules and start them.
6270
For each one, an instance of Daemon "snetd" is created.
@@ -65,14 +73,14 @@ def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl)
6573
all_p = []
6674
for _, service_module in enumerate(service_modules):
6775
service_name = service_module.split(".")[-1]
68-
log.info("Launching {} on port {}".format(str(registry[service_name]),
69-
service_module))
76+
log.info("Launching {} on port {}".format(service_module,
77+
str(registry[service_name])))
7078
all_p += start_service(cwd, service_module,
71-
run_daemon, daemon_config, run_ssl)
79+
run_daemon, daemon_config, run_ssl, mp)
7280
return all_p
7381

7482

75-
def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl):
83+
def start_service(cwd, service_module, run_daemon, daemon_config, run_ssl, mp):
7684
"""
7785
Starts SNET Daemon ("snetd") and the python module of the service
7886
at the passed gRPC port.
@@ -98,11 +106,11 @@ def add_ssl_configs(conf):
98106
all_p.append(start_snetd(str(cwd), config_file))
99107
service_name = service_module.split(".")[-1]
100108
grpc_port = registry[service_name]["grpc"]
101-
p = subprocess.Popen([
102-
sys.executable,
103-
"-m", service_module,
104-
"--grpc-port", str(grpc_port)],
105-
cwd=str(cwd))
109+
p = subprocess.Popen([sys.executable,
110+
"-m", service_module,
111+
"--grpc-port", str(grpc_port),
112+
"--mp", str(mp)],
113+
cwd=str(cwd))
106114
all_p.append(p)
107115
return all_p
108116

service/example_service.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import sys
99

1010
import argparse
11-
import os.path
11+
import os
1212
import grpc
1313

1414
from service import registry
@@ -56,11 +56,12 @@
5656
# derived from the protobuf codes.
5757
class CalculatorServicer(grpc_bt_grpc.CalculatorServicer):
5858
def __init__(self):
59+
self.pid = os.getpid()
5960
self.a = 0
6061
self.b = 0
6162
self.result = 0
6263
# Just for debugging purpose.
63-
_LOGGER.debug("CalculatorServicer created")
64+
_LOGGER.debug("[{}] CalculatorServicer created".format(self.pid))
6465

6566
# The method that will be exposed to the snet-cli call command.
6667
# request: incoming data
@@ -74,7 +75,10 @@ def add(self, request, context):
7475
self.result = Result()
7576

7677
self.result.value = self.a + self.b
77-
_LOGGER.debug("add({},{})={}".format(self.a, self.b, self.result.value))
78+
_LOGGER.debug("[{}] add({},{})={}".format(self.pid,
79+
self.a,
80+
self.b,
81+
self.result.value))
7882
return self.result
7983

8084
def sub(self, request, context):
@@ -83,7 +87,10 @@ def sub(self, request, context):
8387

8488
self.result = Result()
8589
self.result.value = self.a - self.b
86-
_LOGGER.debug("sub({},{})={}".format(self.a, self.b, self.result.value))
90+
_LOGGER.debug("[{}] sub({},{})={}".format(self.pid,
91+
self.a,
92+
self.b,
93+
self.result.value))
8794
return self.result
8895

8996
def mul(self, request, context):
@@ -92,7 +99,10 @@ def mul(self, request, context):
9299

93100
self.result = Result()
94101
self.result.value = self.a * self.b
95-
_LOGGER.debug("mul({},{})={}".format(self.a, self.b, self.result.value))
102+
_LOGGER.debug("[{}] mul({},{})={}".format(self.pid,
103+
self.a,
104+
self.b,
105+
self.result.value))
96106
return self.result
97107

98108
def div(self, request, context):
@@ -101,7 +111,10 @@ def div(self, request, context):
101111

102112
self.result = Result()
103113
self.result.value = self.a / self.b
104-
_LOGGER.debug("div({},{})={}".format(self.a, self.b, self.result.value))
114+
_LOGGER.debug("[{}] div({},{})={}".format(self.pid,
115+
self.a,
116+
self.b,
117+
self.result.value))
105118
return self.result
106119

107120

@@ -154,12 +167,19 @@ def main():
154167
default=registry[service_name]['grpc'],
155168
type=int,
156169
required=False)
157-
args = parser.parse_args(sys.argv[1:])
170+
parser.add_argument("--mp",
171+
help="number of concurrent processes",
172+
metavar="NUMBER_OF_PROCESSES",
173+
default=1,
174+
type=int,
175+
required=False)
176+
args = parser.parse_args()
177+
178+
num_processes = _PROCESS_COUNT if args.mp > _PROCESS_COUNT else args.mp
158179
with reserve_port(args.grpc_port) as port:
159-
_LOGGER.debug("Binding to port '%s'", port)
160180
sys.stdout.flush()
161181
workers = []
162-
for _ in range(_PROCESS_COUNT):
182+
for _ in range(num_processes):
163183
# NOTE: It is imperative that the worker subprocesses be forked before
164184
# any gRPC servers start up. See
165185
# https://github.com/grpc/grpc/issues/16001 for more details.

0 commit comments

Comments
 (0)