-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathtelemetry.py
More file actions
319 lines (256 loc) · 10.1 KB
/
telemetry.py
File metadata and controls
319 lines (256 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Telemetry implementation for PyAirbyte.
We track some basic telemetry to help us understand how PyAirbyte is used. You can opt-out of
telemetry at any time by setting the environment variable DO_NOT_TRACK to any value.
If you are able to provide telemetry, it is greatly appreciated. Telemetry helps us understand how
the library is used, what features are working. We also use this telemetry to prioritize bug fixes
and improvements to the connectors themselves, focusing first on connectors that are (1) most used
and (2) report the most sync failures as a percentage of total attempted syncs.
Your privacy and security are our priority. We do not track any PII (personally identifiable
information), nor do we track anything that _could_ contain PII without first hashing the data
using a one-way hash algorithm. We only track the minimum information necessary to understand how
PyAirbyte is used, and to dedupe users to determine how many users or use cases there are.
Here is what is tracked:
- The version of PyAirbyte.
- The Python version.
- The OS.
- The source type (venv or local install).
- The source name and version number.
- The state of the sync (started, failed, succeeded).
- The cache type (Snowflake, Postgres, etc.).
- The number of records processed.
- The application hash, which is a hash of either the notebook name or Python script name.
- Flags to help us understand if PyAirbyte is running on CI, Google Colab, or another environment.
"""
from __future__ import annotations
import datetime
import os
import sys
from contextlib import suppress
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import Any, cast
import requests
import ulid
import yaml
from airbyte import exceptions as exc
from airbyte._util import meta
from airbyte._util.connector_info import (
ConnectorRuntimeInfo,
WriterRuntimeInfo,
)
from airbyte._util.hashing import one_way_hash
from airbyte.constants import AIRBYTE_OFFLINE_MODE
from airbyte.version import get_version
DEBUG = True
"""Enable debug mode for telemetry code."""
PYAIRBYTE_APP_TRACKING_KEY = (
os.environ.get("AIRBYTE_TRACKING_KEY", "") or "cukeSffc0G6gFQehKDhhzSurDzVSZ2OP"
)
"""This key corresponds globally to the "PyAirbyte" application."""
PYAIRBYTE_SESSION_ID = str(ulid.ULID())
"""Unique identifier for the current invocation of PyAirbyte.
This is used to determine the order of operations within a specific session.
It is not a unique identifier for the user.
"""
DO_NOT_TRACK = "DO_NOT_TRACK"
"""Environment variable to opt-out of telemetry."""
_ENV_ANALYTICS_ID = "AIRBYTE_ANALYTICS_ID" # Allows user to override the anonymous user ID
_ANALYTICS_FILE = Path.home() / ".airbyte" / "analytics.yml"
_ANALYTICS_ID: str | bool | None = None
UNKNOWN = "unknown"
def _setup_analytics() -> str | bool:
"""Set up the analytics file if it doesn't exist.
Return the anonymous user ID or False if the user has opted out.
"""
anonymous_user_id: str | None = None
issues: list[str] = []
if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE:
# User has opted out of tracking.
return False
if _ENV_ANALYTICS_ID in os.environ:
# If the user has chosen to override their analytics ID, use that value and
# remember it for future invocations.
anonymous_user_id = os.environ[_ENV_ANALYTICS_ID]
if not _ANALYTICS_FILE.exists():
# This is a one-time message to inform the user that we are tracking anonymous usage stats.
print(
"Thank you for using PyAirbyte!\n"
"Anonymous usage reporting is currently enabled. For more information, please"
" see https://docs.airbyte.com/telemetry",
file=sys.stderr,
)
if _ANALYTICS_FILE.exists():
analytics_text = _ANALYTICS_FILE.read_text()
analytics: dict | None = None
try:
analytics = yaml.safe_load(analytics_text)
except Exception as ex:
issues += f"File appears corrupted. Error was: {ex!s}"
if analytics and "anonymous_user_id" in analytics:
# The analytics ID was successfully located.
if not anonymous_user_id:
return analytics["anonymous_user_id"]
if anonymous_user_id == analytics["anonymous_user_id"]:
# Values match, no need to update the file.
return analytics["anonymous_user_id"]
issues.append("Provided analytics ID did not match the file. Rewriting the file.")
print(
f"Received a user-provided analytics ID override in the '{_ENV_ANALYTICS_ID}' "
"environment variable.",
file=sys.stderr,
)
# File is missing, incomplete, or stale. Create a new one.
anonymous_user_id = anonymous_user_id or str(ulid.ULID())
try:
_ANALYTICS_FILE.parent.mkdir(exist_ok=True, parents=True)
_ANALYTICS_FILE.write_text(
"# This file is used by Airbyte to track anonymous usage statistics.\n"
"# For more information or to opt out, please see\n"
"# - https://docs.airbyte.com/operator-guides/telemetry\n"
f"anonymous_user_id: {anonymous_user_id}\n"
)
except Exception:
# Failed to create the analytics file. Likely due to a read-only filesystem.
issues.append("Failed to write the analytics file. Check filesystem permissions.")
pass
if DEBUG and issues:
nl = "\n"
print(
f"One or more issues occurred when configuring usage tracking:\n{nl.join(issues)}",
file=sys.stderr,
)
return anonymous_user_id
def _get_analytics_id() -> str | None:
result: str | bool | None = _ANALYTICS_ID
if result is None:
result = _setup_analytics()
if result is False:
return None
return cast("str", result)
_ANALYTICS_ID = _get_analytics_id()
class EventState(str, Enum):
STARTED = "started"
FAILED = "failed"
SUCCEEDED = "succeeded"
CANCELED = "canceled"
class EventType(str, Enum):
INSTALL = "install"
SYNC = "sync"
VALIDATE = "validate"
CHECK = "check"
@lru_cache
def get_env_flags() -> dict[str, Any]:
flags: dict[str, bool | str] = {
"CI": meta.is_ci(),
"LANGCHAIN": meta.is_langchain(),
"MCP": meta.is_mcp_mode(),
"NOTEBOOK_RUNTIME": (
"GOOGLE_COLAB"
if meta.is_colab()
else "JUPYTER"
if meta.is_jupyter()
else "VS_CODE"
if meta.is_vscode_notebook()
else False
),
}
# Drop these flags if value is False or None
return {k: v for k, v in flags.items() if v is not None and v is not False}
def send_telemetry(
*,
source: ConnectorRuntimeInfo | None,
destination: ConnectorRuntimeInfo | None,
cache: WriterRuntimeInfo | None,
state: EventState,
event_type: EventType,
number_of_records: int | None = None,
exception: Exception | None = None,
) -> None:
# If DO_NOT_TRACK is set, we don't send any telemetry
if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE:
return
payload_props: dict[str, str | int | dict] = {
"session_id": PYAIRBYTE_SESSION_ID,
"state": state,
"version": get_version(),
"python_version": meta.get_python_version(),
"os": meta.get_os(),
"application_hash": one_way_hash(meta.get_application_name()),
"flags": get_env_flags(),
}
if source:
payload_props["source"] = source.to_dict()
if destination:
payload_props["destination"] = destination.to_dict()
if cache:
payload_props["cache"] = cache.to_dict()
if exception:
if isinstance(exception, exc.AirbyteError):
payload_props["exception"] = exception.safe_logging_dict()
else:
payload_props["exception"] = {"class": type(exception).__name__}
if number_of_records is not None:
payload_props["number_of_records"] = number_of_records
# Suppress exceptions if host is unreachable or network is unavailable
with suppress(Exception):
# Do not handle the response, we don't want to block the execution
_ = requests.post(
"https://api.segment.io/v1/track",
auth=(PYAIRBYTE_APP_TRACKING_KEY, ""),
json={
"anonymousId": _get_analytics_id(),
"event": event_type,
"properties": payload_props,
"timestamp": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
},
)
def log_config_validation_result(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log a config validation event.
If the name starts with "destination-", it is treated as a destination name. Otherwise, it is
treated as a source name.
"""
send_telemetry(
source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None,
destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None,
cache=None,
state=state,
event_type=EventType.VALIDATE,
exception=exception,
)
def log_connector_check_result(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log a connector `check` result.
If the name starts with "destination-", it is treated as a destination name. Otherwise, it is
treated as a source name.
"""
send_telemetry(
source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None,
destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None,
cache=None,
state=state,
event_type=EventType.CHECK,
exception=exception,
)
def log_install_state(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log an install event."""
send_telemetry(
source=ConnectorRuntimeInfo(name=name),
destination=None,
cache=None,
state=state,
event_type=EventType.INSTALL,
exception=exception,
)