1+ import atexit
2+
13from concurrent .futures import Executor , ProcessPoolExecutor
24
35from opentelemetry .instrumentation ._blobupload .api import Blob
79
810
911def _with_content_type (blob : Blob ) -> Blob :
12+ """Returns a variant of the Blob with the content type auto-detected if needed."""
1013 if blob .content_type is not None :
1114 return blob
1215 content_type = detect_content_type (blob .raw_bytes )
1316 return Blob (blob .raw_bytes , content_type = content_type , labels = blob .labels )
1417
1518
1619def _UploadAction (object ):
20+ """Represents the work to be done in the background to upload a blob."""
1721
1822 def __init__ (self , simple_uploader , uri , blob ):
1923 self ._simple_uploader = simple_uploader
@@ -24,15 +28,49 @@ def __call__(self):
2428 self ._simple_uploader .upload_sync (self ._uri , self ._blob )
2529
2630
27- def _create_default_executor ():
31+ def _create_default_executor_no_cleanup ():
32+ """Instantiates an executor subject to configuration."""
33+ # Potential future enhancement: allow the default executor to be
34+ # configured using environment variables (e.g. to select between
35+ # threads or processes, to choose number of workers, etc.)
36+ #
37+ # It is because of this potential future enhancement, that we
38+ # have moved this logic into a separate function despite it
39+ # being currently logically quite simple.
2840 return ProcessPoolExecutor ()
2941
3042
43+ def _create_default_executor ():
44+ """Creates an executor and registers appropriate cleanup."""
45+ result = _create_default_executor_no_cleanup ()
46+ def _cleanup ():
47+ result .shutdown ()
48+ atexit .register (_cleanup )
49+ return result
50+
51+ # Global default executor so that multiple uses of the adaptor
52+ # do not waste resources creating many duplicative executors.
53+ # Used in the '_get_or_create_default_executor' function below.
54+ _default_executor = None
55+
56+
57+ def _get_or_create_default_executor ():
58+ """Return or lazily instantiate a shared default executor."""
59+ global _default_executor
60+ if _default_executor is None :
61+ _default_executor = _create_default_executor ()
62+ return _default_executor
63+
64+
3165class _SimpleBlobUploaderAdaptor (BlobUploader ):
66+ """Implementation of 'BlobUploader' wrapping a 'SimpleBlobUploader'.
67+
68+ This implements the core of the function 'blob_uploader_from_simple_blob_uploader'.
69+ """
3270
33- def __init__ (self , simple_uploader , executor = None ):
71+ def __init__ (self , simple_uploader : SimpleBlobUploader , executor : Optional [ Executor ] = None ):
3472 self ._simple_uploader = simple_uploader
35- self ._executor = executor or _create_default_executor ()
73+ self ._executor = executor or _get_or_create_default_executor ()
3674
3775 def upload_async (self , blob : Blob ) -> str :
3876 full_blob = _with_content_type (blob )
@@ -41,9 +79,18 @@ def upload_async(self, blob: Blob) -> str:
4179 return uri
4280
4381 def _do_in_background (self , action ):
82+ self ._executor .submit (action )
4483
4584
4685
4786def blob_uploader_from_simple_blob_uploader (simple_uploader : SimpleBlobUploader ) -> BlobUploader :
87+ """Implements a 'BlobUploader' using the supplied 'SimpleBlobUploader'.
88+
89+ The purpose of this function is to allow backend implementations/vendors to be able to
90+ implement their logic much more simply, using synchronous uploading interfaces.
91+
92+ This function takes care of the nitty gritty details necessary to supply an asynchronous
93+ interface on top of the simpler logic supplied by the backend system.
94+ """
4895 return _SimpleBlobUploaderAdaptor (simple_uploader )
4996
0 commit comments