Skip to content

Camunda Pyzeebe Instrumentation #1385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4088,6 +4088,8 @@ def _process_module_builtin_defaults():
)
_process_module_definition("tornado.routing", "newrelic.hooks.framework_tornado", "instrument_tornado_routing")
_process_module_definition("tornado.web", "newrelic.hooks.framework_tornado", "instrument_tornado_web")
_process_module_definition("pyzeebe.client.client", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_client_client")
_process_module_definition("pyzeebe.worker.job_executor", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_worker_job_executor")


def _process_module_entry_points():
Expand Down
6 changes: 6 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
"response.headers.contentType",
"response.status",
"server.address",
"zeebe.client.bpmnProcessId",
"zeebe.client.messageName",
"zeebe.client.correlationKey",
"zeebe.client.messageId",
"zeebe.client.resourceCount",
"zeebe.client.resourceFile",
}

MAX_NUM_USER_ATTRIBUTES = 128
Expand Down
111 changes: 111 additions & 0 deletions newrelic/hooks/external_pyzeebe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging

from newrelic.api.application import application_instance
from newrelic.api.web_transaction import WebTransaction
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import wrap_function_wrapper

_logger = logging.getLogger(__name__)

CLIENT_ATTRIBUTES_WARNING_LOG_MSG = "Exception occurred in PyZeebe instrumentation: Failed to add client attributes."


# Adds client method params as txn or span attributes
def _add_client_input_attributes(method_name, trace, args, kwargs):
try:
if method_name in ("run_process", "run_process_with_result"):
bpmn_id = kwargs.get("bpmn_process_id", args[0] if args else None)
if bpmn_id:
trace._add_agent_attribute("zeebe.client.bpmnProcessId", bpmn_id)
elif method_name == "publish_message":
msg_name = kwargs.get("name", args[0] if args else None)
if msg_name:
trace._add_agent_attribute("zeebe.client.messageName", msg_name)
correlation_key = kwargs.get("correlation_key", args[1] if args and len(args) > 1 else None)
if correlation_key:
trace._add_agent_attribute("zeebe.client.correlationKey", correlation_key)
message_id = kwargs.get("message_id")
if message_id and len(args) > 4:
message_id = args[4]
if message_id:
trace._add_agent_attribute("zeebe.client.messageId", message_id)
elif method_name == "deploy_resource":
resources = list(args)
if len(resources) == 1 and isinstance(resources[0], (list, tuple)):
resources = list(resources[0])
if resources:
trace._add_agent_attribute("zeebe.client.resourceCount", len(resources))
if len(resources) == 1:
trace._add_agent_attribute("zeebe.client.resourceFile", str(resources[0]))
except Exception:
_logger.warning(CLIENT_ATTRIBUTES_WARNING_LOG_MSG, exc_info=True)


# Async wrapper that instruments router/worker annotations`
async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs):
job = args[0] if args else kwargs.get("job")
process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess"
task_type = getattr(job, "type", None) or "UnknownType"
txn_name = f"{process_id}/{task_type}"

with WebTransaction(application_instance(), txn_name, group="ZeebeTask") as txn:
if job is not None:
if hasattr(job, "key"):
txn.add_custom_attribute("zeebe.job.key", job.key)
if hasattr(job, "type"):
txn.add_custom_attribute("zeebe.job.type", job.type)
if hasattr(job, "bpmn_process_id"):
txn.add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id)
if hasattr(job, "process_instance_key"):
txn.add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key)
if hasattr(job, "element_id"):
txn.add_custom_attribute("zeebe.job.elementId", job.element_id)

return await wrapped(*args, **kwargs)


# Async wrapper that instruments a ZeebeClient method.
def _nr_client_wrapper(method_name):
async def wrapper(wrapped, instance, args, kwargs):
txn = current_transaction()
if txn:
# add_fn = add_custom_span_attribute
with FunctionTrace(name=method_name, group="ZeebeClient") as trace:
_add_client_input_attributes(method_name, trace, args, kwargs)
return await wrapped(*args, **kwargs)
else:
return wrapped(*args, **kwargs)

return wrapper


# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations)
def instrument_pyzeebe_worker_job_executor(module):
if hasattr(module, "JobExecutor"):
wrap_function_wrapper(module, "JobExecutor.execute_one_job", _nr_wrapper_execute_one_job)


# Instrument ZeebeClient methods to trace client calls.
def instrument_pyzeebe_client_client(module):
target_methods = ("run_process", "run_process_with_result", "deploy_resource", "publish_message")

for method_name in target_methods:
if hasattr(module, "ZeebeClient"):
if hasattr(module.ZeebeClient, method_name):
wrap_function_wrapper(module, f"ZeebeClient.{method_name}", _nr_client_wrapper(method_name))
102 changes: 102 additions & 0 deletions tests/external_pyzeebe/_mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from types import SimpleNamespace

from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter

# Dummy response objects with only required fields
DummyCreateProcessInstanceResponse = SimpleNamespace(process_instance_key=12345)

DummyCreateProcessInstanceWithResultResponse = SimpleNamespace(
process_instance_key=45678, variables={"result": "success"}
)

DummyDeployResourceResponse = SimpleNamespace(key=67890, deployments=[], tenant_id=None)

DummyPublishMessageResponse = SimpleNamespace(key=99999, tenant_id=None)


# Dummy RPC stub coroutines
async def dummy_create_process_instance(
self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None
):
"""Simulate ZeebeAdapter.create_process_instance"""
return DummyCreateProcessInstanceResponse


async def dummy_create_process_instance_with_result(
self,
bpmn_process_id: str,
variables: dict = None,
version: int = -1,
timeout: int = 0,
variables_to_fetch=None,
tenant_id: str = None,
):
"""Simulate ZeebeAdapter.create_process_instance_with_result"""
return DummyCreateProcessInstanceWithResultResponse


async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None):
"""Simulate ZeebeAdapter.deploy_resource"""
# Create dummy deployment metadata for each provided resource path
deployments = [
SimpleNamespace(
resource_name=str(path),
bpmn_process_id="dummy_process",
process_definition_key=123,
version=1,
tenant_id=tenant_id if tenant_id is not None else None,
)
for path in resource_file_path
]
# Create a dummy response with a list of deployments
return SimpleNamespace(
deployment_key=333333, deployments=deployments, tenant_id=tenant_id if tenant_id is not None else None
)


async def dummy_publish_message(
self,
name: str,
correlation_key: str,
variables: dict = None,
time_to_live_in_milliseconds: int = 60000,
message_id: str = None,
tenant_id: str = None,
):
"""Simulate ZeebeAdapter.publish_message"""
# Return the dummy response (contains message key)
return SimpleNamespace(key=999999, tenant_id=tenant_id if tenant_id is not None else None)


async def dummy_complete_job(self, job_key: int, variables: dict):
"""Simulate JobExecutor.complete_job"""
self._last_complete = {"job_key": job_key, "variables": variables}
return None


class DummyZeebeAdapter(ZeebeAdapter):
"""Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel"""

def __init__(self):
self.completed_job_key = None
self.completed_job_vars = None

async def complete_job(self, job_key: int, variables: dict):
self.completed_job_key = job_key
self.completed_job_vars = variables
return None
29 changes: 29 additions & 0 deletions tests/external_pyzeebe/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from testing_support.fixture.event_loop import event_loop as loop
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
"transaction_tracer.explain_threshold": 0.0,
"transaction_tracer.transaction_threshold": 0.0,
"transaction_tracer.stack_trace_threshold": 0.0,
"debug.log_data_collector_payloads": True,
"debug.record_transaction_failure": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings
)
28 changes: 28 additions & 0 deletions tests/external_pyzeebe/test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
targetNamespace="http://example.com/bpmn"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">

<!-- Define the process with a unique id and name -->
<process id="dummyProcess" name="Dummy Process" isExecutable="true">
<!-- Start Event -->
<startEvent id="StartEvent_1" name="Start"/>

<!-- A simple Service Task representing work -->
<serviceTask id="ServiceTask_1" name="Perform Work"/>

<!-- End Event -->
<endEvent id="EndEvent_1" name="End"/>

<!-- Sequence Flows connecting Start → Service Task → End -->
<sequenceFlow id="Flow_1" sourceRef="StartEvent_1" targetRef="ServiceTask_1"/>
<sequenceFlow id="Flow_2" sourceRef="ServiceTask_1" targetRef="EndEvent_1"/>
</process>

<!-- (Optional) BPMNDiagram section can be added for graphical layout, but omitted here -->
</definitions>
91 changes: 91 additions & 0 deletions tests/external_pyzeebe/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from _mocks import (
dummy_create_process_instance,
dummy_create_process_instance_with_result,
dummy_deploy_resource,
dummy_publish_message,
)
from pyzeebe import ZeebeClient, create_insecure_channel
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from testing_support.validators.validate_span_events import validate_span_events
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics

from newrelic.api.background_task import background_task

client = ZeebeClient(create_insecure_channel())


@validate_transaction_metrics(
"test_zeebe_client:run_process", rollup_metrics=[("ZeebeClient/run_process", 1)], background_task=True
)
@validate_span_events(exact_agents={"zeebe.client.bpmnProcessId": "test_process"}, count=1)
def test_run_process(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "create_process_instance", dummy_create_process_instance)

@background_task(name="test_zeebe_client:run_process")
async def _test():
response = await client.run_process("test_process")
assert response.process_instance_key == 12345

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_zeebe_client:run_process_with_result",
rollup_metrics=[("ZeebeClient/run_process_with_result", 1)],
background_task=True,
)
@validate_span_events(exact_agents={"zeebe.client.bpmnProcessId": "test_process"}, count=1)
def test_run_process_with_result(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "create_process_instance_with_result", dummy_create_process_instance_with_result)

@background_task(name="test_zeebe_client:run_process_with_result")
async def _test():
result = await client.run_process_with_result("test_process")
assert result.process_instance_key == 45678
assert result.variables == {"result": "success"}

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_zeebe_client:deploy_resource", rollup_metrics=[("ZeebeClient/deploy_resource", 1)], background_task=True
)
@validate_span_events(exact_agents={"zeebe.client.resourceCount": 1, "zeebe.client.resourceFile": "test.bpmn"}, count=1)
def test_deploy_resource(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "deploy_resource", dummy_deploy_resource)

@background_task(name="test_zeebe_client:deploy_resource")
async def _test():
result = await client.deploy_resource("test.bpmn")
assert result.deployment_key == 333333

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_zeebe_client:publish_message", rollup_metrics=[("ZeebeClient/publish_message", 1)], background_task=True
)
@validate_span_events(exact_agents={"zeebe.client.messageName": "test_message", "zeebe.client.correlationKey": "999999", "zeebe.client.messageId": "abc123"}, count=1)
def test_publish_message(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "publish_message", dummy_publish_message)

@background_task(name="test_zeebe_client:publish_message")
async def _test():
result = await client.publish_message(name="test_message", correlation_key="999999", message_id="abc123")
assert result.key == 999999

loop.run_until_complete(_test())
Loading