Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
BROKER:
uri: amqps://admin:NhutSFwChtLzugoMnotz@dev.backbone.sandbox.imubit.io:5671
INSTRUCTION: list_services
SERVICE:
id: W-MTSEITLIN
VERBOSE: false
3 changes: 1 addition & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ package_dir =
# DON'T CHANGE THE FOLLOWING LINE! IT WILL BE UPDATED BY PYSCAFFOLD!
# Add here dependencies of your project (semicolon/line-separated), e.g.
install_requires =
importlib-metadata; python_version<"3.8"
confuse
dynaconf
pandas
aiomisc
aiodebug
Expand Down
54 changes: 15 additions & 39 deletions src/data_agent/broker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,7 @@

from data_agent import __version__
from data_agent.api import ServiceApi
from data_agent.config_manager import (
PersistentComponent,
component_config_view,
init_configuration,
)
from data_agent.config_template import (
CONFIG_SECTION_BROKER,
CONFIG_SECTION_CONNECTION_MANAGER,
CONFIG_SECTION_DAQ_SCHEDULER,
CONFIG_SECTION_LOG,
CONFIG_SECTION_SAFE_MANIPULATOR,
CONFIG_SECTION_SERVICE,
)
from data_agent.config_manager import ConfigManager
from data_agent.connection_manager import ConnectionManager
from data_agent.daq_scheduler import create_daq_scheduler
from data_agent.exchanger import DataExchanger
Expand All @@ -35,10 +23,10 @@ class BrokerAgent:
_scheduler = None

async def init(self, loop, is_service=False, enable_persistance=True):
self._config, _ = init_configuration(is_service, loop)
service_config = component_config_view(self._config, CONFIG_SECTION_SERVICE)
broker_config = component_config_view(self._config, CONFIG_SECTION_BROKER)
log_config = component_config_view(self._config, CONFIG_SECTION_LOG)
self._config = ConfigManager(loop=loop, enable_persistance=enable_persistance)

service_config = self._config.get("service")
broker_config = self._config.get("broker")

uri_pattern = re.compile(r"(\b(?:[a-z]{,5})://.*:)(.*)(@[^ \b]+)", re.MULTILINE)
broker_uri_wo_pass = re.sub(uri_pattern, r"\1**********\3", broker_config.uri)
Expand All @@ -50,8 +38,10 @@ async def init(self, loop, is_service=False, enable_persistance=True):
f" FQN: {service_config.domain}.{service_config.type}.{service_config.id}"
)
log.info(f" Broker URI: {broker_uri_wo_pass}")
log.info(f" Config directory: {self._config.config_dir()}")
log.info(f' Logs path: {log_config["handlers"]["file"]["filename"]}')
log.info(f" Config directory: {self._config.base_path}")
log.info(
f" Logs path: {self._config.get('log.handlers.file.filename')}"
)
log.info(
"***********************************************************************"
)
Expand All @@ -70,29 +60,15 @@ async def init(self, loop, is_service=False, enable_persistance=True):
if handler.get_name() == "amqp":
await self._broker_conn.init_logging_handler(handler)

self._connection_manager = ConnectionManager(
PersistentComponent(
self._config,
CONFIG_SECTION_CONNECTION_MANAGER,
enable_persistence=enable_persistance,
)
)
self._connection_manager = ConnectionManager(config=self._config)
self._safe_manipulator = SafeManipulator(
self._connection_manager,
PersistentComponent(
self._config,
CONFIG_SECTION_SAFE_MANIPULATOR,
enable_persistence=enable_persistance,
),
connection_manager=self._connection_manager,
config=self._config,
)
self._scheduler = create_daq_scheduler(
self._broker_conn,
self._connection_manager,
PersistentComponent(
self._config,
CONFIG_SECTION_DAQ_SCHEDULER,
enable_persistence=enable_persistance,
),
broker=self._broker_conn,
conn_manager=self._connection_manager,
config=self._config,
)
self._data_exchanger = DataExchanger(self._connection_manager)
api = ServiceApi(
Expand Down
30 changes: 13 additions & 17 deletions src/data_agent/broker/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@
from data_agent import __version__
from data_agent.broker.instr_exec import cli_exec
from data_agent.broker.instr_list_services import cli_list_services
from data_agent.config_manager import component_config_view, init_configuration
from data_agent.config_template import CONFIG_SECTION_BROKER, CONFIG_SECTION_SERVICE

# logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG"))
# logging.getLogger().addHandler(logging.StreamHandler())
from data_agent.config_manager import ConfigManager

log = logging.getLogger(__name__)


async def create_amq_broker_connector(config, keep_alive_listen):
config_broker = component_config_view(config, CONFIG_SECTION_BROKER)
config_service = component_config_view(config, CONFIG_SECTION_SERVICE)
config_broker = config.get("broker")
config_service = config.get("service")

amq = AmqBrokerConnector(
amqp_uri=config_broker.uri,
Expand All @@ -35,21 +31,22 @@ def run():
loop = asyncio.get_event_loop()

parser = argparse.ArgumentParser(description="Broker Connected Data Agent CLI")

config = ConfigManager(loop=loop, parser=parser)

# Initialize subparser after ConfigManager identified configurable settings
subparsers = parser.add_subparsers(dest="instruction", help="Execute API call")
subparsers.add_parser("exec")
subparsers.add_parser("list_services")

config, unrecognized_args = init_configuration(
loop=loop, is_service=False, parser=parser
)
known_args, _ = parser.parse_known_args()
print(known_args)

config_service = component_config_view(config, CONFIG_SECTION_SERVICE)
config_service = config.get("service")

log.info(
'***** Broker Connected Agent CLI: "{}", Instruction: "{}", Ver: {} ******'.format(
config_service["id"], known_args.instruction, __version__
)
f'***** Broker Connected Agent CLI: "{config_service.id}", '
f'Instruction: "{known_args.instruction}", Ver: {__version__} ******'
)

broker = loop.run_until_complete(
Expand All @@ -61,14 +58,13 @@ def run():
task = None

if known_args.instruction == "exec":
task = cli_exec(broker, config, unrecognized_args)
task = cli_exec(broker, config)
elif known_args.instruction == "list_services":
task = cli_list_services(broker, config, unrecognized_args)
task = cli_list_services(broker, config)
else:
raise Exception("Unknown instruction")

loop.run_until_complete(task)

loop.run_until_complete(broker.close())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Expand Down
11 changes: 4 additions & 7 deletions src/data_agent/broker/instr_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
from aio_pika.exceptions import DeliveryError
from async_timeout import timeout

from data_agent.config_manager import component_config_view
from data_agent.config_template import CONFIG_SECTION_SERVICE

global_kwargs = []


Expand Down Expand Up @@ -88,16 +85,16 @@ async def print_formatted_result(
print("\n\r------------------------------------------------")


async def cli_exec(broker, config, args):
config_service = component_config_view(config, CONFIG_SECTION_SERVICE)
async def cli_exec(broker, config):
config_service = config.get("service")

service_id = config_service.id
service_domain = config_service.domain
service_type = config_service.type

await print_formatted_result(
args[0],
args[1:] if len(args) > 1 else [],
config.unknown_args[0],
config.unknown_args[1:] if len(config.unknown_args) > 1 else [],
broker,
service_domain,
service_id,
Expand Down
2 changes: 1 addition & 1 deletion src/data_agent/broker/instr_list_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pprint import pprint


async def cli_list_services(broker, config, args):
async def cli_list_services(broker, config):
# Sleep to collect keep alives
print("Waiting 5 seconds for keep alives...")
await asyncio.sleep(5)
Expand Down
Loading
Loading