Skip to content

Camunda Pyzeebe Instrumentation #1385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4088,6 +4088,8 @@ def _process_module_builtin_defaults():
)
_process_module_definition("tornado.routing", "newrelic.hooks.framework_tornado", "instrument_tornado_routing")
_process_module_definition("tornado.web", "newrelic.hooks.framework_tornado", "instrument_tornado_web")
_process_module_definition("pyzeebe.client.client", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_client_client")
_process_module_definition("pyzeebe.worker.job_executor", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_worker_job_executor")


def _process_module_entry_points():
Expand Down
6 changes: 6 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
"response.headers.contentType",
"response.status",
"server.address",
"zeebe.client.bpmnProcessId",
"zeebe.client.messageName",
"zeebe.client.correlationKey",
"zeebe.client.messageId",
"zeebe.client.resourceCount",
"zeebe.client.resourceFile",
}

MAX_NUM_USER_ATTRIBUTES = 128
Expand Down
141 changes: 141 additions & 0 deletions newrelic/hooks/external_pyzeebe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging

from newrelic.api.application import application_instance
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.api.transaction import current_transaction, add_custom_attribute
from newrelic.api.time_trace import add_custom_span_attribute
from newrelic.api.background_task import BackgroundTask
from newrelic.api.function_trace import FunctionTrace

_logger = logging.getLogger(__name__)

CLIENT_ATTRIBUTES_WARNING_LOG_MSG = "Exception occurred in PyZeebe instrumentation: Failed to add client attributes."

# Adds client method params as txn or span attributes
def _add_client_input_attributes(method_name, txn, args, kwargs):
try:
if method_name in ("run_process", "run_process_with_result"):
bpmn_id = kwargs.get("bpmn_process_id", args[0] if args else None)
if bpmn_id:
txn._add_agent_attribute("zeebe.client.bpmnProcessId", bpmn_id)
#add_attr("zeebe.client.bpmnProcessId", bpmn_id)
elif method_name == "publish_message":
msg_name = kwargs.get("name", args[0] if args else None)
if msg_name:
txn._add_agent_attribute("zeebe.client.messageName", msg_name)
#add_attr("zeebe.client.messageName", msg_name)
correlation_key = kwargs.get("correlation_key", args[1] if args and len(args) > 1 else None)
if correlation_key:
txn._add_agent_attribute("zeebe.client.correlationKey", correlation_key)
#add_attr("zeebe.client.correlationKey", correlation_key)
message_id = kwargs.get("message_id")
if message_id and len(args) > 4:
message_id = args[4]
if message_id:
txn._add_agent_attribute("zeebe.client.messageId", message_id)
#add_attr("zeebe.client.messageId", message_id)
elif method_name == "deploy_resource":
resources = list(args)
if len(resources) == 1 and isinstance(resources[0], (list, tuple)):
resources = list(resources[0])
if resources:
txn._add_agent_attribute("zeebe.client.resourceCount", len(resources))
#add_attr("zeebe.client.resourceCount", len(resources))
if len(resources) == 1:
try:
txn._add_agent_attribute("zeebe.client.resourceFile", str(resources[0]))
#add_attr("zeebe.client.resourceFile", str(resources[0]))
except Exception:
txn._add_agent_attribute("zeebe.client.resourceFile", str(resources[0]))
#add_attr("zeebe.client.resourceFile", repr(resources[0]))
except Exception:
_logger.warning(CLIENT_ATTRIBUTES_WARNING_LOG_MSG, exc_info=True)

# Async wrapper that instruments router/worker annotations`
async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs):
job = args[0] if args else kwargs.get("job")
process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess"
task_type = getattr(job, "type", None) or "UnknownType"
txn_name = f"{process_id}/{task_type}"

with BackgroundTask(application_instance(), txn_name, group="ZeebeTask"):
if job is not None:
if hasattr(job, "key"):
add_custom_attribute("zeebe.job.key", job.key)
if hasattr(job, "type"):
add_custom_attribute("zeebe.job.type", job.type)
if hasattr(job, "bpmn_process_id"):
add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id)
if hasattr(job, "process_instance_key"):
add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key)
if hasattr(job, "element_id"):
add_custom_attribute("zeebe.job.elementId", job.element_id)

result = await wrapped(*args, **kwargs)
return result

# Async wrapper that instruments a ZeebeClient method.
def _nr_client_wrapper(method_name):
async def wrapper(wrapped, instance, args, kwargs):
txn = current_transaction()
if txn:
# add_fn = add_custom_span_attribute
_add_client_input_attributes(method_name, txn, args, kwargs)
with FunctionTrace(name=method_name, group="ZeebeClient"):
result = await wrapped(*args, **kwargs)
return result
else:
# add_fn = add_custom_attribute
created_txn = BackgroundTask(application=application_instance(), name=method_name, group="ZeebeClient")
created_txn.__enter__()
_add_client_input_attributes(method_name, created_txn, args, kwargs)
try:
result = await wrapped(*args, **kwargs)
finally:
created_txn.__exit__(None, None, None)

return result

return wrapper

# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations)
def instrument_pyzeebe_worker_job_executor(module):
if hasattr(module, "JobExecutor"):
wrap_function_wrapper(
module,
"JobExecutor.execute_one_job",
_nr_wrapper_execute_one_job
)

# Instrument ZeebeClient methods to trace client calls.
def instrument_pyzeebe_client_client(module):
target_methods = (
"run_process",
"run_process_with_result",
"deploy_resource",
"publish_message",
)

for method_name in target_methods:
if hasattr(module, "ZeebeClient"):
if hasattr(module.ZeebeClient, method_name):
wrap_function_wrapper(
module,
f"ZeebeClient.{method_name}",
_nr_client_wrapper(method_name)
)
75 changes: 75 additions & 0 deletions tests/external_pyzeebe/_mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from types import SimpleNamespace
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter

# Dummy response objects with only required fields
DummyCreateProcessInstanceResponse = SimpleNamespace(
process_instance_key=12345
)

DummyCreateProcessInstanceWithResultResponse = SimpleNamespace(
process_instance_key=45678,
variables={"result": "success"}
)

DummyDeployResourceResponse = SimpleNamespace(
key=67890,
deployments=[],
tenant_id=None
)

DummyPublishMessageResponse = SimpleNamespace(
key=99999,
tenant_id=None
)

# Dummy RPC stub coroutines
async def dummy_create_process_instance(self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None):
"""Simulate ZeebeAdapter.create_process_instance"""
return DummyCreateProcessInstanceResponse

async def dummy_create_process_instance_with_result(self, bpmn_process_id: str, variables: dict = None, version: int = -1, timeout: int = 0, variables_to_fetch=None, tenant_id: str = None):
"""Simulate ZeebeAdapter.create_process_instance_with_result"""
return DummyCreateProcessInstanceWithResultResponse

async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None):
"""Simulate ZeebeAdapter.deploy_resource"""
# Create dummy deployment metadata for each provided resource path
deployments = []
for path in resource_file_path:
deployments.append(SimpleNamespace(
resource_name=str(path),
bpmn_process_id="dummy_process",
process_definition_key=123,
version=1,
tenant_id=tenant_id if tenant_id is not None else None
))
# Create a dummy response with a list of deployments

Check failure on line 46 in tests/external_pyzeebe/_mocks.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (PERF401)

tests/external_pyzeebe/_mocks.py:40:9: PERF401 Use a list comprehension to create a transformed list
return SimpleNamespace(
deployment_key=333333,
deployments=deployments,
tenant_id=tenant_id if tenant_id is not None else None
)

async def dummy_publish_message(self, name: str, correlation_key: str, variables: dict = None, time_to_live_in_milliseconds: int = 60000, message_id: str = None, tenant_id: str = None):
"""Simulate ZeebeAdapter.publish_message"""
# Return the dummy response (contains message key)
return SimpleNamespace(
key=999999,
tenant_id=tenant_id if tenant_id is not None else None
)

async def dummy_complete_job(self, job_key: int, variables: dict):
"""Simulate JobExecutor.complete_job"""
setattr(self, "_last_complete", {"job_key": job_key, "variables": variables})
return None

class DummyZeebeAdapter(ZeebeAdapter):
"""Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel"""
def __init__(self):
self.completed_job_key = None
self.completed_job_vars = None
async def complete_job(self, job_key: int, variables: dict):
self.completed_job_key = job_key
self.completed_job_vars = variables
return None

31 changes: 31 additions & 0 deletions tests/external_pyzeebe/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import grpc
from pyzeebe import ZeebeClient, create_insecure_channel
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
"transaction_tracer.explain_threshold": 0.0,
"transaction_tracer.transaction_threshold": 0.0,
"transaction_tracer.stack_trace_threshold": 0.0,
"debug.log_data_collector_payloads": True,
"debug.record_transaction_failure": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings
)
28 changes: 28 additions & 0 deletions tests/external_pyzeebe/test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
targetNamespace="http://example.com/bpmn"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">

<!-- Define the process with a unique id and name -->
<process id="dummyProcess" name="Dummy Process" isExecutable="true">
<!-- Start Event -->
<startEvent id="StartEvent_1" name="Start"/>

<!-- A simple Service Task representing work -->
<serviceTask id="ServiceTask_1" name="Perform Work"/>

<!-- End Event -->
<endEvent id="EndEvent_1" name="End"/>

<!-- Sequence Flows connecting Start → Service Task → End -->
<sequenceFlow id="Flow_1" sourceRef="StartEvent_1" targetRef="ServiceTask_1"/>
<sequenceFlow id="Flow_2" sourceRef="ServiceTask_1" targetRef="EndEvent_1"/>
</process>

<!-- (Optional) BPMNDiagram section can be added for graphical layout, but omitted here -->
</definitions>
Loading
Loading