Skip to content

Commit 5c95e14

Browse files
authored
feat: Adds fragment exporter tool | NPG-0000 (#546)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. List any dependencies that are required for this change. Fixes # (issue) _if applicable_ ## Type of change Please delete options that are not relevant. - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration - [ ] Test A - [ ] Test B **Test Configuration**: _if applicable_ - Firmware version: - Hardware: - Toolchain: - SDK: **List of new dependencies**: _if applicable_ Provide a list of newly added dependencies in this PR, with the description of what are they doing and why do we need them. - Crate A: description - Crate B: description ## Checklist - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [ ] Any dependent changes have been merged and published in downstream modules
1 parent 60932e4 commit 5c95e14

File tree

16 files changed

+1096
-1
lines changed

16 files changed

+1096
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ jobs:
105105
with:
106106
deployment_repo: input-output-hk/catalyst-world
107107
# NOTE: For new services being deployed, this list must be updated
108-
images: cat-data-service migrations voting-node
108+
images: cat-data-service fragment-exporter migrations voting-node
109109
environment: dev
110110
tag: ${{ env.TAG }}
111111
token: ${{ steps.setup.outputs.token }}

nix/automation/devshells.nix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
# Misc tools
4343
jq
44+
poetry
4445
python310Packages.pylddwrap
4546
];
4647
};
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Set the Earthly version to 0.7
2+
VERSION 0.7
3+
4+
# Use current debian stable with python
5+
FROM python:3.11-slim-bookworm
6+
7+
poetry:
8+
WORKDIR /work
9+
10+
ENV POETRY_HOME=/tmp/poetry
11+
ENV PATH=$POETRY_HOME/bin:$PATH
12+
13+
RUN apt-get update && \
14+
apt-get install -y --no-install-recommends curl
15+
16+
RUN curl -sSL https://install.python-poetry.org | python3 -
17+
18+
COPY pyproject.toml .
19+
COPY poetry.lock .
20+
21+
RUN poetry install --only main --no-root
22+
23+
src:
24+
FROM +poetry
25+
26+
COPY --dir fragment_exporter README.md .
27+
28+
check:
29+
FROM +src
30+
31+
COPY --dir tests tests
32+
33+
RUN poetry install --only dev
34+
RUN poetry run black --check .
35+
RUN poetry run ruff check .
36+
RUN poetry run pytest -v
37+
38+
build:
39+
FROM +check
40+
41+
RUN poetry export --without-hashes -f requirements.txt --output requirements.txt
42+
RUN poetry build --no-cache -f wheel
43+
44+
SAVE ARTIFACT dist
45+
SAVE ARTIFACT requirements.txt
46+
47+
docker:
48+
ARG tag="latest"
49+
ARG registry
50+
51+
WORKDIR /app
52+
53+
COPY +build/dist .
54+
COPY +build/requirements.txt .
55+
COPY entry.sh /app
56+
57+
RUN chmod +x entry.sh
58+
RUN pip3 install --no-cache -r requirements.txt
59+
RUN pip3 install --no-cache *.whl
60+
61+
ENTRYPOINT ["/app/entry.sh"]
62+
63+
SAVE IMAGE --push ${registry}fragment-exporter:$tag
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Fragment Exporter
2+
3+
This is a small utility for exporting fragment logs from Jormungandr leader
4+
nodes to `stdout`. This exporter is intended to be run alongside the leaders and
5+
scraped by Loki in order to trace fragments logs for leader nodes.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#!/usr/bin/env bash
2+
3+
# Enable strict mode
4+
set +x
5+
set -o errexit
6+
set -o pipefail
7+
set -o nounset
8+
set -o functrace
9+
set -o errtrace
10+
set -o monitor
11+
set -o posix
12+
shopt -s dotglob
13+
14+
check_env_vars() {
15+
local env_vars=("$@")
16+
17+
# Iterate over the array and check if each variable is set
18+
for var in "${env_vars[@]}"; do
19+
echo "Checking $var"
20+
if [ -z "${!var}" ]; then
21+
echo ">>> Error: $var is required and not set."
22+
exit 1
23+
fi
24+
done
25+
}
26+
27+
debug_sleep() {
28+
if [ -n "${DEBUG_SLEEP:-}" ]; then
29+
echo "DEBUG_SLEEP is set. Sleeping for ${DEBUG_SLEEP} seconds..."
30+
sleep "${DEBUG_SLEEP}"
31+
fi
32+
}
33+
34+
echo ">>> Starting entrypoint script..."
35+
36+
# Check if all required environment variables are set
37+
REQUIRED_ENV=(
38+
"LEADER_IP_PREFIX"
39+
"LEADER_PORT"
40+
"FRAGMENT_EXPORTER_INDEX_PATH"
41+
)
42+
echo ">>> Checking required env vars..."
43+
check_env_vars "${REQUIRED_ENV[@]}"
44+
45+
# Calculate the correct leader IP address (e.g. <prefix>.<index>)
46+
HOSTNAME=$(hostname)
47+
export FRAGMENT_EXPORTER_URL="http://${LEADER_IP_PREFIX}.${HOSTNAME##*-}:${LEADER_PORT}"
48+
49+
# Print out the environment variables
50+
echo ">>> Using FRAGMENT_EXPORTER_URL: ${FRAGMENT_EXPORTER_URL}"
51+
echo ">>> Using FRAGMENT_EXPORTER_INDEX_PATH: ${FRAGMENT_EXPORTER_INDEX_PATH}"
52+
53+
# Sleep if DEBUG_SLEEP is set
54+
debug_sleep
55+
56+
# Define the command to be executed
57+
CMD_TO_RUN="fragment_exporter"
58+
59+
# Add $* to the command so that additional flags can be passed
60+
ARGS="$*"
61+
CMD="$CMD_TO_RUN $ARGS"
62+
63+
# Expand the command with arguments and capture the exit code
64+
eval "$CMD"

utilities/fragment-exporter/fragment_exporter/__init__.py

Whitespace-only changes.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from urllib.parse import urljoin
2+
3+
import requests
4+
import ijson
5+
import json
6+
import hashlib
7+
from .index import Index
8+
from .node import Node
9+
10+
11+
class Exporter:
12+
"""Exports fragments from a node to stdout.
13+
14+
Attributes:
15+
url: The URL of the node
16+
index: A index for storing fragments
17+
node: The node to export from
18+
"""
19+
20+
def __init__(self, url: str, index: Index, node: Node):
21+
self.url = urljoin(url, "/api/v0/fragment/logs")
22+
self.index = index
23+
self.node = node
24+
25+
def run(self):
26+
"""Runs the exporter."""
27+
# Purge the index if the node has been restarted
28+
current_block_height = self.node.get_last_block_height()
29+
previous_block_height = self.index.get_last_block_height()
30+
if current_block_height < previous_block_height:
31+
print("Node has been restarted. Purging index.")
32+
self.index.purge()
33+
34+
# Update the last block height
35+
self.index.set_last_block_height(current_block_height)
36+
37+
# Get the latest fragments
38+
response = requests.get(self.url, stream=True)
39+
response.raise_for_status()
40+
fragments = ijson.items(response.raw, "item")
41+
42+
# Find and print new fragments
43+
for fragment in fragments:
44+
if self.index.exists(hash(fragment)):
45+
continue
46+
47+
print(json.dumps(fragment))
48+
self.index.insert(hash(fragment))
49+
50+
51+
def hash(fragment):
52+
"""Returns the hash of a fragment.
53+
54+
Args:
55+
fragment: The fragment to hash
56+
57+
Returns:
58+
The hash of the fragment
59+
"""
60+
return hashlib.sha256(json.dumps(fragment).encode("utf-8")).hexdigest()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import sqlite3
2+
import os
3+
4+
5+
class Index:
6+
"""Represents a local index of fragments.
7+
8+
Attributes:
9+
db_path: The path to the index database
10+
"""
11+
12+
def __init__(self, path: str):
13+
self.db_path = os.path.join(path, "index.db")
14+
self._init_db()
15+
16+
def _init_db(self):
17+
"""Initializes the index database."""
18+
with sqlite3.connect(self.db_path) as conn:
19+
conn.execute(
20+
"""
21+
CREATE TABLE IF NOT EXISTS fragments (
22+
fragment_id TEXT PRIMARY KEY
23+
)
24+
"""
25+
)
26+
conn.execute(
27+
"""
28+
CREATE TABLE IF NOT EXISTS node_stats (
29+
key TEXT PRIMARY KEY,
30+
value INTEGER
31+
)
32+
"""
33+
)
34+
conn.commit()
35+
36+
def insert(self, fragment_id: str):
37+
"""Inserts a fragment into the index.
38+
39+
Args:
40+
fragment_id: The ID of the fragment to insert
41+
"""
42+
with sqlite3.connect(self.db_path) as conn:
43+
conn.execute(
44+
"""
45+
INSERT OR IGNORE INTO fragments (fragment_id) VALUES (?)
46+
""",
47+
(fragment_id,),
48+
)
49+
conn.commit()
50+
51+
def exists(self, fragment_id: str) -> bool:
52+
"""Checks if a fragment exists in the index.
53+
54+
Args:
55+
fragment_id: The ID of the fragment to check
56+
57+
Returns:
58+
True if the fragment exists, False otherwise
59+
"""
60+
with sqlite3.connect(self.db_path) as conn:
61+
cur = conn.execute(
62+
"""
63+
SELECT 1 FROM fragments WHERE fragment_id = ?
64+
""",
65+
(fragment_id,),
66+
)
67+
return cur.fetchone() is not None
68+
69+
def set_last_block_height(self, height: int):
70+
"""Sets the last block height in the index.
71+
72+
Args:
73+
height: The last block height
74+
"""
75+
with sqlite3.connect(self.db_path) as conn:
76+
conn.execute(
77+
"""
78+
INSERT OR REPLACE INTO node_stats
79+
(key, value)
80+
VALUES
81+
('last_block_height', ?)
82+
""",
83+
(height,),
84+
)
85+
conn.commit()
86+
87+
def get_last_block_height(self) -> int:
88+
"""Gets the last block height from the index.
89+
90+
Returns:
91+
The last block height
92+
"""
93+
with sqlite3.connect(self.db_path) as conn:
94+
cur = conn.execute(
95+
"""
96+
SELECT value FROM node_stats WHERE key = 'last_block_height'
97+
"""
98+
)
99+
result = cur.fetchone()
100+
return result[0] if result else 0
101+
102+
def purge(self):
103+
"""Purges the index."""
104+
with sqlite3.connect(self.db_path) as conn:
105+
conn.execute(
106+
"""
107+
DELETE FROM fragments
108+
"""
109+
)
110+
conn.commit()
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import argparse
2+
import os
3+
import sys
4+
import time
5+
6+
from loguru import logger
7+
8+
from .exporter import Exporter
9+
from .index import Index
10+
from .node import Node
11+
12+
13+
def main():
14+
logger.remove()
15+
logger.add(sys.stderr, serialize=True)
16+
parser = argparse.ArgumentParser(description="Exports node fragment logs to stdout")
17+
parser.add_argument(
18+
"--url",
19+
type=str,
20+
default=os.environ.get("FRAGMENT_EXPORTER_URL"),
21+
help="URL of the node to export from",
22+
required=not bool(os.environ.get("FRAGMENT_EXPORTER_URL")),
23+
)
24+
parser.add_argument(
25+
"--index-path",
26+
type=str,
27+
default=os.environ.get("FRAGMENT_EXPORTER_INDEX_PATH"),
28+
help="A local path to a directory where the index file will be stored",
29+
required=not bool(os.environ.get("FRAGMENT_EXPORTER_INDEX_PATH")),
30+
)
31+
parser.add_argument(
32+
"--interval",
33+
type=int,
34+
default=int(os.environ.get("FRAGMENT_EXPORTER_INTERVAL", 3600)),
35+
help="Interval in seconds between consecutive export runs",
36+
)
37+
38+
args = parser.parse_args()
39+
if not os.path.exists(args.index_path):
40+
logger.error(f"Path {args.index_path} does not exist")
41+
sys.exit(1)
42+
43+
node = Node(args.url)
44+
index = Index(args.index_path)
45+
exporter = Exporter(args.url, index, node)
46+
47+
while True:
48+
logger.info("Entering main control loop")
49+
50+
try:
51+
logger.info("Running exporter")
52+
exporter.run()
53+
except Exception as e:
54+
logger.error(f"Error encountered during export: {e}")
55+
finally:
56+
logger.info(f"Sleeping for {args.interval} seconds")
57+
time.sleep(args.interval)
58+
59+
60+
if __name__ == "__main__":
61+
main()

0 commit comments

Comments
 (0)