Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions camunda/client/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
logger = logging.getLogger(__name__)

ENGINE_LOCAL_BASE_URL = "http://localhost:8080/engine-rest"
ENGINE_REST_AUTH = None


class EngineClient:

def __init__(self, engine_base_url=ENGINE_LOCAL_BASE_URL):
def __init__(self, engine_base_url=ENGINE_LOCAL_BASE_URL, engine_rest_auth=ENGINE_REST_AUTH):
self.engine_base_url = engine_base_url
self.rest_auth = engine_rest_auth

def get_start_process_instance_url(self, process_key, tenant_id=None):
if tenant_id:
Expand All @@ -39,15 +41,24 @@ def start_process(self, process_key, variables, tenant_id=None, business_key=Non
if business_key:
body["businessKey"] = business_key

response = requests.post(url, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
if self.rest_auth:
response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
else:
response = requests.post(url, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
return response.json()

def get_process_instance(self, process_key=None, variables=frozenset([]), tenant_ids=frozenset([])):
url = f"{self.engine_base_url}/process-instance"
url_params = self.__get_process_instance_url_params(process_key, tenant_ids, variables)
response = requests.get(url, headers=self._get_headers(), params=url_params)
raise_exception_if_not_ok(response)

if self.rest_auth:
response = requests.get(url, auth=self.rest_auth, headers=self._get_headers(), params=url_params)
raise_exception_if_not_ok(response)
else:
response = requests.get(url, headers=self._get_headers(), params=url_params)
raise_exception_if_not_ok(response)
return response.json()

@staticmethod
Expand Down Expand Up @@ -98,8 +109,12 @@ def correlate_message(self, message_name, process_instance_id=None, tenant_id=No

body = {k: v for k, v in body.items() if v is not None}

response = requests.post(url, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
if self.rest_auth:
response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
else:
response = requests.post(url, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
return response.json()

def get_jobs(self,
Expand Down Expand Up @@ -129,16 +144,25 @@ def get_jobs(self,
params["withException"] = "true"
if tenant_ids:
params["tenantIdIn"] = ','.join(tenant_ids)
response = requests.get(url, params=params, headers=self._get_headers())
raise_exception_if_not_ok(response)

if self.rest_auth:
response = requests.get(url, auth=self.rest_auth, params=params, headers=self._get_headers())
raise_exception_if_not_ok(response)
else:
response = requests.get(url, params=params, headers=self._get_headers())
raise_exception_if_not_ok(response)
return response.json()

def set_job_retry(self, job_id, retries=1):
url = f"{self.engine_base_url}/job/{job_id}/retries"
body = {"retries": retries}

response = requests.put(url, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
if self.rest_auth:
response = requests.put(url, auth=self.rest_auth, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
else:
response = requests.put(url, headers=self._get_headers(), json=body)
raise_exception_if_not_ok(response)
return response.status_code == HTTPStatus.NO_CONTENT

def get_process_instance_variable(self, process_instance_id, variable_name, with_meta=False):
Expand Down
39 changes: 30 additions & 9 deletions camunda/client/external_task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import requests

from camunda.client.engine_client import ENGINE_LOCAL_BASE_URL
from camunda.client.engine_client import ENGINE_REST_AUTH
from camunda.utils.log_utils import log_with_context
from camunda.utils.response_utils import raise_exception_if_not_ok
from camunda.utils.utils import str_to_list
Expand All @@ -24,7 +25,7 @@ class ExternalTaskClient:
"includeExtensionProperties": True # enables Camunda Extension Properties
}

def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None):
def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, engine_rest_auth=ENGINE_REST_AUTH, config=None):
config = config if config is not None else {}
self.worker_id = worker_id
self.external_task_base_url = engine_base_url + "/external-task"
Expand All @@ -33,6 +34,7 @@ def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None
self.is_debug = config.get('isDebug', False)
self.http_timeout_seconds = self.config.get('httpTimeoutMillis') / 1000
self._log_with_context(f"Created External Task client with config: {self.config}")
self.rest_auth = engine_rest_auth

def get_fetch_and_lock_url(self):
return f"{self.external_task_base_url}/fetchAndLock"
Expand All @@ -49,8 +51,13 @@ def fetch_and_lock(self, topic_names, process_variables=None):
if self.is_debug:
self._log_with_context(f"trying to fetch and lock with request payload: {body}")
http_timeout_seconds = self.__get_fetch_and_lock_http_timeout_seconds()
response = requests.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds)
raise_exception_if_not_ok(response)

if self.rest_auth:
response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=http_timeout_seconds)
raise_exception_if_not_ok(response)
else:
response = requests.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds)
raise_exception_if_not_ok(response)

resp_json = response.json()
if self.is_debug:
Expand Down Expand Up @@ -83,8 +90,12 @@ def complete(self, task_id, global_variables, local_variables=None):
"localVariables": Variables.format(local_variables)
}

response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
if self.rest_auth:
response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
else:
response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
return response.status_code == HTTPStatus.NO_CONTENT

def get_task_complete_url(self, task_id):
Expand All @@ -102,8 +113,13 @@ def failure(self, task_id, error_message, error_details, retries, retry_timeout)
if error_details:
body["errorDetails"] = error_details

response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
if self.rest_auth:
response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)
else:
response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
raise_exception_if_not_ok(response)

return response.status_code == HTTPStatus.NO_CONTENT

def get_task_failure_url(self, task_id):
Expand All @@ -122,8 +138,13 @@ def bpmn_failure(self, task_id, error_code, error_message, variables=None):
if self.is_debug:
self._log_with_context(f"trying to report bpmn error with request payload: {body}")

resp = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
resp.raise_for_status()
if self.rest_auth:
resp = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
resp.raise_for_status()
else:
resp = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds)
resp.raise_for_status()

return resp.status_code == HTTPStatus.NO_CONTENT

def get_task_bpmn_error_url(self, task_id):
Expand Down
6 changes: 3 additions & 3 deletions camunda/external_task/external_task_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time

from camunda.client.external_task_client import ExternalTaskClient, ENGINE_LOCAL_BASE_URL
from camunda.client.external_task_client import ExternalTaskClient, ENGINE_LOCAL_BASE_URL, ENGINE_REST_AUTH
from camunda.external_task.external_task import ExternalTask
from camunda.external_task.external_task_executor import ExternalTaskExecutor
from camunda.utils.log_utils import log_with_context
Expand All @@ -10,10 +10,10 @@
class ExternalTaskWorker:
DEFAULT_SLEEP_SECONDS = 300

def __init__(self, worker_id, base_url=ENGINE_LOCAL_BASE_URL, config=None):
def __init__(self, worker_id, base_url=ENGINE_LOCAL_BASE_URL, rest_auth=ENGINE_REST_AUTH, config=None):
config = config if config is not None else {} # To avoid to have a mutable default for a parameter
self.worker_id = worker_id
self.client = ExternalTaskClient(self.worker_id, base_url, config)
self.client = ExternalTaskClient(self.worker_id, base_url, rest_auth, config)
self.executor = ExternalTaskExecutor(self.worker_id, self.client)
self.config = config
self._log_with_context(f"Created new External Task Worker with config: {self.config}")
Expand Down