Skip to content

Commit 97bf7fd

Browse files
authored
Add Profiling function (#155)
1 parent d1f286f commit 97bf7fd

21 files changed

+854
-135
lines changed

docs/EnvVars.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@ Environment Variable | Description | Default
3131
| `SW_KAFKA_REPORTER_TOPIC_LOG` | Specifying Kafka topic name for Log data. | `skywalking-logs` |
3232
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
3333
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
34-
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
34+
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `True` |
3535
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
36+
| `SW_AGENT_PROFILE_MAX_PARALLEL` | The number of parallel monitor segment count. | `5` |
37+
| `SW_AGENT_PROFILE_DURATION` | The maximum monitor segment time(minutes), if current segment monitor time out of limit, then stop it. | `10` |
38+
| `SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH` | The number of max dump thread stack depth | `500` |
39+
| `SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE` | The number of snapshot transport to backend buffer size | `50` |
3640
| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
3741
| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
3842
| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |

skywalking/agent/__init__.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@
2727
from skywalking import loggings
2828
from skywalking.agent.protocol import Protocol
2929
from skywalking.command import command_service
30-
from skywalking.config import profile_active, profile_task_query_interval
3130
from skywalking.loggings import logger
31+
from skywalking import profile
32+
from skywalking.profile.profile_task import ProfileTask
33+
from skywalking.profile.snapshot import TracingThreadSnapshot
3234

3335
if TYPE_CHECKING:
3436
from skywalking.trace.context import Segment
3537

3638
__started = False
3739
__protocol = Protocol() # type: Protocol
3840
__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
39-
= __queue = __log_queue = __finished = None
41+
= __send_profile_thread = __queue = __log_queue = __snapshot_queue = __finished = None
4042

4143

4244
def __heartbeat():
@@ -69,14 +71,24 @@ def __report_log():
6971
__finished.wait(0)
7072

7173

74+
def __send_profile_snapshot():
75+
while not __finished.is_set():
76+
try:
77+
__protocol.send_snapshot(__snapshot_queue)
78+
except Exception as exc:
79+
logger.error(str(exc))
80+
81+
__finished.wait(0.5)
82+
83+
7284
def __query_profile_command():
7385
while not __finished.is_set():
7486
try:
7587
__protocol.query_profile_commands()
7688
except Exception as exc:
7789
logger.error(str(exc))
7890

79-
__finished.wait(profile_task_query_interval)
91+
__finished.wait(config.get_profile_task_interval)
8092

8193

8294
def __command_dispatch():
@@ -86,13 +98,12 @@ def __command_dispatch():
8698

8799
def __init_threading():
88100
global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
89-
__command_dispatch_thread, __queue, __log_queue, __finished
101+
__command_dispatch_thread, __send_profile_thread, __queue, __log_queue, __snapshot_queue, __finished
90102

91103
__queue = Queue(maxsize=config.max_buffer_size)
92104
__finished = Event()
93105
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
94106
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
95-
__query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
96107
__command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True)
97108

98109
__heartbeat_thread.start()
@@ -104,13 +115,18 @@ def __init_threading():
104115
__log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True)
105116
__log_report_thread.start()
106117

107-
if profile_active:
118+
if config.profile_active:
119+
__snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size)
120+
121+
__query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
108122
__query_profile_thread.start()
109123

124+
__send_profile_thread = Thread(name='SendProfileSnapShotThread', target=__send_profile_snapshot, daemon=True)
125+
__send_profile_thread.start()
126+
110127

111128
def __init():
112129
global __protocol
113-
114130
if config.protocol == 'grpc':
115131
from skywalking.agent.protocol.grpc import GrpcProtocol
116132
__protocol = GrpcProtocol()
@@ -132,9 +148,15 @@ def __init():
132148
def __fini():
133149
__protocol.report(__queue, False)
134150
__queue.join()
151+
135152
if config.log_reporter_active:
136153
__protocol.report_log(__log_queue, False)
137154
__log_queue.join()
155+
156+
if config.profile_active:
157+
__protocol.send_snapshot(__snapshot_queue, False)
158+
__snapshot_queue.join()
159+
138160
__finished.set()
139161

140162

@@ -175,6 +197,7 @@ def start():
175197

176198
loggings.init()
177199
config.finalize()
200+
profile.init()
178201

179202
__init()
180203

@@ -210,3 +233,17 @@ def archive_log(log_data: 'LogData'):
210233
__log_queue.put(log_data, block=False)
211234
except Full:
212235
logger.warning('the queue is full, the log will be abandoned')
236+
237+
238+
def add_profiling_snapshot(snapshot: TracingThreadSnapshot):
239+
try:
240+
__snapshot_queue.put(snapshot)
241+
except Full:
242+
logger.warning('the snapshot queue is full, the snapshot will be abandoned')
243+
244+
245+
def notify_profile_finish(task: ProfileTask):
246+
try:
247+
__protocol.notify_profile_task_finish(task)
248+
except Exception as e:
249+
logger.error("notify profile task finish to backend fail. " + str(e))

skywalking/agent/protocol/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,9 @@ def report_log(self, queue: Queue, block: bool = True):
4040

4141
def query_profile_commands(self):
4242
pass
43+
44+
def send_snapshot(self, queue: Queue, block: bool = True):
45+
pass
46+
47+
def notify_profile_task_finish(self, task):
48+
pass

skywalking/agent/protocol/grpc.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
2525
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
2626
from skywalking.protocol.logging.Logging_pb2 import LogData
27+
from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack
2728

2829
from skywalking import config
2930
from skywalking.agent import Protocol
@@ -32,6 +33,8 @@
3233
GrpcProfileTaskChannelService, GrpcLogDataReportService
3334
from skywalking.loggings import logger
3435
from skywalking.trace.segment import Segment
36+
from skywalking.profile.snapshot import TracingThreadSnapshot
37+
from skywalking.profile.profile_task import ProfileTask
3538

3639

3740
class GrpcProtocol(Protocol):
@@ -52,7 +55,7 @@ def __init__(self):
5255
self.channel.subscribe(self._cb, try_to_connect=True)
5356
self.service_management = GrpcServiceManagementClient(self.channel)
5457
self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
55-
self.profile_query = GrpcProfileTaskChannelService(self.channel)
58+
self.profile_channel = GrpcProfileTaskChannelService(self.channel)
5659
self.log_reporter = GrpcLogDataReportService(self.channel)
5760

5861
def _cb(self, state):
@@ -61,7 +64,10 @@ def _cb(self, state):
6164

6265
def query_profile_commands(self):
6366
logger.debug("query profile commands")
64-
self.profile_query.do_query()
67+
self.profile_channel.do_query()
68+
69+
def notify_profile_task_finish(self, task: ProfileTask):
70+
self.profile_channel.finish(task)
6571

6672
def heartbeat(self):
6773
try:
@@ -163,3 +169,33 @@ def generator():
163169
self.log_reporter.report(generator())
164170
except grpc.RpcError:
165171
self.on_error()
172+
173+
def send_snapshot(self, queue: Queue, block: bool = True):
174+
start = time()
175+
176+
def generator():
177+
while True:
178+
try:
179+
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
180+
if timeout <= 0:
181+
return
182+
snapshot = queue.get(block=block, timeout=timeout) # type: TracingThreadSnapshot
183+
except Empty:
184+
return
185+
186+
queue.task_done()
187+
188+
transform_snapshot = ThreadSnapshot(
189+
taskId=str(snapshot.task_id),
190+
traceSegmentId=str(snapshot.trace_segment_id),
191+
time=int(snapshot.time),
192+
sequence=int(snapshot.sequence),
193+
stack=ThreadStack(codeSignatures=snapshot.stack_list)
194+
)
195+
196+
yield transform_snapshot
197+
198+
try:
199+
self.profile_channel.send(generator())
200+
except grpc.RpcError:
201+
self.on_error()

skywalking/client/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ def report(self, generator):
3737
class ProfileTaskChannelService(object):
3838
def do_query(self):
3939
raise NotImplementedError()
40+
41+
def send(self, generator):
42+
raise NotImplementedError()

skywalking/client/grpc.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818
import grpc
1919
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
2020
from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
21+
from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
22+
from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery, ProfileTaskFinishReport
2123
from skywalking.protocol.logging.Logging_pb2_grpc import LogReportServiceStub
2224
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
2325
from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub
24-
from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
25-
from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
2626

2727
from skywalking import config
2828
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService, \
2929
LogDataReportService
3030
from skywalking.command import command_service
3131
from skywalking.loggings import logger
3232
from skywalking.profile import profile_task_execution_service
33+
from skywalking.profile.profile_task import ProfileTask
3334

3435

3536
class GrpcServiceManagementClient(ServiceManagementClient):
@@ -73,7 +74,7 @@ def report(self, generator):
7374

7475
class GrpcProfileTaskChannelService(ProfileTaskChannelService):
7576
def __init__(self, channel: grpc.Channel):
76-
self.task_stub = ProfileTaskStub(channel)
77+
self.profile_stub = ProfileTaskStub(channel)
7778

7879
def do_query(self):
7980

@@ -83,5 +84,16 @@ def do_query(self):
8384
lastCommandTime=profile_task_execution_service.get_last_command_create_time()
8485
)
8586

86-
commands = self.task_stub.getProfileTaskCommands(query)
87+
commands = self.profile_stub.getProfileTaskCommands(query)
8788
command_service.receive_command(commands)
89+
90+
def send(self, generator):
91+
self.profile_stub.collectSnapshot(generator)
92+
93+
def finish(self, task: ProfileTask):
94+
finish_report = ProfileTaskFinishReport(
95+
service=config.service_name,
96+
serviceInstance=config.service_instance,
97+
taskId=task.task_id
98+
)
99+
self.profile_stub.reportTaskFinish(finish_report)

skywalking/command/command_service.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,38 +30,38 @@
3030
class CommandService:
3131

3232
def __init__(self):
33-
self.__commands = queue.Queue() # type: queue.Queue
33+
self._commands = queue.Queue() # type: queue.Queue
3434
# don't execute same command twice
35-
self.__command_serial_number_cache = CommandSerialNumberCache()
35+
self._command_serial_number_cache = CommandSerialNumberCache()
3636

3737
def dispatch(self):
3838
while True:
3939
# block until a command is available
40-
command = self.__commands.get() # type: BaseCommand
40+
command = self._commands.get() # type: BaseCommand
4141
if not self.__is_command_executed(command):
4242
command_executor_service.execute(command)
43-
self.__command_serial_number_cache.add(command.serial_number)
43+
self._command_serial_number_cache.add(command.serial_number)
4444

4545
def __is_command_executed(self, command: BaseCommand):
46-
return self.__command_serial_number_cache.contains(command.serial_number)
46+
return self._command_serial_number_cache.contains(command.serial_number)
4747

4848
def receive_command(self, commands: Commands):
4949
for command in commands.commands:
5050
try:
5151
base_command = CommandDeserializer.deserialize(command)
52-
logger.debug("Received command [{%s} {%s}]", base_command.command, base_command.serial_number)
52+
logger.debug("received command [{%s} {%s}]", base_command.command, base_command.serial_number)
5353

5454
if self.__is_command_executed(base_command):
55-
logger.warning("Command[{%s}] is executed, ignored.", base_command.command)
55+
logger.warning("command[{%s}] is executed, ignored.", base_command.command)
5656
continue
5757

5858
try:
59-
self.__commands.put(base_command)
59+
self._commands.put(base_command)
6060
except queue.Full:
61-
logger.warning("Command[{%s}, {%s}] cannot add to command list. because the command list is full.",
61+
logger.warning("command[{%s}, {%s}] cannot add to command list. because the command list is full.",
6262
base_command.command, base_command.serial_number)
6363
except UnsupportedCommandException as e:
64-
logger.warning("Received unsupported command[{%s}].", e.command.command)
64+
logger.warning("received unsupported command[{%s}].", e.command.command)
6565

6666

6767
class CommandSerialNumberCache:

skywalking/command/executors/profile_task_command_executor.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717

1818
from skywalking.command.executors.command_executor import CommandExecutor
1919
from skywalking.command.profile_task_command import ProfileTaskCommand
20-
from skywalking.loggings import logger
21-
from skywalking.profile import profile_task_execution_service
20+
from skywalking import profile
2221
from skywalking.profile.profile_task import ProfileTask
2322

2423

2524
class ProfileTaskCommandExecutor(CommandExecutor):
2625

2726
def execute(self, command: ProfileTaskCommand):
28-
logger.debug("ProfileTaskCommandExecutor start execute ProfileTaskCommand [{%s}]", command.serial_number)
29-
3027
profile_task = ProfileTask(task_id=command.task_id,
3128
first_span_op_name=command.endpoint_name,
3229
duration=command.duration,
@@ -36,4 +33,4 @@ def execute(self, command: ProfileTaskCommand):
3633
start_time=command.start_time,
3734
create_time=command.create_time)
3835

39-
profile_task_execution_service.add_profile_task(profile_task)
36+
profile.profile_task_execution_service.add_profile_task(profile_task)

skywalking/config.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,15 @@
6464
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
6565
kafka_topic_log = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or "skywalking-logs" # type: str
6666
celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
67-
profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
68-
os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False # type: bool
69-
profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')
67+
68+
# profile configs
69+
get_profile_task_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20') # type: int
70+
profile_active = False if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
71+
os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'False' else True # type: bool
72+
profile_max_parallel = int(os.getenv("SW_AGENT_PROFILE_MAX_PARALLEL") or '5') # type: int
73+
profile_duration = int(os.getenv('SW_AGENT_PROFILE_DURATION') or '10') # type: int
74+
profile_dump_max_stack_depth = int(os.getenv('SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH') or '500') # type: int
75+
profile_snapshot_transport_buffer_size = int(os.getenv('SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE') or '50')
7076

7177
# NOTE - Log reporting requires a separate channel, will merge in the future.
7278
log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \

skywalking/profile/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
# limitations under the License.
1616
#
1717

18-
from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
18+
profile_task_execution_service = None
1919

20-
profile_task_execution_service = ProfileTaskExecutionService()
20+
21+
def init():
22+
from skywalking.profile.profile_service import ProfileTaskExecutionService
23+
24+
global profile_task_execution_service
25+
if profile_task_execution_service:
26+
return
27+
28+
profile_task_execution_service = ProfileTaskExecutionService()

0 commit comments

Comments
 (0)