Skip to content

Commit d756d69

Browse files
authored
Periodically report instance properties (#279)
1 parent 20ce46a commit d756d69

File tree

16 files changed

+213
-130
lines changed

16 files changed

+213
-130
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
- Avoid reporting meaningless tracecontext with logs when there's no active span, UI will now show empty traceID (#272)
4242
- Fix exception handler in profile_context (#273)
4343
- Add namespace suffix to service name (#275)
44+
- Add periodical instance property report to prevent data loss (#279)
4445

4546
- Docs:
4647
- New documentation on how to test locally (#222)

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ endif
2727

2828
.PHONY: env
2929
env: poetry gen
30-
poetry install
30+
poetry install --all-extras
3131
poetry run pip install --upgrade pip
3232

3333
.PHONY: poetry poetry-fallback

docs/en/setup/Configuration.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ export SW_AGENT_YourConfiguration=YourValue
3636
| Configuration | Environment Variable | Type | Default Value | Description |
3737
| :------------ | :------------ | :------------ | :------------ | :------------ |
3838
| heartbeat_period | SW_AGENT_HEARTBEAT_PERIOD | <class 'int'> | 30 | The agent will exchange heartbeat message with SkyWalking OAP backend every `period` seconds |
39-
| service_instance_property_report_factor | SW_AGENT_SERVICE_INSTANCE_PROPERTY_REPORT_FACTOR | <class 'int'> | 10 | The agent will report service instance properties every `factor * heartbeat period` seconds default: 10*30 = 300 seconds (TODO) |
40-
| experimental_fork_support | SW_AGENT_EXPERIMENTAL_FORK_SUPPORT | <class 'bool'> | False | The agent will try to restart itself in any os.fork()-ed child process. Important note: it's not suitable for large numbered, short-lived processes such as multiprocessing.Pool, as each one will introduce overhead and create numerous instances in SkyWalking dashboard in format of `service_instance-child-<pid>` (TODO) |
39+
| collector_properties_report_period_factor | SW_AGENT_COLLECTOR_PROPERTIES_REPORT_PERIOD_FACTOR | <class 'int'> | 10 | The agent will report service instance properties every `factor * heartbeat period` seconds default: 10*30 = 300 seconds |
40+
| instance_properties_json | SW_AGENT_INSTANCE_PROPERTIES_JSON | <class 'str'> | | A custom JSON string to be reported as service instance properties, e.g. `{"key": "value"}` |
41+
| experimental_fork_support | SW_AGENT_EXPERIMENTAL_FORK_SUPPORT | <class 'bool'> | False | The agent will try to restart itself in any os.fork()-ed child process. Important Note: it's not suitable for short-lived processes as each one will introduce overhead and create a new instance in SkyWalking dashboard in format of `service_instance-child-<pid>` (TODO) |
4142
| queue_timeout | SW_AGENT_QUEUE_TIMEOUT | <class 'int'> | 1 | DANGEROUS - This option controls the interval of each bulk report from telemetry data queues Do not modify unless you have evaluated its impact given your service load. |
4243
### SW_PYTHON Auto Instrumentation CLI
4344
| Configuration | Environment Variable | Type | Default Value | Description |

skywalking/agent/__init__.py

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#
1717

1818
import atexit
19-
import os
2019
from queue import Queue, Full
2120
from threading import Thread, Event
2221
from typing import TYPE_CHECKING
@@ -61,7 +60,7 @@ def __report():
6160

6261
while not __finished.is_set():
6362
try:
64-
__protocol.report(__queue) # is blocking actually, blocks for max config.queue_timeout seconds
63+
__protocol.report_segment(__queue) # is blocking actually, blocks for max config.queue_timeout seconds
6564
wait = base
6665
except Exception as exc:
6766
logger.error(str(exc))
@@ -89,7 +88,7 @@ def __send_profile_snapshot():
8988

9089
while not __finished.is_set():
9190
try:
92-
__protocol.send_snapshot(__snapshot_queue)
91+
__protocol.report_snapshot(__snapshot_queue)
9392
wait = base
9493
except Exception as exc:
9594
logger.error(str(exc))
@@ -198,39 +197,20 @@ def __init():
198197

199198

200199
def __fini():
201-
__protocol.report(__queue, False)
200+
__protocol.report_segment(__queue, False)
202201
__queue.join()
203202

204203
if config.log_reporter_active:
205204
__protocol.report_log(__log_queue, False)
206205
__log_queue.join()
207206

208207
if config.profiler_active:
209-
__protocol.send_snapshot(__snapshot_queue, False)
208+
__protocol.report_snapshot(__snapshot_queue, False)
210209
__snapshot_queue.join()
211210

212211
__finished.set()
213212

214213

215-
def __fork_before():
216-
if config.protocol != 'http':
217-
logger.warning(f'fork() not currently supported with {config.protocol} protocol')
218-
219-
# TODO: handle __queue and __finished correctly (locks, mutexes, etc...), need to lock before fork and unlock after
220-
# if possible, or ensure they are not locked in threads (end threads and restart after fork?)
221-
222-
__protocol.fork_before()
223-
224-
225-
def __fork_after_in_parent():
226-
__protocol.fork_after_in_parent()
227-
228-
229-
def __fork_after_in_child():
230-
__protocol.fork_after_in_child()
231-
__init_threading()
232-
233-
234214
def start():
235215
global __started
236216
if __started:
@@ -256,10 +236,6 @@ def start():
256236

257237
atexit.register(__fini)
258238

259-
if (hasattr(os, 'register_at_fork')):
260-
os.register_at_fork(before=__fork_before, after_in_parent=__fork_after_in_parent,
261-
after_in_child=__fork_after_in_child)
262-
263239

264240
def stop():
265241
atexit.unregister(__fini)

skywalking/agent/protocol/__init__.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,31 @@
1919
from queue import Queue
2020

2121

22-
class BaseProtocol(ABC):
22+
class Protocol(ABC):
2323
@abstractmethod
2424
def heartbeat(self):
2525
raise NotImplementedError()
2626

2727
@abstractmethod
28-
def report(self, queue: Queue, block: bool = True):
28+
def report_segment(self, queue: Queue, block: bool = True):
2929
raise NotImplementedError()
3030

3131
@abstractmethod
3232
def report_log(self, queue: Queue, block: bool = True):
3333
raise NotImplementedError()
3434

35-
36-
class Protocol(BaseProtocol):
37-
def fork_before(self):
38-
pass
39-
40-
def fork_after_in_parent(self):
41-
pass
42-
43-
def fork_after_in_child(self):
44-
pass
45-
35+
@abstractmethod
4636
def report_meter(self, queue: Queue, block: bool = True):
47-
pass
37+
raise NotImplementedError()
4838

49-
def query_profile_commands(self):
50-
pass
39+
@abstractmethod
40+
def report_snapshot(self, queue: Queue, block: bool = True):
41+
raise NotImplementedError()
5142

52-
def send_snapshot(self, queue: Queue, block: bool = True):
53-
pass
43+
@abstractmethod
44+
def query_profile_commands(self):
45+
raise NotImplementedError()
5446

47+
@abstractmethod
5548
def notify_profile_task_finish(self, task):
56-
pass
49+
raise NotImplementedError()

skywalking/agent/protocol/grpc.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def on_error(self):
9090
self.channel.unsubscribe(self._cb)
9191
self.channel.subscribe(self._cb, try_to_connect=True)
9292

93-
def report(self, queue: Queue, block: bool = True):
93+
def report_segment(self, queue: Queue, block: bool = True):
9494
start = None
9595

9696
def generator():
@@ -214,7 +214,6 @@ def generator():
214214

215215
if logger_debug_enabled:
216216
logger.debug('Reporting Meter')
217-
218217
yield meter_data
219218

220219
try:
@@ -223,7 +222,7 @@ def generator():
223222
self.on_error()
224223
raise
225224

226-
def send_snapshot(self, queue: Queue, block: bool = True):
225+
def report_snapshot(self, queue: Queue, block: bool = True):
227226
start = None
228227

229228
def generator():
@@ -255,7 +254,7 @@ def generator():
255254
yield transform_snapshot
256255

257256
try:
258-
self.profile_channel.send(generator())
257+
self.profile_channel.report(generator())
259258
except grpc.RpcError:
260259
self.on_error()
261260
raise

skywalking/agent/protocol/http.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,13 @@ def __init__(self):
3333
self.traces_reporter = HttpTraceSegmentReportService()
3434
self.log_reporter = HttpLogDataReportService()
3535

36-
def fork_after_in_child(self):
37-
self.service_management.fork_after_in_child()
38-
self.traces_reporter.fork_after_in_child()
39-
4036
def heartbeat(self):
4137
if not self.properties_sent:
4238
self.service_management.send_instance_props()
4339
self.properties_sent = True
4440
self.service_management.send_heart_beat()
4541

46-
def report(self, queue: Queue, block: bool = True):
42+
def report_segment(self, queue: Queue, block: bool = True):
4743
start = None
4844

4945
def generator():
@@ -102,3 +98,16 @@ def generator():
10298
self.log_reporter.report(generator=generator())
10399
except Exception:
104100
pass
101+
102+
# meter support requires OAP side HTTP handler to be implemented
103+
def report_meter(self, queue: Queue, block: bool = True):
104+
...
105+
106+
def report_snapshot(self, queue: Queue, block: bool = True):
107+
...
108+
109+
def query_profile_commands(self):
110+
...
111+
112+
def notify_profile_task_finish(self, task):
113+
...

skywalking/agent/protocol/kafka.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(self):
4545
def heartbeat(self):
4646
self.service_management.send_heart_beat()
4747

48-
def report(self, queue: Queue, block: bool = True):
48+
def report_segment(self, queue: Queue, block: bool = True):
4949
start = None
5050

5151
def generator():
@@ -162,3 +162,13 @@ def generator():
162162
yield meter_data
163163

164164
self.meter_reporter.report(generator=generator())
165+
166+
# TODO: implement profiling for kafka
167+
def report_snapshot(self, queue: Queue, block: bool = True):
168+
...
169+
170+
def query_profile_commands(self):
171+
...
172+
173+
def notify_profile_task_finish(self, task):
174+
...

skywalking/client/__init__.py

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,123 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import os
18+
import json
19+
import platform
20+
import socket
21+
from abc import ABC, abstractmethod
22+
from typing import List
1723

24+
from skywalking import config
25+
from skywalking.loggings import logger
26+
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
1827

19-
class ServiceManagementClient(object):
20-
def send_instance_props(self):
28+
29+
class ServiceManagementClient(ABC):
30+
"""
31+
Used to register service and instance to OAP.
32+
"""
33+
34+
def __init__(self):
35+
self.sent_properties_counter = 0
36+
37+
@abstractmethod
38+
def send_instance_props(self) -> None:
39+
"""
40+
Unique to each protocol, send instance properties to OAP.
41+
"""
42+
raise NotImplementedError()
43+
44+
def refresh_instance_props(self) -> None:
45+
"""
46+
Periodically refresh the instance properties to prevent loss on OAP TTL records expiration.
47+
Default: 30 * 10 seconds
48+
"""
49+
self.sent_properties_counter += 1
50+
if self.sent_properties_counter % config.collector_properties_report_period_factor == 0:
51+
self.send_instance_props()
52+
53+
@staticmethod
54+
def get_instance_properties() -> List[dict]:
55+
"""
56+
Get current running Python interpreter's system properties.
57+
Returns: [{'key': str, 'value': str}, ...]
58+
"""
59+
try:
60+
properties = [
61+
{'key': 'language', 'value': 'python'},
62+
{'key': 'OS Name', 'value': os.name},
63+
{'key': 'Process No.', 'value': str(os.getpid())},
64+
{'key': 'hostname', 'value': socket.gethostname()},
65+
{'key': 'ipv4', 'value': '; '.join(socket.gethostbyname_ex(socket.gethostname())[2])},
66+
{'key': 'python_implementation', 'value': platform.python_implementation()},
67+
{'key': 'python_version', 'value': platform.python_version()},
68+
]
69+
70+
except Exception as e: # noqa
71+
logger.exception('Failed to get OS info, fallback to basic properties.')
72+
properties = [
73+
{'key': 'language', 'value': 'python'},
74+
{'key': 'Process No.', 'value': str(os.getpid())},
75+
]
76+
77+
namespace = config.namespace
78+
if namespace:
79+
properties.append({'key': 'namespace', 'value': namespace})
80+
81+
instance_properties_json = config.instance_properties_json
82+
if instance_properties_json:
83+
# load instance properties from json string
84+
json_properties = json.loads(instance_properties_json)
85+
for key, value in json_properties.items():
86+
properties.append({'key': key, 'value': value})
87+
88+
return properties
89+
90+
def get_instance_properties_proto(self) -> List[KeyStringValuePair]:
91+
"""
92+
Converts to protobuf format.
93+
Returns: [KeyStringValuePair, ...]
94+
"""
95+
return [KeyStringValuePair(key=prop['key'], value=prop['value']) for prop in self.get_instance_properties()]
96+
97+
def send_heart_beat(self) -> None:
98+
"""
99+
Each protocol must implement this method to send heart beat to OAP.
100+
Returns: None
101+
"""
21102
raise NotImplementedError()
22103

23-
def send_heart_beat(self):
104+
105+
class Service(ABC):
106+
@abstractmethod
107+
def report(self, segment: bytes) -> None:
24108
raise NotImplementedError()
25109

26110

27-
class TraceSegmentReportService(object):
111+
class TraceSegmentReportService(Service):
112+
@abstractmethod
28113
def report(self, generator):
29114
raise NotImplementedError()
30115

31116

32-
class MeterReportService(object):
117+
class MeterReportService(Service):
118+
@abstractmethod
33119
def report(self, generator):
34120
raise NotImplementedError()
35121

36122

37-
class LogDataReportService(object):
123+
class LogDataReportService(Service):
124+
@abstractmethod
38125
def report(self, generator):
39126
raise NotImplementedError()
40127

41128

42-
class ProfileTaskChannelService(object):
129+
class ProfileTaskChannelService(Service):
130+
@abstractmethod
43131
def do_query(self):
44132
raise NotImplementedError()
45133

46-
def send(self, generator):
134+
@abstractmethod
135+
def report(self, generator):
47136
raise NotImplementedError()

0 commit comments

Comments
 (0)