Skip to content

Commit dd18b5d

Browse files
authored
feat: Add analytics tracking (#11)
1 parent e098496 commit dd18b5d

File tree

5 files changed

+103
-11
lines changed

5 files changed

+103
-11
lines changed

cloud2sql/__main__.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from logging import getLogger
2+
from typing import Optional
23

34
from resotolib.args import Namespace, ArgumentParser
45
from resotolib.logger import setup_logger
56
from sqlalchemy import create_engine
67
from sqlalchemy.engine import Engine
8+
9+
from cloud2sql.analytics import PosthogEventSender, NoEventSender, AnalyticsEventSender
710
from cloud2sql.collect_plugins import collect_from_plugins
811

912
# Will fail in case snowflake is not installed - which is fine.
@@ -16,7 +19,11 @@
1619

1720

1821
def parse_args() -> Namespace:
19-
parser = ArgumentParser(epilog="Collect data from cloud providers and store it in a database")
22+
parser = ArgumentParser(
23+
description="Collect data from cloud providers and store it in a database",
24+
epilog="Synchronizes cloud data to a database",
25+
env_args_prefix="CLOUD2SQL_",
26+
)
2027
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
2128
parser.add_argument("--config", help="Path to config file", required=True)
2229
parser.add_argument(
@@ -30,30 +37,41 @@ def parse_args() -> Namespace:
3037
help="The database url. See https://docs.sqlalchemy.org/en/20/core/engines.html.",
3138
required=True,
3239
)
40+
parser.add_argument(
41+
"--analytics-opt-out",
42+
default=False,
43+
action="store_true",
44+
help="Do not send anonimized analytics data",
45+
)
3346
args = parser.parse_args()
3447
args.log_level = "CRITICAL" if args.show != "log" else "DEBUG" if args.debug else "INFO"
3548
return args # type: ignore
3649

3750

38-
def collect(engine: Engine, args: Namespace) -> None:
51+
def collect(engine: Engine, args: Namespace, sender: AnalyticsEventSender) -> None:
3952
try:
40-
collect_from_plugins(engine, args)
53+
collect_from_plugins(engine, args, sender)
4154
except Exception as e:
4255
log.error("Error during collection", e)
4356
print(f"Error syncing data to database: {e}")
4457

4558

4659
def main() -> None:
4760
args = parse_args()
61+
sender: Optional[AnalyticsEventSender] = None
4862
try:
49-
setup_logger("cloud2sql", level=args.log_level, force=True)
63+
setup_logger("resoto.cloud2sql", level=args.log_level, force=True)
64+
sender = NoEventSender() if args.analytics_opt_out else PosthogEventSender()
5065
engine = create_engine(args.db)
51-
collect(engine, args)
66+
collect(engine, args, sender)
5267
except Exception as e:
5368
if args.debug: # raise exception and show complete tracelog
5469
raise e
5570
else:
5671
print(f"Error syncing data to database: {e}")
72+
finally:
73+
if sender:
74+
sender.flush()
5775

5876

5977
if __name__ == "__main__":

cloud2sql/analytics.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import logging
2+
import uuid
3+
from abc import abstractmethod, ABC
4+
from datetime import datetime
5+
from typing import Union
6+
7+
from requests import get
8+
from posthog import Client
9+
10+
log = logging.getLogger("resoto.cloud2sql")
11+
12+
13+
class AnalyticsEventSender(ABC):
14+
"""
15+
Abstract event sender used to send analytics events.
16+
"""
17+
18+
@abstractmethod
19+
def capture(self, kind: str, **context: Union[str, int, float]) -> None:
20+
pass
21+
22+
def flush(self) -> None:
23+
pass
24+
25+
26+
class NoEventSender(AnalyticsEventSender):
27+
"""
28+
Use this sender to not emit any events other than writing it to the log file.
29+
"""
30+
31+
def capture(self, kind: str, **context: Union[str, int, float]) -> None:
32+
log.debug(f"Capture event {kind}: {context}")
33+
34+
35+
class PosthogEventSender(AnalyticsEventSender):
36+
"""
37+
This sender is used to collect analytics events in posthog.
38+
"""
39+
40+
def __init__(self) -> None:
41+
self.client = Client(host="https://analytics.some.engineering", api_key="")
42+
self.uid = uuid.uuid4()
43+
44+
def capture(self, kind: str, **context: Union[str, int, float]) -> None:
45+
if self.client.api_key == "":
46+
api_key = get("https://cdn.some.engineering/posthog/public_api_key").text.strip()
47+
self.client.api_key = api_key
48+
for consumer in self.client.consumers:
49+
consumer.api_key = api_key
50+
self.client.capture(
51+
distinct_id=self.uid,
52+
event="cloud2sql." + kind,
53+
properties={
54+
**context,
55+
"source": "cloud2sql",
56+
"run_id": self.uid,
57+
},
58+
timestamp=datetime.now(),
59+
)
60+
61+
def flush(self) -> None:
62+
self.client.flush()

cloud2sql/collect_plugins.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from rich.live import Live
2626
from sqlalchemy.engine import Engine
2727

28+
from cloud2sql.analytics import AnalyticsEventSender
2829
from cloud2sql.show_progress import CollectInfo
2930
from cloud2sql.sql import SqlUpdater, sql_updater
3031

@@ -56,7 +57,9 @@ def configure(path_to_config: Optional[str]) -> Json:
5657
return {}
5758

5859

59-
def collect(collector: BaseCollectorPlugin, engine: Engine, feedback: CoreFeedback, args: Namespace) -> None:
60+
def collect(
61+
collector: BaseCollectorPlugin, engine: Engine, feedback: CoreFeedback, args: Namespace
62+
) -> Tuple[str, int, int]:
6063
# collect cloud data
6164
feedback.progress_done(collector.cloud, 0, 1)
6265
collector.collect()
@@ -115,26 +118,27 @@ def collect(collector: BaseCollectorPlugin, engine: Engine, feedback: CoreFeedba
115118
ne_count += len(nodes)
116119
feedback.progress_done(syncdb, ne_count, node_edge_count, context=[collector.cloud])
117120
feedback.progress_done(collector.cloud, 1, 1)
121+
return collector.cloud, len(collector.graph.nodes), len(collector.graph.edges)
118122

119123

120124
def show_messages(core_messages: Queue[Json], end: Event) -> None:
121125
info = CollectInfo()
122126
while not end.is_set():
123-
with Live(info.render(), auto_refresh=False, transient=True) as live:
127+
with Live(info.render(), transient=True):
124128
with suppress(Exception):
125129
info.handle_message(core_messages.get(timeout=1))
126-
live.update(info.render())
127130
for message in info.rendered_messages():
128131
rich_print(message)
129132

130133

131-
def collect_from_plugins(engine: Engine, args: Namespace) -> None:
134+
def collect_from_plugins(engine: Engine, args: Namespace, sender: AnalyticsEventSender) -> None:
132135
# the multiprocessing manager is used to share data between processes
133136
mp_manager = multiprocessing.Manager()
134137
core_messages: Queue[Json] = mp_manager.Queue()
135138
feedback = CoreFeedback("cloud2sql", "collect", "collect", core_messages)
136139
raw_config = configure(args.config)
137140
all_collectors = collectors(raw_config, feedback)
141+
analytics = {"total": len(all_collectors), "engine": engine.dialect.name} | {name: 1 for name in all_collectors}
138142
end = Event()
139143
with ThreadPoolExecutor(max_workers=4) as executor:
140144
try:
@@ -144,7 +148,10 @@ def collect_from_plugins(engine: Engine, args: Namespace) -> None:
144148
for collector in all_collectors.values():
145149
futures.append(executor.submit(collect, collector, engine, feedback, args))
146150
for future in concurrent.futures.as_completed(futures):
147-
future.result()
151+
name, nodes, edges = future.result()
152+
analytics[f"{name}_nodes"] = nodes
153+
analytics[f"{name}_edges"] = edges
154+
sender.capture("collect", **analytics)
148155
# when all collectors are done, we can swap all temp tables
149156
swap_tables = "Make latest snapshot available"
150157
feedback.progress_done(swap_tables, 0, 1)
@@ -153,7 +160,9 @@ def collect_from_plugins(engine: Engine, args: Namespace) -> None:
153160
except Exception as e:
154161
# set end and wait for live to finish, otherwise the cursor is not reset
155162
end.set()
163+
sender.capture("error", error=str(e))
156164
sleep(1)
165+
sender.flush()
157166
log.error("An error occurred", exc_info=e)
158167
print(f"Encountered Error. Giving up. {e}")
159168
emergency_shutdown()

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ SQLAlchemy==1.4.45
55
PyYAML>=6.0
66
rich>=12.6.0
77
resotoclient>=1.2.1
8+
posthog>=2.2.0
9+
requests>=2.28.1
810

911
# bundle popular db drivers
1012
psycopg2-binary>=2.9.5 # postgres

tests/collect_plugins_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from sqlalchemy import MetaData, create_engine
55
from sqlalchemy.orm import Session
66

7+
from cloud2sql.analytics import NoEventSender
78
from cloud2sql.collect_plugins import collect_from_plugins
89

910

@@ -13,7 +14,7 @@ def test_collect() -> None:
1314
cfg = f"{tmp}/config.yml"
1415
with open(cfg, "w") as f:
1516
f.write("example: {}\n")
16-
collect_from_plugins(engine, Namespace(config=cfg, show="none"))
17+
collect_from_plugins(engine, Namespace(config=cfg, show="none"), NoEventSender())
1718
# get all tables
1819
metadata = MetaData()
1920
metadata.reflect(bind=engine)

0 commit comments

Comments
 (0)