diff --git a/Makefile b/Makefile
index 3ad7669..f2afeb5 100644
--- a/Makefile
+++ b/Makefile
@@ -14,8 +14,7 @@ $(eval pytest := $(venvpath)/bin/pytest)
$(eval bumpversion := $(venvpath)/bin/bumpversion)
$(eval twine := $(venvpath)/bin/twine)
$(eval sphinx := $(venvpath)/bin/sphinx-build)
-$(eval black := $(venvpath)/bin/black)
-$(eval isort := $(venvpath)/bin/isort)
+$(eval ruff := $(venvpath)/bin/ruff)
# Setup Python virtualenv
@@ -54,8 +53,8 @@ test-coverage: install-tests
# Formatting
# ----------
format: install-releasetools
- $(isort) grafana_wtf test
- $(black) .
+ $(ruff) format
+ $(ruff) check --fix --ignore=ERA --ignore=F401 --ignore=F841 --ignore=T20 --ignore=ERA001 .
# -------
diff --git a/grafana_wtf/commands.py b/grafana_wtf/commands.py
index 9e83eb4..c479e71 100644
--- a/grafana_wtf/commands.py
+++ b/grafana_wtf/commands.py
@@ -180,7 +180,7 @@ def run():
export CACHE_TTL=infinite
grafana-wtf find geohash
- """
+ """ # noqa: E501
# Parse command line arguments
options = normalize_options(docopt(run.__doc__, version=f"{__appname__} {__version__}"))
@@ -225,7 +225,8 @@ def run():
# Sanity checks
if grafana_url is None:
raise DocoptExit(
- 'No Grafana URL given. Please use "--grafana-url" option or environment variable "GRAFANA_URL".'
+ 'No Grafana URL given. Please use "--grafana-url" option '
+ 'or environment variable "GRAFANA_URL".'
)
log.info(f"Grafana location: {grafana_url}")
@@ -273,7 +274,8 @@ def run():
# Sanity checks.
if output_format.startswith("tab") and options.sql:
raise DocoptExit(
- f"Options --format={output_format} and --sql can not be used together, only data output is supported."
+ f"Options --format={output_format} and --sql can not be used together, "
+ f"only data output is supported."
)
entries = engine.log(dashboard_uid=options.dashboard_uid)
@@ -319,7 +321,9 @@ def run():
output_results(output_format, results)
if options.explore and options.dashboards:
- results = engine.explore_dashboards(with_data_details=options.data_details, queries_only=options.queries_only)
+ results = engine.explore_dashboards(
+ with_data_details=options.data_details, queries_only=options.queries_only
+ )
output_results(output_format, results)
if options.explore and options.permissions:
diff --git a/grafana_wtf/core.py b/grafana_wtf/core.py
index 131f64e..bf3ac57 100644
--- a/grafana_wtf/core.py
+++ b/grafana_wtf/core.py
@@ -36,9 +36,9 @@
class GrafanaEngine:
-
# Configure a larger HTTP request pool.
- # TODO: Review the pool settings and eventually adjust according to concurrency level or other parameters.
+ # TODO: Review the pool settings and eventually adjust according
+ # to concurrency level or other parameters.
# https://urllib3.readthedocs.io/en/latest/advanced-usage.html#customizing-pool-behavior
# https://laike9m.com/blog/requests-secret-pool_connections-and-pool_maxsize,89/
session_args = dict(pool_connections=100, pool_maxsize=100, retries=5)
@@ -52,7 +52,9 @@ def __init__(self, grafana_url, grafana_token=None):
self.concurrency = 5
- self.grafana = self.grafana_client_factory(self.grafana_url, grafana_token=self.grafana_token)
+ self.grafana = self.grafana_client_factory(
+ self.grafana_url, grafana_token=self.grafana_token
+ )
self.set_user_agent()
self.data = GrafanaDataModel()
self.finder = JsonPathFinder()
@@ -66,14 +68,18 @@ def set_session(self, session):
def enable_cache(self, expire_after=60, drop_cache=False):
if expire_after is None:
- log.info(f"Response cache will never expire (infinite caching)")
+ log.info("Response cache will never expire (infinite caching)")
elif expire_after == 0:
- log.info(f"Response cache will expire immediately (expire_after=0)")
+ log.info("Response cache will expire immediately (expire_after=0)")
else:
log.info(f"Response cache will expire after {expire_after} seconds")
session = CachedSession(
- cache_name=__appname__, expire_after=expire_after, use_cache_dir=True, wal=True, **self.session_args
+ cache_name=__appname__,
+ expire_after=expire_after,
+ use_cache_dir=True,
+ wal=True,
+ **self.session_args,
)
self.set_session(session)
self.set_user_agent()
@@ -86,7 +92,7 @@ def enable_cache(self, expire_after=60, drop_cache=False):
return self
def clear_cache(self):
- log.info(f"Clearing cache")
+ log.info("Clearing cache")
requests_cache.clear()
def enable_concurrency(self, concurrency):
@@ -171,7 +177,11 @@ def scan_notifications(self):
if Version(self.grafana.version) < Version("11"):
self.data.notifications = self.grafana.notifications.lookup_channels()
else:
- warnings.warn("Notification channel scanning support for Grafana 11 is not implemented yet", UserWarning)
+ warnings.warn(
+ "Notification channel scanning support for Grafana 11 is not implemented yet",
+ UserWarning,
+ stacklevel=2,
+ )
def scan_datasources(self):
log.info("Scanning datasources")
@@ -185,7 +195,8 @@ def scan_datasources(self):
if isinstance(ex, GrafanaUnauthorizedError):
log.error(
self.get_red_message(
- "Please use --grafana-token or GRAFANA_TOKEN " "for authenticating with Grafana"
+ "Please use --grafana-token or GRAFANA_TOKEN "
+ "for authenticating with Grafana"
)
)
@@ -207,7 +218,7 @@ def scan_dashboards(self, dashboard_uids=None):
except GrafanaClientError as ex:
self.handle_grafana_error(ex)
- return
+ return None
if self.progressbar:
self.start_progressbar(len(self.data.dashboard_list))
@@ -221,7 +232,9 @@ def scan_dashboards(self, dashboard_uids=None):
self.taqadum.close()
# Improve determinism by returning stable sort order.
- self.data.dashboards = munchify(sorted(self.data.dashboards, key=lambda x: x["dashboard"]["uid"]))
+ self.data.dashboards = munchify(
+ sorted(self.data.dashboards, key=lambda x: x["dashboard"]["uid"])
+ )
return self.data.dashboards
@@ -231,7 +244,9 @@ def handle_grafana_error(self, ex):
log.error(self.get_red_message(message))
if isinstance(ex, GrafanaUnauthorizedError):
log.error(
- self.get_red_message("Please use --grafana-token or GRAFANA_TOKEN " "for authenticating with Grafana")
+ self.get_red_message(
+ "Please use --grafana-token or GRAFANA_TOKEN for authenticating with Grafana"
+ )
)
def fetch_dashboard(self, dashboard_info):
@@ -270,10 +285,13 @@ async def execute_parallel(self):
# for response in await asyncio.gather(*tasks):
# pass
+ @staticmethod
+ def get_red_message(message):
+ return colored.stylize(message, colored.fg("red") + colored.attr("bold"))
+
class GrafanaWtf(GrafanaEngine):
def info(self):
-
response = OrderedDict(
grafana=OrderedDict(
version=self.version,
@@ -308,7 +326,9 @@ def info(self):
# Count numbers of panels, annotations and variables for all dashboards.
try:
- dashboard_summary = OrderedDict(dashboard_panels=0, dashboard_annotations=0, dashboard_templating=0)
+ dashboard_summary = OrderedDict(
+ dashboard_panels=0, dashboard_annotations=0, dashboard_templating=0
+ )
for dbdetails in self.dashboard_details():
# TODO: Should there any deduplication be applied when counting those entities?
dashboard_summary["dashboard_panels"] += len(dbdetails.panels)
@@ -324,7 +344,9 @@ def info(self):
def build_info(self):
response = None
error = None
- error_template = f"The request to {self.grafana_url.rstrip('/')}/api/frontend/settings failed"
+ error_template = (
+ f"The request to {self.grafana_url.rstrip('/')}/api/frontend/settings failed"
+ )
try:
response = self.grafana.client.GET("/frontend/settings")
if not isinstance(response, dict):
@@ -353,7 +375,9 @@ def dashboard_details(self):
yield DashboardDetails(dashboard=dashboard)
def search(self, expression):
- log.info('Searching Grafana at "{}" for expression "{}"'.format(self.grafana_url, expression))
+ log.info(
+ 'Searching Grafana at "{}" for expression "{}"'.format(self.grafana_url, expression)
+ )
results = Munch(datasources=[], dashboard_list=[], dashboards=[])
@@ -370,7 +394,9 @@ def search(self, expression):
def replace(self, expression, replacement, dry_run: bool = False):
if dry_run:
log.info("Dry-run mode enabled, skipping any actions")
- log.info(f'Replacing "{expression}" by "{replacement}" within Grafana at "{self.grafana_url}"')
+ log.info(
+ f'Replacing "{expression}" by "{replacement}" within Grafana at "{self.grafana_url}"'
+ )
for dashboard in self.data.dashboards:
payload_before = json.dumps(dashboard)
payload_after = payload_before.replace(expression, replacement)
@@ -433,29 +459,27 @@ def search_items(self, expression, items, results):
if effective_item:
results.append(effective_item)
- @staticmethod
- def get_red_message(message):
- return colored.stylize(message, colored.fg("red") + colored.attr("bold"))
-
def get_dashboard_versions(self, dashboard_id):
# https://grafana.com/docs/http_api/dashboard_versions/
get_dashboard_versions_path = "/dashboards/id/%s/versions" % dashboard_id
- r = self.grafana.dashboard.client.GET(get_dashboard_versions_path)
- return r
+ return self.grafana.dashboard.client.GET(get_dashboard_versions_path)
def explore_datasources(self):
# Prepare indexes, mapping dashboards by uid, datasources by name
# as well as dashboards to datasources and vice versa.
ix = Indexer(engine=self)
- # Compute list of exploration items, associating datasources with the dashboards that use them.
+ # Compute list of exploration items, associating
+ # datasources with the dashboards that use them.
results_used = []
results_unused = []
for datasource in ix.datasources:
ds_identifier = datasource.get("uid", datasource.get("name"))
dashboard_uids = ix.datasource_dashboard_index.get(ds_identifier, [])
dashboards = list(map(ix.dashboard_by_uid.get, dashboard_uids))
- item = DatasourceExplorationItem(datasource=datasource, used_in=dashboards, grafana_url=self.grafana_url)
+ item = DatasourceExplorationItem(
+ datasource=datasource, used_in=dashboards, grafana_url=self.grafana_url
+ )
# Format results in a more compact form, using only a subset of all the attributes.
result = item.format_compact()
@@ -466,16 +490,18 @@ def explore_datasources(self):
if result not in results_unused:
results_unused.append(result)
- results_used = sorted(results_used, key=lambda x: x["datasource"]["name"] or x["datasource"]["uid"])
- results_unused = sorted(results_unused, key=lambda x: x["datasource"]["name"] or x["datasource"]["uid"])
+ results_used = sorted(
+ results_used, key=lambda x: x["datasource"]["name"] or x["datasource"]["uid"]
+ )
+ results_unused = sorted(
+ results_unused, key=lambda x: x["datasource"]["name"] or x["datasource"]["uid"]
+ )
- response = OrderedDict(
+ return OrderedDict(
used=results_used,
unused=results_unused,
)
- return response
-
def explore_dashboards(self, with_data_details: bool = False, queries_only: bool = False):
# Prepare indexes, mapping dashboards by uid, datasources by name
# as well as dashboards to datasources and vice versa.
@@ -484,7 +510,8 @@ def explore_dashboards(self, with_data_details: bool = False, queries_only: bool
# Those dashboard names or uids will be ignored.
ignore_dashboards = ["-- Grafana --", "-- Mixed --", "grafana", "-- Dashboard --"]
- # Compute list of exploration items, looking for dashboards with missing data sources.
+ # Compute list of exploration items, looking
+ # for dashboards with missing data sources.
results = []
for uid in sorted(ix.dashboard_by_uid):
dashboard = ix.dashboard_by_uid[uid]
@@ -597,13 +624,17 @@ def channels_list_by_uid(self, channel_uid):
for dashboard in dashboards:
for panel in dashboard["dashboard"].get("panels", []):
if "alert" in panel and panel["alert"]["notifications"]:
- related_panels += self.extract_channel_related_information(channel_uid, dashboard, panel)
+ related_panels += self.extract_channel_related_information(
+ channel_uid, dashboard, panel
+ )
# Some dashboards have a deeper nested structure
elif "panels" in panel:
for subpanel in panel["panels"]:
if "alert" in subpanel and subpanel["alert"]["notifications"]:
- related_panels += self.extract_channel_related_information(channel_uid, dashboard, subpanel)
+ related_panels += self.extract_channel_related_information(
+ channel_uid, dashboard, subpanel
+ )
if related_panels:
channel["related_panels"] = related_panels
return channel
@@ -613,7 +644,9 @@ def extract_channel_related_information(channel_uid, dashboard, panel):
related_information = []
for notification in panel["alert"]["notifications"]:
if "uid" in notification and notification["uid"] == channel_uid:
- related_information.append({"dashboard": dashboard["dashboard"]["title"], "panel": panel["title"]})
+ related_information.append(
+ {"dashboard": dashboard["dashboard"]["title"], "panel": panel["title"]}
+ )
return related_information
diff --git a/grafana_wtf/model.py b/grafana_wtf/model.py
index 40c1945..917cc1a 100644
--- a/grafana_wtf/model.py
+++ b/grafana_wtf/model.py
@@ -120,7 +120,9 @@ def _format_data_node_compact(item: Dict) -> Dict:
def queries_only(self):
"""
- Return a representation of data details information, only where query expressions are present.
+ Return a representation of data details information.
+
+ Only where query expressions are present.
"""
# All attributes containing query-likes.
attributes_query_likes = ["expr", "jql", "query", "rawSql", "target"]
@@ -145,7 +147,11 @@ def transform(section):
continue
# Unnest items with nested "query" slot.
for slot in ["query", "target"]:
- if slot in new_item and isinstance(new_item[slot], dict) and "query" in new_item[slot]:
+ if (
+ slot in new_item
+ and isinstance(new_item[slot], dict)
+ and "query" in new_item[slot]
+ ):
new_item["query"] = new_item[slot]["query"]
new_items.append(new_item)
return new_items
@@ -196,6 +202,7 @@ def validate(cls, data: dict):
{data}
""".strip(),
UserWarning,
+ stacklevel=2,
)
del data["datasource"]
diff --git a/grafana_wtf/report/data.py b/grafana_wtf/report/data.py
index 840cfd6..6bb9139 100644
--- a/grafana_wtf/report/data.py
+++ b/grafana_wtf/report/data.py
@@ -28,7 +28,7 @@ def serialize_results(output_format: str, results: List):
class DataSearchReport(TabularSearchReport):
- def __init__(self, grafana_url, verbose=False, format=None):
+ def __init__(self, grafana_url, verbose=False, format=None): # noqa: A002
self.grafana_url = grafana_url
self.verbose = verbose
self.format = format
@@ -42,7 +42,11 @@ def display(self, expression, result):
grafana=self.grafana_url,
expression=expression,
),
- datasources=self.get_output_items("Datasource", result.datasources, self.compute_url_datasource),
- dashboards=self.get_output_items("Dashboard", result.dashboards, self.compute_url_dashboard),
+ datasources=self.get_output_items(
+ "Datasource", result.datasources, self.compute_url_datasource
+ ),
+ dashboards=self.get_output_items(
+ "Dashboard", result.dashboards, self.compute_url_dashboard
+ ),
)
output_results(self.format, output)
diff --git a/grafana_wtf/report/tabular.py b/grafana_wtf/report/tabular.py
index bd38bf6..bf46b9a 100644
--- a/grafana_wtf/report/tabular.py
+++ b/grafana_wtf/report/tabular.py
@@ -13,7 +13,7 @@ def get_table_format(output_format):
if output_format is not None and output_format.startswith("tabular"):
try:
tablefmt = output_format.split(":")[1]
- except:
+ except Exception:
tablefmt = "psql"
return tablefmt
@@ -29,7 +29,7 @@ def output_items(self, label, items, url_callback):
print(tabulate(items_rows, headers="keys", tablefmt=self.format))
def get_output_items(self, label, items, url_callback):
- items_rows = [
+ return [
{
"Type": label,
"Name": self.get_item_name(item),
@@ -37,7 +37,6 @@ def get_output_items(self, label, items, url_callback):
}
for item in items
]
- return items_rows
def get_bibdata_dict(self, item, **kwargs):
# Sanity checks.
@@ -87,8 +86,7 @@ def __init__(self, data):
def render(self, output_format: str):
table_format = get_table_format(output_format)
entries = self.compact_table(self.to_table(self.data), output_format)
- output = tabulate(entries, headers="keys", tablefmt=table_format)
- return output
+ return tabulate(entries, headers="keys", tablefmt=table_format)
@staticmethod
def to_table(entries):
@@ -105,7 +103,7 @@ def to_table(entries):
yield item
@staticmethod
- def compact_table(entries, format):
+ def compact_table(entries, format): # noqa: A002
seperator = "\n"
if format.endswith("pipe"):
seperator = "
"
diff --git a/grafana_wtf/report/textual.py b/grafana_wtf/report/textual.py
index cb16193..df8a9f2 100644
--- a/grafana_wtf/report/textual.py
+++ b/grafana_wtf/report/textual.py
@@ -20,7 +20,11 @@ def __init__(self, grafana_url, verbose=False):
def display(self, expression, result):
expression = expression or "*"
- print('Searching for expression "{}" at Grafana instance {}'.format(_m(expression), self.grafana_url))
+ print(
+ 'Searching for expression "{}" at Grafana instance {}'.format(
+ _m(expression), self.grafana_url
+ )
+ )
self.output_items("Data Sources", result.datasources, self.compute_url_datasource)
self.output_items("Dashboards", result.dashboards, self.compute_url_dashboard)
@@ -73,7 +77,7 @@ def output_items(self, label, items, url_callback):
# Output dashboard matches.
print()
- subsection = f"Global"
+ subsection = "Global"
print(_ss(subsection))
print("-" * len(subsection))
for match in dashboard_matches:
@@ -103,7 +107,9 @@ def output_items(self, label, items, url_callback):
@staticmethod
def format_match(match):
- return "{path}: {value}".format(path=_k(match.full_path), value=_m(str(match.value).strip()))
+ return "{path}: {value}".format(
+ path=_k(match.full_path), value=_m(str(match.value).strip())
+ )
def get_panel(self, node):
"""
@@ -111,11 +117,12 @@ def get_panel(self, node):
"""
while node:
last_node = node
- node = getattr(node, "context")
+ node = node.context
if node is None:
break
if str(node.path) == "panels":
return last_node.value
+ return None
def get_bibdata_panel(self, panel, baseurl, **kwargs):
"""
@@ -148,7 +155,7 @@ def get_bibdata_dashboard(self, item, **kwargs):
# Sanity checks.
if "dashboard" not in item.data:
- return
+ return None
bibdata = OrderedDict()
bibdata["Title"] = _v(item.data.dashboard.title)
@@ -164,8 +171,7 @@ def format_where(self, item):
key_word = "keys"
if len(keys) <= 1:
key_word = "key"
- answer = "Found in {key_word}: {keys}".format(keys=_k(", ".join(keys)), key_word=key_word)
- return answer
+ return "Found in {key_word}: {keys}".format(keys=_k(", ".join(keys)), key_word=key_word)
def compute_url_datasource(self, datasource):
return urljoin(self.grafana_url, "/datasources/edit/{}".format(datasource.data.id))
diff --git a/grafana_wtf/util.py b/grafana_wtf/util.py
index 67a2169..a4b07a7 100644
--- a/grafana_wtf/util.py
+++ b/grafana_wtf/util.py
@@ -42,7 +42,7 @@ def normalize_options(options):
normalized = {}
for key, value in options.items():
# Add primary variant.
- key = key.strip("--<>")
+ key = key.strip("--<>") # noqa: B005
normalized[key] = value
# Add secondary variant.
@@ -55,7 +55,7 @@ def normalize_options(options):
def read_list(data, separator=","):
if data is None:
return []
- result = list(map(lambda x: x.strip(), data.split(separator)))
+ result = [x.strip() for x in data.split(separator)]
if len(result) == 1 and not result[0]:
result = []
return result
@@ -119,7 +119,9 @@ class OrderedDumper(Dumper):
pass
def _dict_representer(dumper, data):
- return dumper.represent_mapping(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items())
+ return dumper.represent_mapping(
+ yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()
+ )
OrderedDumper.add_representer(OrderedDict, _dict_representer)
return yaml.dump(data, stream, OrderedDumper, **kwds)
@@ -155,8 +157,8 @@ def as_bool(value: str) -> bool:
}
try:
return _STR_BOOLEAN_MAPPING[value.lower()]
- except KeyError:
- raise ValueError(f"invalid truth value {value}")
+ except KeyError as ex:
+ raise ValueError(f"invalid truth value {value}") from ex
def format_dict(data) -> str:
@@ -199,8 +201,7 @@ def filter_with_sql(data: trecord, view_name: str, expression: str) -> trecord:
import duckdb
import pandas as pd
- df = pd.DataFrame.from_records(data)
- duckdb.register(view_name, df)
+ frame = pd.DataFrame.from_records(data)
+ duckdb.register(view_name, frame)
results = duckdb.sql(expression)
- entries = results.to_df().to_dict(orient="records")
- return entries
+ return results.to_df().to_dict(orient="records")
diff --git a/pyproject.toml b/pyproject.toml
index 3c81f6c..c191627 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,9 +1,36 @@
-[tool.black]
-line-length = 120
+[tool.ruff]
+line-length = 100
-[tool.isort]
-profile = "black"
-src_paths = ["grafana_wtf", "tests"]
+lint.select = [
+ # Builtins
+ "A",
+ # Bugbear
+ "B",
+ # comprehensions
+ "C4",
+ # Pycodestyle
+ "E",
+ # eradicate
+ "ERA",
+ # Pyflakes
+ "F",
+ # isort
+ "I",
+ # pandas-vet
+ "PD",
+ # return
+ "RET",
+ # Bandit
+ "S",
+ # print
+ "T20",
+ "W",
+ # flake8-2020
+ "YTT",
+]
+
+lint.ignore = [ "C408", "RET505" ]
+lint.per-file-ignores."tests/*" = [ "S101" ] # Use of `assert` detected
[tool.pytest.ini_options]
addopts = """
diff --git a/requirements-release.txt b/requirements-release.txt
index ec2e99b..a293305 100644
--- a/requirements-release.txt
+++ b/requirements-release.txt
@@ -1,6 +1,5 @@
bump2version<2
twine<7
keyring<26
-black<25
-isort<6
+ruff<0.10
build<2
diff --git a/setup.py b/setup.py
index 6d4ad40..26fe2a5 100644
--- a/setup.py
+++ b/setup.py
@@ -6,7 +6,9 @@
here = os.path.abspath(os.path.dirname(__file__))
README = open(os.path.join(here, "README.rst")).read()
-no_linux_on_arm = "platform_system != 'Linux' or (platform_machine != 'armv7l' and platform_machine != 'aarch64')"
+no_linux_on_arm = (
+ "platform_system != 'Linux' or (platform_machine != 'armv7l' and platform_machine != 'aarch64')"
+)
requires = [
# Core
diff --git a/tests/conftest.py b/tests/conftest.py
index 20e4884..3ce0129 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -50,8 +50,7 @@ def docker_grafana(docker_services):
"""
docker_services.start("grafana")
public_port = docker_services.wait_for_service("grafana", 3000)
- url = "http://admin:admin@{docker_services.docker_ip}:{public_port}".format(**locals())
- return url
+ return "http://admin:admin@{docker_services.docker_ip}:{public_port}".format(**locals())
@pytest.fixture
@@ -108,7 +107,7 @@ def mkresponse(response):
else:
return {"uid": response["uid"], "type": response["type"]}
- def _create_datasource(name: str, type: str = "testdata", access: str = "proxy", **kwargs):
+ def _create_datasource(name: str, type: str = "testdata", access: str = "proxy", **kwargs): # noqa: A002
# Reuse existing datasource.
try:
response = grafana.datasource.get_datasource_by_name(name)
@@ -131,7 +130,9 @@ def _create_datasource(name: str, type: str = "testdata", access: str = "proxy",
# TODO: Mimic the original response in order to make the removal work.
# `{'datasource': {'id': 5, 'uid': 'u9wNRyEnk', 'orgId': 1, ...`.
if not re.match(
- "Client Error 409: Data source with (the )?same name already exists", str(ex), re.IGNORECASE
+ "Client Error 409: Data source with (the )?same name already exists",
+ str(ex),
+ re.IGNORECASE,
):
raise
@@ -199,7 +200,8 @@ def _create_folder(title: str, uid: str = None):
except GrafanaClientError as ex:
# TODO: Mimic the original response in order to make the removal work.
error_exists = re.match(
- "Client Error 409: a folder or dashboard in the general folder with the same name already exists",
+ "Client Error 409: a folder or dashboard in the "
+ "general folder with the same name already exists",
str(ex),
re.IGNORECASE,
)
@@ -292,7 +294,7 @@ def _ldi_resources(dashboards: List[Union[Path, str]] = None):
uid="PDF2762CDFF14A314",
url="http://localhost:8086/",
user="root",
- password="root",
+ password="root", # noqa: S106
database="ldi_v2",
secureJsonData={"password": "root"},
)
@@ -320,22 +322,24 @@ def grafana_version(docker_grafana):
Return Grafana version number.
"""
engine = GrafanaWtf(grafana_url=docker_grafana, grafana_token=None)
- grafana_version = engine.version
- return grafana_version
+ return engine.version
def mkdashboard(title: str, datasources: Optional[List[str]] = None):
"""
Build dashboard with multiple panels, each with a different data source.
"""
- # datasource = grafanalib.core.DataSourceInput(name="foo", label="foo", pluginId="foo", pluginName="foo")
+ # datasource = grafanalib.core.DataSourceInput(
+ # name="foo", label="foo", pluginId="foo", pluginName="foo")
datasources = datasources or []
# Build dashboard object model.
panels = []
for datasource in datasources:
- panel = grafanalib.core.Panel(dataSource=datasource, gridPos={"h": 1, "w": 24, "x": 0, "y": 0})
+ panel = grafanalib.core.Panel(
+ dataSource=datasource, gridPos={"h": 1, "w": 24, "x": 0, "y": 0}
+ )
panels.append(panel.panel_json(overrides={}))
dashboard = grafanalib.core.Dashboard(title=title, panels=panels)
@@ -343,8 +347,7 @@ def mkdashboard(title: str, datasources: Optional[List[str]] = None):
dashboard_json = StringIO()
write_dashboard(dashboard, dashboard_json)
dashboard_json.seek(0)
- dashboard = json.loads(dashboard_json.read())
- return dashboard
+ return json.loads(dashboard_json.read())
clean_environment()
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 776615d..5591ba6 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1,11 +1,3 @@
-import warnings
-
-import grafana_client
-from grafana_client.elements.plugin import get_plugin_by_id
-from munch import munchify
-from verlib2 import version
-
-warnings.filterwarnings("ignore", category=DeprecationWarning, module=".*docopt.*")
import json
import logging
import re
@@ -13,8 +5,12 @@
import sys
import docopt
+import grafana_client
import pytest
import yaml
+from grafana_client.elements.plugin import get_plugin_by_id
+from munch import munchify
+from verlib2 import version
import grafana_wtf.commands
from tests.conftest import mkdashboard
@@ -24,7 +20,10 @@ def set_command(command, more_options="", cache=False):
cache_option = ""
if cache is False:
cache_option = "--cache-ttl=0"
- command = f'grafana-wtf --grafana-url="http://localhost:33333" {cache_option} {more_options} {command}'
+ command = (
+ f'grafana-wtf --grafana-url="http://localhost:33333" '
+ f"{cache_option} {more_options} {command}"
+ )
sys.argv = shlex.split(command)
@@ -37,7 +36,10 @@ def test_failure_grafana_url_missing():
# Verify output.
assert ex.match(
- re.escape('No Grafana URL given. Please use "--grafana-url" option or environment variable "GRAFANA_URL".')
+ re.escape(
+ 'No Grafana URL given. Please use "--grafana-url" option '
+ 'or environment variable "GRAFANA_URL".'
+ )
)
@@ -48,7 +50,10 @@ def test_find_textual_empty(docker_grafana, capsys):
captured = capsys.readouterr()
# Verify output.
- assert 'Searching for expression "foobar" at Grafana instance http://localhost:33333' in captured.out
+ assert (
+ 'Searching for expression "foobar" at Grafana instance http://localhost:33333'
+ in captured.out
+ )
assert "Data Sources: 0 hits" in captured.out
assert "Dashboards: 0 hits" in captured.out
@@ -72,7 +77,12 @@ def test_find_textual_select_empty(docker_grafana, capsys, caplog):
def test_find_textual_dashboard_success(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("find ldi_readings")
@@ -80,26 +90,43 @@ def test_find_textual_dashboard_success(ldi_resources, capsys):
captured = capsys.readouterr()
# Verify output.
- assert 'Searching for expression "ldi_readings" at Grafana instance http://localhost:33333' in captured.out
+ assert (
+ 'Searching for expression "ldi_readings" at Grafana instance http://localhost:33333'
+ in captured.out
+ )
assert "Dashboards: 2 hits" in captured.out
assert "luftdaten-info-generic-trend" in captured.out
assert "Title luftdaten.info generic trend" in captured.out
assert "Folder Testdrive" in captured.out
assert "UID ioUrPwQiz" in captured.out
- assert "Dashboard http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33" in captured.out
+ assert (
+ "Dashboard http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33"
+ in captured.out
+ )
assert (
"Variables http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33?editview=templating"
in captured.out
)
- assert "View http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33?viewPanel=17" in captured.out
- assert "Edit http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33?editPanel=17" in captured.out
+ assert (
+ "View http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33?viewPanel=17"
+ in captured.out
+ )
+ assert (
+ "Edit http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33?editPanel=17"
+ in captured.out
+ )
assert "dashboard.panels.[1].targets.[0].measurement: ldi_readings" in captured.out
assert "dashboard.panels.[7].panels.[0].targets.[0].measurement: ldi_readings" in captured.out
def test_find_textual_datasource_success(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("find ldi_v2")
@@ -107,7 +134,10 @@ def test_find_textual_datasource_success(ldi_resources, capsys):
captured = capsys.readouterr()
# Verify output.
- assert 'Searching for expression "ldi_v2" at Grafana instance http://localhost:33333' in captured.out
+ assert (
+ 'Searching for expression "ldi_v2" at Grafana instance http://localhost:33333'
+ in captured.out
+ )
assert "Data Sources: 1 hits" in captured.out
assert "name: ldi_v2" in captured.out
@@ -121,7 +151,12 @@ def test_find_textual_datasource_success(ldi_resources, capsys):
def test_find_tabular_dashboard_success(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("find ldi_readings", "--format=tabular:pipe")
@@ -129,14 +164,17 @@ def test_find_tabular_dashboard_success(ldi_resources, capsys):
captured = capsys.readouterr()
# Verify output.
- assert 'Searching for expression "ldi_readings" at Grafana instance http://localhost:33333' in captured.out
+ assert (
+ 'Searching for expression "ldi_readings" at Grafana instance http://localhost:33333'
+ in captured.out
+ )
reference_table = """
| Type | Name | Title | Folder | UID | Created | Updated | Created by | Datasources | URL |
|:-----------|:---------------------------------|:---------------------------------|:----------|:----------|:---------------------|:---------------------|:-------------|:--------------------------------------------------------------------------------------|:--------------------------------------------------------------------|
| Dashboards | luftdaten-info-generic-trend-v27 | luftdaten.info generic trend v27 | Testdrive | ioUrPwQiz | xxxx-xx-xxTxx:xx:xxZ | xxxx-xx-xxTxx:xx:xxZ | admin | -- Grafana --,ldi_v2,weatherbase | http://localhost:33333/d/ioUrPwQiz/luftdaten-info-generic-trend-v27 |
| Dashboards | luftdaten-info-generic-trend-v33 | luftdaten.info generic trend v33 | Testdrive | jpVsQxRja | xxxx-xx-xxTxx:xx:xxZ | xxxx-xx-xxTxx:xx:xxZ | admin | -- Grafana --,{'type': 'influxdb', 'uid': 'PDF2762CDFF14A314'},{'uid': 'weatherbase'} | http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33 |
- """.strip()
+ """.strip() # noqa: E501
output_table = captured.out[captured.out.find("| Type") :]
output_table_normalized = re.sub(
@@ -148,7 +186,12 @@ def test_find_tabular_dashboard_success(ldi_resources, capsys):
def test_find_format_json(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("find ldi_readings --format=json")
@@ -162,7 +205,12 @@ def test_find_format_json(ldi_resources, capsys):
def test_find_format_yaml(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("find ldi_readings --format=yaml")
@@ -176,7 +224,12 @@ def test_find_format_yaml(ldi_resources, capsys):
def test_replace_dashboard_success(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Rename references from "ldi_v2" to "ldi_v3".
set_command("replace ldi_v2 ldi_v3")
@@ -187,7 +240,10 @@ def test_replace_dashboard_success(ldi_resources, capsys):
set_command("find ldi_v3")
grafana_wtf.commands.run()
captured = capsys.readouterr()
- assert 'Searching for expression "ldi_v3" at Grafana instance http://localhost:33333' in captured.out
+ assert (
+ 'Searching for expression "ldi_v3" at Grafana instance http://localhost:33333'
+ in captured.out
+ )
# TODO: Expand renaming to data sources.
assert "Data Sources: 0 hits" in captured.out
@@ -207,7 +263,12 @@ def test_replace_dashboard_success(ldi_resources, capsys):
def test_replace_dashboard_dry_run_success(ldi_resources, capsys):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Rename references from "ldi_v2" to "ldi_v3".
set_command("replace ldi_v2 ldi_v3 --dry-run")
@@ -236,7 +297,12 @@ def test_log_empty(docker_grafana, capsys, caplog):
def test_log_all(ldi_resources, capsys, caplog):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("log")
@@ -252,7 +318,12 @@ def test_log_all(ldi_resources, capsys, caplog):
def test_log_json_success(ldi_resources, capsys, caplog):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("log ioUrPwQiz")
@@ -285,7 +356,12 @@ def test_log_json_success(ldi_resources, capsys, caplog):
def test_log_tabular_success(ldi_resources, capsys, caplog):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("log ioUrPwQiz", "--format=tabular:pipe")
@@ -298,16 +374,23 @@ def test_log_tabular_success(ldi_resources, capsys, caplog):
reference = """
| Notes: n/a
[Testdrive ยป luftdaten.info generic trend v27](http://localhost:33333/d/ioUrPwQiz/luftdaten-info-generic-trend-v27) | User: admin
Date: xxxx-xx-xxTxx:xx:xxZ |
- """.strip()
+ """.strip() # noqa: E501
first_item_raw = str.splitlines(captured.out)[-1]
- first_item_normalized = re.sub("(.*)Date: .+|(.*)", r"\1Date: xxxx-xx-xxTxx:xx:xxZ |\2", first_item_raw, 1)
+ first_item_normalized = re.sub( # noqa: B034
+ "(.*)Date: .+|(.*)", r"\1Date: xxxx-xx-xxTxx:xx:xxZ |\2", first_item_raw, 1
+ )
assert first_item_normalized == reference
def test_log_yaml_success(ldi_resources, capsys, caplog):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command("log")
@@ -322,7 +405,12 @@ def test_log_yaml_success(ldi_resources, capsys, caplog):
def test_log_filter_sql(ldi_resources, capsys, caplog):
# Only provision specific dashboard(s).
- ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json", "tests/grafana/dashboards/ldi-v33.json"])
+ ldi_resources(
+ dashboards=[
+ "tests/grafana/dashboards/ldi-v27.json",
+ "tests/grafana/dashboards/ldi-v33.json",
+ ]
+ )
# Run command and capture output.
set_command(
@@ -338,13 +426,11 @@ def test_log_filter_sql(ldi_resources, capsys, caplog):
grafana_wtf.commands.run()
captured = capsys.readouterr()
- assert set(captured.out.strip().split("\n")) == set(
- [
- "- url: http://localhost:33333/d/ioUrPwQiz/luftdaten-info-generic-trend-v27",
- "- url: http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33",
- "- url: http://localhost:33333/dashboards/f/testdrive/testdrive",
- ]
- )
+ assert set(captured.out.strip().split("\n")) == {
+ "- url: http://localhost:33333/d/ioUrPwQiz/luftdaten-info-generic-trend-v27",
+ "- url: http://localhost:33333/d/jpVsQxRja/luftdaten-info-generic-trend-v33",
+ "- url: http://localhost:33333/dashboards/f/testdrive/testdrive",
+ }
def test_explore_datasources_used(create_datasource, create_dashboard, capsys, caplog):
@@ -404,7 +490,7 @@ def test_explore_dashboards_grafana6(grafana_version, ldi_resources, capsys, cap
# Only for Grafana 6.
if not grafana_version.startswith("6."):
- raise pytest.skip(f"Grafana 6 only")
+ raise pytest.skip("Grafana 6 only")
# Only provision specific dashboard.
ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v27.json"])
@@ -439,7 +525,7 @@ def test_explore_dashboards_grafana7up(grafana_version, ldi_resources, capsys, c
# Only for Grafana 7.
if version.parse(grafana_version) < version.parse("7"):
- raise pytest.skip(f"Grafana >= 7 only")
+ raise pytest.skip("Grafana >= 7 only")
# Only provision specific dashboard.
ldi_resources(dashboards=["tests/grafana/dashboards/ldi-v33.json"])
@@ -501,7 +587,9 @@ def test_explore_dashboards_data_details(ldi_resources, capsys, caplog):
)
-def test_explore_dashboards_empty_annotations(grafana_version, create_datasource, create_dashboard, capsys, caplog):
+def test_explore_dashboards_empty_annotations(
+ grafana_version, create_datasource, create_dashboard, capsys, caplog
+):
# Create a dashboard with an anomalous value in the "annotations" slot.
dashboard = mkdashboard(title="foo")
dashboard["annotations"]["list"] = None
@@ -598,7 +686,7 @@ def test_plugins_status_datasource(grafana_version, docker_grafana, capsys, capl
Verify the plugin status (metrics endpoint) on a 3rd-party "datasource" plugin.
"""
if version.parse(grafana_version) < version.parse("8"):
- raise pytest.skip(f"Plugin status inquiry only works on Grafana 8 and newer")
+ raise pytest.skip("Plugin status inquiry only works on Grafana 8 and newer")
# Before conducting a plugin status test, install a non-internal one.
grafana = grafana_client.GrafanaApi.from_url(url=docker_grafana, timeout=15)
@@ -617,7 +705,9 @@ def test_plugins_status_datasource(grafana_version, docker_grafana, capsys, capl
assert len(data) >= 28
# Proof the output is correct.
- plugin = munchify(get_plugin_by_id(plugin_list=data, plugin_id="yesoreyeram-infinity-datasource"))
+ plugin = munchify(
+ get_plugin_by_id(plugin_list=data, plugin_id="yesoreyeram-infinity-datasource")
+ )
assert "go_gc_duration_seconds" in plugin.metrics
@@ -626,7 +716,7 @@ def test_plugins_status_app(grafana_version, docker_grafana, capsys, caplog):
Verify the plugin status (metrics endpoint and health check) on a 3rd-party "app" plugin.
"""
if version.parse(grafana_version) < version.parse("10"):
- raise pytest.skip(f"Plugin status inquiry only works on Grafana 10 and newer")
+ raise pytest.skip("Plugin status inquiry only works on Grafana 10 and newer")
# Before conducting a plugin status test, install a non-internal one.
grafana = grafana_client.GrafanaApi.from_url(url=docker_grafana, timeout=15)
@@ -645,7 +735,9 @@ def test_plugins_status_app(grafana_version, docker_grafana, capsys, caplog):
assert len(data) >= 28
# Proof the output is correct.
- plugin = munchify(get_plugin_by_id(plugin_list=data, plugin_id="aws-datasource-provisioner-app"))
+ plugin = munchify(
+ get_plugin_by_id(plugin_list=data, plugin_id="aws-datasource-provisioner-app")
+ )
assert plugin.id == "aws-datasource-provisioner-app"
assert "process_virtual_memory_max_bytes" in plugin.metrics
@@ -658,7 +750,7 @@ def test_plugins_install_uninstall(grafana_version, docker_grafana, capsys, capl
Verify the plugin status when installing/uninstalling a plugin.
"""
if version.parse(grafana_version) < version.parse("8"):
- raise pytest.skip(f"Plugin status inquiry only works on Grafana 8 and newer")
+ raise pytest.skip("Plugin status inquiry only works on Grafana 8 and newer")
plugin_name = "yesoreyeram-infinity-datasource"
diff --git a/tests/test_core.py b/tests/test_core.py
index e962da4..6fd3fa7 100644
--- a/tests/test_core.py
+++ b/tests/test_core.py
@@ -3,7 +3,7 @@
import pytest
from munch import Munch
-from grafana_wtf.core import Indexer, GrafanaWtf
+from grafana_wtf.core import GrafanaWtf, Indexer
def test_collect_datasource_items_variable_all():