Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4088,6 +4088,7 @@ def _process_module_builtin_defaults():
)
_process_module_definition("tornado.routing", "newrelic.hooks.framework_tornado", "instrument_tornado_routing")
_process_module_definition("tornado.web", "newrelic.hooks.framework_tornado", "instrument_tornado_web")
_process_module_definition("pyzeebe", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe")


def _process_module_entry_points():
Expand Down
136 changes: 136 additions & 0 deletions newrelic/hooks/external_pyzeebe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from newrelic.api.application import application_instance
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.api.transaction import current_transaction, add_custom_attribute
from newrelic.api.time_trace import add_custom_span_attribute
from newrelic.api.background_task import BackgroundTask
from newrelic.api.function_trace import FunctionTrace

# Adds client method params as txn or span attributes
def _add_client_input_attributes(method_name, add_attr, args, kwargs):
if method_name in ("run_process", "run_process_with_result"):
bpmn_id = kwargs.get("bpmn_process_id", args[0] if args else None)
if bpmn_id is not None:
add_attr("zeebe.client.bpmnProcessId", bpmn_id)
elif method_name == "publish_message":
msg_name = kwargs.get("name", args[0] if args else None)
if msg_name is not None:
add_attr("zeebe.client.messageName", msg_name)
correlation_key = kwargs.get("correlation_key", args[1] if args and len(args) > 1 else None)
if correlation_key is not None:
add_attr("zeebe.client.correlationKey", correlation_key)
message_id = kwargs.get("message_id")
if message_id is None and len(args) > 4:
message_id = args[4]
if message_id is not None:
add_attr("zeebe.client.messageId", message_id)
elif method_name == "deploy_resource":
resources = list(args)
if len(resources) == 1 and isinstance(resources[0], (list, tuple)):
resources = list(resources[0])
if resources:
add_attr("zeebe.client.resourceCount", len(resources))
if len(resources) == 1:
try:
add_attr("zeebe.client.resourceFile", str(resources[0]))
except Exception:
add_attr("zeebe.client.resourceFile", repr(resources[0]))


# Async wrapper that instruments a ZeebeClient method.
def _nr_client_wrapper(method_name):
async def wrapper(wrapped, instance, args, kwargs):
txn = current_transaction()
if txn:
add_fn = add_custom_span_attribute
# Trace within an existing txn
with FunctionTrace(name=method_name, group="ZeebeClient"):
_add_client_input_attributes(method_name, add_fn, args, kwargs)
result = await wrapped(*args, **kwargs)
return result
else:
add_fn = add_custom_attribute
# No active txn: start a background task
with BackgroundTask(application_instance(), name=method_name, group="ZeebeClient"):
_add_client_input_attributes(method_name, add_fn, args, kwargs)
result = await wrapped(*args, **kwargs)
return result
return wrapper

# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations)
def instrument_pyzeebe_worker_job_executor(job_exec_module):
async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs):
job = args[0] if args else kwargs.get("job")
process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess"
task_type = getattr(job, "type", None) or "UnknownType"
txn_name = f"{process_id}/{task_type}"

with BackgroundTask(application_instance(), txn_name, group="ZeebeTask"):
if job is not None:
if hasattr(job, "key"):
add_custom_attribute("zeebe.job.key", job.key)
if hasattr(job, "type"):
add_custom_attribute("zeebe.job.type", job.type)
if hasattr(job, "bpmn_process_id"):
add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id)
if hasattr(job, "process_instance_key"):
add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key)
if hasattr(job, "element_id"):
add_custom_attribute("zeebe.job.elementId", job.element_id)

result = await wrapped(*args, **kwargs)
return result

wrap_function_wrapper(
job_exec_module,
"JobExecutor.execute_one_job",
_nr_wrapper_execute_one_job
)


# Instrument ZeebeClient methods to trace client calls.
def instrument_pyzeebe_client_methods(client_module):
target_methods = (
"run_process",
"run_process_with_result",
"deploy_resource",
"publish_message",
)

for method_name in target_methods:
if hasattr(client_module.ZeebeClient, method_name):
wrap_function_wrapper(
client_module,
f"ZeebeClient.{method_name}",
_nr_client_wrapper(method_name)
)


def instrument_pyzeebe(module):
# Worker instrumentation
worker_pkg = getattr(module, 'worker', None)
if worker_pkg is not None:
job_exec_mod = getattr(worker_pkg, 'job_executor', None)
if job_exec_mod is not None:
instrument_pyzeebe_worker_job_executor(job_exec_mod)

# Client instrumentation
client_pkg = getattr(module, 'client', None)
if client_pkg is not None:
client_mod = getattr(client_pkg, 'client', None)
if client_mod is not None:
instrument_pyzeebe_client_methods(client_mod)
32 changes: 32 additions & 0 deletions tests/external_pyzeebe/_dummy_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class DummyProcessResult:
def __init__(self, key: int):
self.process_instance_key = key

class ZeebeClient:
async def run_process(self, *args, **kwargs):
return DummyProcessResult(key=12345)

async def run_process_with_result(self, *args, **kwargs):
return DummyProcessResult(key=45678)

async def deploy_resource(self, *args, **kwargs):
return {"deployment_key": 33333}

async def publish_message(self, name: str, correlation_key: str = "", variables: dict = None):
return {"message_key": 56789}


29 changes: 29 additions & 0 deletions tests/external_pyzeebe/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
"transaction_tracer.explain_threshold": 0.0,
"transaction_tracer.transaction_threshold": 0.0,
"transaction_tracer.stack_trace_threshold": 0.0,
"debug.log_data_collector_payloads": True,
"debug.record_transaction_failure": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings
)
83 changes: 83 additions & 0 deletions tests/external_pyzeebe/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import asyncio
import sys, types
import _dummy_client as dummy_client

from newrelic.api.background_task import background_task

# Force import system to use dummy ZeebeClient
pyzeebe_mod = types.ModuleType("pyzeebe")
pyzeebe_client_pkg = types.ModuleType("pyzeebe.client")

pyzeebe_mod.client = pyzeebe_client_pkg
pyzeebe_client_pkg.client = dummy_client

sys.modules["pyzeebe"] = pyzeebe_mod
sys.modules["pyzeebe.client"] = pyzeebe_client_pkg
sys.modules["pyzeebe.client.client"] = dummy_client

from pyzeebe.client.client import ZeebeClient
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics

Check failure on line 35 in tests/external_pyzeebe/test_client.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (E402)

tests/external_pyzeebe/test_client.py:35:1: E402 Module level import not at top of file
@validate_transaction_metrics(

Check failure on line 36 in tests/external_pyzeebe/test_client.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (E402)

tests/external_pyzeebe/test_client.py:36:1: E402 Module level import not at top of file
"test_client_methods:function_trace",
rollup_metrics=[
("ZeebeClient/run_process", 1),
("ZeebeClient/run_process_with_result", 1),
("ZeebeClient/deploy_resource", 1),
("ZeebeClient/publish_message", 1)
],
background_task=True,
)
def test_client_methods_as_function_traces():
@background_task(name="test_client_methods:function_trace")
def _test():
client = ZeebeClient()

#run_process
result_1 = asyncio.run(client.run_process("DummyProcess"))
assert hasattr(result_1, "process_instance_key")
assert result_1.process_instance_key == 12345

#run_process_with_result
result_2 = asyncio.run(client.run_process_with_result("DummyProcess"))
assert hasattr(result_2, "process_instance_key")
assert result_2.process_instance_key == 45678

# deploy_resource
result_3 = asyncio.run(client.deploy_resource("test-workflow.bpmn"))
assert result_3["deployment_key"] == 33333

# publish_message
result_4 = asyncio.run(client.publish_message("test_message"))
assert result_4["message_key"] == 56789
_test()


# # TODO: fix failure
# @validate_transaction_metrics(
# "ZeebeClient/run_process",
# rollup_metrics=[
# ("OtherTransaction/ZeebeClient/run_process", 1),
# ],
# background_task=True,
# )
# def test_run_process_as_background_tx():
# client = ZeebeClient()
# result = asyncio.run(client.run_process("DummyProcess"))
# assert hasattr(result, "process_instance_key")
# assert result.process_instance_key == 12345
3 changes: 3 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ envlist =
python-external_http-{py37,py38,py39,py310,py311,py312,py313},
python-external_httplib-{py37,py38,py39,py310,py311,py312,py313,pypy310},
python-external_httplib2-{py37,py38,py39,py310,py311,py312,py313,pypy310},
python-external_pyzeebe-{py37,py38,py39,py310,py311,py312,py313,pypy310},
python-external_requests-{py37,py38,py39,py310,py311,py312,py313,pypy310},
python-external_urllib3-{py37,py38,py39,py310,py311,py312,py313,pypy310}-urllib3latest,
python-external_urllib3-{py37,py312,py313,pypy310}-urllib30126,
Expand Down Expand Up @@ -317,6 +318,7 @@ deps =
external_httpx: httpx[http2]
external_requests: urllib3
external_requests: requests
external_pyzeebe: pyzeebe
external_urllib3-urllib30126: urllib3<1.27
external_urllib3-urllib3latest: urllib3
framework_aiohttp-aiohttp03: aiohttp<4
Expand Down Expand Up @@ -520,6 +522,7 @@ changedir =
external_httplib: tests/external_httplib
external_httplib2: tests/external_httplib2
external_httpx: tests/external_httpx
external_pyzeebe: tests/external_pyzeebe
external_requests: tests/external_requests
external_urllib3: tests/external_urllib3
framework_aiohttp: tests/framework_aiohttp
Expand Down