Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controllers/scripts/csi_general/csi_pb2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cd ./proto/${PB2_DIR}
curl -O https://raw.githubusercontent.com/container-storage-interface/spec/${CSI_VERSION}/csi.proto
curl -O https://raw.githubusercontent.com/IBM/csi-volume-group/${VG_VERSION}/volumegroup/volumegroup.proto
curl -O https://raw.githubusercontent.com/csi-addons/spec/v0.2.0/replication/replication.proto
curl -O https://raw.githubusercontent.com/csi-addons/spec/main/identity/identity.proto
sed -i 's|github.com/container-storage-interface/spec/lib/go/csi/csi.proto|csi_general/csi.proto|g' replication.proto
cd -

Expand Down
57 changes: 0 additions & 57 deletions controllers/servers/csi/controller_server_manager.py

This file was deleted.

Empty file.
57 changes: 54 additions & 3 deletions controllers/servers/csi/main.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,70 @@
import os
from argparse import ArgumentParser
from threading import Thread
from concurrent import futures
import grpc
from concurrent import futures

from csi_general import csi_pb2_grpc, volumegroup_pb2_grpc, identity_pb2_grpc, replication_pb2_grpc

from controllers.common.csi_logger import set_log_level
from controllers.servers.csi.controller_server_manager import ControllerServerManager
from controllers.common.settings import CSI_CONTROLLER_SERVER_WORKERS
from controllers.servers.csi.server_manager import ServerManager
from controllers.servers.csi.csi_controller_server import CSIControllerServicer
from controllers.servers.csi.volume_group_server import VolumeGroupControllerServicer
from controllers.servers.csi.csi_addons_server.replication_controller_servicer import ReplicationControllerServicer


def main():
parser = ArgumentParser()
parser.add_argument("-e", "--csi-endpoint", dest="endpoint", help="grpc endpoint")
parser.add_argument("-a", "--csi-addons-endpoint", dest="addonsendpoint", help="CSI-Addons grpc endpoint")
parser.add_argument("-l", "--loglevel", dest="loglevel", help="log level")
arguments = parser.parse_args()

set_log_level(arguments.loglevel)
controller_server = _create_grpc_server()
csi_addons_server = _create_grpc_server()

csi_controller_server_manager = ServerManager(arguments.endpoint, "Controller",
_add_csi_controller_servicers(controller_server))
csi_addons_server_manager = ServerManager(arguments.addonsendpoint, "CSI Addons",
_add_csi_addons_servicers(csi_addons_server))
_start_servers(csi_controller_server_manager, csi_addons_server_manager)


def _create_grpc_server():
max_workers = _get_max_workers_count()
return grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))


def _get_max_workers_count():
cpu_count = (os.cpu_count() or 1) + 4
return CSI_CONTROLLER_SERVER_WORKERS if cpu_count < CSI_CONTROLLER_SERVER_WORKERS else None


def _add_csi_controller_servicers(controller_server):
csi_servicer = CSIControllerServicer()
volume_group_servicer = VolumeGroupControllerServicer()
csi_pb2_grpc.add_ControllerServicer_to_server(csi_servicer, controller_server)
csi_pb2_grpc.add_IdentityServicer_to_server(csi_servicer, controller_server)
volumegroup_pb2_grpc.add_ControllerServicer_to_server(volume_group_servicer, controller_server)
return controller_server


def _add_csi_addons_servicers(csi_addons_server):
replication_servicer = ReplicationControllerServicer()
replication_pb2_grpc.add_ControllerServicer_to_server(replication_servicer, csi_addons_server)
return csi_addons_server


server_manager = ControllerServerManager(arguments.endpoint)
server_manager.start_server()
def _start_servers(csi_controller_server_manager, csi_addons_server_manager):
servers = (
csi_controller_server_manager.start_server,
csi_addons_server_manager.start_server)
for server_function in servers:
thread = Thread(target=server_function,)
thread.start()


if __name__ == '__main__':
Expand Down
34 changes: 34 additions & 0 deletions controllers/servers/csi/server_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import time

from controllers.common.config import config
from controllers.common.csi_logger import get_stdout_logger

logger = get_stdout_logger()


class ServerManager:
def __init__(self, array_endpoint, server_type, grpc_server):
self.endpoint = array_endpoint
self.server_type = server_type
self.grpc_server = grpc_server

def start_server(self):
# bind the server to the port defined above
# grpc_server.add_insecure_port('[::]:{}'.format(self.server_port))
# grpc_server.add_insecure_port('unix://{}'.format(self.server_port))
self.grpc_server.add_insecure_port(self.endpoint)

logger.info("{} version: {}".format(self.server_type, config.identity.version))

# start the server
logger.debug("Listening for connections on endpoint address: {}".format(self.endpoint))

self.grpc_server.start()
logger.debug('{} Server running ...'.format(self.server_type))

try:
while True:
time.sleep(60 * 60 * 60)
except KeyboardInterrupt:
self.grpc_server.stop(0)
logger.debug('Controller Server Stopped ...')
4 changes: 2 additions & 2 deletions controllers/tests/controller_server/addons_server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from controllers.servers.settings import PARAMETERS_SYSTEM_ID, PARAMETERS_COPY_TYPE, PARAMETERS_REPLICATION_POLICY
from controllers.array_action.settings import REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, REPLICATION_COPY_TYPE_SYNC
from controllers.array_action.array_action_types import ReplicationRequest
from controllers.servers.csi.addons_server import ReplicationControllerServicer
from controllers.servers.csi.csi_addons_server.replication_controller_servicer import ReplicationControllerServicer
from controllers.tests import utils
from controllers.tests.common.test_settings import VOLUME_NAME, VOLUME_UID, OBJECT_INTERNAL_ID, \
OTHER_OBJECT_INTERNAL_ID, REPLICATION_NAME, SYSTEM_ID, COPY_TYPE, SECRET_USERNAME_VALUE, SECRET_PASSWORD_VALUE, \
Expand All @@ -16,7 +16,7 @@
from controllers.tests.controller_server.csi_controller_server_test import (CommonControllerTest)
from controllers.tests.utils import ProtoBufMock

ADDON_SERVER_PATH = "controllers.servers.csi.addons_server"
ADDON_SERVER_PATH = "controllers.servers.csi.csi_addons_server.replication_controller_servicer"


class BaseReplicationSetUp(unittest.TestCase):
Expand Down