2020import os
2121import threading
2222import weakref
23+ from abc import abstractmethod
2324from typing import (
2425 Generic ,
2526 Optional ,
27+ Protocol ,
2628 TypeVar ,
2729)
2830
@@ -42,10 +44,20 @@ class BatchExportStrategy(enum.Enum):
4244
4345
4446Telemetry = TypeVar ("Telemetry" )
45- Exporter = TypeVar ("Exporter" )
4647
4748
48- class BatchProcessor (Generic [Telemetry , Exporter ]):
49+ # TODO: Switch this to Exporter[Telemetry](Protocol) once only python 3.12+ is supported.
50+ class Exporter (Protocol ):
51+ @abstractmethod
52+ def export (self , data : list [Telemetry ]):
53+ raise NotImplementedError
54+
55+ @abstractmethod
56+ def shutdown (self ):
57+ raise NotImplementedError
58+
59+
60+ class BatchProcessor (Generic [Telemetry ]):
4961 def __init__ (
5062 self ,
5163 exporter : Exporter ,
@@ -134,7 +146,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
134146 iteration += 1
135147 token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
136148 try :
137- self ._exporter .export ( # pyright: ignore [reportAttributeAccessIssue]
149+ self ._exporter .export (
138150 [
139151 # Oldest records are at the back, so pop from there.
140152 self ._queue .pop ()
@@ -174,7 +186,7 @@ def shutdown(self):
174186 self ._worker_awaken .set ()
175187 # Main worker loop should exit after one final export call with flush all strategy.
176188 self ._worker_thread .join ()
177- self ._exporter .shutdown () # pyright: ignore [reportAttributeAccessIssue]
189+ self ._exporter .shutdown ()
178190
179191 def force_flush (self , timeout_millis : Optional [int ] = None ):
180192 if self ._shutdown :
0 commit comments