Skip to content
2 changes: 2 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4088,6 +4088,8 @@ def _process_module_builtin_defaults():
)
_process_module_definition("tornado.routing", "newrelic.hooks.framework_tornado", "instrument_tornado_routing")
_process_module_definition("tornado.web", "newrelic.hooks.framework_tornado", "instrument_tornado_web")
_process_module_definition("pyzeebe.client.client", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_client_client")
_process_module_definition("pyzeebe.worker.job_executor", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_worker_job_executor")


def _process_module_entry_points():
Expand Down
6 changes: 6 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
"response.headers.contentType",
"response.status",
"server.address",
"zeebe.client.bpmnProcessId",
"zeebe.client.messageName",
"zeebe.client.correlationKey",
"zeebe.client.messageId",
"zeebe.client.resourceCount",
"zeebe.client.resourceFile",
}

MAX_NUM_USER_ATTRIBUTES = 128
Expand Down
111 changes: 111 additions & 0 deletions newrelic/hooks/external_pyzeebe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging

from newrelic.api.application import application_instance
from newrelic.api.web_transaction import WebTransaction
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import wrap_function_wrapper

_logger = logging.getLogger(__name__)

CLIENT_ATTRIBUTES_WARNING_LOG_MSG = "Exception occurred in PyZeebe instrumentation: Failed to add client attributes."


# Adds client method params as txn or span attributes
def _add_client_input_attributes(method_name, trace, args, kwargs):
try:
if method_name in ("run_process", "run_process_with_result"):
bpmn_id = kwargs.get("bpmn_process_id", args[0] if args else None)
if bpmn_id:
trace._add_agent_attribute("zeebe.client.bpmnProcessId", bpmn_id)
elif method_name == "publish_message":
msg_name = kwargs.get("name", args[0] if args else None)
if msg_name:
trace._add_agent_attribute("zeebe.client.messageName", msg_name)
correlation_key = kwargs.get("correlation_key", args[1] if args and len(args) > 1 else None)
if correlation_key:
trace._add_agent_attribute("zeebe.client.correlationKey", correlation_key)
message_id = kwargs.get("message_id")
if message_id and len(args) > 4:
message_id = args[4]
if message_id:
trace._add_agent_attribute("zeebe.client.messageId", message_id)
elif method_name == "deploy_resource":
resources = list(args)
if len(resources) == 1 and isinstance(resources[0], (list, tuple)):
resources = list(resources[0])
if resources:
trace._add_agent_attribute("zeebe.client.resourceCount", len(resources))
if len(resources) == 1:
trace._add_agent_attribute("zeebe.client.resourceFile", str(resources[0]))
except Exception:
_logger.warning(CLIENT_ATTRIBUTES_WARNING_LOG_MSG, exc_info=True)


# Async wrapper that instruments router/worker annotations`
async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs):
job = args[0] if args else kwargs.get("job")
process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess"
task_type = getattr(job, "type", None) or "UnknownType"
txn_name = f"{process_id}/{task_type}"

with WebTransaction(application_instance(), txn_name, group="ZeebeTask") as txn:
if job is not None:
if hasattr(job, "key"):
txn.add_custom_attribute("zeebe.job.key", job.key)
if hasattr(job, "type"):
txn.add_custom_attribute("zeebe.job.type", job.type)
if hasattr(job, "bpmn_process_id"):
txn.add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id)
if hasattr(job, "process_instance_key"):
txn.add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key)
if hasattr(job, "element_id"):
txn.add_custom_attribute("zeebe.job.elementId", job.element_id)

return await wrapped(*args, **kwargs)


# Async wrapper that instruments a ZeebeClient method.
def _nr_client_wrapper(method_name):
async def wrapper(wrapped, instance, args, kwargs):
txn = current_transaction()
if txn:
# add_fn = add_custom_span_attribute
with FunctionTrace(name=method_name, group="ZeebeClient") as trace:
_add_client_input_attributes(method_name, trace, args, kwargs)
return await wrapped(*args, **kwargs)
else:
return wrapped(*args, **kwargs)

return wrapper


# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations)
def instrument_pyzeebe_worker_job_executor(module):
if hasattr(module, "JobExecutor"):
wrap_function_wrapper(module, "JobExecutor.execute_one_job", _nr_wrapper_execute_one_job)


# Instrument ZeebeClient methods to trace client calls.
def instrument_pyzeebe_client_client(module):
target_methods = ("run_process", "run_process_with_result", "deploy_resource", "publish_message")

for method_name in target_methods:
if hasattr(module, "ZeebeClient"):
if hasattr(module.ZeebeClient, method_name):
wrap_function_wrapper(module, f"ZeebeClient.{method_name}", _nr_client_wrapper(method_name))
102 changes: 102 additions & 0 deletions tests/external_pyzeebe/_mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from types import SimpleNamespace

from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter

# Dummy response objects with only required fields
DummyCreateProcessInstanceResponse = SimpleNamespace(process_instance_key=12345)

DummyCreateProcessInstanceWithResultResponse = SimpleNamespace(
process_instance_key=45678, variables={"result": "success"}
)

DummyDeployResourceResponse = SimpleNamespace(key=67890, deployments=[], tenant_id=None)

DummyPublishMessageResponse = SimpleNamespace(key=99999, tenant_id=None)


# Dummy RPC stub coroutines
async def dummy_create_process_instance(
self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None
):
"""Simulate ZeebeAdapter.create_process_instance"""
return DummyCreateProcessInstanceResponse


async def dummy_create_process_instance_with_result(
self,
bpmn_process_id: str,
variables: dict = None,
version: int = -1,
timeout: int = 0,
variables_to_fetch=None,
tenant_id: str = None,
):
"""Simulate ZeebeAdapter.create_process_instance_with_result"""
return DummyCreateProcessInstanceWithResultResponse


async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None):
"""Simulate ZeebeAdapter.deploy_resource"""
# Create dummy deployment metadata for each provided resource path
deployments = [
SimpleNamespace(
resource_name=str(path),
bpmn_process_id="dummy_process",
process_definition_key=123,
version=1,
tenant_id=tenant_id if tenant_id is not None else None,
)
for path in resource_file_path
]
# Create a dummy response with a list of deployments
return SimpleNamespace(
deployment_key=333333, deployments=deployments, tenant_id=tenant_id if tenant_id is not None else None
)


async def dummy_publish_message(
self,
name: str,
correlation_key: str,
variables: dict = None,
time_to_live_in_milliseconds: int = 60000,
message_id: str = None,
tenant_id: str = None,
):
"""Simulate ZeebeAdapter.publish_message"""
# Return the dummy response (contains message key)
return SimpleNamespace(key=999999, tenant_id=tenant_id if tenant_id is not None else None)


async def dummy_complete_job(self, job_key: int, variables: dict):
"""Simulate JobExecutor.complete_job"""
self._last_complete = {"job_key": job_key, "variables": variables}
return None


class DummyZeebeAdapter(ZeebeAdapter):
"""Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel"""

def __init__(self):
self.completed_job_key = None
self.completed_job_vars = None

async def complete_job(self, job_key: int, variables: dict):
self.completed_job_key = job_key
self.completed_job_vars = variables
return None
29 changes: 29 additions & 0 deletions tests/external_pyzeebe/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from testing_support.fixture.event_loop import event_loop as loop
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
"transaction_tracer.explain_threshold": 0.0,
"transaction_tracer.transaction_threshold": 0.0,
"transaction_tracer.stack_trace_threshold": 0.0,
"debug.log_data_collector_payloads": True,
"debug.record_transaction_failure": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings
)
28 changes: 28 additions & 0 deletions tests/external_pyzeebe/test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
targetNamespace="http://example.com/bpmn"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">

<!-- Define the process with a unique id and name -->
<process id="dummyProcess" name="Dummy Process" isExecutable="true">
<!-- Start Event -->
<startEvent id="StartEvent_1" name="Start"/>

<!-- A simple Service Task representing work -->
<serviceTask id="ServiceTask_1" name="Perform Work"/>

<!-- End Event -->
<endEvent id="EndEvent_1" name="End"/>

<!-- Sequence Flows connecting Start → Service Task → End -->
<sequenceFlow id="Flow_1" sourceRef="StartEvent_1" targetRef="ServiceTask_1"/>
<sequenceFlow id="Flow_2" sourceRef="ServiceTask_1" targetRef="EndEvent_1"/>
</process>

<!-- (Optional) BPMNDiagram section can be added for graphical layout, but omitted here -->
</definitions>
91 changes: 91 additions & 0 deletions tests/external_pyzeebe/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from _mocks import (
dummy_create_process_instance,
dummy_create_process_instance_with_result,
dummy_deploy_resource,
dummy_publish_message,
)
from pyzeebe import ZeebeClient, create_insecure_channel
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from testing_support.validators.validate_span_events import validate_span_events
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics

from newrelic.api.background_task import background_task

client = ZeebeClient(create_insecure_channel())


@validate_transaction_metrics(
"test_zeebe_client:run_process", rollup_metrics=[("ZeebeClient/run_process", 1)], background_task=True
)
@validate_span_events(exact_agents={"zeebe.client.bpmnProcessId": "test_process"}, count=1)
def test_run_process(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "create_process_instance", dummy_create_process_instance)

@background_task(name="test_zeebe_client:run_process")
async def _test():
response = await client.run_process("test_process")
assert response.process_instance_key == 12345

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_zeebe_client:run_process_with_result",
rollup_metrics=[("ZeebeClient/run_process_with_result", 1)],
background_task=True,
)
@validate_span_events(exact_agents={"zeebe.client.bpmnProcessId": "test_process"}, count=1)
def test_run_process_with_result(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "create_process_instance_with_result", dummy_create_process_instance_with_result)

@background_task(name="test_zeebe_client:run_process_with_result")
async def _test():
result = await client.run_process_with_result("test_process")
assert result.process_instance_key == 45678
assert result.variables == {"result": "success"}

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_zeebe_client:deploy_resource", rollup_metrics=[("ZeebeClient/deploy_resource", 1)], background_task=True
)
@validate_span_events(exact_agents={"zeebe.client.resourceCount": 1, "zeebe.client.resourceFile": "test.bpmn"}, count=1)
def test_deploy_resource(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "deploy_resource", dummy_deploy_resource)

@background_task(name="test_zeebe_client:deploy_resource")
async def _test():
result = await client.deploy_resource("test.bpmn")
assert result.deployment_key == 333333

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_zeebe_client:publish_message", rollup_metrics=[("ZeebeClient/publish_message", 1)], background_task=True
)
@validate_span_events(exact_agents={"zeebe.client.messageName": "test_message", "zeebe.client.correlationKey": "999999", "zeebe.client.messageId": "abc123"}, count=1)
def test_publish_message(monkeypatch, loop):
monkeypatch.setattr(ZeebeAdapter, "publish_message", dummy_publish_message)

@background_task(name="test_zeebe_client:publish_message")
async def _test():
result = await client.publish_message(name="test_message", correlation_key="999999", message_id="abc123")
assert result.key == 999999

loop.run_until_complete(_test())
Loading