Skip to content

Commit a7f1a81

Browse files
authored
Merge branch 'master' into async
2 parents b7f48eb + 9db6571 commit a7f1a81

25 files changed

+321
-54
lines changed

.github/workflows/UnitTesting.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
python-version: ${{ env[matrix.python-version] }}
3737

3838
- name: Install tox
39-
run: pip install -U tox-factor
39+
run: pip install "tox<=3.27.1" -U tox-factor
4040

4141
- name: Cache tox environment
4242
# Preserves .tox directory between runs for faster installs

CHANGELOG.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@ CHANGELOG
55
Unreleased
66
==========
77

8+
2.11.0
9+
==========
10+
* bugfix: Fix TypeError by patching register_default_jsonb from psycopg2 `PR350 https://github.com/aws/aws-xray-sdk-python/pull/350`
11+
* improvement: Add annotations `PR348 https://github.com/aws/aws-xray-sdk-python/pull/348`
12+
* bugfix: Use service parameter to match centralized sampling rules `PR 353 https://github.com/aws/aws-xray-sdk-python/pull/353`
13+
* bugfix: Implement PEP3134 to discover underlying problems with python3 `PR355 https://github.com/aws/aws-xray-sdk-python/pull/355`
14+
* improvement: Allow list TopicArn for SNS PublishBatch request `PR358 https://github.com/aws/aws-xray-sdk-python/pull/358`
15+
* bugfix: Version pinning flask-sqlalchemy version to 2.5.1 or less `PR360 https://github.com/aws/aws-xray-sdk-python/pull/360`
16+
* bugfix: Fix UnboundLocalError when aiohttp server raises a CancelledError `PR356 https://github.com/aws/aws-xray-sdk-python/pull/356`
17+
* improvement: Instrument httpx >= 0.20 `PR357 https://github.com/aws/aws-xray-sdk-python/pull/357`
18+
* improvement: [LambdaContext] persist original trace header `PR362 https://github.com/aws/aws-xray-sdk-python/pull/362`
19+
* bugfix: Run tests against Django 4.x `PR361 https://github.com/aws/aws-xray-sdk-python/pull/361`
20+
* improvement: Oversampling Mitigation `PR366 https://github.com/aws/aws-xray-sdk-python/pull/366`
21+
822
2.10.0
923
==========
1024
* bugfix: Only import future for py2. `PR343 <https://github.com/aws/aws-xray-sdk-python/pull/343>`_.

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,46 @@ xray_recorder.end_subsegment()
124124
xray_recorder.end_segment()
125125
```
126126

127+
### Oversampling Mitigation
128+
To modify the sampling decision at the subsegment level, subsegments that inherit the decision of their direct parent (segment or subsegment) can be created using `xray_recorder.begin_subsegment()` and unsampled subsegments can be created using
129+
`xray_recorder.begin_subsegment_without_sampling()`.
130+
131+
The code snippet below demonstrates creating a sampled or unsampled subsegment based on the sampling decision of each SQS message processed by Lambda.
132+
133+
```python
134+
from aws_xray_sdk.core import xray_recorder
135+
from aws_xray_sdk.core.models.subsegment import Subsegment
136+
from aws_xray_sdk.core.utils.sqs_message_helper import SqsMessageHelper
137+
138+
def lambda_handler(event, context):
139+
140+
for message in event['Records']:
141+
if SqsMessageHelper.isSampled(message):
142+
subsegment = xray_recorder.begin_subsegment('sampled_subsegment')
143+
print('sampled - processing SQS message')
144+
145+
else:
146+
subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled_subsegment')
147+
print('unsampled - processing SQS message')
148+
149+
xray_recorder.end_subsegment()
150+
```
151+
152+
The code snippet below demonstrates wrapping a downstream AWS SDK request with an unsampled subsegment.
153+
```python
154+
from aws_xray_sdk.core import xray_recorder, patch_all
155+
import boto3
156+
157+
patch_all()
158+
159+
def lambda_handler(event, context):
160+
subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled_subsegment')
161+
client = boto3.client('sqs')
162+
print(client.list_queues())
163+
164+
xray_recorder.end_subsegment()
165+
```
166+
127167
### Capture
128168

129169
As a decorator:

aws_xray_sdk/core/async_recorder.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import time
2-
import six
32

43
from aws_xray_sdk.core.recorder import AWSXRayRecorder
54
from aws_xray_sdk.core.utils import stacktrace
@@ -82,10 +81,10 @@ async def record_subsegment_async(self, wrapped, instance, args, kwargs, name,
8281
try:
8382
return_value = await wrapped(*args, **kwargs)
8483
return return_value
85-
except Exception as exc:
86-
exception = exc
84+
except Exception as e:
85+
exception = e
8786
stack = stacktrace.get_stacktrace(limit=self._max_trace_back)
88-
six.raise_from(exc, exc)
87+
raise
8988
finally:
9089
# No-op if subsegment is `None` due to `LOG_ERROR`.
9190
if subsegment is not None:

aws_xray_sdk/core/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class Context(object):
2727
2828
This data structure is thread-safe.
2929
"""
30-
def __init__(self, context_missing='RUNTIME_ERROR'):
30+
def __init__(self, context_missing='LOG_ERROR'):
3131

3232
self._local = threading.local()
3333
strategy = os.getenv(CXT_MISSING_STRATEGY_KEY, context_missing)

aws_xray_sdk/core/models/dummy_entities.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class DummySegment(Segment):
1111
the segment based on sampling rules.
1212
Adding data to a dummy segment becomes a no-op except for
1313
subsegments. This is to reduce the memory footprint of the SDK.
14-
A dummy segment will not be sent to the X-Ray daemon. Manually create
14+
A dummy segment will not be sent to the X-Ray daemon. Manually creating
1515
dummy segments is not recommended.
1616
"""
1717

aws_xray_sdk/core/models/entity.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ def add_subsegment(self, subsegment):
8181
"""
8282
self._check_ended()
8383
subsegment.parent_id = self.id
84+
85+
if not self.sampled and subsegment.sampled:
86+
log.warning("This sampled subsegment is being added to an unsampled parent segment/subsegment and will be orphaned.")
87+
8488
self.subsegments.append(subsegment)
8589

8690
def remove_subsegment(self, subsegment):

aws_xray_sdk/core/patcher.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import re
77
import sys
88
import wrapt
9-
import six
109

1110
from aws_xray_sdk import global_sdk_config
1211
from .utils.compat import PY2, is_classmethod, is_instance_method
@@ -110,9 +109,9 @@ def patch(modules_to_patch, raise_errors=True, ignore_module_patterns=None):
110109
def _patch_module(module_to_patch, raise_errors=True):
111110
try:
112111
_patch(module_to_patch)
113-
except Exception as exc:
112+
except Exception:
114113
if raise_errors:
115-
six.raise_from(exc, exc)
114+
raise
116115
log.debug('failed to patch module %s', module_to_patch)
117116

118117

aws_xray_sdk/core/recorder.py

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import os
55
import platform
66
import time
7-
import six
87

98
from aws_xray_sdk import global_sdk_config
109
from aws_xray_sdk.version import VERSION
@@ -275,16 +274,10 @@ def current_segment(self):
275274
else:
276275
return entity
277276

278-
def begin_subsegment(self, name, namespace='local'):
279-
"""
280-
Begin a new subsegment.
281-
If there is open subsegment, the newly created subsegment will be the
282-
child of latest opened subsegment.
283-
If not, it will be the child of the current open segment.
284-
285-
:param str name: the name of the subsegment.
286-
:param str namespace: currently can only be 'local', 'remote', 'aws'.
287-
"""
277+
def _begin_subsegment_helper(self, name, namespace='local', beginWithoutSampling=False):
278+
'''
279+
Helper method to begin_subsegment and begin_subsegment_without_sampling
280+
'''
288281
# Generating the parent dummy segment is necessary.
289282
# We don't need to store anything in context. Assumption here
290283
# is that we only work with recorder-level APIs.
@@ -295,16 +288,42 @@ def begin_subsegment(self, name, namespace='local'):
295288
if not segment:
296289
log.warning("No segment found, cannot begin subsegment %s." % name)
297290
return None
298-
299-
if not segment.sampled:
291+
292+
current_entity = self.get_trace_entity()
293+
if not current_entity.sampled or beginWithoutSampling:
300294
subsegment = DummySubsegment(segment, name)
301295
else:
302296
subsegment = Subsegment(name, namespace, segment)
303297

304298
self.context.put_subsegment(subsegment)
305-
306299
return subsegment
307300

301+
302+
303+
def begin_subsegment(self, name, namespace='local'):
304+
"""
305+
Begin a new subsegment.
306+
If there is open subsegment, the newly created subsegment will be the
307+
child of latest opened subsegment.
308+
If not, it will be the child of the current open segment.
309+
310+
:param str name: the name of the subsegment.
311+
:param str namespace: currently can only be 'local', 'remote', 'aws'.
312+
"""
313+
return self._begin_subsegment_helper(name, namespace)
314+
315+
316+
def begin_subsegment_without_sampling(self, name):
317+
"""
318+
Begin a new unsampled subsegment.
319+
If there is open subsegment, the newly created subsegment will be the
320+
child of latest opened subsegment.
321+
If not, it will be the child of the current open segment.
322+
323+
:param str name: the name of the subsegment.
324+
"""
325+
return self._begin_subsegment_helper(name, beginWithoutSampling=True)
326+
308327
def current_subsegment(self):
309328
"""
310329
Return the latest opened subsegment. In a multithreading environment,
@@ -436,10 +455,10 @@ def record_subsegment(self, wrapped, instance, args, kwargs, name,
436455
try:
437456
return_value = wrapped(*args, **kwargs)
438457
return return_value
439-
except Exception as exc:
440-
exception = exc
458+
except Exception as e:
459+
exception = e
441460
stack = stacktrace.get_stacktrace(limit=self.max_trace_back)
442-
six.raise_from(exc, exc)
461+
raise
443462
finally:
444463
# No-op if subsegment is `None` due to `LOG_ERROR`.
445464
if subsegment is not None:
@@ -487,7 +506,8 @@ def _send_segment(self):
487506

488507
def _stream_subsegment_out(self, subsegment):
489508
log.debug("streaming subsegments...")
490-
self.emitter.send_entity(subsegment)
509+
if subsegment.sampled:
510+
self.emitter.send_entity(subsegment)
491511

492512
def _load_sampling_rules(self, sampling_rules):
493513

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
SQS_XRAY_HEADER = "AWSTraceHeader"
2+
class SqsMessageHelper:
3+
4+
@staticmethod
5+
def isSampled(sqs_message):
6+
attributes = sqs_message['attributes']
7+
8+
if SQS_XRAY_HEADER not in attributes:
9+
return False
10+
11+
return 'Sampled=1' in attributes[SQS_XRAY_HEADER]

0 commit comments

Comments
 (0)