diff --git a/camunda/client/engine_client.py b/camunda/client/engine_client.py index e5e0449..07dc68a 100644 --- a/camunda/client/engine_client.py +++ b/camunda/client/engine_client.py @@ -11,12 +11,14 @@ logger = logging.getLogger(__name__) ENGINE_LOCAL_BASE_URL = "http://localhost:8080/engine-rest" +ENGINE_REST_AUTH = None class EngineClient: - def __init__(self, engine_base_url=ENGINE_LOCAL_BASE_URL): + def __init__(self, engine_base_url=ENGINE_LOCAL_BASE_URL, engine_rest_auth=ENGINE_REST_AUTH): self.engine_base_url = engine_base_url + self.rest_auth = engine_rest_auth def get_start_process_instance_url(self, process_key, tenant_id=None): if tenant_id: @@ -39,15 +41,24 @@ def start_process(self, process_key, variables, tenant_id=None, business_key=Non if business_key: body["businessKey"] = business_key - response = requests.post(url, headers=self._get_headers(), json=body) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) return response.json() def get_process_instance(self, process_key=None, variables=frozenset([]), tenant_ids=frozenset([])): url = f"{self.engine_base_url}/process-instance" url_params = self.__get_process_instance_url_params(process_key, tenant_ids, variables) - response = requests.get(url, headers=self._get_headers(), params=url_params) - raise_exception_if_not_ok(response) + + if self.rest_auth: + response = requests.get(url, auth=self.rest_auth, headers=self._get_headers(), params=url_params) + raise_exception_if_not_ok(response) + else: + response = requests.get(url, headers=self._get_headers(), params=url_params) + raise_exception_if_not_ok(response) return response.json() @staticmethod @@ -98,8 +109,12 @@ def correlate_message(self, message_name, process_instance_id=None, tenant_id=No body = {k: v for k, v in body.items() if v is not None} - response = requests.post(url, headers=self._get_headers(), json=body) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) return response.json() def get_jobs(self, @@ -129,16 +144,25 @@ def get_jobs(self, params["withException"] = "true" if tenant_ids: params["tenantIdIn"] = ','.join(tenant_ids) - response = requests.get(url, params=params, headers=self._get_headers()) - raise_exception_if_not_ok(response) + + if self.rest_auth: + response = requests.get(url, auth=self.rest_auth, params=params, headers=self._get_headers()) + raise_exception_if_not_ok(response) + else: + response = requests.get(url, params=params, headers=self._get_headers()) + raise_exception_if_not_ok(response) return response.json() def set_job_retry(self, job_id, retries=1): url = f"{self.engine_base_url}/job/{job_id}/retries" body = {"retries": retries} - response = requests.put(url, headers=self._get_headers(), json=body) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.put(url, auth=self.rest_auth, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) + else: + response = requests.put(url, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) return response.status_code == HTTPStatus.NO_CONTENT def get_process_instance_variable(self, process_instance_id, variable_name, with_meta=False): diff --git a/camunda/client/external_task_client.py b/camunda/client/external_task_client.py index 31473f2..13d1adb 100644 --- a/camunda/client/external_task_client.py +++ b/camunda/client/external_task_client.py @@ -4,6 +4,7 @@ import requests from camunda.client.engine_client import ENGINE_LOCAL_BASE_URL +from camunda.client.engine_client import ENGINE_REST_AUTH from camunda.utils.log_utils import log_with_context from camunda.utils.response_utils import raise_exception_if_not_ok from camunda.utils.utils import str_to_list @@ -24,7 +25,7 @@ class ExternalTaskClient: "includeExtensionProperties": True # enables Camunda Extension Properties } - def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None): + def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, engine_rest_auth=ENGINE_REST_AUTH, config=None): config = config if config is not None else {} self.worker_id = worker_id self.external_task_base_url = engine_base_url + "/external-task" @@ -33,6 +34,7 @@ def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None self.is_debug = config.get('isDebug', False) self.http_timeout_seconds = self.config.get('httpTimeoutMillis') / 1000 self._log_with_context(f"Created External Task client with config: {self.config}") + self.rest_auth = engine_rest_auth def get_fetch_and_lock_url(self): return f"{self.external_task_base_url}/fetchAndLock" @@ -49,8 +51,13 @@ def fetch_and_lock(self, topic_names, process_variables=None): if self.is_debug: self._log_with_context(f"trying to fetch and lock with request payload: {body}") http_timeout_seconds = self.__get_fetch_and_lock_http_timeout_seconds() - response = requests.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds) - raise_exception_if_not_ok(response) + + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=http_timeout_seconds) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds) + raise_exception_if_not_ok(response) resp_json = response.json() if self.is_debug: @@ -83,8 +90,12 @@ def complete(self, task_id, global_variables, local_variables=None): "localVariables": Variables.format(local_variables) } - response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) return response.status_code == HTTPStatus.NO_CONTENT def get_task_complete_url(self, task_id): @@ -102,8 +113,13 @@ def failure(self, task_id, error_message, error_details, retries, retry_timeout) if error_details: body["errorDetails"] = error_details - response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) + return response.status_code == HTTPStatus.NO_CONTENT def get_task_failure_url(self, task_id): @@ -122,8 +138,13 @@ def bpmn_failure(self, task_id, error_code, error_message, variables=None): if self.is_debug: self._log_with_context(f"trying to report bpmn error with request payload: {body}") - resp = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) - resp.raise_for_status() + if self.rest_auth: + resp = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + resp.raise_for_status() + else: + resp = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + resp.raise_for_status() + return resp.status_code == HTTPStatus.NO_CONTENT def get_task_bpmn_error_url(self, task_id): diff --git a/camunda/external_task/external_task_worker.py b/camunda/external_task/external_task_worker.py index ea8ac85..7a8c7eb 100644 --- a/camunda/external_task/external_task_worker.py +++ b/camunda/external_task/external_task_worker.py @@ -1,6 +1,6 @@ import time -from camunda.client.external_task_client import ExternalTaskClient, ENGINE_LOCAL_BASE_URL +from camunda.client.external_task_client import ExternalTaskClient, ENGINE_LOCAL_BASE_URL, ENGINE_REST_AUTH from camunda.external_task.external_task import ExternalTask from camunda.external_task.external_task_executor import ExternalTaskExecutor from camunda.utils.log_utils import log_with_context @@ -10,10 +10,10 @@ class ExternalTaskWorker: DEFAULT_SLEEP_SECONDS = 300 - def __init__(self, worker_id, base_url=ENGINE_LOCAL_BASE_URL, config=None): + def __init__(self, worker_id, base_url=ENGINE_LOCAL_BASE_URL, rest_auth=ENGINE_REST_AUTH, config=None): config = config if config is not None else {} # To avoid to have a mutable default for a parameter self.worker_id = worker_id - self.client = ExternalTaskClient(self.worker_id, base_url, config) + self.client = ExternalTaskClient(self.worker_id, base_url, rest_auth, config) self.executor = ExternalTaskExecutor(self.worker_id, self.client) self.config = config self._log_with_context(f"Created new External Task Worker with config: {self.config}")