Skip to content

Commit 3b26f81

Browse files
bindipankhudibindipankhudiaaronsteers
authored
Feat: Telemetry - Added "install" event type to track failed installs (#115)
Co-authored-by: bindipankhudi <[email protected]> Co-authored-by: Aaron ("AJ") Steers <[email protected]>
1 parent 2f483ec commit 3b26f81

File tree

4 files changed

+71
-29
lines changed

4 files changed

+71
-29
lines changed

airbyte/_util/telemetry.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,17 @@ def _get_analytics_id() -> str | None:
166166
_ANALYTICS_ID = _get_analytics_id()
167167

168168

169-
class SyncState(str, Enum):
169+
class EventState(str, Enum):
170170
STARTED = "started"
171171
FAILED = "failed"
172172
SUCCEEDED = "succeeded"
173173

174174

175+
class EventType(str, Enum):
176+
INSTALL = "install"
177+
SYNC = "sync"
178+
179+
175180
@dataclass
176181
class CacheTelemetryInfo:
177182
type: str
@@ -198,6 +203,14 @@ def from_source(cls, source: Source) -> SourceTelemetryInfo:
198203
version=source.executor.reported_version,
199204
)
200205

206+
@classmethod
207+
def from_name(cls, name: str) -> SourceTelemetryInfo:
208+
return cls(
209+
name=name,
210+
executor_type="unknown",
211+
version="unknown",
212+
)
213+
201214

202215
def one_way_hash(
203216
string_to_hash: Any, # noqa: ANN401 # Allow Any type
@@ -229,9 +242,10 @@ def get_env_flags() -> dict[str, Any]:
229242

230243

231244
def send_telemetry(
232-
source: Source,
245+
source: Source | str,
233246
cache: CacheBase | None,
234-
state: SyncState,
247+
state: EventState,
248+
event_type: EventType,
235249
number_of_records: int | None = None,
236250
exception: Exception | None = None,
237251
) -> None:
@@ -241,7 +255,6 @@ def send_telemetry(
241255

242256
payload_props: dict[str, str | int | dict] = {
243257
"session_id": PYAIRBYTE_SESSION_ID,
244-
"source": asdict(SourceTelemetryInfo.from_source(source)),
245258
"cache": asdict(CacheTelemetryInfo.from_cache(cache)),
246259
"state": state,
247260
"version": get_version(),
@@ -250,6 +263,12 @@ def send_telemetry(
250263
"application_hash": one_way_hash(meta.get_application_name()),
251264
"flags": get_env_flags(),
252265
}
266+
267+
if isinstance(source, str):
268+
payload_props["source"] = asdict(SourceTelemetryInfo.from_name(source))
269+
else:
270+
payload_props["source"] = asdict(SourceTelemetryInfo.from_source(source))
271+
253272
if exception:
254273
if isinstance(exception, exc.AirbyteError):
255274
payload_props["exception"] = exception.safe_logging_dict()
@@ -267,7 +286,7 @@ def send_telemetry(
267286
auth=(PYAIRBYTE_APP_TRACKING_KEY, ""),
268287
json={
269288
"anonymousId": _get_analytics_id(),
270-
"event": "sync",
289+
"event": event_type,
271290
"properties": payload_props,
272291
"timestamp": datetime.datetime.utcnow().isoformat(), # noqa: DTZ003
273292
},

airbyte/sources/base.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@
2929
from airbyte import exceptions as exc
3030
from airbyte._util import protocol_util
3131
from airbyte._util.name_normalizers import normalize_records
32-
from airbyte._util.telemetry import (
33-
SyncState,
34-
send_telemetry,
35-
)
32+
from airbyte._util.telemetry import EventState, EventType, send_telemetry
3633
from airbyte.caches.util import get_default_cache
3734
from airbyte.datasets._lazy import LazyDataset
3835
from airbyte.progress import progress
@@ -515,7 +512,8 @@ def _log_sync_start(
515512
send_telemetry(
516513
source=self,
517514
cache=cache,
518-
state=SyncState.STARTED,
515+
state=EventState.STARTED,
516+
event_type=EventType.SYNC,
519517
)
520518

521519
def _log_sync_success(
@@ -528,8 +526,9 @@ def _log_sync_success(
528526
send_telemetry(
529527
source=self,
530528
cache=cache,
531-
state=SyncState.SUCCEEDED,
529+
state=EventState.SUCCEEDED,
532530
number_of_records=self._processed_records,
531+
event_type=EventType.SYNC,
533532
)
534533

535534
def _log_sync_failure(
@@ -541,11 +540,12 @@ def _log_sync_failure(
541540
"""Log the failure of a sync operation."""
542541
print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
543542
send_telemetry(
544-
state=SyncState.FAILED,
543+
state=EventState.FAILED,
545544
source=self,
546545
cache=cache,
547546
number_of_records=self._processed_records,
548547
exception=exception,
548+
event_type=EventType.SYNC,
549549
)
550550

551551
def read(

airbyte/sources/util.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from airbyte import exceptions as exc
1212
from airbyte._executor import PathExecutor, VenvExecutor
13+
from airbyte._util.telemetry import EventState, EventType, send_telemetry
1314
from airbyte.sources.base import Source
1415
from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata
1516

@@ -106,28 +107,48 @@ def get_source(
106107
metadata: ConnectorMetadata | None = None
107108
try:
108109
metadata = get_connector_metadata(name)
109-
except exc.AirbyteConnectorNotRegisteredError:
110+
except exc.AirbyteConnectorNotRegisteredError as ex:
110111
if not pip_url:
112+
_log_install_state(name, state=EventState.FAILED, exception=ex)
111113
# We don't have a pip url or registry entry, so we can't install the connector
112114
raise
113115

114-
executor = VenvExecutor(
115-
name=name,
116-
metadata=metadata,
117-
target_version=version,
118-
pip_url=pip_url,
119-
)
120-
if install_if_missing:
121-
executor.ensure_installation()
116+
try:
117+
executor = VenvExecutor(
118+
name=name,
119+
metadata=metadata,
120+
target_version=version,
121+
pip_url=pip_url,
122+
)
123+
if install_if_missing:
124+
executor.ensure_installation()
122125

123-
return Source(
124-
name=name,
125-
config=config,
126-
streams=streams,
127-
executor=executor,
128-
)
126+
return Source(
127+
name=name,
128+
config=config,
129+
streams=streams,
130+
executor=executor,
131+
)
132+
except Exception as e:
133+
_log_install_state(name, state=EventState.FAILED, exception=e)
134+
raise
129135

130136

131137
__all__ = [
132138
"get_source",
133139
]
140+
141+
142+
def _log_install_state(
143+
name: str,
144+
state: EventState,
145+
exception: Exception | None = None,
146+
) -> None:
147+
"""Log an install event."""
148+
send_telemetry(
149+
source=name,
150+
cache=None,
151+
state=state,
152+
event_type=EventType.INSTALL,
153+
exception=exception,
154+
)

tests/unit_tests/test_anonymous_usage_stats.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ def test_telemetry_track(monkeypatch):
3434
telemetry.send_telemetry(
3535
source=source_test,
3636
cache=cache,
37-
state="started",
37+
state=telemetry.EventState.STARTED,
3838
number_of_records=0,
39+
event_type=telemetry.EventType.SYNC,
3940
)
4041

4142
# Check that one request was made
@@ -78,8 +79,9 @@ def test_do_not_track(monkeypatch, do_not_track):
7879
telemetry.send_telemetry(
7980
source=source_test,
8081
cache=cache,
81-
state="started",
82+
state=telemetry.EventState.STARTED,
8283
number_of_records=0,
84+
event_type=telemetry.EventType.SYNC,
8385
)
8486

8587
# Check that zero requests were made, because DO_NOT_TRACK is set

0 commit comments

Comments
 (0)