|
11 | 11 | from ddtrace.appsec.trace_utils import track_user_login_failure_event |
12 | 12 | from ddtrace.appsec.trace_utils import track_user_login_success_event |
13 | 13 | from ddtrace.appsec.trace_utils import track_user_signup_event |
| 14 | +from ddtrace.appsec.track_user_sdk import track_user |
14 | 15 | from ddtrace.contrib.internal.trace_utils import set_user |
15 | 16 | from ddtrace.ext import user |
16 | 17 | import tests.appsec.rules as rules |
@@ -93,6 +94,7 @@ def test_track_user_login_event_success_in_span_without_metadata(self): |
93 | 94 | assert ( |
94 | 95 | user_span.get_tag(user.SESSION_ID) == "test_session_id" and parent_span.get_tag(user.SESSION_ID) is None |
95 | 96 | ) |
| 97 | + user_span.finish() |
96 | 98 |
|
97 | 99 | def test_track_user_login_event_success_auto_mode_safe(self): |
98 | 100 | with asm_context(tracer=self.tracer, span_name="test_success1", config=config_asm): |
@@ -239,6 +241,31 @@ def test_set_user_blocked(self): |
239 | 241 | assert span.get_tag("usr.id") == str(self._BLOCKED_USER) |
240 | 242 | assert is_blocked(span) |
241 | 243 |
|
| 244 | + def test_track_user_blocked(self): |
| 245 | + with asm_context(tracer=self.tracer, span_name="fake_span", config=config_good_rules) as span: |
| 246 | + track_user( |
| 247 | + self.tracer, |
| 248 | + user_id=self._BLOCKED_USER, |
| 249 | + session_id="usr.session_id", |
| 250 | + metadata={ |
| 251 | + "email": "usr.email", |
| 252 | + "name": "usr.name", |
| 253 | + "session_id": "usr.session_id", |
| 254 | + "role": "usr.role", |
| 255 | + "scope": "usr.scope", |
| 256 | + }, |
| 257 | + ) |
| 258 | + assert span.get_tag(user.ID) |
| 259 | + assert span.get_tag(user.EMAIL) |
| 260 | + assert span.get_tag(user.SESSION_ID) |
| 261 | + assert span.get_tag(user.NAME) |
| 262 | + assert span.get_tag(user.ROLE) |
| 263 | + assert span.get_tag(user.SCOPE) |
| 264 | + assert span.get_tag(user.SESSION_ID) |
| 265 | + assert span.get_tag(APPSEC.AUTO_LOGIN_EVENTS_COLLECTION_MODE) == LOGIN_EVENTS_MODE.SDK |
| 266 | + assert span.get_tag("usr.id") == str(self._BLOCKED_USER) |
| 267 | + assert is_blocked(span) |
| 268 | + |
242 | 269 | def test_no_span_doesnt_raise(self): |
243 | 270 | from ddtrace.trace import tracer |
244 | 271 |
|
|
0 commit comments