diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 021b70d66..12fae71e5 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -8,7 +8,9 @@ on: jobs: deploy-to-dockerhub: runs-on: ubuntu-latest - if: github.event.workflow_run.conclusion == 'success' + # if: github.event.workflow_run.conclusion == 'success' + # DISABLE FOR REWRITE + if: false steps: - uses: actions/checkout@v4 - name: Set up QEMU diff --git a/.github/workflows/publish-dist.yml b/.github/workflows/publish-dist.yml index d228464e0..35c41ae39 100644 --- a/.github/workflows/publish-dist.yml +++ b/.github/workflows/publish-dist.yml @@ -6,7 +6,6 @@ jobs: build: name: Build distribution 📦 runs-on: ubuntu-latest - steps: - uses: actions/checkout@v4 - name: Set up Python @@ -30,7 +29,9 @@ jobs: publish-to-pypi: name: >- Publish Python 🐍 distribution 📦 to PyPI - if: startsWith(github.ref, 'refs/tags/') # only publish to PyPI on tag pushes + # if: startsWith(github.ref, 'refs/tags/') # only publish to PyPI on tag pushes + # DISABLE FOR REWRITE + if: false needs: - build runs-on: ubuntu-latest diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0524bed9c..2ce7c4cbf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,48 +7,25 @@ on: - main jobs: - # ruff: - # runs-on: ubuntu-latest - # steps: - # - uses: actions/checkout@v4 - # - uses: chartboost/ruff-action@v1 - # with: - # args: 'check .' - ruff-format: + ruff: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: chartboost/ruff-action@v1 - with: - args: 'format --check .' - build-image: - # needs: [ruff, ruff-format] - needs: [ruff-format] + - uses: hynek/setup-cached-uv@v1 + - run: uv venv + - run: uv pip install ruff==0.5.4 + - run: source .venv/bin/activate; ruff check . + ruff-format: runs-on: ubuntu-latest steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Build and export - uses: docker/build-push-action@v5 - with: - file: resources/images/rpc/Dockerfile_prod - context: . - tags: warnet/dev - cache-from: type=gha - cache-to: type=gha,mode=max - outputs: type=docker,dest=/tmp/warnet.tar - - - name: Upload artifact - uses: actions/upload-artifact@v4 - with: - name: warnet - path: /tmp/warnet.tar + - uses: actions/checkout@v4 + - uses: hynek/setup-cached-uv@v1 + - run: uv venv + - run: uv pip install ruff==0.5.4 + - run: source .venv/bin/activate; ruff format . test: - needs: [build-image] + # DISABLE FOR REWRITE + if: false runs-on: ubuntu-latest strategy: matrix: @@ -102,10 +79,3 @@ jobs: run: | source .venv/bin/activate ./test/${{matrix.test}} - # build-test: - # needs: [build-image] - # runs-on: ubuntu-latest - # steps: - # - uses: actions/checkout@v4 - # - uses: ./.github/actions/compose - # - run: ./test/build_branch_test.py compose diff --git a/docs/warcli.md b/docs/warcli.md index 77dddae85..8257bdfc7 100644 --- a/docs/warcli.md +++ b/docs/warcli.md @@ -30,87 +30,6 @@ options: Check Warnet requirements are installed -## Bitcoin - -### `warcli bitcoin debug-log` -Fetch the Bitcoin Core debug log from \ in [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| node | Int | yes | | -| network | String | | "warnet" | - -### `warcli bitcoin grep-logs` -Grep combined logs via fluentd using regex \ - -options: -| name | type | required | default | -|---------------------|--------|------------|-----------| -| pattern | String | yes | | -| show_k8s_timestamps | Bool | | False | -| no_sort | Bool | | False | -| network | String | | "warnet" | - -### `warcli bitcoin messages` -Fetch messages sent between \ and \ in [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| node_a | Int | yes | | -| node_b | Int | yes | | -| network | String | | "warnet" | - -### `warcli bitcoin rpc` -Call bitcoin-cli \ [params] on \ in [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| node | Int | yes | | -| method | String | yes | | -| params | String | | | -| network | String | | "warnet" | - -## Cluster - -### `warcli cluster connect-logging` -Connect kubectl to cluster logging - - -### `warcli cluster deploy` -Deploy Warnet using the current kubectl-configured cluster - -options: -| name | type | required | default | -|--------|--------|------------|-----------| -| dev | Bool | | False | - -### `warcli cluster deploy-logging` -Deploy logging configurations to the cluster using helm - - -### `warcli cluster port-start` -Port forward (runs as a detached process) - - -### `warcli cluster port-stop` -Stop the port forwarding process - - -### `warcli cluster setup-minikube` -Configure a local minikube cluster - -options: -| name | type | required | default | -|--------|--------|------------|-----------| -| clean | Bool | | False | - -### `warcli cluster teardown` -Stop the warnet server and tear down the cluster - - ## Graph ### `warcli graph create` @@ -165,36 +84,15 @@ options: | arches | String | | | | action | String | | "load" | -## Ln - -### `warcli ln pubkey` -Get lightning node pub key on \ in [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| node | Int | yes | | -| network | String | | "warnet" | - -### `warcli ln rpc` -Call lightning cli rpc \ on \ in [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| node | Int | yes | | -| command | String | yes | | -| network | String | | "warnet" | - ## Network -### `warcli network connected` -Indicate whether the all of the edges in the gaph file are connected in [network] +### `warcli network connect` +Connect nodes based on the edges defined in the graph file. options: -| name | type | required | default | -|---------|--------|------------|-----------| -| network | String | | "warnet" | +| name | type | required | default | +|------------|--------|------------|----------------------------------| +| graph_file | Path | | resources/graphs/default.graphml | ### `warcli network down` Bring down a running warnet named [network] @@ -204,26 +102,14 @@ options: |---------|--------|------------|-----------| | network | String | | "warnet" | -### `warcli network export` -Export all [network] data for a "simln" service running in a container - on the network. Optionally add JSON string [activity] to simln config. - Optionally provide a list of tank indexes to [exclude]. - Returns True on success. - -options: -| name | type | required | default | -|----------|--------|------------|-----------| -| network | String | | "warnet" | -| activity | String | | | -| exclude | String | | "[]" | - -### `warcli network info` -Get info about a warnet named [network] +### `warcli network generate-yaml` +Generate a Kubernetes YAML file from a graph file for deploying warnet nodes. options: -| name | type | required | default | -|---------|--------|------------|-----------| -| network | String | | "warnet" | +| name | type | required | default | +|------------|--------|------------|----------------------------------| +| graph_file | Path | | resources/graphs/default.graphml | +| output | String | | "warnet-deployment.yaml" | ### `warcli network logs` Get Kubernetes logs from the RPC server @@ -240,24 +126,8 @@ options: | name | type | required | default | |------------|--------|------------|----------------------------------| | graph_file | Path | | resources/graphs/default.graphml | -| force | Bool | | False | | network | String | | "warnet" | - -### `warcli network status` -Get status of a warnet named [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| network | String | | "warnet" | - -### `warcli network up` -Bring up a previously-stopped warnet named [network] - -options: -| name | type | required | default | -|---------|--------|------------|-----------| -| network | String | | "warnet" | +| logging | Bool | | False | ## Scenarios diff --git a/pyproject.toml b/pyproject.toml index ef349b802..5485098a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,6 @@ classifiers = [ dynamic = ["dependencies"] [project.scripts] -warnet = "warnet.server:run_server" warcli = "warnet.cli.main:cli" [project.urls] diff --git a/resources/graphs/default.graphml b/resources/graphs/default.graphml index ce84579df..ec591b000 100644 --- a/resources/graphs/default.graphml +++ b/resources/graphs/default.graphml @@ -18,60 +18,60 @@ 27.0 - -uacomment=w0 + uacomment=w0 true true 27.0 - -uacomment=w1 + uacomment=w1 true true bitcoindevproject/bitcoin:26.0 - -uacomment=w2 -debug=mempool + uacomment=w2,debug=mempool true true 27.0 - -uacomment=w3 + uacomment=w3 true 27.0 - -uacomment=w4 + uacomment=w4 true 27.0 - -uacomment=w5 + uacomment=w5 true 27.0 - -uacomment=w6 + uacomment=w6 27.0 - -uacomment=w7 + uacomment=w7 27.0 - -uacomment=w8 + uacomment=w8 27.0 - -uacomment=w9 + uacomment=w9 27.0 - -uacomment=w10 + uacomment=w10 27.0 - -uacomment=w11 + uacomment=w11 diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 000000000..de7b43e8b Binary files /dev/null and b/src/.DS_Store differ diff --git a/src/warnet/backend/__init__.py b/src/warnet/backend/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/warnet/backend/kubernetes_backend.py b/src/warnet/backend/kubernetes_backend.py deleted file mode 100644 index f8821888d..000000000 --- a/src/warnet/backend/kubernetes_backend.py +++ /dev/null @@ -1,885 +0,0 @@ -import base64 -import logging -import re -import subprocess -import time -from pathlib import Path -from typing import cast - -import yaml -from kubernetes import client, config -from kubernetes.client.exceptions import ApiValueError -from kubernetes.client.models.v1_pod import V1Pod -from kubernetes.client.models.v1_service import V1Service -from kubernetes.client.rest import ApiException -from kubernetes.dynamic import DynamicClient -from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError -from kubernetes.stream import stream -from warnet.cli.image import build_image -from warnet.services import SERVICES, ServiceType -from warnet.status import RunningStatus -from warnet.tank import Tank -from warnet.utils import parse_raw_messages - -DOCKER_REGISTRY_CORE = "bitcoindevproject/bitcoin" -LOCAL_REGISTRY = "warnet/bitcoin-core" - -POD_PREFIX = "tank" -BITCOIN_CONTAINER_NAME = "bitcoin" -LN_CONTAINER_NAME = "ln" -LN_CB_CONTAINER_NAME = "ln-cb" -MAIN_NAMESPACE = "warnet" -PROMETHEUS_METRICS_PORT = 9332 -LND_MOUNT_PATH = "/root/.lnd" - - -logger = logging.getLogger("k8s") - - -class KubernetesBackend: - def __init__(self, config_dir: Path, network_name: str, logs_pod="fluentd") -> None: - # assumes the warnet rpc server is always - # running inside a k8s cluster as a statefulset - config.load_incluster_config() - self.client = client.CoreV1Api() - self.dynamic_client = DynamicClient(client.ApiClient()) - self.namespace = "warnet" - self.logs_pod = logs_pod - self.network_name = network_name - self.log = logger - - def build(self) -> bool: - # TODO: just return true for now, this is so we can be running either docker or k8s as a backend - # on the same branch - return True - - def up(self, warnet) -> bool: - self.deploy_pods(warnet) - return True - - def down(self, warnet) -> bool: - """ - Bring an existing network down. - e.g. `k delete -f warnet-tanks.yaml` - """ - - for tank in warnet.tanks: - self.client.delete_namespaced_pod( - self.get_pod_name(tank.index, ServiceType.BITCOIN), self.namespace - ) - self.client.delete_namespaced_service( - self.get_service_name(tank.index, ServiceType.BITCOIN), self.namespace - ) - if tank.lnnode: - self.client.delete_namespaced_pod( - self.get_pod_name(tank.index, ServiceType.LIGHTNING), self.namespace - ) - self.client.delete_namespaced_service( - self.get_service_name(tank.index, ServiceType.LIGHTNING), self.namespace - ) - - self.remove_prometheus_service_monitors(warnet.tanks) - - for service_name in warnet.services: - try: - self.client.delete_namespaced_pod( - self.get_service_pod_name(SERVICES[service_name]["container_name_suffix"]), - self.namespace, - ) - self.client.delete_namespaced_service( - self.get_service_service_name(SERVICES[service_name]["container_name_suffix"]), - self.namespace, - ) - except Exception as e: - self.log.error(f"Could not delete service: {service_name}:\n{e}") - - return True - - def get_file(self, tank_index: int, service: ServiceType, file_path: str): - """ - Read a file from inside a container - """ - pod_name = self.get_pod_name(tank_index, service) - exec_command = ["sh", "-c", f'cat "{file_path}" | base64'] - - resp = stream( - self.client.connect_get_namespaced_pod_exec, - pod_name, - self.namespace, - command=exec_command, - stderr=True, - stdin=True, - stdout=True, - tty=False, - _preload_content=False, - container=BITCOIN_CONTAINER_NAME - if service == ServiceType.BITCOIN - else LN_CONTAINER_NAME, - ) - - base64_encoded_data = "" - while resp.is_open(): - resp.update(timeout=1) - if resp.peek_stdout(): - base64_encoded_data += resp.read_stdout() - if resp.peek_stderr(): - stderr_output = resp.read_stderr() - logger.error(f"STDERR: {stderr_output}") - raise Exception(f"Problem copying file from pod: {stderr_output}") - - decoded_bytes = base64.b64decode(base64_encoded_data) - return decoded_bytes - - def get_service_pod_name(self, suffix: str) -> str: - return f"{self.network_name}-{suffix}" - - def get_service_service_name(self, suffix: str) -> str: - return f"{self.network_name}-{suffix}-service" - - def get_pod_name(self, tank_index: int, type: ServiceType) -> str: - if type == ServiceType.LIGHTNING or type == ServiceType.CIRCUITBREAKER: - return f"{self.network_name}-{POD_PREFIX}-ln-{tank_index:06d}" - return f"{self.network_name}-{POD_PREFIX}-{tank_index:06d}" - - def get_service_name(self, tank_index: int, type: ServiceType) -> str: - return f"{self.get_pod_name(tank_index, type)}-service" - - def get_pod(self, pod_name: str) -> V1Pod | None: - try: - return cast( - V1Pod, self.client.read_namespaced_pod(name=pod_name, namespace=self.namespace) - ) - except ApiException as e: - if e.status == 404: - return None - - def get_service(self, service_name: str) -> V1Service | None: - try: - return cast( - V1Service, - self.client.read_namespaced_service(name=service_name, namespace=self.namespace), - ) - except ApiException as e: - if e.status == 404: - return None - - # We could enhance this by checking the pod status as well - # The following pod phases are available: Pending, Running, Succeeded, Failed, Unknown - # For example not able to pull image will be a phase of Pending, but the container status will be ErrImagePull - def get_status(self, tank_index: int, service: ServiceType) -> RunningStatus: - pod_name = self.get_pod_name(tank_index, service) - pod = self.get_pod(pod_name) - # Possible states: - # 1. pod not found? - # -> STOPPED - # 2. pod phase Succeeded? - # -> STOPPED - # 3. pod phase Failed? - # -> FAILED - # 4. pod phase Unknown? - # -> UNKNOWN - # Pod phase is now "Running" or "Pending" - # -> otherwise we need a bug fix, return UNKNOWN - # - # The pod is ready if all containers are ready. - # 5. Pod not ready? - # -> PENDING - # 6. Pod ready? - # -> RUNNING - # - # Note: we don't know anything about deleted pods so we can't return a status for them. - # TODO: we could use a kubernetes job to keep the result 🤔 - - if pod is None: - return RunningStatus.STOPPED - - assert pod.status, "Could not get pod status" - assert pod.status.phase, "Could not get pod status.phase" - if pod.status.phase == "Succeeded": - return RunningStatus.STOPPED - if pod.status.phase == "Failed": - return RunningStatus.FAILED - if pod.status.phase == "Unknown": - return RunningStatus.UNKNOWN - if pod.status.phase == "Pending": - return RunningStatus.PENDING - - assert pod.status.phase in ("Running", "Pending"), f"Unknown pod phase {pod.status.phase}" - - # a pod is ready if all containers are ready - ready = True - for container in pod.status.container_statuses: - if container.ready is not True: - ready = False - break - return RunningStatus.RUNNING if ready else RunningStatus.PENDING - - def exec_run(self, tank_index: int, service: ServiceType, cmd: str): - pod_name = self.get_pod_name(tank_index, service) - exec_cmd = ["/bin/sh", "-c", f"{cmd}"] - self.log.debug(f"Running {exec_cmd=:} on {tank_index=:}") - if service == ServiceType.BITCOIN: - container = BITCOIN_CONTAINER_NAME - if service == ServiceType.LIGHTNING: - container = LN_CONTAINER_NAME - if service == ServiceType.CIRCUITBREAKER: - container = LN_CB_CONTAINER_NAME - result = stream( - self.client.connect_get_namespaced_pod_exec, - pod_name, - self.namespace, - container=container, - command=exec_cmd, - stderr=True, - stdin=False, - stdout=True, - tty=False, - # Avoid preloading the content to keep JSON intact - _preload_content=False, - ) - # TODO: stream result is just a string, so there is no error code to check - # ideally, we use a method where we can check for an error code, otherwise we will - # need to check for errors in the string (meh) - # - # if result.exit_code != 0: - # raise Exception( - # f"Command failed with exit code {result.exit_code}: {result.output.decode('utf-8')}" - # ) - result.run_forever() - result = result.read_all() - return result - - def get_bitcoin_debug_log(self, tank_index: int): - pod_name = self.get_pod_name(tank_index, ServiceType.BITCOIN) - logs = self.client.read_namespaced_pod_log( - name=pod_name, - namespace=self.namespace, - container=BITCOIN_CONTAINER_NAME, - ) - return logs - - def ln_cli(self, tank: Tank, command: list[str]): - if tank.lnnode is None: - raise Exception("No LN node configured for tank") - cmd = tank.lnnode.generate_cli_command(command) - self.log.debug(f"Running lncli {cmd=:} on {tank.index=:}") - return self.exec_run(tank.index, ServiceType.LIGHTNING, cmd) - - def ln_pub_key(self, tank) -> str: - if tank.lnnode is None: - raise Exception("No LN node configured for tank") - self.log.debug(f"Getting pub key for tank {tank.index}") - return tank.lnnode.get_pub_key() - - def get_bitcoin_cli(self, tank: Tank, method: str, params=None): - if params: - cmd = f"bitcoin-cli -regtest -rpcuser={tank.rpc_user} -rpcport={tank.rpc_port} -rpcpassword={tank.rpc_password} {method} {' '.join(map(str, params))}" - else: - cmd = f"bitcoin-cli -regtest -rpcuser={tank.rpc_user} -rpcport={tank.rpc_port} -rpcpassword={tank.rpc_password} {method}" - self.log.debug(f"Running bitcoin-cli {cmd=:} on {tank.index=:}") - return self.exec_run(tank.index, ServiceType.BITCOIN, cmd) - - def get_messages( - self, - a_index: int, - b_index: int, - bitcoin_network: str = "regtest", - ): - b_pod = self.get_pod(self.get_pod_name(b_index, ServiceType.BITCOIN)) - b_service = self.get_service(self.get_service_name(b_index, ServiceType.BITCOIN)) - subdir = "/" if bitcoin_network == "main" else f"{bitcoin_network}/" - base_dir = f"/root/.bitcoin/{subdir}message_capture" - cmd = f"ls {base_dir}" - self.log.debug(f"Running {cmd=:} on {a_index=:}") - dirs = self.exec_run( - a_index, - ServiceType.BITCOIN, - cmd, - ) - dirs = dirs.splitlines() - self.log.debug(f"Got dirs: {dirs}") - messages = [] - - for dir_name in dirs: - if b_pod.status.pod_ip in dir_name or b_service.spec.cluster_ip in dir_name: - for file, outbound in [["msgs_recv.dat", False], ["msgs_sent.dat", True]]: - # Fetch the file contents from the container - file_path = f"{base_dir}/{dir_name}/{file}" - blob = self.get_file(a_index, ServiceType.BITCOIN, f"{file_path}") - # Parse the blob - json = parse_raw_messages(blob, outbound) - messages = messages + json - - messages.sort(key=lambda x: x["time"]) - return messages - - def logs_grep(self, pattern: str, network: str, k8s_timestamps=False, no_sort=False): - compiled_pattern = re.compile(pattern) - matching_logs = [] - pods = self.client.list_namespaced_pod(self.namespace) - relevant_pods = [pod for pod in pods.items if "warnet" in pod.metadata.name] - - for pod in relevant_pods: - try: - log_stream = self.client.read_namespaced_pod_log( - name=pod.metadata.name, - container=BITCOIN_CONTAINER_NAME, - namespace=self.namespace, - timestamps=k8s_timestamps, - _preload_content=False, - ) - for log_entry in log_stream: - log_entry_str = log_entry.decode("utf-8").strip() - if compiled_pattern.search(log_entry_str): - matching_logs.append((log_entry_str, pod.metadata.name)) - except ApiException as e: - print(f"Error fetching logs for pod {pod.metadata.name}: {e}") - - sorted_logs = matching_logs if no_sort else sorted(matching_logs, key=lambda x: x[0]) - # Prepend pod names - formatted_logs = [f"{pod_name}: {log}" for log, pod_name in sorted_logs] - - return "\n".join(formatted_logs) - - def generate_deployment_file(self, warnet): - """ - TODO: implement this - """ - pass - - def create_bitcoind_container(self, tank: Tank) -> client.V1Container: - self.log.debug(f"Creating bitcoind container for tank {tank.index}") - container_name = BITCOIN_CONTAINER_NAME - container_image = None - - # Prebuilt image - if tank.image: - container_image = tank.image - # On-demand built image - elif "/" and "#" in tank.version: - # We don't have docker installed on the RPC server, where this code will be run from, - # and it's currently unclear to me if having the RPC pod build images is a good idea. - # Don't support this for now in CI by disabling in the workflow. - - # This can be re-enabled by enabling in the workflow file and installing docker and - # docker-buildx on the rpc server image. - - # it's a git branch, building step is necessary - repo, branch = tank.version.split("#") - build_image( - repo, - branch, - LOCAL_REGISTRY, - branch, - tank.DEFAULT_BUILD_ARGS + tank.build_args, - arches="amd64", - ) - # Prebuilt major version - else: - container_image = f"{DOCKER_REGISTRY_CORE}:{tank.version}" - - peers = [ - self.get_service_name(dst_index, ServiceType.BITCOIN) for dst_index in tank.init_peers - ] - bitcoind_options = tank.get_bitcoin_conf(peers) - container_env = [client.V1EnvVar(name="BITCOIN_ARGS", value=bitcoind_options)] - - bitcoind_container = client.V1Container( - name=container_name, - image=container_image, - env=container_env, - liveness_probe=client.V1Probe( - failure_threshold=3, - initial_delay_seconds=5, - period_seconds=5, - timeout_seconds=1, - _exec=client.V1ExecAction(command=["pidof", "bitcoind"]), - ), - readiness_probe=client.V1Probe( - failure_threshold=1, - initial_delay_seconds=0, - period_seconds=1, - timeout_seconds=1, - tcp_socket=client.V1TCPSocketAction(port=tank.rpc_port), - ), - security_context=client.V1SecurityContext( - privileged=True, - capabilities=client.V1Capabilities(add=["NET_ADMIN", "NET_RAW"]), - ), - ) - self.log.debug( - f"Created bitcoind container for tank {tank.index} using {bitcoind_options=:}" - ) - return bitcoind_container - - def create_prometheus_container(self, tank) -> client.V1Container: - env = [ - client.V1EnvVar(name="BITCOIN_RPC_HOST", value="127.0.0.1"), - client.V1EnvVar(name="BITCOIN_RPC_PORT", value=str(tank.rpc_port)), - client.V1EnvVar(name="BITCOIN_RPC_USER", value=tank.rpc_user), - client.V1EnvVar(name="BITCOIN_RPC_PASSWORD", value=tank.rpc_password), - ] - if tank.metrics is not None: - env.append( - client.V1EnvVar(name="METRICS", value=tank.metrics), - ) - return client.V1Container( - name="prometheus", image="bitcoindevproject/bitcoin-exporter:latest", env=env - ) - - def check_logging_crds_installed(self): - logging_crd_name = "servicemonitors.monitoring.coreos.com" - api = client.ApiextensionsV1Api() - crds = api.list_custom_resource_definition() - return bool(any(crd.metadata.name == logging_crd_name for crd in crds.items)) - - def apply_prometheus_service_monitors(self, tanks): - for tank in tanks: - if not tank.exporter: - continue - - tank_name = self.get_pod_name(tank.index, ServiceType.BITCOIN) - - service_monitor = { - "apiVersion": "monitoring.coreos.com/v1", - "kind": "ServiceMonitor", - "metadata": { - "name": tank_name, - "namespace": MAIN_NAMESPACE, - "labels": { - "app.kubernetes.io/name": "bitcoind-metrics", - "release": "prometheus", - }, - }, - "spec": { - "endpoints": [{"port": "prometheus-metrics"}], - "selector": {"matchLabels": {"app": tank_name}}, - }, - } - # Create the custom resource using the dynamic client - sc_crd = self.dynamic_client.resources.get( - api_version="monitoring.coreos.com/v1", kind="ServiceMonitor" - ) - sc_crd.create(body=service_monitor, namespace=MAIN_NAMESPACE) - - # attempts to delete the service monitors whether they exist or not - def remove_prometheus_service_monitors(self, tanks): - for tank in tanks: - try: - self.dynamic_client.resources.get( - api_version="monitoring.coreos.com/v1", kind="ServiceMonitor" - ).delete( - name=f"warnet-tank-{tank.index:06d}", - namespace=MAIN_NAMESPACE, - ) - except (ResourceNotFoundError, NotFoundError): - continue - - def get_lnnode_hostname(self, index: int) -> str: - return f"{self.get_service_name(index, ServiceType.LIGHTNING)}.{self.namespace}" - - def create_ln_container(self, tank, bitcoind_service_name, volume_mounts) -> client.V1Container: - # These args are appended to the Dockerfile `ENTRYPOINT ["lnd"]` - bitcoind_rpc_host = f"{bitcoind_service_name}.{self.namespace}" - lightning_dns = self.get_lnnode_hostname(tank.index) - args = tank.lnnode.get_conf(lightning_dns, bitcoind_rpc_host) - self.log.debug(f"Creating lightning container for tank {tank.index} using {args=:}") - lightning_ready_probe = "" - if tank.lnnode.impl == "lnd": - lightning_ready_probe = "lncli --network=regtest getinfo" - elif tank.lnnode.impl == "cln": - lightning_ready_probe = "lightning-cli --network=regtest getinfo" - else: - raise Exception( - f"Lightning node implementation {tank.lnnode.impl} for tank {tank.index} not supported" - ) - lightning_container = client.V1Container( - name=LN_CONTAINER_NAME, - image=tank.lnnode.image, - args=args.split(" "), - env=[ - client.V1EnvVar(name="LN_IMPL", value=tank.lnnode.impl), - ], - readiness_probe=client.V1Probe( - failure_threshold=1, - success_threshold=3, - initial_delay_seconds=10, - period_seconds=2, - timeout_seconds=2, - _exec=client.V1ExecAction(command=["/bin/sh", "-c", lightning_ready_probe]), - ), - security_context=client.V1SecurityContext( - privileged=True, - capabilities=client.V1Capabilities(add=["NET_ADMIN", "NET_RAW"]), - ), - volume_mounts=volume_mounts, - ) - self.log.debug(f"Created lightning container for tank {tank.index}") - return lightning_container - - def create_circuitbreaker_container(self, tank, volume_mounts) -> client.V1Container: - self.log.debug(f"Creating circuitbreaker container for tank {tank.index}") - cb_container = client.V1Container( - name=LN_CB_CONTAINER_NAME, - image=tank.lnnode.cb, - args=[ - "--network=regtest", - f"--rpcserver=127.0.0.1:{tank.lnnode.rpc_port}", - f"--tlscertpath={LND_MOUNT_PATH}/tls.cert", - f"--macaroonpath={LND_MOUNT_PATH}/data/chain/bitcoin/regtest/admin.macaroon", - ], - security_context=client.V1SecurityContext( - privileged=True, - capabilities=client.V1Capabilities(add=["NET_ADMIN", "NET_RAW"]), - ), - volume_mounts=volume_mounts, - ) - self.log.debug(f"Created circuitbreaker container for tank {tank.index}") - return cb_container - - def create_pod_object( - self, - tank: Tank, - containers: list[client.V1Container], - volumes: list[client.V1Volume], - name: str, - ) -> client.V1Pod: - # Create and return a Pod object - # TODO: pass a custom namespace , e.g. different warnet sims can be deployed into diff namespaces - - return client.V1Pod( - api_version="v1", - kind="Pod", - metadata=client.V1ObjectMeta( - name=name, - namespace=self.namespace, - labels={ - "app": name, - "network": tank.warnet.network_name, - }, - ), - spec=client.V1PodSpec( - # Might need some more thinking on the pod restart policy, setting to Never for now - # This means if a node has a problem it dies - restart_policy="OnFailure", - containers=containers, - volumes=volumes, - ), - ) - - def get_tank_ipv4(self, index: int) -> str | None: - pod_name = self.get_pod_name(index, ServiceType.BITCOIN) - pod = self.get_pod(pod_name) - if pod: - return pod.status.pod_ip - else: - return None - - def get_tank_dns_addr(self, index: int) -> str | None: - service_name = self.get_service_name(index, ServiceType.BITCOIN) - try: - self.client.read_namespaced_service(name=service_name, namespace=self.namespace) - except ApiValueError as e: - self.log.info(ApiValueError(f"dns addr request for {service_name} raised {str(e)}")) - return None - return service_name - - def get_tank_ip_addr(self, index: int) -> str | None: - service_name = self.get_service_name(index, ServiceType.BITCOIN) - try: - endpoints = self.client.read_namespaced_endpoints( - name=service_name, namespace=self.namespace - ) - except ApiValueError as e: - self.log.info(f"ip addr request for {service_name} raised {str(e)}") - return None - - if len(endpoints.subsets) == 0: - raise Exception(f"{service_name}'s endpoint does not have an initial subset") - initial_subset = endpoints.subsets[0] - - if len(initial_subset.addresses) == 0: - raise Exception(f"{service_name}'s initial subset does not have an initial address") - initial_address = initial_subset.addresses[0] - - return str(initial_address.ip) - - def create_bitcoind_service(self, tank) -> client.V1Service: - service_name = self.get_service_name(tank.index, ServiceType.BITCOIN) - self.log.debug(f"Creating bitcoind service {service_name} for tank {tank.index}") - service = client.V1Service( - api_version="v1", - kind="Service", - metadata=client.V1ObjectMeta( - name=service_name, - labels={ - "app": self.get_pod_name(tank.index, ServiceType.BITCOIN), - "network": tank.warnet.network_name, - }, - ), - spec=client.V1ServiceSpec( - selector={"app": self.get_pod_name(tank.index, ServiceType.BITCOIN)}, - publish_not_ready_addresses=True, - ports=[ - client.V1ServicePort(port=18444, target_port=18444, name="p2p"), - client.V1ServicePort(port=tank.rpc_port, target_port=tank.rpc_port, name="rpc"), - client.V1ServicePort( - port=tank.zmqblockport, target_port=tank.zmqblockport, name="zmqblock" - ), - client.V1ServicePort( - port=tank.zmqtxport, target_port=tank.zmqtxport, name="zmqtx" - ), - client.V1ServicePort( - port=PROMETHEUS_METRICS_PORT, - target_port=PROMETHEUS_METRICS_PORT, - name="prometheus-metrics", - ), - ], - ), - ) - self.log.debug(f"Created bitcoind service {service_name} for tank {tank.index}") - return service - - def create_lightning_service(self, tank) -> client.V1Service: - service_name = self.get_service_name(tank.index, ServiceType.LIGHTNING) - self.log.debug(f"Creating lightning service {service_name} for tank {tank.index}") - service = client.V1Service( - api_version="v1", - kind="Service", - metadata=client.V1ObjectMeta( - name=service_name, - labels={ - "app": self.get_pod_name(tank.index, ServiceType.LIGHTNING), - "network": tank.warnet.network_name, - }, - ), - spec=client.V1ServiceSpec( - selector={"app": self.get_pod_name(tank.index, ServiceType.LIGHTNING)}, - cluster_ip="None", - ports=[ - client.V1ServicePort( - port=tank.lnnode.rpc_port, target_port=tank.lnnode.rpc_port, name="rpc" - ), - ], - publish_not_ready_addresses=True, - ), - ) - self.log.debug(f"Created lightning service {service_name} for tank {tank.index}") - return service - - def deploy_pods(self, warnet): - # TODO: this is pretty hack right now, ideally it should mirror - # a similar workflow to the docker backend: - # 1. read graph file, turn graph file into k8s resources, deploy the resources - tank_resource_files = [] - self.log.debug("Deploying pods") - for tank in warnet.tanks: - # Create and deploy bitcoind pod and service - bitcoind_container = self.create_bitcoind_container(tank) - bitcoind_pod = self.create_pod_object( - tank, [bitcoind_container], [], self.get_pod_name(tank.index, ServiceType.BITCOIN) - ) - - if tank.exporter and self.check_logging_crds_installed(): - prometheus_container = self.create_prometheus_container(tank) - bitcoind_pod.spec.containers.append(prometheus_container) - - bitcoind_service = self.create_bitcoind_service(tank) - self.client.create_namespaced_pod(namespace=self.namespace, body=bitcoind_pod) - # delete the service if it already exists, ignore 404 - try: - self.client.delete_namespaced_service( - name=bitcoind_service.metadata.name, namespace=self.namespace - ) - except ApiException as e: - if e.status != 404: - raise e - self.client.create_namespaced_service(namespace=self.namespace, body=bitcoind_service) - - # Create and deploy a lightning pod - if tank.lnnode: - conts = [] - vols = [] - volume_mounts = [] - if tank.lnnode.cb: - # Create a shared volume between containers in the pod - volume_name = f"ln-cb-data-{tank.index}" - vols.append( - client.V1Volume(name=volume_name, empty_dir=client.V1EmptyDirVolumeSource()) - ) - volume_mounts.append( - client.V1VolumeMount( - name=volume_name, - mount_path=LND_MOUNT_PATH, - ) - ) - # Add circuit breaker container - conts.append(self.create_circuitbreaker_container(tank, volume_mounts)) - # Add lightning container - conts.append( - self.create_ln_container(tank, bitcoind_service.metadata.name, volume_mounts) - ) - # Put it all together in a pod - lnd_pod = self.create_pod_object( - tank, conts, vols, self.get_pod_name(tank.index, ServiceType.LIGHTNING) - ) - self.client.create_namespaced_pod(namespace=self.namespace, body=lnd_pod) - # Create service for the pod - lightning_service = self.create_lightning_service(tank) - try: - self.client.delete_namespaced_service( - name=lightning_service.metadata.name, namespace=self.namespace - ) - except ApiException as e: - if e.status != 404: - raise e - self.client.create_namespaced_service( - namespace=self.namespace, body=lightning_service - ) - - # add metrics scraping for tanks configured to export metrics - if self.check_logging_crds_installed(): - self.apply_prometheus_service_monitors(warnet.tanks) - - for service_name in warnet.services: - try: - self.service_from_json(SERVICES[service_name]) - except Exception as e: - self.log.error(f"Error starting service: {service_name}\n{e}") - - self.log.debug("Containers and services created. Configuring IP addresses") - # now that the pods have had a second to create, - # get the ips and set them on the tanks - - # TODO: this is really hacky, should probably just update the generate_ipv4 function at some point - # by moving it into the base class - for tank in warnet.tanks: - pod_ip = None - while not pod_ip: - pod_name = self.get_pod_name(tank.index, ServiceType.BITCOIN) - pod = self.get_pod(pod_name) - if pod is None or pod.status is None or getattr(pod.status, "pod_ip", None) is None: - self.log.info("Waiting for pod response or pod IP...") - time.sleep(3) - continue - pod_ip = pod.status.pod_ip - - tank._ipv4 = pod_ip - self.log.debug(f"Tank {tank.index} created") - - with open(warnet.config_dir / "warnet-tanks.yaml", "w") as f: - for pod in tank_resource_files: - yaml.dump(pod.to_dict(), f) - f.write("---\n") # separator for multiple resources - self.log.info("Pod definitions saved to warnet-tanks.yaml") - - def wait_for_healthy_tanks(self, warnet, timeout=30): - """ - Wait for healthy status on all bitcoind nodes - """ - pass - - def service_from_json(self, obj): - env = [] - for pair in obj.get("environment", []): - name, value = pair.split("=") - env.append(client.V1EnvVar(name=name, value=value)) - volume_mounts = [] - volumes = [] - for vol in obj.get("config_files", []): - volume_name, mount_path = vol.split(":") - volume_name = volume_name.replace("/", "") - volume_mounts.append(client.V1VolumeMount(name=volume_name, mount_path=mount_path)) - volumes.append( - client.V1Volume(name=volume_name, empty_dir=client.V1EmptyDirVolumeSource()) - ) - - service_container = client.V1Container( - name=self.get_service_pod_name(obj["container_name_suffix"]), - image=obj["image"], - env=env, - security_context=client.V1SecurityContext( - privileged=True, - capabilities=client.V1Capabilities(add=["NET_ADMIN", "NET_RAW"]), - ), - volume_mounts=volume_mounts, - ) - sidecar_container = client.V1Container( - name="sidecar", - image="pinheadmz/sidecar:latest", - volume_mounts=volume_mounts, - ports=[client.V1ContainerPort(container_port=22)], - ) - service_pod = client.V1Pod( - api_version="v1", - kind="Pod", - metadata=client.V1ObjectMeta( - name=self.get_service_pod_name(obj["container_name_suffix"]), - namespace=self.namespace, - labels={ - "app": self.get_service_pod_name(obj["container_name_suffix"]), - "network": self.network_name, - }, - ), - spec=client.V1PodSpec( - restart_policy="OnFailure", - containers=[service_container, sidecar_container], - volumes=volumes, - ), - ) - - # Do not ever change this variable name. xoxo, --Zip - service_service = client.V1Service( - api_version="v1", - kind="Service", - metadata=client.V1ObjectMeta( - name=self.get_service_service_name(obj["container_name_suffix"]), - labels={ - "app": self.get_service_pod_name(obj["container_name_suffix"]), - "network": self.network_name, - }, - ), - spec=client.V1ServiceSpec( - selector={"app": self.get_service_pod_name(obj["container_name_suffix"])}, - publish_not_ready_addresses=True, - ports=[ - client.V1ServicePort(name="ssh", port=22, target_port=22), - ], - ), - ) - - self.client.create_namespaced_pod(namespace=self.namespace, body=service_pod) - self.client.create_namespaced_service(namespace=self.namespace, body=service_service) - - def write_service_config(self, source_path: str, service_name: str, destination_path: str): - obj = SERVICES[service_name] - container_name = "sidecar" - # Copy the archive from our local drive (Warnet RPC container/pod) - # to the destination service's sidecar container via ssh - self.log.info( - f"Copying local {source_path} to remote {destination_path} for {service_name}" - ) - subprocess.run( - [ - "scp", - "-o", - "StrictHostKeyChecking=accept-new", - source_path, - f"root@{self.get_service_service_name(obj['container_name_suffix'])}.{self.namespace}:/arbitrary_filename.tar", - ] - ) - self.log.info(f"Finished copying tarball for {service_name}, unpacking...") - # Unpack the archive - stream( - self.client.connect_get_namespaced_pod_exec, - self.get_service_pod_name(obj["container_name_suffix"]), - self.namespace, - container=container_name, - command=["/bin/sh", "-c", f"tar -xf /arbitrary_filename.tar -C {destination_path}"], - stderr=True, - stdin=False, - stdout=True, - tty=False, - _preload_content=False, - ) - self.log.info(f"Finished unpacking config data for {service_name} to {destination_path}") diff --git a/src/warnet/cli/graph.py b/src/warnet/cli/graph.py index 45128d603..1e75862e5 100644 --- a/src/warnet/cli/graph.py +++ b/src/warnet/cli/graph.py @@ -5,7 +5,8 @@ import click import networkx as nx from rich import print -from warnet.utils import DEFAULT_TAG, create_cycle_graph, validate_graph_schema + +from .util import DEFAULT_TAG, create_cycle_graph, validate_graph_schema @click.group(name="graph") diff --git a/src/warnet/cli/main.py b/src/warnet/cli/main.py index 3aa7e8dce..a1dda555c 100644 --- a/src/warnet/cli/main.py +++ b/src/warnet/cli/main.py @@ -5,11 +5,11 @@ import click from rich import print as richprint -from .bitcoin import bitcoin -from .cluster import cluster +# from .bitcoin import bitcoin from .graph import graph from .image import image -from .ln import ln + +# from .ln import ln from .network import network from .scenarios import scenarios @@ -21,11 +21,10 @@ def cli(): pass -cli.add_command(bitcoin) -cli.add_command(cluster) +# cli.add_command(bitcoin) cli.add_command(graph) cli.add_command(image) -cli.add_command(ln) +# cli.add_command(ln) cli.add_command(network) cli.add_command(scenarios) diff --git a/src/warnet/cli/network.py b/src/warnet/cli/network.py index b81cbaec6..ff104e620 100644 --- a/src/warnet/cli/network.py +++ b/src/warnet/cli/network.py @@ -1,44 +1,17 @@ -import base64 # noqa: I001 -import json -from pathlib import Path +import tempfile +import xml.etree.ElementTree as ET from importlib.resources import files +from pathlib import Path import click +import networkx as nx +import yaml from rich import print -from rich.console import Console -from rich.table import Table -from .rpc import rpc_call # noqa: I001 from .util import run_command - DEFAULT_GRAPH_FILE = files("graphs").joinpath("default.graphml") - - -def print_repr(wn: dict) -> None: - if not isinstance(wn, dict): - print("Error, cannot print_repr of non-dict") - return - console = Console() - - # Warnet table - warnet_table = Table(show_header=True, header_style="bold") - for header in wn["warnet_headers"]: - warnet_table.add_column(header) - for row in wn["warnet"]: - warnet_table.add_row(*[str(cell) for cell in row]) - - # Tank table - tank_table = Table(show_header=True, header_style="bold") - for header in wn["tank_headers"]: - tank_table.add_column(header) - for row in wn["tanks"]: - tank_table.add_row(*[str(cell) for cell in row]) - - console.print("Warnet:") - console.print(warnet_table) - console.print("\nTanks:") - console.print(tank_table) +WAR_MANIFESTS = files("manifests") @click.group(name="network") @@ -46,37 +19,40 @@ def network(): """Network commands""" +# High-level network operations @network.command() @click.argument("graph_file", default=DEFAULT_GRAPH_FILE, type=click.Path()) -@click.option("--force", default=False, is_flag=True, type=bool) @click.option("--network", default="warnet", show_default=True) -def start(graph_file: Path, force: bool, network: str): +@click.option("--logging/--no-logging", default=False) +def start(graph_file: Path, logging: bool, network: str): """ Start a warnet with topology loaded from a into [network] """ - try: - encoded_graph_file = "" - with open(graph_file, "rb") as graph_file_buffer: - encoded_graph_file = base64.b64encode(graph_file_buffer.read()).decode("utf-8") - except Exception as e: - print(f"Error encoding graph file: {e}") - return + graph = read_graph_file(graph_file) + kubernetes_yaml = generate_kubernetes_yaml(graph) - result = rpc_call( - "network_from_file", - {"graph_file": encoded_graph_file, "force": force, "network": network}, - ) - assert isinstance(result, dict) - print_repr(result) + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as temp_file: + yaml.dump_all(kubernetes_yaml, temp_file) + temp_file_path = temp_file.name - -@network.command() -@click.option("--network", default="warnet", show_default=True) -def up(network: str): - """ - Bring up a previously-stopped warnet named [network] - """ - print(rpc_call("network_up", {"network": network})) + try: + if deploy_base_configurations() and apply_kubernetes_yaml(temp_file_path): + print(f"Warnet '{network}' started successfully.") + if not set_kubectl_context(network): + print( + "Warning: Failed to set kubectl context. You may need to manually switch to the warnet namespace." + ) + if not logging: + print("Skipping install of logging charts") + else: + if setup_logging_helm(): + print("Helm charts installed successfully.") + else: + print("Failed to install Helm charts.") + else: + print(f"Failed to start warnet '{network}'.") + finally: + Path(temp_file_path).unlink() @network.command() @@ -85,90 +61,226 @@ def down(network: str): """ Bring down a running warnet named [network] """ + if delete_namespace(network) and delete_namespace("warnet-logging"): + print(f"Warnet '{network}' has been successfully brought down and the namespaces deleted.") + else: + print(f"Failed to bring down warnet '{network}' or delete the namespaces.") - running_scenarios = rpc_call("scenarios_list_running", {}) - assert isinstance(running_scenarios, list) - if running_scenarios: - for scenario in running_scenarios: - pid = scenario.get("pid") - if pid: - try: - params = {"pid": pid} - rpc_call("scenarios_stop", params) - except Exception as e: - print( - f"Exception when stopping scenario: {scenario} with PID {scenario.pid}: {e}" - ) - print("Continuing with shutdown...") - continue - print(rpc_call("network_down", {"network": network})) + +@network.command() +@click.option("--follow", "-f", is_flag=True, help="Follow logs") +def logs(follow: bool): + """Get Kubernetes logs from the RPC server""" + command = "kubectl logs rpc-0" + if follow: + command += " --follow" + run_command(command, stream_output=follow) @network.command() -@click.option("--network", default="warnet", show_default=True) -def info(network: str): +@click.argument("graph_file", default=DEFAULT_GRAPH_FILE, type=click.Path()) +@click.option("--output", "-o", default="warnet-deployment.yaml", help="Output YAML file") +def generate_yaml(graph_file: Path, output: str): """ - Get info about a warnet named [network] + Generate a Kubernetes YAML file from a graph file for deploying warnet nodes. """ - result = rpc_call("network_info", {"network": network}) - assert isinstance(result, dict), "Result is not a dict" # Make mypy happy - print_repr(result) + graph = read_graph_file(graph_file) + kubernetes_yaml = generate_kubernetes_yaml(graph) + + with open(output, "w") as f: + yaml.dump_all(kubernetes_yaml, f) + + print(f"Kubernetes YAML file generated: {output}") @network.command() -@click.option("--network", default="warnet", show_default=True) -def status(network: str): +@click.argument("graph_file", default=DEFAULT_GRAPH_FILE, type=click.Path(exists=True)) +def connect(graph_file: Path): """ - Get status of a warnet named [network] + Connect nodes based on the edges defined in the graph file. """ - result = rpc_call("network_status", {"network": network}) - assert isinstance(result, list), "Result is not a list" # Make mypy happy - for tank in result: - lightning_status = "" - circuitbreaker_status = "" - if "lightning_status" in tank: - lightning_status = f"\tLightning: {tank['lightning_status']}" - if "circuitbreaker_status" in tank: - circuitbreaker_status = f"\tCircuit Breaker: {tank['circuitbreaker_status']}" - print( - f"Tank: {tank['tank_index']} \tBitcoin: {tank['bitcoin_status']}{lightning_status}{circuitbreaker_status}" + tree = ET.parse(graph_file) + root = tree.getroot() + edges = root.findall(".//{http://graphml.graphdrawing.org/xmlns}edge") + + for edge in edges: + source = edge.get("source") + target = edge.get("target") + command = f"kubectl exec -it warnet-node-{source} -- bitcoin-cli -rpcuser=user -rpcpassword=password addnode warnet-node-{target}-service:8333 add" + + print(f"Connecting node {source} to node {target}") + if run_command(command, stream_output=True): + print(f"Successfully connected node {source} to node {target}") + else: + print(f"Failed to connect node {source} to node {target}") + + print("All connections attempted.") + + +# Kubernetes object generation +def generate_kubernetes_yaml(graph: nx.Graph) -> list: + kubernetes_objects = [create_namespace()] + + for node, data in graph.nodes(data=True): + config = generate_node_config(node, data) + kubernetes_objects.extend( + [ + create_config_map(node, config), + create_node_deployment(node, data), + create_node_service(node), + ] ) + return kubernetes_objects -@network.command() -@click.option("--network", default="warnet", show_default=True) -def connected(network: str): - """ - Indicate whether the all of the edges in the gaph file are connected in [network] - """ - print(rpc_call("network_connected", {"network": network})) +def create_namespace() -> dict: + return {"apiVersion": "v1", "kind": "Namespace", "metadata": {"name": "warnet"}} -@network.command() -@click.option("--network", default="warnet", show_default=True) -@click.option("--activity", type=str) -@click.option("--exclude", type=str, default="[]") -def export(network: str, activity: str, exclude: str): + +def create_node_deployment(node: int, data: dict) -> dict: + image = data.get("image", "bitcoindevproject/bitcoin:27.0") + version = data.get("version", "27.0") + + return { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": f"warnet-node-{node}", + "namespace": "warnet", + "labels": {"app": "warnet", "node": str(node)}, + }, + "spec": { + "containers": [ + { + "name": "bitcoin", + "image": image, + "env": [ + {"name": "BITCOIN_VERSION", "value": version}, + ], + "volumeMounts": [ + { + "name": "config", + "mountPath": "/root/.bitcoin/bitcoin.conf", + "subPath": "bitcoin.conf", + } + ], + } + ], + "volumes": [{"name": "config", "configMap": {"name": f"bitcoin-config-node-{node}"}}], + }, + } + + +def create_node_service(node: int) -> dict: + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": f"warnet-node-{node}-service", "namespace": "warnet"}, + "spec": { + "selector": {"app": "warnet", "node": str(node)}, + "ports": [{"port": 8333, "targetPort": 8333}], + }, + } + + +def create_config_map(node: int, config: str) -> dict: + return { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": f"bitcoin-config-node-{node}", + "namespace": "warnet", + }, + "data": {"bitcoin.conf": config}, + } + + +# Utility functions +def read_graph_file(graph_file: Path) -> nx.Graph: + with open(graph_file) as f: + return nx.parse_graphml(f.read()) + + +def generate_node_config(node: int, data: dict) -> str: + base_config = """ +regtest=1 +checkmempool=0 +acceptnonstdtxn=1 +debuglogfile=0 +logips=1 +logtimemicros=1 +capturemessages=1 +fallbackfee=0.00001000 +listen=1 + +[regtest] +rpcuser=user +rpcpassword=password +rpcport=18443 +rpcallowip=0.0.0.0/0 +rpcbind=0.0.0.0 + +zmqpubrawblock=tcp://0.0.0.0:28332 +zmqpubrawtx=tcp://0.0.0.0:28333 +""" + node_specific_config = data.get("bitcoin_config", "") + return f"{base_config}\n{node_specific_config.replace(",", "\n")}" + + +def set_kubectl_context(namespace: str): """ - Export all [network] data for a "simln" service running in a container - on the network. Optionally add JSON string [activity] to simln config. - Optionally provide a list of tank indexes to [exclude]. - Returns True on success. + Set the default kubectl context to the specified namespace. """ - exclude = json.loads(exclude) - print( - rpc_call("network_export", {"network": network, "activity": activity, "exclude": exclude}) - ) + command = f"kubectl config set-context --current --namespace={namespace}" + result = run_command(command, stream_output=True) + if result: + print(f"Kubectl context set to namespace: {namespace}") + else: + print(f"Failed to set kubectl context to namespace: {namespace}") + return result -@network.command() -@click.option("--follow", "-f", is_flag=True, help="Follow logs") -def logs(follow: bool): - """Get Kubernetes logs from the RPC server""" - command = "kubectl logs rpc-0" - stream_output = False - if follow: - command += " --follow" - stream_output = True +def deploy_base_configurations(): + base_configs = [ + "namespace.yaml", + "rbac-config.yaml", + ] + + for config in base_configs: + command = f"kubectl apply -f {WAR_MANIFESTS}/{config}" + if not run_command(command, stream_output=True): + print(f"Failed to apply {config}") + return False + return True + + +def apply_kubernetes_yaml(yaml_file: str): + command = f"kubectl apply -f {yaml_file}" + return run_command(command, stream_output=True) + + +def delete_namespace(namespace: str): + command = f"kubectl delete namespace {namespace}" + return run_command(command, stream_output=True) + + +def setup_logging_helm(): + """ + Run the required Helm commands for setting up Grafana, Prometheus, and Loki. + """ + helm_commands = [ + "helm repo add grafana https://grafana.github.io/helm-charts", + "helm repo add prometheus-community https://prometheus-community.github.io/helm-charts", + "helm repo update", + f"helm upgrade --install --namespace warnet-logging --create-namespace --values {WAR_MANIFESTS}/loki_values.yaml loki grafana/loki --version 5.47.2", + "helm upgrade --install --namespace warnet-logging promtail grafana/promtail", + "helm upgrade --install --namespace warnet-logging prometheus prometheus-community/kube-prometheus-stack --namespace warnet-logging --set grafana.enabled=false", + f"helm upgrade --install --namespace warnet-logging loki-grafana grafana/grafana --values {WAR_MANIFESTS}/grafana_values.yaml", + ] - run_command(command, stream_output=stream_output) + for command in helm_commands: + if not run_command(command, stream_output=True): + print(f"Failed to run Helm command: {command}") + return False + return True diff --git a/src/warnet/cli/rpc.py b/src/warnet/cli/rpc.py index 9380ede0c..4ecd6bada 100644 --- a/src/warnet/cli/rpc.py +++ b/src/warnet/cli/rpc.py @@ -5,7 +5,8 @@ import requests from jsonrpcclient.requests import request from jsonrpcclient.responses import Error, Ok, parse -from warnet.server import WARNET_SERVER_PORT + +WARNET_SERVER_PORT = 9276 class JSONRPCException(Exception): diff --git a/src/warnet/cli/util.py b/src/warnet/cli/util.py index 80db73cd9..718ee6fb9 100644 --- a/src/warnet/cli/util.py +++ b/src/warnet/cli/util.py @@ -1,5 +1,22 @@ +import json +import logging import os +import random import subprocess +from importlib.resources import files +from pathlib import Path + +import networkx as nx +from jsonschema import validate + +logger = logging.getLogger("utils") + +SUPPORTED_TAGS = ["27.0", "26.0", "25.1", "24.2", "23.2", "22.2"] +DEFAULT_TAG = SUPPORTED_TAGS[0] +WEIGHTED_TAGS = [ + tag for index, tag in enumerate(reversed(SUPPORTED_TAGS)) for _ in range(index + 1) +] +SRC_DIR = files("warnet") def run_command(command, stream_output=False, env=None): @@ -40,3 +57,166 @@ def run_command(command, stream_output=False, env=None): return False print(result.stdout) return True + + +def create_cycle_graph(n: int, version: str, bitcoin_conf: str | None, random_version: bool): + try: + # Use nx.MultiDiGraph() so we get directed edges (source->target) + # and still allow parallel edges (L1 p2p connections + LN channels) + graph = nx.generators.cycle_graph(n, nx.MultiDiGraph()) + except TypeError as e: + msg = f"Failed to create graph: {e}" + logging.error(msg) + return msg + + # Graph is a simply cycle graph with all nodes connected in a loop, including both ends. + # Ensure each node has at least 8 outbound connections by making 7 more outbound connections + for src_node in graph.nodes(): + logging.debug(f"Creating additional connections for node {src_node}") + for _ in range(8): + # Choose a random node to connect to + # Make sure it's not the same node and they aren't already connected in either direction + potential_nodes = [ + dst_node + for dst_node in range(n) + if dst_node != src_node + and not graph.has_edge(dst_node, src_node) + and not graph.has_edge(src_node, dst_node) + ] + if potential_nodes: + chosen_node = random.choice(potential_nodes) + graph.add_edge(src_node, chosen_node) + logging.debug(f"Added edge: {src_node}:{chosen_node}") + logging.debug(f"Node {src_node} edges: {graph.edges(src_node)}") + + # parse and process conf file + conf_contents = "" + if bitcoin_conf is not None: + conf = Path(bitcoin_conf) + if conf.is_file(): + with open(conf) as f: + # parse INI style conf then dump using for_graph + conf_dict = parse_bitcoin_conf(f.read()) + conf_contents = dump_bitcoin_conf(conf_dict, for_graph=True) + + # populate our custom fields + for i, node in enumerate(graph.nodes()): + if random_version: + graph.nodes[node]["version"] = random.choice(WEIGHTED_TAGS) + else: + # One node demoing the image tag + if i == 1: + graph.nodes[node]["image"] = f"bitcoindevproject/bitcoin:{version}" + else: + graph.nodes[node]["version"] = version + graph.nodes[node]["bitcoin_config"] = conf_contents + graph.nodes[node]["tc_netem"] = "" + graph.nodes[node]["build_args"] = "" + graph.nodes[node]["exporter"] = False + graph.nodes[node]["collect_logs"] = False + graph.nodes[node]["resources"] = None + + convert_unsupported_attributes(graph) + return graph + + +def convert_unsupported_attributes(graph: nx.Graph): + # Sometimes networkx complains about invalid types when writing the graph + # (it just generated itself!). Try to convert them here just in case. + for _, node_data in graph.nodes(data=True): + for key, value in node_data.items(): + if isinstance(value, set): + node_data[key] = list(value) + elif isinstance(value, int | float | str): + continue + else: + node_data[key] = str(value) + + for _, _, edge_data in graph.edges(data=True): + for key, value in edge_data.items(): + if isinstance(value, set): + edge_data[key] = list(value) + elif isinstance(value, int | float | str): + continue + else: + edge_data[key] = str(value) + + +def load_schema(): + with open(SRC_DIR / "graph_schema.json") as schema_file: + return json.load(schema_file) + + +def validate_graph_schema(graph: nx.Graph): + """ + Validate a networkx.Graph against the node schema + """ + graph_schema = load_schema() + validate(instance=graph.graph, schema=graph_schema["graph"]) + for n in list(graph.nodes): + validate(instance=graph.nodes[n], schema=graph_schema["node"]) + for e in list(graph.edges): + validate(instance=graph.edges[e], schema=graph_schema["edge"]) + + +def parse_bitcoin_conf(file_content): + """ + Custom parser for INI-style bitcoin.conf + + Args: + - file_content (str): The content of the INI-style file. + + Returns: + - dict: A dictionary representation of the file content. + Key-value pairs are stored as tuples so one key may have + multiple values. Sections are represented as arrays of these tuples. + """ + current_section = None + result = {current_section: []} + + for line in file_content.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + + if line.startswith("[") and line.endswith("]"): + current_section = line[1:-1] + result[current_section] = [] + elif "=" in line: + key, value = line.split("=", 1) + result[current_section].append((key.strip(), value.strip())) + + return result + + +def dump_bitcoin_conf(conf_dict, for_graph=False): + """ + Converts a dictionary representation of bitcoin.conf content back to INI-style string. + + Args: + - conf_dict (dict): A dictionary representation of the file content. + + Returns: + - str: The INI-style string representation of the input dictionary. + """ + result = [] + + # Print global section at the top first + values = conf_dict[None] + for sub_key, sub_value in values: + result.append(f"{sub_key}={sub_value}") + + # Then print any named subsections + for section, values in conf_dict.items(): + if section is not None: + result.append(f"\n[{section}]") + else: + continue + for sub_key, sub_value in values: + result.append(f"{sub_key}={sub_value}") + + if for_graph: + return ",".join(result) + + # Terminate file with newline + return "\n".join(result) + "\n" diff --git a/src/warnet/utils.py b/src/warnet/cli/utils.py similarity index 99% rename from src/warnet/utils.py rename to src/warnet/cli/utils.py index 23dad566b..c9d43f616 100644 --- a/src/warnet/utils.py +++ b/src/warnet/cli/utils.py @@ -435,6 +435,7 @@ def create_cycle_graph(n: int, version: str, bitcoin_conf: str | None, random_ve graph.nodes[node]["build_args"] = "" graph.nodes[node]["exporter"] = False graph.nodes[node]["collect_logs"] = False + graph.nodes[node]["resources"] = None convert_unsupported_attributes(graph) return graph diff --git a/src/warnet/cln.py b/src/warnet/cln.py deleted file mode 100644 index 53ed5ffa1..000000000 --- a/src/warnet/cln.py +++ /dev/null @@ -1,198 +0,0 @@ -import io -import tarfile - -from warnet.backend.kubernetes_backend import KubernetesBackend -from warnet.services import ServiceType -from warnet.utils import exponential_backoff, generate_ipv4_addr, handle_json - -from .lnchannel import LNChannel, LNPolicy -from .lnnode import LNNode -from .status import RunningStatus - -CLN_CONFIG_BASE = " ".join( - [ - "--network=regtest", - "--database-upgrade=true", - "--bitcoin-retry-timeout=600", - "--bind-addr=0.0.0.0:9735", - "--developer", - "--dev-fast-gossip", - "--log-level=debug", - ] -) - - -class CLNNode(LNNode): - def __init__(self, warnet, tank, backend: KubernetesBackend, options): - self.warnet = warnet - self.tank = tank - self.backend = backend - self.image = options["ln_image"] - self.cb = options["cb_image"] - self.ln_config = options["ln_config"] - self.ipv4 = generate_ipv4_addr(self.warnet.subnet) - self.rpc_port = 10009 - self.impl = "cln" - - @property - def status(self) -> RunningStatus: - return super().status - - @property - def cb_status(self) -> RunningStatus: - return super().cb_status - - def get_conf(self, ln_container_name, tank_container_name) -> str: - conf = CLN_CONFIG_BASE - conf += f" --alias={self.tank.index}" - conf += f" --grpc-port={self.rpc_port}" - conf += f" --bitcoin-rpcuser={self.tank.rpc_user}" - conf += f" --bitcoin-rpcpassword={self.tank.rpc_password}" - conf += f" --bitcoin-rpcconnect={tank_container_name}" - conf += f" --bitcoin-rpcport={self.tank.rpc_port}" - conf += f" --announce-addr=dns:{ln_container_name}:9735" - return conf - - @exponential_backoff(max_retries=20, max_delay=300) - @handle_json - def lncli(self, cmd) -> dict: - cli = "lightning-cli" - cmd = f"{cli} --network=regtest {cmd}" - return self.backend.exec_run(self.tank.index, ServiceType.LIGHTNING, cmd) - - def getnewaddress(self): - return self.lncli("newaddr")["bech32"] - - def get_pub_key(self): - res = self.lncli("getinfo") - return res["id"] - - def getURI(self): - res = self.lncli("getinfo") - if len(res["address"]) < 1: - return None - return f'{res["id"]}@{res["address"][0]["address"]}:{res["address"][0]["port"]}' - - def get_wallet_balance(self) -> int: - res = self.lncli("listfunds") - return int(sum(o["amount_msat"] for o in res["outputs"]) / 1000) - - # returns the channel point in the form txid:output_index - def open_channel_to_tank(self, index: int, channel_open_data: str) -> str: - tank = self.warnet.tanks[index] - [pubkey, host] = tank.lnnode.getURI().split("@") - res = self.lncli(f"fundchannel id={pubkey} {channel_open_data}") - if "txid" not in res or "outnum" not in res: - raise ValueError(f"Error opening channel to tank: {res}") - return f"{res['txid']}:{res['outnum']}" - - def update_channel_policy(self, chan_point: str, policy: str) -> str: - return self.lncli(f"setchannel {chan_point} {policy}") - - def get_graph_nodes(self) -> list[str]: - return list(n["nodeid"] for n in self.lncli("listnodes")["nodes"]) - - def get_graph_channels(self) -> list[LNChannel]: - cln_channels = self.lncli("listchannels")["channels"] - # CLN lists channels twice, once for each direction. This finds the unique channel ids. - short_channel_ids = {chan["short_channel_id"]: chan for chan in cln_channels}.keys() - channels = [] - for short_channel_id in short_channel_ids: - nodes = [ - chans for chans in cln_channels if chans["short_channel_id"] == short_channel_id - ] - # CLN has only heard about one side of the channel - if len(nodes) == 1: - channels.append(self.lnchannel_from_json(nodes[0], None)) - continue - channels.append(self.lnchannel_from_json(nodes[0], nodes[1])) - return channels - - @staticmethod - def lnchannel_from_json(node1: object, node2: object) -> LNChannel: - if not node1: - raise ValueError("node1 can't be None") - - node2_policy = ( - LNPolicy( - min_htlc=node2["htlc_minimum_msat"], - max_htlc=node2["htlc_maximum_msat"], - base_fee_msat=node2["base_fee_millisatoshi"], - fee_rate_milli_msat=node2["fee_per_millionth"], - ) - if node2 is not None - else None - ) - - return LNChannel( - node1_pub=node1["source"], - node2_pub=node1["destination"], - capacity_msat=node1["amount_msat"], - short_chan_id=node1["short_channel_id"], - node1_policy=LNPolicy( - min_htlc=node1["htlc_minimum_msat"], - max_htlc=node1["htlc_maximum_msat"], - base_fee_msat=node1["base_fee_millisatoshi"], - fee_rate_milli_msat=node1["fee_per_millionth"], - ), - node2_policy=node2_policy, - ) - - def get_peers(self) -> list[str]: - return list(p["id"] for p in self.lncli("listpeers")["peers"]) - - def connect_to_tank(self, index): - return super().connect_to_tank(index) - - def generate_cli_command(self, command: list[str]): - network = f"--network={self.tank.warnet.bitcoin_network}" - cmd = f"{network} {' '.join(command)}" - cmd = f"lightning-cli {cmd}" - return cmd - - def export(self, config: object, tar_file): - # Retrieve the credentials - ca_cert = self.backend.get_file( - self.tank.index, - ServiceType.LIGHTNING, - "/root/.lightning/regtest/ca.pem", - ) - client_cert = self.backend.get_file( - self.tank.index, - ServiceType.LIGHTNING, - "/root/.lightning/regtest/client.pem", - ) - client_key = self.backend.get_file( - self.tank.index, - ServiceType.LIGHTNING, - "/root/.lightning/regtest/client-key.pem", - ) - name = f"ln-{self.tank.index}" - ca_cert_filename = f"{name}_ca_cert.pem" - client_cert_filename = f"{name}_client_cert.pem" - client_key_filename = f"{name}_client_key.pem" - host = self.backend.get_lnnode_hostname(self.tank.index) - - # Add the files to the in-memory tar archive - tarinfo1 = tarfile.TarInfo(name=ca_cert_filename) - tarinfo1.size = len(ca_cert) - fileobj1 = io.BytesIO(ca_cert) - tar_file.addfile(tarinfo=tarinfo1, fileobj=fileobj1) - tarinfo2 = tarfile.TarInfo(name=client_cert_filename) - tarinfo2.size = len(client_cert) - fileobj2 = io.BytesIO(client_cert) - tar_file.addfile(tarinfo=tarinfo2, fileobj=fileobj2) - tarinfo3 = tarfile.TarInfo(name=client_key_filename) - tarinfo3.size = len(client_key) - fileobj3 = io.BytesIO(client_key) - tar_file.addfile(tarinfo=tarinfo3, fileobj=fileobj3) - - config["nodes"].append( - { - "id": name, - "address": f"https://{host}:{self.rpc_port}", - "ca_cert": f"/simln/{ca_cert_filename}", - "client_cert": f"/simln/{client_cert_filename}", - "client_key": f"/simln/{client_key_filename}", - } - ) diff --git a/src/warnet/lnchannel.py b/src/warnet/lnchannel.py deleted file mode 100644 index 2d17460fb..000000000 --- a/src/warnet/lnchannel.py +++ /dev/null @@ -1,148 +0,0 @@ -import logging - - -class LNPolicy: - def __init__( - self, - min_htlc: int, - max_htlc: int, - base_fee_msat: int, - fee_rate_milli_msat: int, - time_lock_delta: int = 0, - ) -> None: - self.min_htlc = min_htlc - self.max_htlc = max_htlc - self.base_fee_msat = base_fee_msat - self.fee_rate_milli_msat = fee_rate_milli_msat - self.time_lock_delta = time_lock_delta - - def __str__(self) -> str: - return ( - f"LNPolicy(min_htlc={self.min_htlc}, " - f"max_htlc={self.max_htlc}, " - f"base_fee={self.base_fee_msat}, " - f"fee_rate={self.fee_rate_milli_msat}, " - f"time_lock_delta={self.time_lock_delta})" - ) - - -class LNChannel: - def __init__( - self, - node1_pub: str, - node2_pub: str, - capacity_msat: int = 0, - short_chan_id: str = "", - node1_policy: LNPolicy = None, - node2_policy: LNPolicy = None, - ) -> None: - # Ensure that the node with the lower pubkey is node1 - if node1_pub > node2_pub: - node1_pub, node2_pub = node2_pub, node1_pub - node1_policy, node2_policy = node2_policy, node1_policy - self.node1_pub = node1_pub - self.node2_pub = node2_pub - self.capacity_msat = capacity_msat - self.short_chan_id = short_chan_id - self.node1_policy = node1_policy - self.node2_policy = node2_policy - self.logger = logging.getLogger("lnchan") - - def __str__(self) -> str: - return ( - f"LNChannel(short_chan_id={self.short_chan_id}, " - f"capacity_msat={self.capacity_msat}, " - f"node1_pub={self.node1_pub[:8]}..., " - f"node2_pub={self.node2_pub[:8]}..., " - f"node1_policy=({self.node1_policy.__str__()}), " - f"node2_policy=({self.node2_policy.__str__()}))" - ) - - # Only used to compare warnet channels imported from a mainnet source file - # because pubkeys are unpredictable and node 1/2 might be swapped - def flip(self) -> "LNChannel": - return LNChannel( - # Keep the old pubkeys so the constructor doesn't just flip it back - node1_pub=self.node1_pub, - node2_pub=self.node2_pub, - capacity_msat=self.capacity_msat, - short_chan_id=self.short_chan_id, - node1_policy=self.node2_policy, - node2_policy=self.node1_policy, - ) - - def policy_match(self, ch2: "LNChannel") -> bool: - assert isinstance(ch2, LNChannel) - - node1_policy_match = False - node2_policy_match = False - - if self.node1_policy is None and ch2.node1_policy is None: - node1_policy_match = True - - if self.node2_policy is None and ch2.node2_policy is None: - node2_policy_match = True - - def compare_attributes(attr1, attr2, min_value=0, attr_name=""): - if attr1 == 0 or attr2 == 0: - return True - result = max(int(attr1), min_value) == max(int(attr2), min_value) - if not result: - self.logger.debug(f"Mismatch in {attr_name}: {attr1} != {attr2}") - return result - - if self.node1_policy is not None and ch2.node1_policy is not None: - attributes_to_compare = [ - ( - self.node1_policy.time_lock_delta, - ch2.node1_policy.time_lock_delta, - 18, - "node1_time_lock_delta", - ), - (self.node1_policy.min_htlc, ch2.node1_policy.min_htlc, 1, "node1_min_htlc"), - ( - self.node1_policy.base_fee_msat, - ch2.node1_policy.base_fee_msat, - 0, - "node1_base_fee_msat", - ), - ( - self.node1_policy.fee_rate_milli_msat, - ch2.node1_policy.fee_rate_milli_msat, - 0, - "node1_fee_rate_milli_msat", - ), - ] - node1_policy_match = all(compare_attributes(*attrs) for attrs in attributes_to_compare) - - if self.node2_policy is not None and ch2.node2_policy is not None: - attributes_to_compare = [ - ( - self.node2_policy.time_lock_delta, - ch2.node2_policy.time_lock_delta, - 18, - "node2_time_lock_delta", - ), - (self.node2_policy.min_htlc, ch2.node2_policy.min_htlc, 1, "node2_min_htlc"), - ( - self.node2_policy.base_fee_msat, - ch2.node2_policy.base_fee_msat, - 0, - "node2_base_fee_msat", - ), - ( - self.node2_policy.fee_rate_milli_msat, - ch2.node2_policy.fee_rate_milli_msat, - 0, - "node2_fee_rate_milli_msat", - ), - ] - node2_policy_match = all(compare_attributes(*attrs) for attrs in attributes_to_compare) - - return node1_policy_match and node2_policy_match - - def channel_match(self, ch2: "LNChannel") -> bool: - if self.capacity_msat != ch2.capacity_msat: - self.logger.debug(f"Capacity mismatch: {self.capacity_msat} != {ch2.capacity_msat}") - return False - return self.policy_match(ch2) diff --git a/src/warnet/lnd.py b/src/warnet/lnd.py deleted file mode 100644 index 3282d8253..000000000 --- a/src/warnet/lnd.py +++ /dev/null @@ -1,191 +0,0 @@ -import io -import tarfile - -from warnet.backend.kubernetes_backend import KubernetesBackend -from warnet.services import ServiceType -from warnet.utils import exponential_backoff, generate_ipv4_addr, handle_json - -from .lnchannel import LNChannel, LNPolicy -from .lnnode import LNNode, lnd_to_cl_scid -from .status import RunningStatus - -LND_CONFIG_BASE = " ".join( - [ - "--noseedbackup", - "--norest", - "--debuglevel=debug", - "--accept-keysend", - "--bitcoin.active", - "--bitcoin.regtest", - "--bitcoin.node=bitcoind", - "--maxpendingchannels=64", - "--trickledelay=1", - ] -) - - -class LNDNode(LNNode): - def __init__(self, warnet, tank, backend: KubernetesBackend, options): - self.warnet = warnet - self.tank = tank - self.backend = backend - self.image = options["ln_image"] - self.cb = options["cb_image"] - self.ln_config = options["ln_config"] - self.ipv4 = generate_ipv4_addr(self.warnet.subnet) - self.rpc_port = 10009 - self.impl = "lnd" - - @property - def status(self) -> RunningStatus: - return super().status - - @property - def cb_status(self) -> RunningStatus: - return super().cb_status - - def get_conf(self, ln_container_name, tank_container_name) -> str: - conf = LND_CONFIG_BASE - conf += f" --bitcoind.rpcuser={self.tank.rpc_user}" - conf += f" --bitcoind.rpcpass={self.tank.rpc_password}" - conf += f" --bitcoind.rpchost={tank_container_name}:{self.tank.rpc_port}" - conf += f" --bitcoind.zmqpubrawblock=tcp://{tank_container_name}:{self.tank.zmqblockport}" - conf += f" --bitcoind.zmqpubrawtx=tcp://{tank_container_name}:{self.tank.zmqtxport}" - conf += f" --rpclisten=0.0.0.0:{self.rpc_port}" - conf += f" --alias={self.tank.index}" - conf += f" --externalhosts={ln_container_name}" - conf += f" --tlsextradomain={ln_container_name}" - conf += " " + self.ln_config - return conf - - @exponential_backoff(max_retries=20, max_delay=300) - @handle_json - def lncli(self, cmd) -> dict: - cli = "lncli" - cmd = f"{cli} --network=regtest {cmd}" - return self.backend.exec_run(self.tank.index, ServiceType.LIGHTNING, cmd) - - def getnewaddress(self): - return self.lncli("newaddress p2wkh")["address"] - - def get_pub_key(self): - res = self.lncli("getinfo") - return res["identity_pubkey"] - - def getURI(self): - res = self.lncli("getinfo") - if len(res["uris"]) < 1: - return None - return res["uris"][0] - - def get_wallet_balance(self) -> int: - res = self.lncli("walletbalance")["confirmed_balance"] - return res - - # returns the channel point in the form txid:output_index - def open_channel_to_tank(self, index: int, channel_open_data: str) -> str: - tank = self.warnet.tanks[index] - [pubkey, host] = tank.lnnode.getURI().split("@") - txid = self.lncli(f"openchannel --node_key={pubkey} --connect={host} {channel_open_data}")[ - "funding_txid" - ] - # Why doesn't LND return the output index as well? - # Do they charge by the RPC call or something?! - pending = self.lncli("pendingchannels") - for chan in pending["pending_open_channels"]: - if txid in chan["channel"]["channel_point"]: - return chan["channel"]["channel_point"] - raise Exception(f"Opened channel with txid {txid} not found in pending channels") - - def update_channel_policy(self, chan_point: str, policy: str) -> str: - ret = self.lncli(f"updatechanpolicy --chan_point={chan_point} {policy}") - if len(ret["failed_updates"]) == 0: - return ret - else: - raise Exception(ret) - - def get_graph_nodes(self) -> list[str]: - return list(n["pub_key"] for n in self.lncli("describegraph")["nodes"]) - - def get_graph_channels(self) -> list[LNChannel]: - edges = self.lncli("describegraph")["edges"] - return [self.lnchannel_from_json(edge) for edge in edges] - - @staticmethod - def lnchannel_from_json(edge: object) -> LNChannel: - node1_policy = ( - LNPolicy( - min_htlc=int(edge["node1_policy"]["min_htlc"]), - max_htlc=int(edge["node1_policy"]["max_htlc_msat"]), - base_fee_msat=int(edge["node1_policy"]["fee_base_msat"]), - fee_rate_milli_msat=int(edge["node1_policy"]["fee_rate_milli_msat"]), - time_lock_delta=int(edge["node1_policy"]["time_lock_delta"]), - ) - if edge["node1_policy"] - else None - ) - - node2_policy = ( - LNPolicy( - min_htlc=int(edge["node2_policy"]["min_htlc"]), - max_htlc=int(edge["node2_policy"]["max_htlc_msat"]), - base_fee_msat=int(edge["node2_policy"]["fee_base_msat"]), - fee_rate_milli_msat=int(edge["node2_policy"]["fee_rate_milli_msat"]), - time_lock_delta=int(edge["node2_policy"]["time_lock_delta"]), - ) - if edge["node2_policy"] - else None - ) - - return LNChannel( - node1_pub=edge["node1_pub"], - node2_pub=edge["node2_pub"], - capacity_msat=(int(edge["capacity"]) * 1000), - short_chan_id=lnd_to_cl_scid(edge["channel_id"]), - node1_policy=node1_policy, - node2_policy=node2_policy, - ) - - def get_peers(self) -> list[str]: - return list(p["pub_key"] for p in self.lncli("listpeers")["peers"]) - - def connect_to_tank(self, index): - return super().connect_to_tank(index) - - def generate_cli_command(self, command: list[str]): - network = f"--network={self.tank.warnet.bitcoin_network}" - cmd = f"{network} {' '.join(command)}" - cmd = f"lncli {cmd}" - return cmd - - def export(self, config: object, tar_file): - # Retrieve the credentials - macaroon = self.backend.get_file( - self.tank.index, - ServiceType.LIGHTNING, - "/root/.lnd/data/chain/bitcoin/regtest/admin.macaroon", - ) - cert = self.backend.get_file(self.tank.index, ServiceType.LIGHTNING, "/root/.lnd/tls.cert") - name = f"ln-{self.tank.index}" - macaroon_filename = f"{name}_admin.macaroon" - cert_filename = f"{name}_tls.cert" - host = self.backend.get_lnnode_hostname(self.tank.index) - - # Add the files to the in-memory tar archive - tarinfo1 = tarfile.TarInfo(name=macaroon_filename) - tarinfo1.size = len(macaroon) - fileobj1 = io.BytesIO(macaroon) - tar_file.addfile(tarinfo=tarinfo1, fileobj=fileobj1) - tarinfo2 = tarfile.TarInfo(name=cert_filename) - tarinfo2.size = len(cert) - fileobj2 = io.BytesIO(cert) - tar_file.addfile(tarinfo=tarinfo2, fileobj=fileobj2) - - config["nodes"].append( - { - "id": name, - "address": f"https://{host}:{self.rpc_port}", - "macaroon": f"/simln/{macaroon_filename}", - "cert": f"/simln/{cert_filename}", - } - ) diff --git a/src/warnet/lnnode.py b/src/warnet/lnnode.py deleted file mode 100644 index deda5da20..000000000 --- a/src/warnet/lnnode.py +++ /dev/null @@ -1,99 +0,0 @@ -from abc import ABC, abstractmethod - -from warnet.backend.kubernetes_backend import KubernetesBackend -from warnet.services import ServiceType -from warnet.utils import exponential_backoff, handle_json - -from .status import RunningStatus - - -class LNNode(ABC): - @abstractmethod - def __init__(self, warnet, tank, backend: KubernetesBackend, options): - pass - - @property - def status(self) -> RunningStatus: - return self.warnet.container_interface.get_status(self.tank.index, ServiceType.LIGHTNING) - - @property - def cb_status(self) -> RunningStatus: - if not self.cb: - return None - return self.warnet.container_interface.get_status( - self.tank.index, ServiceType.CIRCUITBREAKER - ) - - @abstractmethod - def get_conf(self, ln_container_name, tank_container_name) -> str: - pass - - @exponential_backoff(max_retries=20, max_delay=300) - @handle_json - @abstractmethod - def lncli(self, cmd) -> dict: - pass - - @abstractmethod - def getnewaddress(self): - pass - - @abstractmethod - def get_pub_key(self): - pass - - @abstractmethod - def getURI(self): - pass - - @abstractmethod - def get_wallet_balance(self) -> int: - pass - - @abstractmethod - def open_channel_to_tank(self, index: int, channel_open_data: str) -> str: - """Return the channel point in the form txid:output_index""" - pass - - @abstractmethod - def update_channel_policy(self, chan_point: str, policy: str) -> str: - pass - - @abstractmethod - def get_graph_nodes(self) -> list[str]: - pass - - @abstractmethod - def get_graph_channels(self) -> list[dict]: - pass - - @abstractmethod - def get_peers(self) -> list[str]: - pass - - def connect_to_tank(self, index): - tank = self.warnet.tanks[index] - uri = tank.lnnode.getURI() - res = self.lncli(f"connect {uri}") - return res - - @abstractmethod - def generate_cli_command(self, command: list[str]): - pass - - @abstractmethod - def export(self, config: object, tar_file): - pass - - -def lnd_to_cl_scid(id) -> str: - s = int(id, 10) - block = s >> 40 - tx = s >> 16 & 0xFFFFFF - output = s & 0xFFFF - return f"{block}x{tx}x{output}" - - -def cl_to_lnd_scid(s) -> int: - s = [int(i) for i in s.split("x")] - return (s[0] << 40) | (s[1] << 16) | s[2] diff --git a/src/warnet/server.py b/src/warnet/server.py deleted file mode 100644 index b3fbda6dd..000000000 --- a/src/warnet/server.py +++ /dev/null @@ -1,608 +0,0 @@ -import argparse -import base64 -import importlib -import io -import json -import logging -import logging.config -import os -import pkgutil -import platform -import shutil -import subprocess -import sys -import tarfile -import tempfile -import threading -import time -import traceback -from datetime import datetime - -import warnet.scenarios as scenarios -from flask import Flask, jsonify, request -from flask_jsonrpc.app import JSONRPC -from flask_jsonrpc.exceptions import ServerError -from warnet import SRC_DIR - -from .services import ServiceType -from .utils import gen_config_dir -from .warnet import Warnet - -WARNET_SERVER_PORT = 9276 -CONFIG_DIR_ALREADY_EXISTS = 32001 - - -class Server: - def __init__(self): - system = os.name - if system == "nt" or platform.system() == "Windows": - self.basedir = os.path.join(os.path.expanduser("~"), "warnet") - elif system == "posix" or platform.system() == "Linux" or platform.system() == "Darwin": - self.basedir = os.environ.get("XDG_STATE_HOME") - if self.basedir is None: - # ~/.warnet/warnet.log - self.basedir = os.path.join(os.environ["HOME"], ".warnet") - else: - # XDG_STATE_HOME / warnet / warnet.log - self.basedir = os.path.join(self.basedir, "warnet") - else: - raise NotImplementedError("Unsupported operating system") - - self.running_scenarios = [] - - self.app = Flask(__name__) - self.jsonrpc = JSONRPC(self.app, "/api") - - self.log_file_path = os.path.join(self.basedir, "warnet.log") - self.setup_global_exception_handler() - self.setup_logging() - self.setup_rpc() - self.warnets: dict = dict() - self.logger.info("Started server") - - # register a well known /-/healthy endpoint for liveness tests - # we regard warnet as healthy if the http server is up - # /-/healthy and /-/ready are often used (e.g. by the prometheus server) - self.app.add_url_rule("/-/healthy", view_func=self.healthy) - - # This is set while we bring a warnet up, which may include building a new image - # After warnet is up this will be released. - # This is used to delay api calls which rely on and image being built dynamically - # before the config dir is populated with the deployment info - self.image_build_lock = threading.Lock() - - def setup_global_exception_handler(self): - """ - Use flask to log traceback of unhandled exceptions - """ - - @self.app.errorhandler(Exception) - def handle_exception(e): - trace = traceback.format_exc() - self.logger.error(f"Unhandled exception: {e}\n{trace}") - response = { - "jsonrpc": "2.0", - "error": { - "code": -32603, - "message": "Internal server error", - "data": str(e), - }, - "id": request.json.get("id", None) if request.json else None, - } - return jsonify(response), 500 - - def healthy(self): - return "warnet is healthy" - - def setup_logging(self): - os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True) - with open(SRC_DIR / "logging_config.json") as f: - logging_config = json.load(f) - logging_config["handlers"]["file"]["filename"] = str(self.log_file_path) - logging.config.dictConfig(logging_config) - self.logger = logging.getLogger("server") - self.scenario_logger = logging.getLogger("scenario") - self.logger.info("Logging started") - - def log_request(): - if "healthy" in request.path: - return # No need to log all these - if not request.path.startswith("/api"): - self.logger.debug(request.path) - else: - self.logger.debug(request.json) - - def build_check(): - timeout = 600 - check_interval = 10 - time_elapsed = 0 - - while time_elapsed < timeout: - # Attempt to acquire the lock without blocking - lock_acquired = self.image_build_lock.acquire(blocking=False) - # If we get the lock, release it and continue - if lock_acquired: - self.image_build_lock.release() - return - # Otherwise wait before trying again - else: - time.sleep(check_interval) - time_elapsed += check_interval - - # If we've reached here, the lock wasn't acquired in time - raise Exception( - f"Failed to acquire the build lock within {timeout} seconds, aborting RPC." - ) - - self.app.before_request(log_request) - self.app.before_request(build_check) - - def setup_rpc(self): - # Tanks - self.jsonrpc.register(self.tank_bcli) - self.jsonrpc.register(self.tank_lncli) - self.jsonrpc.register(self.tank_debug_log) - self.jsonrpc.register(self.tank_messages) - self.jsonrpc.register(self.tank_ln_pub_key) - # Scenarios - self.jsonrpc.register(self.scenarios_available) - self.jsonrpc.register(self.scenarios_run) - self.jsonrpc.register(self.scenarios_run_file) - self.jsonrpc.register(self.scenarios_stop) - self.jsonrpc.register(self.scenarios_list_running) - # Networks - self.jsonrpc.register(self.network_up) - self.jsonrpc.register(self.network_from_file) - self.jsonrpc.register(self.network_down) - self.jsonrpc.register(self.network_info) - self.jsonrpc.register(self.network_status) - self.jsonrpc.register(self.network_connected) - self.jsonrpc.register(self.network_export) - # Debug - self.jsonrpc.register(self.generate_deployment) - self.jsonrpc.register(self.exec_run) - # Logs - self.jsonrpc.register(self.logs_grep) - - def scenario_log(self, proc): - while not proc.stdout: - time.sleep(0.1) - for line in proc.stdout: - self.scenario_logger.info(line.decode().rstrip()) - - def get_warnet(self, network: str) -> Warnet: - """ - Will get a warnet from the cache if it exists. - Otherwise it will create the network using from_network() and save it - to the cache before returning it. - """ - if network in self.warnets: - return self.warnets[network] - wn = Warnet.from_network(network) - if isinstance(wn, Warnet): - self.warnets[network] = wn - return wn - raise ServerError(f"Could not find warnet {network}") - - def tank_bcli( - self, node: int, method: str, params: list[str] | None = None, network: str = "warnet" - ) -> str: - """ - Call bitcoin-cli on in [network] - """ - wn = self.get_warnet(network) - try: - return wn.container_interface.get_bitcoin_cli(wn.tanks[node], method, params) - except Exception as e: - msg = f"Sever error calling bitcoin-cli {method}: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def tank_lncli(self, node: int, command: list[str], network: str = "warnet") -> str: - """ - Call lightning cli on in [network] - """ - wn = self.get_warnet(network) - try: - return wn.container_interface.ln_cli(wn.tanks[node], command) - except Exception as e: - msg = f"Error calling lncli: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def tank_ln_pub_key(self, node: int, network: str = "warnet") -> str: - """ - Get lightning pub key on in [network] - """ - wn = self.get_warnet(network) - try: - return wn.container_interface.ln_pub_key(wn.tanks[node]) - except Exception as e: - msg = f"Error getting pub key: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def tank_debug_log(self, network: str, node: int) -> str: - """ - Fetch the Bitcoin Core debug log from - """ - wn = Warnet.from_network(network) - try: - return wn.container_interface.get_bitcoin_debug_log(wn.tanks[node].index) - except Exception as e: - msg = f"Error fetching debug logs: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def tank_messages(self, network: str, node_a: int, node_b: int) -> str: - """ - Fetch messages sent between and . - """ - wn = self.get_warnet(network) - try: - messages = [ - msg - for msg in wn.container_interface.get_messages( - wn.tanks[node_a].index, wn.tanks[node_b].index, wn.bitcoin_network - ) - if msg is not None - ] - if not messages: - msg = f"No messages found between {node_a} and {node_b}" - self.logger.error(msg) - raise ServerError(message=msg) - - messages_str_list = [] - - for message in messages: - # Check if 'time' key exists and its value is a number - if not (message.get("time") and isinstance(message["time"], int | float)): - continue - - timestamp = datetime.utcfromtimestamp(message["time"] / 1e6).strftime( - "%Y-%m-%d %H:%M:%S" - ) - direction = ">>>" if message.get("outbound", False) else "<<<" - msgtype = message.get("msgtype", "") - body_dict = message.get("body", {}) - - if not isinstance(body_dict, dict): # messages will be in dict form - continue - - body_str = ", ".join(f"{key}: {value}" for key, value in body_dict.items()) - messages_str_list.append(f"{timestamp} {direction} {msgtype} {body_str}") - - result_str = "\n".join(messages_str_list) - - return result_str - - except Exception as e: - msg = f"Error fetching messages between nodes {node_a} and {node_b}: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def network_export(self, network: str, activity: str | None, exclude: list[int]) -> bool: - """ - Export all data for a simln container running on the network - """ - wn = self.get_warnet(network) - if "simln" not in wn.services: - raise Exception("No simln service in network") - - # JSON object that will eventually be written to simln config file - config = {"nodes": []} - if activity: - config["activity"] = json.loads(activity) - # In-memory file to build tar archive - tar_buffer = io.BytesIO() - with tarfile.open(fileobj=tar_buffer, mode="w") as tar_file: - # tank LN nodes add their credentials to tar archive - wn.export(config, tar_file, exclude=exclude) - # write config file - config_bytes = json.dumps(config).encode("utf-8") - config_stream = io.BytesIO(config_bytes) - tarinfo = tarfile.TarInfo(name="sim.json") - tarinfo.size = len(config_bytes) - tar_file.addfile(tarinfo=tarinfo, fileobj=config_stream) - - # Write the archive to the RPC server's config directory - source_file = wn.config_dir / "simln.tar" - with open(source_file, "wb") as output: - tar_buffer.seek(0) - output.write(tar_buffer.read()) - - # Copy the archive to the "emptydir" volume in the simln pod - wn.container_interface.write_service_config(source_file, "simln", "/simln/") - return True - - def scenarios_available(self) -> list[tuple]: - """ - List available scenarios in the Warnet Test Framework - """ - try: - scenario_list = [] - for s in pkgutil.iter_modules(scenarios.__path__): - module_name = f"warnet.scenarios.{s.name}" - try: - m = importlib.import_module(module_name) - if hasattr(m, "cli_help"): - scenario_list.append((s.name, m.cli_help())) - except ModuleNotFoundError as e: - print(f"Module not found: {module_name}, error: {e}") - raise - return scenario_list - except Exception as e: - msg = f"Error listing scenarios: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def _start_scenario( - self, - scenario_path: str, - scenario_name: str, - additional_args: list[str], - network: str, - ) -> str: - try: - run_cmd = [sys.executable, scenario_path] + additional_args + [f"--network={network}"] - self.logger.debug(f"Running {run_cmd}") - proc = subprocess.Popen( - run_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - t = threading.Thread(target=lambda: self.scenario_log(proc)) - t.daemon = True - t.start() - self.running_scenarios.append( - { - "pid": proc.pid, - "cmd": f"{scenario_name} {' '.join(additional_args)}", - "proc": proc, - "network": network, - } - ) - return f"Running scenario {scenario_name} with PID {proc.pid} in the background..." - except Exception as e: - msg = f"Error running scenario: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def scenarios_run_file( - self, - scenario_base64: str, - scenario_name: str, - additional_args: list[str], - network: str = "warnet", - ) -> str: - # Extract just the filename without path and extension - with tempfile.NamedTemporaryFile( - prefix=scenario_name, - suffix=".py", - delete=False, - ) as temp_file: - scenario_path = temp_file.name - temp_file.write(base64.b64decode(scenario_base64)) - - if not os.path.exists(scenario_path): - raise ServerError(f"Scenario not found at {scenario_path}.") - - return self._start_scenario(scenario_path, scenario_name, additional_args, network) - - def scenarios_run( - self, scenario: str, additional_args: list[str], network: str = "warnet" - ) -> str: - # Use importlib.resources to get the scenario path - scenario_package = "warnet.scenarios" - scenario_filename = f"{scenario}.py" - - # Ensure the scenario file exists within the package - with importlib.resources.path(scenario_package, scenario_filename) as scenario_path: - scenario_path = str(scenario_path) # Convert Path object to string - - if not os.path.exists(scenario_path): - raise ServerError(f"Scenario {scenario} not found at {scenario_path}.") - - return self._start_scenario(scenario_path, scenario, additional_args, network) - - def scenarios_stop(self, pid: int) -> str: - matching_scenarios = [sc for sc in self.running_scenarios if sc["pid"] == pid] - if matching_scenarios: - matching_scenarios[0]["proc"].terminate() # sends SIGTERM - # Remove from running list - self.running_scenarios = [sc for sc in self.running_scenarios if sc["pid"] != pid] - return f"Stopped scenario with PID {pid}." - else: - msg = f"Could not find scenario with PID {pid}" - self.logger.error(msg) - raise ServerError(message=msg) - - def scenarios_list_running(self) -> list[dict]: - running = [ - { - "pid": sc["pid"], - "cmd": sc["cmd"], - "active": sc["proc"].poll() is None, - "return_code": sc["proc"].returncode, - "network": sc["network"], - } - for sc in self.running_scenarios - ] - return running - - def network_up(self, network: str = "warnet") -> str: - def thread_start(server: Server, network): - try: - wn = server.get_warnet(network) - wn.apply_network_conditions() - wn.wait_for_health() - server.logger.info( - f"Successfully resumed warnet named '{network}' from config dir {wn.config_dir}" - ) - except Exception as e: - trace = traceback.format_exc() - server.logger.error(f"Unhandled exception bringing network up: {e}\n{trace}") - - try: - t = threading.Thread(target=lambda: thread_start(self, network)) - t.daemon = True - t.start() - return "Resuming warnet..." - except Exception as e: - msg = f"Error bring up warnet: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def network_from_file( - self, graph_file: str, force: bool = False, network: str = "warnet" - ) -> dict: - """ - Run a warnet with topology loaded from a - """ - - def thread_start(server: Server, network): - with server.image_build_lock: - try: - wn = server.get_warnet(network) - wn.generate_deployment() - wn.warnet_build() - wn.warnet_up() - wn.wait_for_health() - wn.apply_network_conditions() - self.logger.info("Warnet started successfully") - except Exception as e: - trace = traceback.format_exc() - self.logger.error(f"Unhandled exception starting warnet: {e}\n{trace}") - - config_dir = gen_config_dir(network) - if config_dir.exists(): - if force: - shutil.rmtree(config_dir) - else: - message = f"Config dir {config_dir} already exists, not overwriting existing warnet without --force" - self.logger.error(message) - raise ServerError(message=message, code=CONFIG_DIR_ALREADY_EXISTS) - - try: - self.warnets[network] = Warnet.from_graph_file( - graph_file, - config_dir, - network, - ) - t = threading.Thread(target=lambda: thread_start(self, network)) - t.daemon = True - t.start() - return self.warnets[network]._warnet_dict_representation() - except Exception as e: - msg = f"Error bring up warnet: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def network_down(self, network: str = "warnet") -> str: - """ - Stop all containers in . - """ - wn = self.get_warnet(network) - try: - wn.warnet_down() - return "Stopping warnet" - except Exception as e: - msg = f"Error bringing warnet down: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def network_info(self, network: str = "warnet") -> dict: - """ - Get info about a warnet network named - """ - wn = self.get_warnet(network) - return wn._warnet_dict_representation() - - def network_status(self, network: str = "warnet") -> list[dict]: - """ - Get running status of a warnet network named - """ - try: - wn = self.get_warnet(network) - stats = [] - for tank in wn.tanks: - status = {"tank_index": tank.index, "bitcoin_status": tank.status.name.lower()} - if tank.lnnode is not None: - status["lightning_status"] = tank.lnnode.status.name.lower() - if tank.lnnode.cb is not None: - status["circuitbreaker_status"] = tank.lnnode.cb_status.name.lower() - stats.append(status) - return stats - except Exception as e: - msg = f"Error getting network status: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def network_connected(self, network: str = "warnet") -> bool: - """ - Indicate whether all of the graph edges are connected in - """ - try: - wn = self.get_warnet(network) - return wn.network_connected() - except Exception as e: - self.logger.error(f"{e}") - return False - - def generate_deployment(self, graph_file: str, network: str = "warnet") -> str: - """ - Generate the deployment file for a graph file - """ - try: - config_dir = gen_config_dir(network) - if config_dir.exists(): - message = f"Config dir {config_dir} already exists, not overwriting existing warnet without --force" - self.logger.error(message) - raise ServerError(message=message, code=CONFIG_DIR_ALREADY_EXISTS) - wn = self.get_warnet(network) - wn.generate_deployment() - if not wn.deployment_file or not wn.deployment_file.is_file(): - raise ServerError(f"No deployment file found at {wn.deployment_file}") - with open(wn.deployment_file) as f: - return f.read() - except Exception as e: - msg = f"Error generating deployment file: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def logs_grep( - self, pattern: str, network: str = "warnet", k8s_timestamps=False, no_sort=False - ) -> str: - """ - Grep the logs from the fluentd container for a regex pattern - """ - try: - wn = self.get_warnet(network) - return wn.container_interface.logs_grep(pattern, network, k8s_timestamps, no_sort) - except Exception as e: - msg = f"Error grepping logs using pattern {pattern}: {e}" - self.logger.error(msg) - raise ServerError(message=msg) from e - - def exec_run(self, index: int, service_type: int, cmd: str, network: str = "warnet") -> str: - """ - Execute an arbitrary command in an arbitrary container, - identified by tank index and ServiceType - """ - wn = self.get_warnet(network) - return wn.container_interface.exec_run(index, ServiceType(service_type), cmd) - - -def run_server(): - parser = argparse.ArgumentParser(description="Run the server") - parser.add_argument( - "--dev", action="store_true", help="Run in development mode with debug enabled" - ) - args = parser.parse_args() - debug_mode = args.dev - server = Server() - server.app.run(host="0.0.0.0", port=WARNET_SERVER_PORT, debug=debug_mode) - - -if __name__ == "__main__": - run_server() diff --git a/src/warnet/services.py b/src/warnet/services.py deleted file mode 100644 index 562813432..000000000 --- a/src/warnet/services.py +++ /dev/null @@ -1,36 +0,0 @@ -from enum import Enum - -FO_CONF_NAME = "fork_observer_config.toml" -AO_CONF_NAME = "addrman_observer_config.toml" -GRAFANA_PROVISIONING = "grafana-provisioning" -PROM_CONF_NAME = "prometheus.yml" - - -class ServiceType(Enum): - BITCOIN = 1 - LIGHTNING = 2 - CIRCUITBREAKER = 3 - - -SERVICES = { - # "forkobserver": { - # "image": "b10c/fork-observer:latest", - # "container_name_suffix": "fork-observer", - # "warnet_port": "23001", - # "container_port": "2323", - # "config_files": [f"{FO_CONF_NAME}:/app/config.toml"], - # }, - # "addrmanobserver": { - # "image": "b10c/addrman-observer:latest", - # "container_name_suffix": "addrman-observer", - # "warnet_port": "23005", - # "container_port": "3882", - # "config_files": [f"{AO_CONF_NAME}:/app/config.toml"], - # }, - "simln": { - "image": "bitcoindevproject/simln:0.2.0", - "container_name_suffix": "simln", - "environment": ["LOG_LEVEL=debug", "SIMFILE_PATH=/simln/sim.json"], - "config_files": ["simln/:/simln"], - }, -} diff --git a/src/warnet/status.py b/src/warnet/status.py deleted file mode 100644 index ac83d4140..000000000 --- a/src/warnet/status.py +++ /dev/null @@ -1,9 +0,0 @@ -from enum import Enum - - -class RunningStatus(Enum): - PENDING = 1 - RUNNING = 2 - STOPPED = 3 - FAILED = 4 - UNKNOWN = 5 diff --git a/src/warnet/tank.py b/src/warnet/tank.py deleted file mode 100644 index ac04d3f70..000000000 --- a/src/warnet/tank.py +++ /dev/null @@ -1,198 +0,0 @@ -""" -Tanks are containerized bitcoind nodes -""" - -import logging - -from .services import ServiceType -from .status import RunningStatus -from .utils import ( - SUPPORTED_TAGS, - exponential_backoff, - generate_ipv4_addr, - sanitize_tc_netem_command, -) - -CONTAINER_PREFIX_PROMETHEUS = "prometheus_exporter" - -logger = logging.getLogger("tank") - -CONFIG_BASE = " ".join( - [ - "-regtest=1", - "-checkmempool=0", - "-acceptnonstdtxn=1", - "-debuglogfile=0", - "-logips=1", - "-logtimemicros=1", - "-capturemessages=1", - "-rpcallowip=0.0.0.0/0", - "-rpcbind=0.0.0.0", - "-fallbackfee=0.00001000", - "-listen=1", - ] -) - - -class Tank: - DEFAULT_BUILD_ARGS = "--disable-tests --with-incompatible-bdb --without-gui --disable-bench --disable-fuzz-binary --enable-suppress-external-warnings --enable-debug " - - def __init__(self, index: int, warnet): - from warnet.lnnode import LNNode - - self.index = index - self.warnet = warnet - self.network_name = warnet.network_name - self.bitcoin_network = warnet.bitcoin_network - self.version: str = "" - self.image: str = "" - self.bitcoin_config = "" - self.netem = None - self.exporter = False - self.metrics = None - self.collect_logs = False - self.build_args = "" - self.lnnode: LNNode | None = None - self.rpc_port = 18443 - self.rpc_user = "warnet_user" - self.rpc_password = "2themoon" - self.zmqblockport = 28332 - self.zmqtxport = 28333 - self._suffix = None - self._ipv4 = None - self._exporter_name = None - # index of integers imported from graph file - # indicating which tanks to initially connect to - self.init_peers = [] - - def _parse_version(self, version): - if not version: - return - if version not in SUPPORTED_TAGS and not ("/" in version and "#" in version): - raise Exception( - f"Unsupported version: can't be generated from Docker images: {self.version}" - ) - self.version = version - - def parse_graph_node(self, node): - # Dynamically parse properties based on the schema - graph_properties = {} - for property, specs in self.warnet.graph_schema["node"]["properties"].items(): - value = node.get(property, specs.get("default")) - if property == "version": - self._parse_version(value) - setattr(self, property, value) - graph_properties[property] = value - - if self.version and self.image: - raise Exception( - f"Tank has {self.version=:} and {self.image=:} supplied and can't be built. Provide one or the other." - ) - - # Special handling for complex properties - if "ln" in node: - options = { - "impl": node["ln"], - "cb_image": node.get("ln_cb_image", None), - "ln_config": node.get("ln_config", ""), - } - from warnet.cln import CLNNode - from warnet.lnd import LNDNode - - if options["impl"] == "lnd": - options["ln_image"] = node.get("ln_image", "lightninglabs/lnd:v0.18.0-beta") - self.lnnode = LNDNode(self.warnet, self, self.warnet.container_interface, options) - elif options["impl"] == "cln": - options["ln_image"] = node.get("ln_image", "elementsproject/lightningd:v23.11") - self.lnnode = CLNNode(self.warnet, self, self.warnet.container_interface, options) - else: - raise Exception(f"Unsupported Lightning Network implementation: {options['impl']}") - - if "metrics" in node: - self.metrics = node["metrics"] - - logger.debug( - f"Parsed graph node: {self.index} with attributes: {[f'{key}={value}' for key, value in graph_properties.items()]}" - ) - - @classmethod - def from_graph_node(cls, index, warnet, tank=None): - assert index is not None - index = int(index) - self = tank - if self is None: - self = cls(index, warnet) - node = warnet.graph.nodes[index] - self.parse_graph_node(node) - return self - - @property - def suffix(self): - if self._suffix is None: - self._suffix = f"{self.index:06}" - return self._suffix - - @property - def ipv4(self): - if self._ipv4 is None: - self._ipv4 = generate_ipv4_addr(self.warnet.subnet) - return self._ipv4 - - @property - def exporter_name(self): - if self._exporter_name is None: - self._exporter_name = f"{self.network_name}-{CONTAINER_PREFIX_PROMETHEUS}-{self.suffix}" - return self._exporter_name - - @property - def status(self) -> RunningStatus: - return self.warnet.container_interface.get_status(self.index, ServiceType.BITCOIN) - - @exponential_backoff() - def exec(self, cmd: str): - return self.warnet.container_interface.exec_run(self.index, ServiceType.BITCOIN, cmd=cmd) - - def get_dns_addr(self) -> str: - dns_addr = self.warnet.container_interface.get_tank_dns_addr(self.index) - return dns_addr - - def get_ip_addr(self) -> str: - ip_addr = self.warnet.container_interface.get_tank_ip_addr(self.index) - return ip_addr - - def get_bitcoin_conf(self, nodes: list[str]) -> str: - conf = CONFIG_BASE - conf += f" -rpcuser={self.rpc_user}" - conf += f" -rpcpassword={self.rpc_password}" - conf += f" -rpcport={self.rpc_port}" - conf += f" -zmqpubrawblock=tcp://0.0.0.0:{self.zmqblockport}" - conf += f" -zmqpubrawtx=tcp://0.0.0.0:{self.zmqtxport}" - conf += " " + self.bitcoin_config - for node in nodes: - conf += f" -addnode={node}" - return conf - - def apply_network_conditions(self): - if self.netem is None: - return - - if not sanitize_tc_netem_command(self.netem): - logger.warning( - f"Not applying unsafe tc-netem conditions to tank {self.index}: `{self.netem}`" - ) - return - - # Apply the network condition to the container - try: - self.exec(self.netem) - logger.info( - f"Successfully applied network conditions to tank {self.index}: `{self.netem}`" - ) - except Exception as e: - logger.error( - f"Error applying network conditions to tank {self.index}: `{self.netem}` ({e})" - ) - - def export(self, config: object, tar_file): - if self.lnnode is not None: - self.lnnode.export(config, tar_file) diff --git a/src/warnet/warnet.py b/src/warnet/warnet.py deleted file mode 100644 index 67cd52002..000000000 --- a/src/warnet/warnet.py +++ /dev/null @@ -1,283 +0,0 @@ -""" -Warnet is the top-level class for a simulated network. -""" - -import base64 -import json -import logging -from pathlib import Path - -import networkx - -from .backend.kubernetes_backend import KubernetesBackend -from .tank import Tank -from .utils import gen_config_dir, load_schema, validate_graph_schema - -logger = logging.getLogger("warnet") - - -class Warnet: - def __init__(self, config_dir, network_name: str): - self.config_dir: Path = config_dir - self.config_dir.mkdir(parents=True, exist_ok=True) - self.container_interface = KubernetesBackend(config_dir, network_name) - self.bitcoin_network: str = "regtest" - self.network_name: str = "warnet" - self.subnet: str = "100.0.0.0/8" - self.graph: networkx.Graph | None = None - self.graph_name = "graph.graphml" - self.tanks: list[Tank] = [] - self.deployment_file: Path | None = None - self.graph_schema = load_schema() - self.services = [] - - def _warnet_dict_representation(self) -> dict: - repr = {} - # Warnet - repr["warnet_headers"] = [ - "Temp dir", - "Bitcoin network", - "Docker network", - "Subnet", - "Graph", - ] - repr["warnet"] = [ - [ - str(self.config_dir), - self.bitcoin_network, - self.network_name, - self.subnet, - str(self.graph), - ] - ] - - # Tanks - tank_headers = [ - "Index", - "Version", - "IPv4", - "bitcoin conf", - "tc_netem", - "LN", - "LN Image", - "LN IPv4", - ] - has_ln = any(tank.lnnode and tank.lnnode.impl for tank in self.tanks) - tanks = [] - for tank in self.tanks: - tank_data = [ - tank.index, - tank.version if tank.version else tank.image, - tank.ipv4, - tank.bitcoin_config, - tank.netem, - ] - if has_ln: - tank_data.extend( - [ - tank.lnnode.impl if tank.lnnode else "", - tank.lnnode.image if tank.lnnode else "", - tank.lnnode.ipv4 if tank.lnnode else "", - ] - ) - tanks.append(tank_data) - if not has_ln: - tank_headers.remove("LN") - tank_headers.remove("LN IPv4") - - repr["tank_headers"] = tank_headers - repr["tanks"] = tanks - - return repr - - @classmethod - def from_graph_file( - cls, - base64_graph: str, - config_dir: Path, - network: str = "warnet", - ): - self = cls(config_dir, network) - destination = self.config_dir / self.graph_name - destination.parent.mkdir(parents=True, exist_ok=True) - graph_file = base64.b64decode(base64_graph) - with open(destination, "wb") as f: - f.write(graph_file) - self.network_name = network - self.graph = networkx.parse_graphml( - graph_file.decode("utf-8"), node_type=int, force_multigraph=True - ) - validate_graph_schema(self.graph) - self.tanks_from_graph() - if "services" in self.graph.graph: - self.services = self.graph.graph["services"].split() - logger.info(f"Created Warnet using directory {self.config_dir}") - return self - - @classmethod - def from_graph(cls, graph, network="warnet"): - self = cls(Path(), network) - self.graph = graph - validate_graph_schema(self.graph) - self.tanks_from_graph() - if "services" in self.graph.graph: - self.services = self.graph.graph["services"].split() - logger.info(f"Created Warnet using directory {self.config_dir}") - return self - - @classmethod - def from_network(cls, network_name): - config_dir = gen_config_dir(network_name) - self = cls(config_dir, network_name) - self.network_name = network_name - # Get network graph edges from graph file (required for network restarts) - self.graph = networkx.read_graphml( - Path(self.config_dir / self.graph_name), node_type=int, force_multigraph=True - ) - validate_graph_schema(self.graph) - self.tanks_from_graph() - if "services" in self.graph.graph: - self.services = self.graph.graph["services"].split() - for tank in self.tanks: - tank._ipv4 = self.container_interface.get_tank_ipv4(tank.index) - return self - - def tanks_from_graph(self): - if not self.graph: - return - for node_id in self.graph.nodes(): - if int(node_id) != len(self.tanks): - raise Exception( - f"Node ID in graph must be incrementing integers (got '{node_id}', expected '{len(self.tanks)}')" - ) - tank = Tank.from_graph_node(node_id, self) - # import edges as list of destinations to connect to - for edge in self.graph.edges(data=True): - (src, dst, data) = edge - # Ignore LN edges for now - if "channel_open" in data: - continue - if src == node_id: - tank.init_peers.append(int(dst)) - self.tanks.append(tank) - logger.info(f"Imported {len(self.tanks)} tanks from graph") - - def apply_network_conditions(self): - for tank in self.tanks: - tank.apply_network_conditions() - - def warnet_build(self): - self.container_interface.build() - - def get_ln_node_from_tank(self, index): - return self.tanks[index].lnnode - - def warnet_up(self): - self.container_interface.up(self) - - def warnet_down(self): - self.container_interface.down(self) - - def generate_deployment(self): - self.container_interface.generate_deployment_file(self) - - # if "forkobserver" in self.services: - # self.write_fork_observer_config() - # if "addrmanobserver" in self.services: - # self.write_addrman_observer_config() - # if "grafana" in self.services: - # self.write_grafana_config() - # if "prometheus" in self.services: - # self.write_prometheus_config() - - # def write_fork_observer_config(self): - # src = FO_CONF_NAME - # dst = self.config_dir / FO_CONF_NAME - # shutil.copy(src, dst) - # with open(dst, "a") as f: - # for tank in self.tanks: - # f.write( - # f""" - # [[networks.nodes]] - # id = {tank.index} - # name = "Node {tank.index}" - # description = "Warnet tank {tank.index}" - # rpc_host = "{tank.ipv4}" - # rpc_port = {tank.rpc_port} - # rpc_user = "{tank.rpc_user}" - # rpc_password = "{tank.rpc_password}" - # """ - # ) - # logger.info(f"Wrote file: {dst}") - - # def write_addrman_observer_config(self): - # src = AO_CONF_NAME - # dst = self.config_dir / AO_CONF_NAME - # shutil.copy(src, dst) - # with open(dst, "a") as f: - # for tank in self.tanks: - # f.write( - # f""" - # [[nodes]] - # id = {tank.index} - # name = "node-{tank.index}" - # rpc_host = "{tank.ipv4}" - # rpc_port = {tank.rpc_port} - # rpc_user = "{tank.rpc_user}" - # rpc_password = "{tank.rpc_password}" - # """ - # ) - # logger.info(f"Wrote file: {dst}") - - # def write_grafana_config(self): - # src = GRAFANA_PROVISIONING - # dst = self.config_dir / GRAFANA_PROVISIONING - # shutil.copytree(src, dst, dirs_exist_ok=True) - # logger.info(f"Wrote directory: {dst}") - - # def write_prometheus_config(self): - # scrape_configs = [ - # { - # "job_name": "cadvisor", - # "scrape_interval": "15s", - # "static_configs": [{"targets": [f"{self.network_name}_cadvisor:8080"]}], - # } - # ] - # for tank in self.tanks: - # if tank.exporter: - # scrape_configs.append( - # { - # "job_name": tank.exporter_name, - # "scrape_interval": "5s", - # "static_configs": [{"targets": [f"{tank.exporter_name}:9332"]}], - # } - # ) - # config = {"global": {"scrape_interval": "15s"}, "scrape_configs": scrape_configs} - # prometheus_path = self.config_dir / PROM_CONF_NAME - # try: - # with open(prometheus_path, "w") as file: - # yaml.dump(config, file) - # logger.info(f"Wrote file: {prometheus_path}") - # except Exception as e: - # logger.error(f"An error occurred while writing to {prometheus_path}: {e}") - - def export(self, config: object, tar_file, exclude: list[int]): - for tank in self.tanks: - if tank.index not in exclude: - tank.export(config, tar_file) - - def wait_for_health(self): - self.container_interface.wait_for_healthy_tanks(self) - - def network_connected(self): - for tank in self.tanks: - peerinfo = json.loads(self.container_interface.get_bitcoin_cli(tank, "getpeerinfo")) - manuals = 0 - for peer in peerinfo: - if peer["connection_type"] == "manual": - manuals += 1 - # Even if more edges are specifed, bitcoind only allows - # 8 manual outbound connections - if min(8, len(tank.init_peers)) > manuals: - return False - return True