Skip to content

Commit 7b22496

Browse files
committed
[grpc] Using gRPC multiprocessing.
1 parent 933a630 commit 7b22496

File tree

6 files changed

+111
-64
lines changed

6 files changed

+111
-64
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,6 @@ venv.bak/
102102

103103
# mypy
104104
.mypy_cache/
105+
106+
*_pb2.py
107+
*_pb2_grpc.py

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
grpcio>=1.14.2
1+
grpcio>=1.14.2 --no-binary grpcio
22
grpcio-tools>=1.14.1

run_example_service.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,37 @@
1111

1212
from service import registry
1313

14-
logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s")
14+
logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - "
15+
"%(name)s - %(message)s")
1516
log = logging.getLogger("run_example_service")
1617

1718

1819
def main():
1920
parser = argparse.ArgumentParser(description="Run services")
20-
parser.add_argument("--no-daemon", action="store_false", dest="run_daemon", help="do not start the daemon")
21+
parser.add_argument("--no-daemon",
22+
action="store_false",
23+
dest="run_daemon",
24+
help="do not start the daemon")
2125
parser.add_argument("--daemon-config",
2226
dest="daemon_config",
2327
help="Path of daemon configuration file, without config it won't be started",
24-
required=False
25-
)
26-
parser.add_argument("--ssl", action="store_true", dest="run_ssl", help="start the daemon with SSL")
28+
required=False)
29+
parser.add_argument("--ssl",
30+
action="store_true",
31+
dest="run_ssl",
32+
help="start the daemon with SSL")
2733
args = parser.parse_args()
2834
root_path = pathlib.Path(__file__).absolute().parent
2935

3036
# All services modules go here
3137
service_modules = ["service.example_service"]
3238

3339
# Call for all the services listed in service_modules
34-
all_p = start_all_services(root_path, service_modules, args.run_daemon, args.daemon_config, args.run_ssl)
40+
all_p = start_all_services(root_path,
41+
service_modules,
42+
args.run_daemon,
43+
args.daemon_config,
44+
args.run_ssl)
3545

3646
# Continuous checking all subprocess
3747
try:
@@ -53,10 +63,12 @@ def start_all_services(cwd, service_modules, run_daemon, daemon_config, run_ssl)
5363
snetd will start with configs from "snetd.config.json"
5464
"""
5565
all_p = []
56-
for i, service_module in enumerate(service_modules):
66+
for _, service_module in enumerate(service_modules):
5767
service_name = service_module.split(".")[-1]
58-
log.info("Launching {} on port {}".format(str(registry[service_name]), service_module))
59-
all_p += start_service(cwd, service_module, run_daemon, daemon_config, run_ssl)
68+
log.info("Launching {} on port {}".format(str(registry[service_name]),
69+
service_module))
70+
all_p += start_service(cwd, service_module,
71+
run_daemon, daemon_config, run_ssl)
6072
return all_p
6173

6274

@@ -80,13 +92,17 @@ def add_ssl_configs(conf):
8092
if daemon_config:
8193
all_p.append(start_snetd(str(cwd), daemon_config))
8294
else:
83-
for idx, config_file in enumerate(glob.glob("./snetd_configs/*.json")):
95+
for _, config_file in enumerate(glob.glob("./snetd_configs/*.json")):
8496
if run_ssl:
8597
add_ssl_configs(config_file)
8698
all_p.append(start_snetd(str(cwd), config_file))
8799
service_name = service_module.split(".")[-1]
88100
grpc_port = registry[service_name]["grpc"]
89-
p = subprocess.Popen([sys.executable, "-m", service_module, "--grpc-port", str(grpc_port)], cwd=str(cwd))
101+
p = subprocess.Popen([
102+
sys.executable,
103+
"-m", service_module,
104+
"--grpc-port", str(grpc_port)],
105+
cwd=str(cwd))
90106
all_p.append(p)
91107
return all_p
92108

service/common.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

service/example_service.py

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1-
import sys
1+
from concurrent import futures
2+
import contextlib
3+
import datetime
24
import logging
5+
import multiprocessing
6+
import time
7+
import socket
8+
import sys
39

10+
import argparse
11+
import os.path
412
import grpc
5-
import concurrent.futures as futures
613

7-
import service.common
14+
from service import registry
815

916
# Importing the generated codes from buildproto.sh
1017
import service.service_spec.example_service_pb2_grpc as grpc_bt_grpc
1118
from service.service_spec.example_service_pb2 import Result
1219

13-
logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s] - %(name)s - %(message)s")
14-
log = logging.getLogger("example_service")
20+
logging.basicConfig(level=10, format="%(asctime)s - [%(levelname)8s]"
21+
" - %(name)s - %(message)s")
22+
_LOGGER = logging.getLogger("example_service")
1523

24+
_ONE_DAY = datetime.timedelta(days=1)
25+
_PROCESS_COUNT = multiprocessing.cpu_count()
26+
_THREAD_CONCURRENCY = _PROCESS_COUNT
1627

1728
"""
1829
Simple arithmetic service to test the Snet Daemon (gRPC), dApp and/or Snet-CLI.
@@ -41,7 +52,6 @@
4152
value: 924.0
4253
"""
4354

44-
4555
# Create a class to be added to the gRPC server
4656
# derived from the protobuf codes.
4757
class CalculatorServicer(grpc_bt_grpc.CalculatorServicer):
@@ -50,7 +60,7 @@ def __init__(self):
5060
self.b = 0
5161
self.result = 0
5262
# Just for debugging purpose.
53-
log.debug("CalculatorServicer created")
63+
_LOGGER.debug("CalculatorServicer created")
5464

5565
# The method that will be exposed to the snet-cli call command.
5666
# request: incoming data
@@ -64,7 +74,7 @@ def add(self, request, context):
6474
self.result = Result()
6575

6676
self.result.value = self.a + self.b
67-
log.debug("add({},{})={}".format(self.a, self.b, self.result.value))
77+
_LOGGER.debug("add({},{})={}".format(self.a, self.b, self.result.value))
6878
return self.result
6979

7080
def sub(self, request, context):
@@ -73,7 +83,7 @@ def sub(self, request, context):
7383

7484
self.result = Result()
7585
self.result.value = self.a - self.b
76-
log.debug("sub({},{})={}".format(self.a, self.b, self.result.value))
86+
_LOGGER.debug("sub({},{})={}".format(self.a, self.b, self.result.value))
7787
return self.result
7888

7989
def mul(self, request, context):
@@ -82,7 +92,7 @@ def mul(self, request, context):
8292

8393
self.result = Result()
8494
self.result.value = self.a * self.b
85-
log.debug("mul({},{})={}".format(self.a, self.b, self.result.value))
95+
_LOGGER.debug("mul({},{})={}".format(self.a, self.b, self.result.value))
8696
return self.result
8797

8898
def div(self, request, context):
@@ -91,10 +101,18 @@ def div(self, request, context):
91101

92102
self.result = Result()
93103
self.result.value = self.a / self.b
94-
log.debug("div({},{})={}".format(self.a, self.b, self.result.value))
104+
_LOGGER.debug("div({},{})={}".format(self.a, self.b, self.result.value))
95105
return self.result
96106

97107

108+
def wait_forever(server):
109+
try:
110+
while True:
111+
time.sleep(_ONE_DAY.total_seconds())
112+
except KeyboardInterrupt:
113+
server.stop(None)
114+
115+
98116
# The gRPC serve function.
99117
#
100118
# Params:
@@ -103,17 +121,54 @@ def div(self, request, context):
103121
#
104122
# Add all your classes to the server here.
105123
# (from generated .py files by protobuf compiler)
106-
def serve(max_workers=10, port=7777):
107-
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
124+
def run_server(grpc_port=7777):
125+
options = (('grpc.so_reuseport', 1),)
126+
server = grpc.server(
127+
futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,),
128+
options=options)
108129
grpc_bt_grpc.add_CalculatorServicer_to_server(CalculatorServicer(), server)
109-
server.add_insecure_port("[::]:{}".format(port))
110-
return server
130+
server.add_insecure_port("[::]:{}".format(grpc_port))
131+
server.start()
132+
wait_forever(server)
133+
134+
135+
@contextlib.contextmanager
136+
def reserve_port(grpc_port=7777):
137+
"""Find and reserve a port for all subprocesses to use."""
138+
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
139+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
140+
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
141+
raise RuntimeError("Failed to set SO_REUSEPORT.")
142+
sock.bind(('', grpc_port))
143+
try:
144+
yield sock.getsockname()[1]
145+
finally:
146+
sock.close()
147+
148+
def main():
149+
""" Runs the gRPC server to communicate with the SNET Daemon. """
150+
parser = argparse.ArgumentParser(prog=__file__)
151+
service_name = os.path.splitext(os.path.basename(__file__))[0]
152+
parser.add_argument("--grpc-port",
153+
help="port to bind gRPC service to",
154+
default=registry[service_name]['grpc'],
155+
type=int,
156+
required=False)
157+
args = parser.parse_args(sys.argv[1:])
158+
with reserve_port(args.grpc_port) as port:
159+
_LOGGER.debug("Binding to port '%s'", port)
160+
sys.stdout.flush()
161+
workers = []
162+
for _ in range(_PROCESS_COUNT):
163+
# NOTE: It is imperative that the worker subprocesses be forked before
164+
# any gRPC servers start up. See
165+
# https://github.com/grpc/grpc/issues/16001 for more details.
166+
worker = multiprocessing.Process(target=run_server, args=(port,))
167+
worker.start()
168+
workers.append(worker)
169+
for worker in workers:
170+
worker.join()
111171

112172

113173
if __name__ == "__main__":
114-
"""
115-
Runs the gRPC server to communicate with the Snet Daemon.
116-
"""
117-
parser = service.common.common_parser(__file__)
118-
args = parser.parse_args(sys.argv[1:])
119-
service.common.main_loop(serve, args)
174+
main()

test_example_service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
test_flag = True
1717

1818
# Example Service - Arithmetic
19-
endpoint = input("Endpoint (localhost:{}): ".format(registry["example_service"]["grpc"])) if not test_flag else ""
19+
endpoint = input("Endpoint (localhost:{}): ".format(
20+
registry["example_service"]["grpc"])) if not test_flag else ""
2021
if endpoint == "":
21-
endpoint = "localhost:{}".format(registry["example_service"]["grpc"])
22+
endpoint = "localhost:{}".format(
23+
registry["example_service"]["grpc"])
2224

2325
grpc_method = input("Method (add|sub|mul|div): ") if not test_flag else "mul"
2426
a = float(input("Number 1: ") if not test_flag else "12")

0 commit comments

Comments
 (0)