diff --git a/newrelic/config.py b/newrelic/config.py index fd01d94ab3..c80b973587 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -4088,6 +4088,8 @@ def _process_module_builtin_defaults(): ) _process_module_definition("tornado.routing", "newrelic.hooks.framework_tornado", "instrument_tornado_routing") _process_module_definition("tornado.web", "newrelic.hooks.framework_tornado", "instrument_tornado_web") + _process_module_definition("pyzeebe.client.client", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_client_client") + _process_module_definition("pyzeebe.worker.job_executor", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_worker_job_executor") def _process_module_entry_points(): diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index d8a3162251..6bdd1f93ef 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -96,6 +96,12 @@ "response.headers.contentType", "response.status", "server.address", + "zeebe.client.bpmnProcessId", + "zeebe.client.messageName", + "zeebe.client.correlationKey", + "zeebe.client.messageId", + "zeebe.client.resourceCount", + "zeebe.client.resourceFile", } MAX_NUM_USER_ATTRIBUTES = 128 diff --git a/newrelic/hooks/external_pyzeebe.py b/newrelic/hooks/external_pyzeebe.py new file mode 100644 index 0000000000..304f85f63d --- /dev/null +++ b/newrelic/hooks/external_pyzeebe.py @@ -0,0 +1,111 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging + +from newrelic.api.application import application_instance +from newrelic.api.web_transaction import WebTransaction +from newrelic.api.function_trace import FunctionTrace +from newrelic.api.transaction import current_transaction +from newrelic.common.object_wrapper import wrap_function_wrapper + +_logger = logging.getLogger(__name__) + +CLIENT_ATTRIBUTES_WARNING_LOG_MSG = "Exception occurred in PyZeebe instrumentation: Failed to add client attributes." + + +# Adds client method params as txn or span attributes +def _add_client_input_attributes(method_name, trace, args, kwargs): + try: + if method_name in ("run_process", "run_process_with_result"): + bpmn_id = kwargs.get("bpmn_process_id", args[0] if args else None) + if bpmn_id: + trace._add_agent_attribute("zeebe.client.bpmnProcessId", bpmn_id) + elif method_name == "publish_message": + msg_name = kwargs.get("name", args[0] if args else None) + if msg_name: + trace._add_agent_attribute("zeebe.client.messageName", msg_name) + correlation_key = kwargs.get("correlation_key", args[1] if args and len(args) > 1 else None) + if correlation_key: + trace._add_agent_attribute("zeebe.client.correlationKey", correlation_key) + message_id = kwargs.get("message_id") + if message_id and len(args) > 4: + message_id = args[4] + if message_id: + trace._add_agent_attribute("zeebe.client.messageId", message_id) + elif method_name == "deploy_resource": + resources = list(args) + if len(resources) == 1 and isinstance(resources[0], (list, tuple)): + resources = list(resources[0]) + if resources: + trace._add_agent_attribute("zeebe.client.resourceCount", len(resources)) + if len(resources) == 1: + trace._add_agent_attribute("zeebe.client.resourceFile", str(resources[0])) + except Exception: + _logger.warning(CLIENT_ATTRIBUTES_WARNING_LOG_MSG, exc_info=True) + + +# Async wrapper that instruments router/worker annotations` +async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs): + job = args[0] if args else kwargs.get("job") + process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess" + task_type = getattr(job, "type", None) or "UnknownType" + txn_name = f"{process_id}/{task_type}" + + with WebTransaction(application_instance(), txn_name, group="ZeebeTask") as txn: + if job is not None: + if hasattr(job, "key"): + txn.add_custom_attribute("zeebe.job.key", job.key) + if hasattr(job, "type"): + txn.add_custom_attribute("zeebe.job.type", job.type) + if hasattr(job, "bpmn_process_id"): + txn.add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id) + if hasattr(job, "process_instance_key"): + txn.add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key) + if hasattr(job, "element_id"): + txn.add_custom_attribute("zeebe.job.elementId", job.element_id) + + return await wrapped(*args, **kwargs) + + +# Async wrapper that instruments a ZeebeClient method. +def _nr_client_wrapper(method_name): + async def wrapper(wrapped, instance, args, kwargs): + txn = current_transaction() + if txn: + # add_fn = add_custom_span_attribute + with FunctionTrace(name=method_name, group="ZeebeClient") as trace: + _add_client_input_attributes(method_name, trace, args, kwargs) + return await wrapped(*args, **kwargs) + else: + return wrapped(*args, **kwargs) + + return wrapper + + +# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations) +def instrument_pyzeebe_worker_job_executor(module): + if hasattr(module, "JobExecutor"): + wrap_function_wrapper(module, "JobExecutor.execute_one_job", _nr_wrapper_execute_one_job) + + +# Instrument ZeebeClient methods to trace client calls. +def instrument_pyzeebe_client_client(module): + target_methods = ("run_process", "run_process_with_result", "deploy_resource", "publish_message") + + for method_name in target_methods: + if hasattr(module, "ZeebeClient"): + if hasattr(module.ZeebeClient, method_name): + wrap_function_wrapper(module, f"ZeebeClient.{method_name}", _nr_client_wrapper(method_name)) diff --git a/tests/external_pyzeebe/_mocks.py b/tests/external_pyzeebe/_mocks.py new file mode 100644 index 0000000000..0de7ec17b8 --- /dev/null +++ b/tests/external_pyzeebe/_mocks.py @@ -0,0 +1,102 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from types import SimpleNamespace + +from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter + +# Dummy response objects with only required fields +DummyCreateProcessInstanceResponse = SimpleNamespace(process_instance_key=12345) + +DummyCreateProcessInstanceWithResultResponse = SimpleNamespace( + process_instance_key=45678, variables={"result": "success"} +) + +DummyDeployResourceResponse = SimpleNamespace(key=67890, deployments=[], tenant_id=None) + +DummyPublishMessageResponse = SimpleNamespace(key=99999, tenant_id=None) + + +# Dummy RPC stub coroutines +async def dummy_create_process_instance( + self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None +): + """Simulate ZeebeAdapter.create_process_instance""" + return DummyCreateProcessInstanceResponse + + +async def dummy_create_process_instance_with_result( + self, + bpmn_process_id: str, + variables: dict = None, + version: int = -1, + timeout: int = 0, + variables_to_fetch=None, + tenant_id: str = None, +): + """Simulate ZeebeAdapter.create_process_instance_with_result""" + return DummyCreateProcessInstanceWithResultResponse + + +async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None): + """Simulate ZeebeAdapter.deploy_resource""" + # Create dummy deployment metadata for each provided resource path + deployments = [ + SimpleNamespace( + resource_name=str(path), + bpmn_process_id="dummy_process", + process_definition_key=123, + version=1, + tenant_id=tenant_id if tenant_id is not None else None, + ) + for path in resource_file_path + ] + # Create a dummy response with a list of deployments + return SimpleNamespace( + deployment_key=333333, deployments=deployments, tenant_id=tenant_id if tenant_id is not None else None + ) + + +async def dummy_publish_message( + self, + name: str, + correlation_key: str, + variables: dict = None, + time_to_live_in_milliseconds: int = 60000, + message_id: str = None, + tenant_id: str = None, +): + """Simulate ZeebeAdapter.publish_message""" + # Return the dummy response (contains message key) + return SimpleNamespace(key=999999, tenant_id=tenant_id if tenant_id is not None else None) + + +async def dummy_complete_job(self, job_key: int, variables: dict): + """Simulate JobExecutor.complete_job""" + self._last_complete = {"job_key": job_key, "variables": variables} + return None + + +class DummyZeebeAdapter(ZeebeAdapter): + """Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel""" + + def __init__(self): + self.completed_job_key = None + self.completed_job_vars = None + + async def complete_job(self, job_key: int, variables: dict): + self.completed_job_key = job_key + self.completed_job_vars = variables + return None diff --git a/tests/external_pyzeebe/conftest.py b/tests/external_pyzeebe/conftest.py new file mode 100644 index 0000000000..35d6bc5700 --- /dev/null +++ b/tests/external_pyzeebe/conftest.py @@ -0,0 +1,29 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from testing_support.fixture.event_loop import event_loop as loop +from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture + +_default_settings = { + "package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs. + "transaction_tracer.explain_threshold": 0.0, + "transaction_tracer.transaction_threshold": 0.0, + "transaction_tracer.stack_trace_threshold": 0.0, + "debug.log_data_collector_payloads": True, + "debug.record_transaction_failure": True, +} + +collector_agent_registration = collector_agent_registration_fixture( + app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings +) diff --git a/tests/external_pyzeebe/test.bpmn b/tests/external_pyzeebe/test.bpmn new file mode 100644 index 0000000000..7cdf1e410d --- /dev/null +++ b/tests/external_pyzeebe/test.bpmn @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/external_pyzeebe/test_client.py b/tests/external_pyzeebe/test_client.py new file mode 100644 index 0000000000..d99213ad72 --- /dev/null +++ b/tests/external_pyzeebe/test_client.py @@ -0,0 +1,91 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _mocks import ( + dummy_create_process_instance, + dummy_create_process_instance_with_result, + dummy_deploy_resource, + dummy_publish_message, +) +from pyzeebe import ZeebeClient, create_insecure_channel +from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter +from testing_support.validators.validate_span_events import validate_span_events +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +from newrelic.api.background_task import background_task + +client = ZeebeClient(create_insecure_channel()) + + +@validate_transaction_metrics( + "test_zeebe_client:run_process", rollup_metrics=[("ZeebeClient/run_process", 1)], background_task=True +) +@validate_span_events(exact_agents={"zeebe.client.bpmnProcessId": "test_process"}, count=1) +def test_run_process(monkeypatch, loop): + monkeypatch.setattr(ZeebeAdapter, "create_process_instance", dummy_create_process_instance) + + @background_task(name="test_zeebe_client:run_process") + async def _test(): + response = await client.run_process("test_process") + assert response.process_instance_key == 12345 + + loop.run_until_complete(_test()) + + +@validate_transaction_metrics( + "test_zeebe_client:run_process_with_result", + rollup_metrics=[("ZeebeClient/run_process_with_result", 1)], + background_task=True, +) +@validate_span_events(exact_agents={"zeebe.client.bpmnProcessId": "test_process"}, count=1) +def test_run_process_with_result(monkeypatch, loop): + monkeypatch.setattr(ZeebeAdapter, "create_process_instance_with_result", dummy_create_process_instance_with_result) + + @background_task(name="test_zeebe_client:run_process_with_result") + async def _test(): + result = await client.run_process_with_result("test_process") + assert result.process_instance_key == 45678 + assert result.variables == {"result": "success"} + + loop.run_until_complete(_test()) + + +@validate_transaction_metrics( + "test_zeebe_client:deploy_resource", rollup_metrics=[("ZeebeClient/deploy_resource", 1)], background_task=True +) +@validate_span_events(exact_agents={"zeebe.client.resourceCount": 1, "zeebe.client.resourceFile": "test.bpmn"}, count=1) +def test_deploy_resource(monkeypatch, loop): + monkeypatch.setattr(ZeebeAdapter, "deploy_resource", dummy_deploy_resource) + + @background_task(name="test_zeebe_client:deploy_resource") + async def _test(): + result = await client.deploy_resource("test.bpmn") + assert result.deployment_key == 333333 + + loop.run_until_complete(_test()) + + +@validate_transaction_metrics( + "test_zeebe_client:publish_message", rollup_metrics=[("ZeebeClient/publish_message", 1)], background_task=True +) +@validate_span_events(exact_agents={"zeebe.client.messageName": "test_message", "zeebe.client.correlationKey": "999999", "zeebe.client.messageId": "abc123"}, count=1) +def test_publish_message(monkeypatch, loop): + monkeypatch.setattr(ZeebeAdapter, "publish_message", dummy_publish_message) + + @background_task(name="test_zeebe_client:publish_message") + async def _test(): + result = await client.publish_message(name="test_message", correlation_key="999999", message_id="abc123") + assert result.key == 999999 + + loop.run_until_complete(_test()) diff --git a/tests/external_pyzeebe/test_job_executor.py b/tests/external_pyzeebe/test_job_executor.py new file mode 100644 index 0000000000..ca6e47a9a1 --- /dev/null +++ b/tests/external_pyzeebe/test_job_executor.py @@ -0,0 +1,79 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +from _mocks import DummyZeebeAdapter +from pyzeebe import Job, JobStatus, ZeebeTaskRouter +from pyzeebe.worker.job_executor import JobController, JobExecutor +from pyzeebe.worker.task_state import TaskState +from testing_support.validators.validate_custom_parameters import validate_custom_parameters +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +# Set up a router with a dummy async task +router = ZeebeTaskRouter() + + +@router.task(task_type="testTask") +async def dummy_task(x: int) -> dict: + """ + Simulate a task function that reads input variable x + """ + return {"result": x} + + +@validate_transaction_metrics(group="ZeebeTask", name="test_process/testTask") +@validate_custom_parameters( + required_params=[ + ("zeebe.job.key", 123), + ("zeebe.job.type", "testTask"), + ("zeebe.job.bpmnProcessId", "test_process"), + ("zeebe.job.processInstanceKey", 456), + ("zeebe.job.elementId", "service_task_123"), + ] +) +def test_execute_one_job(loop): + dummy_adapter = DummyZeebeAdapter() + + # Build a Job with fixed values + job = Job( + key=123, + type="testTask", # must match router.task(task_type="testTask") + bpmn_process_id="test_process", + process_instance_key=456, + process_definition_version=1, + process_definition_key=789, + element_id="service_task_123", + element_instance_key=321, + custom_headers={}, + worker="test_worker", + retries=3, + deadline=0, + variables={"x": 33}, + status=JobStatus.Running, + ) + + # JobExecutor constructor params init + task_obj = router.get_task("testTask") + assert task_obj is not None + jobs_queue = asyncio.Queue() + task_state = TaskState() + + job_executor = JobExecutor(task_obj, jobs_queue, task_state, dummy_adapter) + + # Build a JobController for completion logic. + job_controller = JobController(job, dummy_adapter) + + loop.run_until_complete(job_executor.execute_one_job(job, job_controller)) + assert job.variables["x"] == 33 diff --git a/tox.ini b/tox.ini index c9ca588577..aed58212aa 100644 --- a/tox.ini +++ b/tox.ini @@ -114,6 +114,7 @@ envlist = python-external_http-{py37,py38,py39,py310,py311,py312,py313}, python-external_httplib-{py37,py38,py39,py310,py311,py312,py313,pypy310}, python-external_httplib2-{py37,py38,py39,py310,py311,py312,py313,pypy310}, + python-external_pyzeebe-{py39,py310,py311,py312,pypy310}, python-external_requests-{py37,py38,py39,py310,py311,py312,py313,pypy310}, python-external_urllib3-{py37,py38,py39,py310,py311,py312,py313,pypy310}-urllib3latest, python-external_urllib3-{py37,py312,py313,pypy310}-urllib30126, @@ -317,6 +318,7 @@ deps = external_httpx: httpx[http2] external_requests: urllib3 external_requests: requests + external_pyzeebe: pyzeebe external_urllib3-urllib30126: urllib3<1.27 external_urllib3-urllib3latest: urllib3 framework_aiohttp-aiohttp03: aiohttp<4 @@ -520,6 +522,7 @@ changedir = external_httplib: tests/external_httplib external_httplib2: tests/external_httplib2 external_httpx: tests/external_httpx + external_pyzeebe: tests/external_pyzeebe external_requests: tests/external_requests external_urllib3: tests/external_urllib3 framework_aiohttp: tests/framework_aiohttp