Skip to content

Commit 2b51a15

Browse files
committed
Completed writing unit tests for functionality implemented so far.
1 parent 41b7eea commit 2b51a15

File tree

10 files changed

+299
-24
lines changed

10 files changed

+299
-24
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from opentelemetry.instrumentation._blobupload.backend.google.gcs._gcs_impl import GcsBlobUploader
22

3+
34
__all__ = [
45
GcsBlobUploader
56
]
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from typing import Any, TypeAlias
2+
3+
import logging
4+
5+
_logger = logging.getLogger(__name__)
6+
_gcs_initialized = False
7+
_gcs_client_factory = None
8+
_gcs_blob_from_uri = None
9+
10+
11+
GcsClientType: TypeAlias = Any
12+
13+
14+
def set_gcs_client_factory(gcs_client_type, client_factory):
15+
global _gcs_initialized
16+
global _gcs_client_factory
17+
global GcsClientType
18+
if _gcs_initialized:
19+
_logger.warning('Replacing default GCS client factory')
20+
GcsClientType = gcs_client_type
21+
_gcs_client_factory = client_factory
22+
if _gcs_client_factory and _gcs_blob_from_uri:
23+
_gcs_initialized = True
24+
25+
26+
def set_gcs_blob_from_uri(blob_from_uri):
27+
global _gcs_initialized
28+
global _gcs_blob_from_uri
29+
if _gcs_initialized:
30+
_logger.warning('Replacing default GCS blob_from_uri method')
31+
_gcs_blob_from_uri = blob_from_uri
32+
if _gcs_client_factory and _gcs_blob_from_uri:
33+
_gcs_initialized = True
34+
35+
36+
def is_gcs_initialized():
37+
return _gcs_initialized
38+
39+
40+
def create_gcs_client():
41+
if _gcs_client_factory is not None:
42+
return _gcs_client_factory()
43+
return None
44+
45+
46+
def blob_from_uri(uri, client):
47+
if _gcs_blob_from_uri is not None:
48+
return _gcs_blob_from_uri(uri, client=client)
49+
return None
50+
51+
52+
try:
53+
from google.cloud.storage import Client as _GcsClient
54+
from google.cloud.storage.blob import Blob as _GcsBlob
55+
set_gcs_client_factory(_GcsClient, _GcsClient)
56+
set_gcs_blob_from_uri(getattr(_GcsBlob, 'from_uri', getattr(_GcsBlob, 'from_string')))
57+
_logger.debug('Found "google-cloud-storage" optional dependency and successfully registered it.')
58+
except ImportError:
59+
_logger.warning('Missing optional "google-cloud-storage" dependency.')

opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/backend/google/gcs/_gcs_impl.py

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,42 @@
11
import io
22
import uuid
3+
import logging
34

4-
from google.cloud.storage import Client as GcsClient
5-
from google.cloud.storage import Blob as GcsBlob
5+
from typing import Optional, TypeAlias
66

77
from opentelemetry.instrumentation._blobupload.api import Blob
88
from opentelemetry.instrumentation._blobupload.api import BlobUploader
99
from opentelemetry.instrumentation._blobupload.utils import SimpleBlobUploader
1010
from opentelemetry.instrumentation._blobupload.utils import blob_uploader_from_simple_blob_uploader
11+
from opentelemetry.instrumentation._blobupload.backend.google.gcs import _gcs_client_wrapper
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
GcsClient: TypeAlias = _gcs_client_wrapper.GcsClientType
16+
17+
18+
def _path_for_span(trace_id, span_id):
19+
if not trace_id or not span_id:
20+
return ''
21+
return 'traces/{}/spans/{}'.format(trace_id, span_id)
22+
23+
24+
def _path_for_event(trace_id, span_id, event_name):
25+
if not event_name:
26+
return ''
27+
span_path = _path_for_span(trace_id, span_id)
28+
if not span_path:
29+
return ''
30+
return '{}/events/{}'.format(span_path, event_name)
31+
32+
33+
def _path_for_span_event(trace_id, span_id, event_index):
34+
if event_index is None:
35+
return ''
36+
span_path = _path_for_span(trace_id, span_id)
37+
if not span_path:
38+
return ''
39+
return '{}/events/{}'.format(span_path, event_index)
1140

1241

1342
def _path_segment_from_labels(labels):
@@ -22,24 +51,19 @@ def _path_segment_from_labels(labels):
2251
...depending on the particular type of signal source.
2352
2453
"""
25-
segments = []
26-
target_segments = [
27-
('traces', 'trace_id', 'unknown'),
28-
('spans', 'span_id', 'unknown'),
29-
('events', 'event_index', None),
30-
]
31-
for segment_prefix, label_key, default_val in target_segments:
32-
label_value = labels.get(label_key) or default_val
33-
if label_value:
34-
segments.append(segment_prefix)
35-
segments.append(label_value)
36-
if ((labels.get('otel_type') in ['event', 'span_event']) and
37-
('events' not in segments)):
38-
event_name = labels.get('event_name') or 'unknown'
39-
segments.append('events')
40-
segments.append(event_name)
41-
return '/'.join(segments)
42-
54+
signal_type = labels.get('otel_type')
55+
if not signal_type or signal_type not in ['span', 'event', 'span_event']:
56+
return ''
57+
trace_id = labels.get('trace_id')
58+
span_id = labels.get('span_id')
59+
event_name = labels.get('event_name')
60+
event_index = labels.get('event_index')
61+
if signal_type == 'span':
62+
return _path_for_span(trace_id, span_id)
63+
elif signal_type == 'event':
64+
return _path_for_event(trace_id, span_id, event_name)
65+
elif signal_type == 'span_event':
66+
return _path_for_span_event(trace_id, span_id, event_index)
4367

4468

4569
class _SimpleGcsBlobUploader(SimpleBlobUploader):
@@ -52,15 +76,18 @@ def __init__(self, prefix: str, client:Optional[GcsClient]=None):
5276
if not prefix.endswith('/'):
5377
prefix = '{}/'.format(prefix)
5478
self._prefix = prefix
55-
self._client = client or GcsClient()
79+
self._client = client or _gcs_client_wrapper.create_gcs_client()
5680

5781
def generate_destination_uri(self, blob: Blob) -> str:
5882
origin_path = _path_segment_from_labels(blob.labels)
83+
if origin_path and not origin_path.endswith('/'):
84+
origin_path = '{}/'.format(origin_path)
5985
upload_id = uuid.uuid4().hex
60-
return '{}{}/uploads/{}'.format(self._prefix, origin_path, upload_id)
86+
return '{}{}uploads/{}'.format(self._prefix, origin_path, upload_id)
6187

6288
def upload_sync(self, uri: str, blob: Blob):
63-
gcs_blob = GcsBlob.from_string(uri, client=self._client)
89+
_logger.debug('Uploading blob: size: {} -> "{}"'.format(len(blob.raw_bytes), uri))
90+
gcs_blob = _gcs_client_wrapper.blob_from_uri(uri, client=self._client)
6491
gcs_blob.upload_from_file(
6592
io.BytesIO(blob.raw_bytes),
6693
content_type=blob.content_type)
@@ -69,9 +96,12 @@ def upload_sync(self, uri: str, blob: Blob):
6996
gcs_blob.metadata = metadata
7097

7198

99+
72100
class GcsBlobUploader(BlobUploader):
73101

74102
def __init__(self, prefix: str, client:Optional[GcsClient]=None):
103+
if not _gcs_client_wrapper.is_gcs_initialized():
104+
raise NotImplementedError("GcsBlobUploader implementation unavailable without 'google-cloud-storage' optional dependency.")
75105
simple_uploader = _SimpleGcsBlobUploader(prefix, client)
76106
self._delegate = blob_uploader_from_simple_blob_uploader(simple_uploader)
77107

opentelemetry-instrumentation/src/opentelemetry/instrumentation/_blobupload/utils/simple_blob_uploader_adaptor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def __call__(self):
3535
try:
3636
self._simple_uploader.upload_sync(self._uri, self._blob)
3737
except:
38-
_logger.error('Failed to upload blob to "{}".'.format(self._uri))
38+
_logger.exception('Failed to upload blob to "{}".'.format(self._uri))
3939

4040

4141
def _create_default_executor_no_cleanup():

opentelemetry-instrumentation/tests/_blobupload/api/test_blob.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
sys.path.append("../../../src")
66

77
import base64
8+
import logging
89
import unittest
910

1011

@@ -105,4 +106,5 @@ def assert_labels_equal(self, a, b):
105106

106107

107108
if __name__ == "__main__":
109+
logging.basicConfig(level=logging.DEBUG)
108110
unittest.main()

opentelemetry-instrumentation/tests/_blobupload/api/test_content_type.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
sys.path.append("../../../src")
66

77
import io
8+
import logging
89
import unittest
910

1011
from PIL import Image
@@ -56,4 +57,5 @@ def test_detects_png(self):
5657

5758

5859
if __name__ == "__main__":
60+
logging.basicConfig(level=logging.DEBUG)
5961
unittest.main()

opentelemetry-instrumentation/tests/_blobupload/api/test_labels.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
sys.path.append("../../../src")
66

7+
import logging
78
import unittest
89

910
from opentelemetry.instrumentation._blobupload.api import (
@@ -68,4 +69,5 @@ def test_generate_labels_for_span_event(self):
6869

6970

7071
if __name__ == "__main__":
72+
logging.basicConfig(level=logging.DEBUG)
7173
unittest.main()

opentelemetry-instrumentation/tests/_blobupload/api/test_provider.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
sys.path.append("../../../src")
66

7+
import logging
78
import unittest
89

910
from opentelemetry.instrumentation._blobupload.api import (
@@ -61,4 +62,5 @@ def get_blob_uploader(self, use_case):
6162

6263

6364
if __name__ == "__main__":
65+
logging.basicConfig(level=logging.DEBUG)
6466
unittest.main()

0 commit comments

Comments
 (0)