|
8 | 8 | from __future__ import annotations |
9 | 9 |
|
10 | 10 | import json |
| 11 | +import re |
11 | 12 | from os import environ |
12 | 13 |
|
13 | 14 | import requests |
@@ -50,6 +51,38 @@ def msk_bootstrap(msk_arn: str, broker_type: str) -> str: |
50 | 51 | return msk_arn |
51 | 52 |
|
52 | 53 |
|
| 54 | +def msk_cluster_zookeeper(msk_arn, with_tls: bool): |
| 55 | + session = Session() |
| 56 | + client = session.client("kafka") |
| 57 | + config_r = client.describe_cluster(ClusterArn=msk_arn) |
| 58 | + config_info = config_r["ClusterInfo"] |
| 59 | + if with_tls and keyisset("ZookeeperConnectStringTls", config_info): |
| 60 | + return config_info["ZookeeperConnectStringTls"] |
| 61 | + else: |
| 62 | + if keyisset("ZookeeperConnectString", config_info): |
| 63 | + return config_info["ZookeeperConnectString"] |
| 64 | + return "" |
| 65 | + |
| 66 | + |
| 67 | +def msk_endpoints(msk_arn: str, broker_type: str, endpoint_type: str): |
| 68 | + session = Session() |
| 69 | + client = session.client("kafka") |
| 70 | + brokers_r = client.get_bootstrap_brokers(ClusterArn=msk_arn) |
| 71 | + if not keyisset(broker_type, brokers_r): |
| 72 | + return |
| 73 | + _endpoints = brokers_r[broker_type].split(",") |
| 74 | + endpoint_port: dict = {"JMX": 11001, "NODE": 11002} |
| 75 | + _msk_ports_re = re.compile(r":(?P<port>\d+)") |
| 76 | + if endpoint_type not in endpoint_port: |
| 77 | + endpoint_type = "JMX" |
| 78 | + endpoints: list[str] = [] |
| 79 | + for _endpoint in _endpoints: |
| 80 | + endpoints.append( |
| 81 | + _msk_ports_re.sub(f":{endpoint_port[endpoint_type]}", _endpoint) |
| 82 | + ) |
| 83 | + return endpoints |
| 84 | + |
| 85 | + |
53 | 86 | def from_ssm_json(parameter_name: str) -> dict: |
54 | 87 | """ |
55 | 88 | Function to retrieve an SSM parameter value |
|
0 commit comments