diff --git a/camunda/client/engine_client.py b/camunda/client/engine_client.py index e5e0449..07dc68a 100644 --- a/camunda/client/engine_client.py +++ b/camunda/client/engine_client.py @@ -11,12 +11,14 @@ logger = logging.getLogger(__name__) ENGINE_LOCAL_BASE_URL = "http://localhost:8080/engine-rest" +ENGINE_REST_AUTH = None class EngineClient: - def __init__(self, engine_base_url=ENGINE_LOCAL_BASE_URL): + def __init__(self, engine_base_url=ENGINE_LOCAL_BASE_URL, engine_rest_auth=ENGINE_REST_AUTH): self.engine_base_url = engine_base_url + self.rest_auth = engine_rest_auth def get_start_process_instance_url(self, process_key, tenant_id=None): if tenant_id: @@ -39,15 +41,24 @@ def start_process(self, process_key, variables, tenant_id=None, business_key=Non if business_key: body["businessKey"] = business_key - response = requests.post(url, headers=self._get_headers(), json=body) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) return response.json() def get_process_instance(self, process_key=None, variables=frozenset([]), tenant_ids=frozenset([])): url = f"{self.engine_base_url}/process-instance" url_params = self.__get_process_instance_url_params(process_key, tenant_ids, variables) - response = requests.get(url, headers=self._get_headers(), params=url_params) - raise_exception_if_not_ok(response) + + if self.rest_auth: + response = requests.get(url, auth=self.rest_auth, headers=self._get_headers(), params=url_params) + raise_exception_if_not_ok(response) + else: + response = requests.get(url, headers=self._get_headers(), params=url_params) + raise_exception_if_not_ok(response) return response.json() @staticmethod @@ -98,8 +109,12 @@ def correlate_message(self, message_name, process_instance_id=None, tenant_id=No body = {k: v for k, v in body.items() if v is not None} - response = requests.post(url, headers=self._get_headers(), json=body) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) return response.json() def get_jobs(self, @@ -129,16 +144,25 @@ def get_jobs(self, params["withException"] = "true" if tenant_ids: params["tenantIdIn"] = ','.join(tenant_ids) - response = requests.get(url, params=params, headers=self._get_headers()) - raise_exception_if_not_ok(response) + + if self.rest_auth: + response = requests.get(url, auth=self.rest_auth, params=params, headers=self._get_headers()) + raise_exception_if_not_ok(response) + else: + response = requests.get(url, params=params, headers=self._get_headers()) + raise_exception_if_not_ok(response) return response.json() def set_job_retry(self, job_id, retries=1): url = f"{self.engine_base_url}/job/{job_id}/retries" body = {"retries": retries} - response = requests.put(url, headers=self._get_headers(), json=body) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.put(url, auth=self.rest_auth, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) + else: + response = requests.put(url, headers=self._get_headers(), json=body) + raise_exception_if_not_ok(response) return response.status_code == HTTPStatus.NO_CONTENT def get_process_instance_variable(self, process_instance_id, variable_name, with_meta=False): diff --git a/camunda/client/external_task_client.py b/camunda/client/external_task_client.py index 31473f2..13d1adb 100644 --- a/camunda/client/external_task_client.py +++ b/camunda/client/external_task_client.py @@ -4,6 +4,7 @@ import requests from camunda.client.engine_client import ENGINE_LOCAL_BASE_URL +from camunda.client.engine_client import ENGINE_REST_AUTH from camunda.utils.log_utils import log_with_context from camunda.utils.response_utils import raise_exception_if_not_ok from camunda.utils.utils import str_to_list @@ -24,7 +25,7 @@ class ExternalTaskClient: "includeExtensionProperties": True # enables Camunda Extension Properties } - def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None): + def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, engine_rest_auth=ENGINE_REST_AUTH, config=None): config = config if config is not None else {} self.worker_id = worker_id self.external_task_base_url = engine_base_url + "/external-task" @@ -33,6 +34,7 @@ def __init__(self, worker_id, engine_base_url=ENGINE_LOCAL_BASE_URL, config=None self.is_debug = config.get('isDebug', False) self.http_timeout_seconds = self.config.get('httpTimeoutMillis') / 1000 self._log_with_context(f"Created External Task client with config: {self.config}") + self.rest_auth = engine_rest_auth def get_fetch_and_lock_url(self): return f"{self.external_task_base_url}/fetchAndLock" @@ -49,8 +51,13 @@ def fetch_and_lock(self, topic_names, process_variables=None): if self.is_debug: self._log_with_context(f"trying to fetch and lock with request payload: {body}") http_timeout_seconds = self.__get_fetch_and_lock_http_timeout_seconds() - response = requests.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds) - raise_exception_if_not_ok(response) + + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=http_timeout_seconds) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body, timeout=http_timeout_seconds) + raise_exception_if_not_ok(response) resp_json = response.json() if self.is_debug: @@ -83,8 +90,12 @@ def complete(self, task_id, global_variables, local_variables=None): "localVariables": Variables.format(local_variables) } - response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) return response.status_code == HTTPStatus.NO_CONTENT def get_task_complete_url(self, task_id): @@ -102,8 +113,13 @@ def failure(self, task_id, error_message, error_details, retries, retry_timeout) if error_details: body["errorDetails"] = error_details - response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) - raise_exception_if_not_ok(response) + if self.rest_auth: + response = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) + else: + response = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + raise_exception_if_not_ok(response) + return response.status_code == HTTPStatus.NO_CONTENT def get_task_failure_url(self, task_id): @@ -122,8 +138,13 @@ def bpmn_failure(self, task_id, error_code, error_message, variables=None): if self.is_debug: self._log_with_context(f"trying to report bpmn error with request payload: {body}") - resp = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) - resp.raise_for_status() + if self.rest_auth: + resp = requests.post(url, auth=self.rest_auth, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + resp.raise_for_status() + else: + resp = requests.post(url, headers=self._get_headers(), json=body, timeout=self.http_timeout_seconds) + resp.raise_for_status() + return resp.status_code == HTTPStatus.NO_CONTENT def get_task_bpmn_error_url(self, task_id): diff --git a/camunda/external_task/external_task_worker.py b/camunda/external_task/external_task_worker.py index ea8ac85..7a8c7eb 100644 --- a/camunda/external_task/external_task_worker.py +++ b/camunda/external_task/external_task_worker.py @@ -1,6 +1,6 @@ import time -from camunda.client.external_task_client import ExternalTaskClient, ENGINE_LOCAL_BASE_URL +from camunda.client.external_task_client import ExternalTaskClient, ENGINE_LOCAL_BASE_URL, ENGINE_REST_AUTH from camunda.external_task.external_task import ExternalTask from camunda.external_task.external_task_executor import ExternalTaskExecutor from camunda.utils.log_utils import log_with_context @@ -10,10 +10,10 @@ class ExternalTaskWorker: DEFAULT_SLEEP_SECONDS = 300 - def __init__(self, worker_id, base_url=ENGINE_LOCAL_BASE_URL, config=None): + def __init__(self, worker_id, base_url=ENGINE_LOCAL_BASE_URL, rest_auth=ENGINE_REST_AUTH, config=None): config = config if config is not None else {} # To avoid to have a mutable default for a parameter self.worker_id = worker_id - self.client = ExternalTaskClient(self.worker_id, base_url, config) + self.client = ExternalTaskClient(self.worker_id, base_url, rest_auth, config) self.executor = ExternalTaskExecutor(self.worker_id, self.client) self.config = config self._log_with_context(f"Created new External Task Worker with config: {self.config}") diff --git a/camunda/variables/tests/test_variables.py b/camunda/variables/tests/test_variables.py index 8b08577..af23fcd 100644 --- a/camunda/variables/tests/test_variables.py +++ b/camunda/variables/tests/test_variables.py @@ -51,3 +51,11 @@ def test_to_dict_returns_variables_as_dict(self): "var2": {"value": True}, "var3": {"value": "string"}}) self.assertDictEqual({"var1": 1, "var2": True, "var3": "string"}, variables.to_dict()) + + def test_json_returns_variables_as_json(self): + variables = {"list": ["a", "b", "c"], "obj": [{"z":2, "h":3}, {"t":5, "s":9}], "zobj": {"az": [0, 1, 2], "zot": [True, False, False]}} + formatted_vars = Variables.format(variables) + self.assertDictEqual({"list": {"type": "json", "value": "[\"a\", \"b\", \"c\"]"}, + "obj": {"type": "json", "value": "[{\"z\": 2, \"h\": 3}, {\"t\": 5, \"s\": 9}]"}, + "zobj": {"type": "json", "value": "{\"az\": [0, 1, 2], \"zot\": [true, false, false]}"}}, formatted_vars) + diff --git a/camunda/variables/variables.py b/camunda/variables/variables.py index 59e200b..46ea4d6 100644 --- a/camunda/variables/variables.py +++ b/camunda/variables/variables.py @@ -25,10 +25,13 @@ def format(cls, variables): """ formatted_vars = {} if variables: - formatted_vars = { - k: v if isinstance(v, dict) else {"value": v} - for k, v in variables.items() - } + for i in variables.keys(): + if type(variables[i]) in [bool, int, float, str]: + formatted_vars[i] = {"value": variables[i]} + elif type(variables[i]) == dict and "value" in variables[i] and type(variables[i]['value']) in [bool, int, float, str]: + formatted_vars[i] = variables[i] + else: + formatted_vars[i] = {"value": json.dumps(variables[i]), "type": "json"} return formatted_vars def to_dict(self): @@ -41,5 +44,8 @@ def to_dict(self): """ result = {} for k, v in self.variables.items(): - result[k] = v["value"] + if 'type' in v and v['type'] == "Json": + result[k] = json.loads(v["value"]) + else: + result[k] = v["value"] return result