Skip to content

Camunda Pyzeebe Instrumentation #1385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions tests/external_pyzeebe/_dummy_client.py

This file was deleted.

75 changes: 75 additions & 0 deletions tests/external_pyzeebe/_mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from types import SimpleNamespace
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter

# Dummy response objects with only required fields
DummyCreateProcessInstanceResponse = SimpleNamespace(
process_instance_key=12345
)

DummyCreateProcessInstanceWithResultResponse = SimpleNamespace(
process_instance_key=45678,
variables={"result": "success"}
)

DummyDeployResourceResponse = SimpleNamespace(
key=67890,
deployments=[],
tenant_id=None
)

DummyPublishMessageResponse = SimpleNamespace(
key=99999,
tenant_id=None
)

# Dummy RPC stub coroutines
async def dummy_create_process_instance(self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None):
"""Simulate ZeebeAdapter.create_process_instance"""
return DummyCreateProcessInstanceResponse

async def dummy_create_process_instance_with_result(self, bpmn_process_id: str, variables: dict = None, version: int = -1, timeout: int = 0, variables_to_fetch=None, tenant_id: str = None):
"""Simulate ZeebeAdapter.create_process_instance_with_result"""
return DummyCreateProcessInstanceWithResultResponse

async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None):
"""Simulate ZeebeAdapter.deploy_resource"""
# Create dummy deployment metadata for each provided resource path
deployments = []
for path in resource_file_path:
deployments.append(SimpleNamespace(
resource_name=str(path),
bpmn_process_id="dummy_process",
process_definition_key=123,
version=1,
tenant_id=tenant_id if tenant_id is not None else None
))
# Create a dummy response with a list of deployments

Check failure on line 46 in tests/external_pyzeebe/_mocks.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (PERF401)

tests/external_pyzeebe/_mocks.py:40:9: PERF401 Use a list comprehension to create a transformed list
return SimpleNamespace(
deployment_key=333333,
deployments=deployments,
tenant_id=tenant_id if tenant_id is not None else None
)

async def dummy_publish_message(self, name: str, correlation_key: str, variables: dict = None, time_to_live_in_milliseconds: int = 60000, message_id: str = None, tenant_id: str = None):
"""Simulate ZeebeAdapter.publish_message"""
# Return the dummy response (contains message key)
return SimpleNamespace(
key=999999,
tenant_id=tenant_id if tenant_id is not None else None
)

async def dummy_complete_job(self, job_key: int, variables: dict):
"""Simulate JobExecutor.complete_job"""
setattr(self, "_last_complete", {"job_key": job_key, "variables": variables})
return None

class DummyZeebeAdapter(ZeebeAdapter):
"""Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel"""
def __init__(self):
self.completed_job_key = None
self.completed_job_vars = None
async def complete_job(self, job_key: int, variables: dict):
self.completed_job_key = job_key
self.completed_job_vars = variables
return None

2 changes: 2 additions & 0 deletions tests/external_pyzeebe/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import pytest
import grpc
from pyzeebe import ZeebeClient, create_insecure_channel
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
Expand Down
28 changes: 28 additions & 0 deletions tests/external_pyzeebe/test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
targetNamespace="http://example.com/bpmn"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">

<!-- Define the process with a unique id and name -->
<process id="dummyProcess" name="Dummy Process" isExecutable="true">
<!-- Start Event -->
<startEvent id="StartEvent_1" name="Start"/>

<!-- A simple Service Task representing work -->
<serviceTask id="ServiceTask_1" name="Perform Work"/>

<!-- End Event -->
<endEvent id="EndEvent_1" name="End"/>

<!-- Sequence Flows connecting Start → Service Task → End -->
<sequenceFlow id="Flow_1" sourceRef="StartEvent_1" targetRef="ServiceTask_1"/>
<sequenceFlow id="Flow_2" sourceRef="ServiceTask_1" targetRef="EndEvent_1"/>
</process>

<!-- (Optional) BPMNDiagram section can be added for graphical layout, but omitted here -->
</definitions>
110 changes: 65 additions & 45 deletions tests/external_pyzeebe/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,85 @@

import pytest
import asyncio
import sys, types
import _dummy_client as dummy_client
from pyzeebe import ZeebeClient, create_insecure_channel
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from _mocks import dummy_create_process_instance, dummy_create_process_instance_with_result, dummy_deploy_resource, dummy_publish_message

from newrelic.api.background_task import background_task
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
from testing_support.fixtures import validate_attributes

# Force import system to use dummy ZeebeClient
pyzeebe_mod = types.ModuleType("pyzeebe")
pyzeebe_client_pkg = types.ModuleType("pyzeebe.client")

pyzeebe_mod.client = pyzeebe_client_pkg
pyzeebe_client_pkg.client = dummy_client
client = ZeebeClient(create_insecure_channel())

sys.modules["pyzeebe"] = pyzeebe_mod
sys.modules["pyzeebe.client"] = pyzeebe_client_pkg
sys.modules["pyzeebe.client.client"] = dummy_client
@validate_transaction_metrics(
"test_zeebe_client:run_process",
rollup_metrics=[
("ZeebeClient/run_process", 1)
],
background_task=True
)
@validate_attributes("agent", ["zeebe.client.bpmnProcessId"])
def test_run_process(monkeypatch):
monkeypatch.setattr(ZeebeAdapter, "create_process_instance", dummy_create_process_instance)

from pyzeebe.client.client import ZeebeClient
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
from testing_support.fixtures import validate_attributes
@background_task(name="test_zeebe_client:run_process")
def _test():
response = asyncio.run(client.run_process("test_process"))
assert response.process_instance_key == 12345
_test()

AGENT_ATTRIBUTES = [
"zeebe.client.bpmnProcessId",
"zeebe.client.messageName",
"zeebe.client.correlationKey",
"zeebe.client.resourceCount",
"zeebe.client.resourceFile",
]

@validate_transaction_metrics(
"test_client_methods:function_trace",
"test_zeebe_client:run_process_with_result",
rollup_metrics=[
("ZeebeClient/run_process", 1),
("ZeebeClient/run_process_with_result", 1),
("ZeebeClient/deploy_resource", 1),
("ZeebeClient/publish_message", 1)
("ZeebeClient/run_process_with_result", 1)
],
background_task=True,
background_task=True
)
@validate_attributes("agent", AGENT_ATTRIBUTES)
def test_client_methods():
@background_task(name="test_client_methods:function_trace")
@validate_attributes("agent", ["zeebe.client.bpmnProcessId"])
def test_run_process_with_result(monkeypatch):
monkeypatch.setattr(ZeebeAdapter, "create_process_instance_with_result", dummy_create_process_instance_with_result)

@background_task(name="test_zeebe_client:run_process_with_result")
def _test():
client = ZeebeClient()

#run_process
result_1 = asyncio.run(client.run_process("DummyProcess"))
assert hasattr(result_1, "process_instance_key")
assert result_1.process_instance_key == 12345
result = asyncio.run(client.run_process_with_result("test_process"))
assert result.process_instance_key == 45678
assert result.variables == {"result": "success"}
_test()

#run_process_with_result
result_2 = asyncio.run(client.run_process_with_result("DummyProcess"))
assert hasattr(result_2, "process_instance_key")
assert result_2.process_instance_key == 45678

# deploy_resource
result_3 = asyncio.run(client.deploy_resource("test-workflow.bpmn"))
assert result_3["deployment_key"] == 33333
@validate_transaction_metrics(
"test_zeebe_client:deploy_resource",
rollup_metrics=[
("ZeebeClient/deploy_resource", 1)
],
background_task=True
)
@validate_attributes("agent", ["zeebe.client.resourceCount", "zeebe.client.resourceFile"])
def test_deploy_resource(monkeypatch):
monkeypatch.setattr(ZeebeAdapter, "deploy_resource", dummy_deploy_resource)

@background_task(name="test_zeebe_client:deploy_resource")
def _test():
result = asyncio.run(client.deploy_resource("test.bpmn"))
assert result.deployment_key == 333333
_test()


@validate_transaction_metrics(
"test_zeebe_client:publish_message",
rollup_metrics=[
("ZeebeClient/publish_message", 1)
],
background_task=True
)
@validate_attributes("agent", ["zeebe.client.messageName", "zeebe.client.correlationKey", "zeebe.client.messageId"])
def test_publish_message(monkeypatch):
monkeypatch.setattr(ZeebeAdapter, "publish_message", dummy_publish_message)

# publish_message
result_4 = asyncio.run(client.publish_message("test_message", correlation_key="12345"))
assert result_4["message_key"] == 56789
@background_task(name="test_zeebe_client:publish_message")
def _test():
result = asyncio.run(client.publish_message(name="test_message", correlation_key="999999", message_id="abc123"))
assert result.key == 999999
_test()
70 changes: 70 additions & 0 deletions tests/external_pyzeebe/test_job_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import pytest

from pyzeebe import Job, ZeebeTaskRouter, JobStatus
from pyzeebe.worker.job_executor import JobExecutor, JobController
from pyzeebe.worker.task_state import TaskState
from _mocks import DummyZeebeAdapter

# from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
from testing_support.validators.validate_custom_parameters import validate_custom_parameters

# Set up a router with a dummy async task
router = ZeebeTaskRouter()

@router.task(task_type="testTask")
async def dummy_task(x: int) -> dict:
"""
Simulate a task function that reads input variable 'x'
and adds 1.
"""
return {"result": x }

# @validate_transaction_metrics(
# "ZeebeTask/test_process/testTask",
# rollup_metrics=[
# ("ZeebeTask/test_process/testTask", 1)
# ],
# background_task=True
# )
@validate_custom_parameters(required_params=[
("zeebe.job.key", 123),
("zeebe.job.type", "testTask"),
("zeebe.job.bpmnProcessId", "test_process"),
("zeebe.job.processInstanceKey", 456),
("zeebe.job.elementId", "service_task_123")
])
def test_execute_one_job():
dummy_adapter = DummyZeebeAdapter()

# Build a Job with fixed values
job = Job(
key=123,
type="testTask", # must match router.task(task_type="testTask")
bpmn_process_id="test_process",
process_instance_key=456,
process_definition_version=1,
process_definition_key=789,
element_id="service_task_123",
element_instance_key=321,
custom_headers={},
worker="test_worker",
retries=3,
deadline=0,
variables={"x": 33},
status=JobStatus.Running
)

# JobExecutor constructor params init
task_obj = router.get_task("testTask")
assert task_obj is not None
jobs_queue = asyncio.Queue()
task_state = TaskState()

job_executor = JobExecutor(task_obj, jobs_queue, task_state, dummy_adapter)

# Build a JobController for completion logic.
job_controller = JobController(job, dummy_adapter)

asyncio.run(job_executor.execute_one_job(job, job_controller))
assert job.variables["x"] == 33
Loading