Skip to content

Commit c0b6911

Browse files
P403n1x87mabdinur
andauthored
refactor(rcm): remove code duplication in subscriber service (#7535)
We remove some code duplication from the subscriber service class and reuse the logic already implemented for the periodic service class. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Title is accurate. - [ ] No unnecessary changes are introduced. - [ ] Description motivates each change. - [ ] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [ ] Testing strategy adequately addresses listed risk(s). - [ ] Change is maintainable (easy to change, telemetry, documentation). - [ ] Release note makes sense to a user of the library. - [ ] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [ ] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [ ] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [ ] This PR doesn't touch any of that. Co-authored-by: Munir Abdinur <[email protected]>
1 parent a45b659 commit c0b6911

File tree

7 files changed

+36
-71
lines changed

7 files changed

+36
-71
lines changed
Lines changed: 18 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import os
2-
import time
32
from typing import TYPE_CHECKING
43

54
from ddtrace.internal.logger import get_logger
6-
from ddtrace.internal.periodic import PeriodicThread
5+
from ddtrace.internal.periodic import PeriodicService
76
from ddtrace.internal.remoteconfig.utils import get_poll_interval_seconds
87

98

@@ -19,80 +18,42 @@
1918
log = get_logger(__name__)
2019

2120

22-
class RemoteConfigSubscriber(object):
23-
_th_worker = None
24-
21+
class RemoteConfigSubscriber(PeriodicService):
2522
def __init__(self, data_connector, callback, name):
2623
# type: (PublisherSubscriberConnector, Callable, str) -> None
24+
super().__init__(get_poll_interval_seconds() / 2)
25+
2726
self._data_connector = data_connector
28-
self.is_running = False
2927
self._callback = callback
3028
self._name = name
31-
log.debug("[%s] Subscriber %s init", os.getpid(), self._name)
32-
self.interval = get_poll_interval_seconds() / 2
29+
30+
log.debug("[PID %d] %s initialized", os.getpid(), self)
3331

3432
def _exec_callback(self, data, test_tracer=None):
3533
# type: (SharedDataType, Optional[Tracer]) -> None
3634
if data:
37-
log.debug("[%s] Subscriber %s _exec_callback: %s", os.getpid(), self._name, str(data)[:50])
35+
log.debug("[PID %d] %s _exec_callback: %s", os.getpid(), self, str(data)[:50])
3836
self._callback(data, test_tracer=test_tracer)
3937

4038
def _get_data_from_connector_and_exec(self, test_tracer=None):
4139
# type: (Optional[Tracer]) -> None
4240
data = self._data_connector.read()
4341
self._exec_callback(data, test_tracer=test_tracer)
4442

45-
def _worker(self):
46-
self.is_running = True
43+
def periodic(self):
4744
try:
45+
log.debug("[PID %d | PPID %d] %s is getting data", os.getpid(), os.getppid(), self)
4846
self._get_data_from_connector_and_exec()
47+
log.debug("[PID %d | PPID %d] %s got data", os.getpid(), os.getppid(), self)
4948
except Exception:
50-
log.debug("[%s][P: %s] Subscriber %s get an error", os.getpid(), os.getppid(), self._name, exc_info=True)
51-
time.sleep(self.interval)
52-
53-
def start(self):
54-
if not self.is_running and self._th_worker is None:
55-
log.debug("[%s][P: %s] Subscriber %s starts %s", os.getpid(), os.getppid(), self._name, self.is_running)
56-
self._th_worker = PeriodicThread(
57-
target=self._worker,
58-
interval=self.interval,
59-
on_shutdown=self.stop,
60-
name="%s:%s" % (self.__class__.__module__, self.__class__.__name__),
61-
)
62-
self._th_worker.start()
63-
else:
64-
is_alive = False
65-
if self._th_worker is not None:
66-
is_alive = self._th_worker.is_alive()
67-
log.debug(
68-
"[%s][P: %s] Subscriber %s is trying to start but the subscriber status is %s "
69-
"and subscriber thread is %s (is alive: %s)",
70-
os.getpid(),
71-
os.getppid(),
72-
self._name,
73-
self.is_running,
74-
self._th_worker,
75-
is_alive,
76-
)
49+
log.error("[PID %d | PPID %d] %s while getting data", os.getpid(), os.getppid(), self, exc_info=True)
7750

78-
def force_restart(self, join):
79-
self.is_running = False
80-
self.stop(join)
81-
log.debug(
82-
"[%s][P: %s] Subscriber %s worker restarts. Status: %s",
83-
os.getpid(),
84-
os.getppid(),
85-
self._name,
86-
self.is_running,
87-
)
51+
def force_restart(self, join=False):
52+
self.stop()
53+
if join:
54+
self.join()
8855
self.start()
56+
log.debug("[PID %d | PPID %d] %s restarted", os.getpid(), os.getppid(), self)
8957

90-
def stop(self, join=False):
91-
# type: (bool) -> None
92-
if self._th_worker:
93-
self.is_running = False
94-
self._th_worker.stop()
95-
if join:
96-
self._th_worker.join()
97-
log.debug("[%s][P: %s] Subscriber %s. Stopped", os.getpid(), os.getppid(), self._name)
98-
self._th_worker = None
58+
def __str__(self):
59+
return f"Subscriber {self._name}"

ddtrace/internal/remoteconfig/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from ddtrace.internal.logger import get_logger
2828
from ddtrace.internal.remoteconfig.constants import REMOTE_CONFIG_AGENT_ENDPOINT
2929
from ddtrace.internal.runtime import container
30+
from ddtrace.internal.service import ServiceStatus
3031
from ddtrace.internal.utils.time import parse_isoformat
3132

3233
from ..utils.formats import parse_tags_str
@@ -254,7 +255,7 @@ def get_pubsubs(self):
254255
def is_subscriber_running(self, pubsub_to_check):
255256
# type: (PubSub) -> bool
256257
for pubsub in self.get_pubsubs():
257-
if pubsub_to_check._subscriber is pubsub._subscriber and pubsub._subscriber.is_running:
258+
if pubsub_to_check._subscriber is pubsub._subscriber and pubsub._subscriber.status == ServiceStatus.RUNNING:
258259
return True
259260
return False
260261

ddtrace/internal/remoteconfig/worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,7 @@ def __enter__(self):
174174

175175
def __exit__(self, *args):
176176
# type: (...) -> None
177-
self.stop_subscribers(True)
178-
self.disable()
177+
self.disable(join=True)
179178

180179

181180
remoteconfig_poller = RemoteConfigPoller()

tests/conftest.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from ddtrace.internal.compat import parse
2727
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
2828
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
29+
from ddtrace.internal.service import ServiceStatusError
2930
from ddtrace.internal.telemetry import TelemetryWriter
3031
from ddtrace.internal.utils.formats import parse_tags_str
3132
from tests import utils
@@ -373,7 +374,10 @@ def _stop_remote_config_worker():
373374

374375
@pytest.fixture
375376
def remote_config_worker():
376-
remoteconfig_poller.disable(join=True)
377+
try:
378+
remoteconfig_poller.disable(join=True)
379+
except ServiceStatusError:
380+
pass
377381
remoteconfig_poller._client = RemoteConfigClient()
378382
try:
379383
yield

tests/debugging/probe/test_remoteconfig.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@ def remove_probes(self, *probe_ids):
4545
class SyncProbeRCAdapter(ProbeRCAdapter):
4646
def __init__(self, *args, **kwargs):
4747
super(SyncProbeRCAdapter, self).__init__(*args, **kwargs)
48-
# Prevent the worker thread from starting. We call methods manually.
49-
self._subscriber.is_running = True
48+
# Make the subscriber worker thread a no-op. We call methods manually.
49+
self._subscriber.periodic = self.periodic
50+
51+
def periodic(self):
52+
pass
5053

5154

5255
def config_metadata(config_id=None):

tests/internal/remoteconfig/test_remoteconfig.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ def _reload_features(self, features, test_tracer=None):
233233
mock_pubsub = RCMockPubSub(None, callback._reload_features)
234234
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
235235

236-
mock_pubsub.start_subscriber()
237236
rc._online()
238237
mock_send_request.assert_called()
239238
sleep(0.5)
@@ -263,7 +262,6 @@ def _reload_features(self, features, test_tracer=None):
263262
with RemoteConfigPoller() as rc:
264263
mock_pubsub = RCMockPubSub(None, callback._reload_features)
265264
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
266-
mock_pubsub.start_subscriber()
267265
rc._online()
268266
mock_send_request.assert_called_once()
269267
sleep(0.5)
@@ -353,7 +351,6 @@ def _reload_features(self, features, test_tracer=None):
353351
mock_send_request.return_value = get_mock_encoded_msg_with_signed_errors(msg, path, signed_errors)
354352
mock_pubsub = RCMockPubSub(None, callback._reload_features)
355353
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
356-
mock_pubsub.start_subscriber()
357354
rc._online()
358355
mock_send_request.assert_called()
359356
sleep(0.5)
@@ -376,7 +373,6 @@ def _reload_features(self, features, test_tracer=None):
376373
with RemoteConfigPoller() as rc:
377374
mock_pubsub = RCMockPubSub(None, callback._reload_features)
378375
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
379-
mock_pubsub.start_subscriber()
380376
for _ in range(0, 2):
381377
msg = b'{"asm":{"enabled":true}}'
382378
expires_date = datetime.datetime.strftime(

tests/internal/remoteconfig/test_remoteconfig_subscriber.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import mock
55

66
from ddtrace.internal.remoteconfig._subscribers import RemoteConfigSubscriber
7+
from ddtrace.internal.service import ServiceStatus
78
from tests.internal.remoteconfig.utils import MockConnector
89
from tests.utils import override_global_config
910

@@ -12,12 +13,12 @@ def test_subscriber_thread():
1213
with override_global_config(dict(_remote_config_poll_interval=0.1)):
1314
mock_callback = mock.MagicMock()
1415
subscriber = RemoteConfigSubscriber(MockConnector({"example": "data"}), mock_callback, "TEST_DATA")
15-
assert not subscriber.is_running
16+
assert subscriber.status is ServiceStatus.STOPPED
1617

1718
subscriber.start()
1819
sleep(0.15)
19-
assert subscriber.is_running
20+
assert subscriber.status is ServiceStatus.RUNNING
2021
mock_callback.assert_called_with({"example": "data"}, test_tracer=None)
2122

22-
subscriber.stop()
23-
assert not subscriber.is_running
23+
subscriber.stop(join=True)
24+
assert subscriber.status is ServiceStatus.STOPPED

0 commit comments

Comments
 (0)