From f7c3c85184037f08dba446054eeb0ca641f86e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Werenne?= Date: Mon, 25 May 2020 18:04:57 +0200 Subject: [PATCH 1/2] API change for fuzzy join --- dataikuapi/dss/recipe.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dataikuapi/dss/recipe.py b/dataikuapi/dss/recipe.py index c768e60e..4f5323bd 100644 --- a/dataikuapi/dss/recipe.py +++ b/dataikuapi/dss/recipe.py @@ -475,6 +475,13 @@ class JoinRecipeCreator(VirtualInputsSingleOutputRecipeCreator): def __init__(self, name, project): VirtualInputsSingleOutputRecipeCreator.__init__(self, 'join', name, project) +class FuzzyJoinRecipeCreator(VirtualInputsSingleOutputRecipeCreator): + """ + Create a FuzzyJoin recipe + """ + def __init__(self, name, project): + VirtualInputsSingleOutputRecipeCreator.__init__(self, 'fuzzyjoin', name, project) + class StackRecipeCreator(VirtualInputsSingleOutputRecipeCreator): """ Create a Stack recipe From 223017a5e7dd3691b20ede4d9fa8f3d05636f200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Werenne?= Date: Wed, 17 Jun 2020 16:03:29 +0200 Subject: [PATCH 2/2] merge conflicts --- dataikuapi/dss/app.py | 19 +- dataikuapi/dss/dataset.py | 392 ++++++++++++- dataikuapi/dss/flow.py | 145 ++++- dataikuapi/dss/managedfolder.py | 4 + dataikuapi/dss/ml.py | 224 ++++++-- dataikuapi/dss/plugin.py | 132 ++++- dataikuapi/dss/project.py | 236 ++++++-- dataikuapi/dss/recipe.py | 916 ++++++++++++++++++++++++++++--- dataikuapi/dss/savedmodel.py | 27 +- dataikuapi/dss/tools/__init__.py | 0 dataikuapi/dss/tools/codegen.py | 386 +++++++++++++ dataikuapi/dss/utils.py | 176 ++++++ 12 files changed, 2446 insertions(+), 211 deletions(-) create mode 100644 dataikuapi/dss/tools/__init__.py create mode 100644 dataikuapi/dss/tools/codegen.py diff --git a/dataikuapi/dss/app.py b/dataikuapi/dss/app.py index 3998bd57..15af2d3e 100644 --- a/dataikuapi/dss/app.py +++ b/dataikuapi/dss/app.py @@ -1,4 +1,5 @@ import sys +import re import os.path as osp from .future import DSSFuture from dataikuapi.utils import DataikuException @@ -40,7 +41,8 @@ def create_instance(self, instance_key, instance_name, wait=True): return future def make_random_project_key(self): - return "%s_tmp_%s" % (self.app_id, random_string(10)) + slugified_app_id = re.sub(r'[^A-Za-z_0-9]+', '_', self.app_id) + return "%s_tmp_%s" % (slugified_app_id, random_string(10)) def create_temporary_instance(self): """ @@ -77,15 +79,17 @@ def get_instance(self, instance_key): def get_manifest(self): raw_data = self.client._perform_json("GET", "/apps/%s/" % self.app_id) - return DSSAppManifest(self.client, raw_data) + project_key = self.app_id[8:] if self.app_id.startswith('PROJECT_') else None + return DSSAppManifest(self.client, raw_data, project_key) class DSSAppManifest(object): - def __init__(self, client, raw_data): + def __init__(self, client, raw_data, project_key=None): """The manifest for an app. Do not create this class directly""" self.client = client self.raw_data = raw_data + self.project_key = project_key def get_raw(self): return self.raw_data @@ -97,6 +101,13 @@ def get_runnable_scenarios(self): """Return the scenario identifiers that are declared as actions for this app""" return [x["scenarioId"] for x in self.get_all_actions() if x["type"] == "SCENARIO_RUN"] + def save(self): + """Saves the changes to this manifest object back to the template project""" + if self.project_key is None: + raise Exception("This manifest object wasn't created from a project, cannot be saved back") + self.client._perform_empty("PUT", "/projects/%s/app-manifest" % self.project_key, body=self.raw_data) + + class DSSAppInstance(object): def __init__(self, client, project_key): @@ -124,7 +135,7 @@ def __init__(self, client, project_key): def close(self): - self.get_as_project().delete() + self.get_as_project().delete(drop_data=True) def __enter__(self,): return self diff --git a/dataikuapi/dss/dataset.py b/dataikuapi/dss/dataset.py index 7d8ca1b4..47dec896 100644 --- a/dataikuapi/dss/dataset.py +++ b/dataikuapi/dss/dataset.py @@ -1,11 +1,53 @@ from ..utils import DataikuException from ..utils import DataikuUTF8CSVReader from ..utils import DataikuStreamedHttpUTF8CSVReader -import json +from .future import DSSFuture +import json, warnings +from .utils import DSSTaggableObjectListItem, DSSTaggableObjectSettings from .future import DSSFuture from .metrics import ComputedMetrics from .discussion import DSSObjectDiscussions from .statistics import DSSStatisticsWorksheet +from . import recipe + +class DSSDatasetListItem(DSSTaggableObjectListItem): + """An item in a list of datasets. Do not instantiate this class""" + def __init__(self, client, data): + super(DSSDatasetListItem, self).__init__(data) + self.client = client + + def to_dataset(self): + """Gets the :class:`DSSDataset` corresponding to this dataset""" + return DSSDataset(self.client, self._data["projectKey"], self._data["name"]) + + @property + def name(self): + return self._data["name"] + @property + def id(self): + return self._data["name"] + @property + def type(self): + return self._data["type"] + @property + def schema(self): + return self._data["schema"] + + @property + def connection(self): + """Returns the connection on which this dataset is attached, or None if there is no connection for this dataset""" + if not "params" in self._data: + return None + return self._data["params"].get("connection", None) + + def get_column(self, column): + """ + Returns the schema column given a name. + :param str column: Column to find + :return a dict of the column settings or None if column does not exist + """ + matched = [col for col in self.schema["columns"] if col["name"] == column] + return None if len(matched) == 0 else matched[0] class DSSDataset(object): """ @@ -13,9 +55,14 @@ class DSSDataset(object): """ def __init__(self, client, project_key, dataset_name): self.client = client + self.project = client.get_project(project_key) self.project_key = project_key self.dataset_name = dataset_name + @property + def name(self): + return self.dataset_name + ######################################################## # Dataset deletion ######################################################## @@ -36,28 +83,67 @@ def delete(self, drop_data=False): # Dataset definition ######################################################## - def get_definition(self): + def get_settings(self): """ - Get the definition of the dataset + Returns the settings of this dataset as a :class:`DSSDatasetSettings`, or one of its subclasses. - Returns: - the definition, as a JSON object + Know subclasses of :class:`DSSDatasetSettings` include :class:`FSLikeDatasetSettings` + and :class:`SQLDatasetSettings` + + You must use :meth:`~DSSDatasetSettings.save()` on the returned object to make your changes effective + on the dataset. + + .. code-block:: python + + # Example: activating discrete partitioning on a SQL dataset + dataset = project.get_dataset("my_database_table") + settings = dataset.get_settings() + settings.add_discrete_partitioning_dimension("country") + settings.save() + + :rtype: :class:`DSSDatasetSettings` """ + data = self.client._perform_json("GET", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name)) + + if data["type"] in self.__class__.FS_TYPES: + return FSLikeDatasetSettings(self, data) + elif data["type"] in self.__class__.SQL_TYPES: + return SQLDatasetSettings(self, data) + else: + return DSSDatasetSettings(self, data) + + + def get_definition(self): + """ + Deprecated. Use :meth:`get_settings` + Get the raw settings of the dataset as a dict + :rtype: dict + """ + warnings.warn("Dataset.get_definition is deprecated, please use get_settings", DeprecationWarning) return self.client._perform_json( "GET", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name)) def set_definition(self, definition): """ + Deprecated. Use :meth:`get_settings` and :meth:`DSSDatasetSettings.save` Set the definition of the dataset - Args: - definition: the definition, as a JSON object. You should only set a definition object - that has been retrieved using the get_definition call. + :param definition: the definition, as a dict. You should only set a definition object + that has been retrieved using the get_definition call. """ + warnings.warn("Dataset.set_definition is deprecated, please use get_settings", DeprecationWarning) return self.client._perform_json( "PUT", "/projects/%s/datasets/%s" % (self.project_key, self.dataset_name), body=definition) + def exists(self): + """Returns whether this dataset exists""" + try: + self.get_metadata() + return True + except Exception as e: + return False + ######################################################## # Dataset metadata ######################################################## @@ -174,6 +260,30 @@ def copy_to(self, target, sync_schema=True, write_mode="OVERWRITE"): # Dataset actions ######################################################## + def build(self, job_type="NON_RECURSIVE_FORCED_BUILD", partitions=None, wait=True, no_fail=False): + """ + Starts a new job to build this dataset and wait for it to complete. + Raises if the job failed. + + .. code-block:: python + + job = dataset.build() + print("Job %s done" % job.id) + + :param job_type: The job type. One of RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD or RECURSIVE_FORCED_BUILD + :param partitions: If the dataset is partitioned, a list of partition ids to build + :param no_fail: if True, does not raise if the job failed. + :return: the :class:`dataikuapi.dss.job.DSSJob` job handle corresponding to the built job + :rtype: :class:`dataikuapi.dss.job.DSSJob` + """ + jd = self.project.new_job(job_type) + jd.with_output(self.dataset_name, partition=partitions) + if wait: + return jd.start_and_wait() + else: + return jd.start() + + def synchronize_hive_metastore(self): """ Synchronize this dataset with the Hive metastore @@ -238,6 +348,85 @@ def uploaded_list_files(self): """ return self.client._perform_json("GET", "/projects/%s/datasets/%s/uploaded/files" % (self.project_key, self.dataset_name)) + ######################################################## + # Lab and ML + # Don't forget to synchronize with DSSProject.* + ######################################################## + + def create_prediction_ml_task(self, target_variable, + ml_backend_type="PY_MEMORY", + guess_policy="DEFAULT", + prediction_type=None, + wait_guess_complete=True): + + """Creates a new prediction task in a new visual analysis lab + for a dataset. + + :param string input_dataset: the dataset to use for training/testing the model + :param string target_variable: the variable to predict + :param string ml_backend_type: ML backend to use, one of PY_MEMORY, MLLIB or H2O + :param string guess_policy: Policy to use for setting the default parameters. Valid values are: DEFAULT, SIMPLE_FORMULA, DECISION_TREE, EXPLANATORY and PERFORMANCE + :param string prediction_type: The type of prediction problem this is. If not provided the prediction type will be guessed. Valid values are: BINARY_CLASSIFICATION, REGRESSION, MULTICLASS + :param boolean wait_guess_complete: if False, the returned ML task will be in 'guessing' state, i.e. analyzing the input dataset to determine feature handling and algorithms. + You should wait for the guessing to be completed by calling + ``wait_guess_complete`` on the returned object before doing anything + else (in particular calling ``train`` or ``get_settings``) + """ + return self.project.create_prediction_ml_task(self.dataset_name, + target_variable = target_variable, ml_backend_type = ml_backend_type, + guess_policy = guess_policy, prediction_type = prediction_type, wait_guess_complete = wait_guess_complete) + + def create_clustering_ml_task(self, input_dataset, + ml_backend_type = "PY_MEMORY", + guess_policy = "KMEANS"): + """Creates a new clustering task in a new visual analysis lab + for a dataset. + + + The returned ML task will be in 'guessing' state, i.e. analyzing + the input dataset to determine feature handling and algorithms. + + You should wait for the guessing to be completed by calling + ``wait_guess_complete`` on the returned object before doing anything + else (in particular calling ``train`` or ``get_settings``) + + :param string ml_backend_type: ML backend to use, one of PY_MEMORY, MLLIB or H2O + :param string guess_policy: Policy to use for setting the default parameters. Valid values are: KMEANS and ANOMALY_DETECTION + """ + return self.project.create_clustering_ml_task(self.dataset_name, + ml_backend_type = ml_backend_type, guess_policy = guess_policy) + + def create_analysis(self): + """ + Creates a new visual analysis lab + """ + return self.project_create_analysis(self.dataset_name) + + def list_analyses(self, as_type="listitems"): + """ + List the visual analyses on this dataset + :param str as_type: How to return the list. Supported values are "listitems" and "objects". + :returns: The list of the analyses. If "as_type" is "listitems", each one as a dict, + If "as_type" is "objects", each one as a :class:`dataikuapi.dss.analysis.DSSAnalysis` + :rtype: list + """ + analysis_list = [al for al in self.project.list_analyses() if self.dataset_name == al.get('inputDataset')] + + if as_type == "listitems" or as_type == "listitem": + return analysis_list + elif as_type == "objects" or as_type == "object": + return [self.project.get_analysis(item["analysisId"])for item in analysis_list] + else: + raise ValueError("Unknown as_type") + + def delete_analyses(self, drop_data=False): + """ + Deletes all analyses that have this dataset as input dataset. Also deletes + ML tasks that are part of the analysis + + :param: bool drop_data: whether to drop data for all ML tasks in the analysis + """ + [analysis.delete(drop_data=drop_data) for analysis in self.list_analyses(as_type="objects")] ######################################################## # Statistics worksheets @@ -346,6 +535,175 @@ def get_object_discussions(self): """ return DSSObjectDiscussions(self.client, self.project_key, "DATASET", self.dataset_name) + ######################################################## + # Test / Autofill + ######################################################## + + FS_TYPES = ["Filesystem", "UploadedFiles", "FilesInFolder", + "HDFS", "S3", "Azure", "GCS", "FTP", "SCP", "SFTP"] + # HTTP is FSLike but not FS + + SQL_TYPES = ["JDBC", "PostgreSQL", "MySQL", "Vertica", "Snowflake", "Redshift", + "Greenplum", "Teradata", "Oracle", "SQLServer", "SAPHANA", "Netezza", + "BigQuery", "Athena", "hiveserver2"] + + def test_and_detect(self, infer_storage_types=False): + settings = self.get_settings() + + if settings.type in self.__class__.FS_TYPES: + future_resp = self.client._perform_json("POST", + "/projects/%s/datasets/%s/actions/testAndDetectSettings/fsLike"% (self.project_key, self.dataset_name), + body = {"detectPossibleFormats" : True, "inferStorageTypes" : infer_storage_types }) + + return DSSFuture(self.client, future_resp.get('jobId', None), future_resp) + elif settings.type in self.__class__.SQL_TYPES: + return self.client._perform_json("POST", + "/projects/%s/datasets/%s/actions/testAndDetectSettings/externalSQL"% (self.project_key, self.dataset_name)) + else: + raise ValueError("don't know how to test/detect on dataset type:%s" % settings.type) + + def autodetect_settings(self, infer_storage_types=False): + settings = self.get_settings() + + if settings.type in self.__class__.FS_TYPES: + future = self.test_and_detect(infer_storage_types) + result = future.wait_for_result() + + if not "format" in result or not result["format"]["ok"]: + raise DataikuException("Format detection failed, complete response is " + json.dumps(result)) + + settings.get_raw()["formatType"] = result["format"]["type"] + settings.get_raw()["formatParams"] = result["format"]["params"] + settings.get_raw()["schema"] = result["format"]["schemaDetection"]["newSchema"] + + return settings + + elif settings.type in self.__class__.SQL_TYPES: + result = self.test_and_detect() + + if not "schemaDetection" in result: + raise DataikuException("Format detection failed, complete response is " + json.dumps(result)) + + settings.get_raw()["schema"] = result["schemaDetection"]["newSchema"] + return settings + + else: + raise ValueError("don't know how to test/detect on dataset type:%s" % settings.type) + + def get_as_core_dataset(self): + import dataiku + return dataiku.Dataset("%s.%s" % (self.project_key, self.dataset_name)) + + ######################################################## + # Creation of recipes + ######################################################## + + def new_code_recipe(self, type, code=None, recipe_name=None): + """ + Starts creation of a new code recipe taking this dataset as input + :param str type: Type of the recipe ('python', 'r', 'pyspark', 'sparkr', 'sql', 'sparksql', 'hive', ...) + :param str code: The code of the recipe + """ + + if type == "python": + builder = recipe.PythonRecipeCreator(recipe_name, self.project) + else: + builder = recipe.CodeRecipeCreator(recipe_name, type, self.project) + builder.with_input(self.dataset_name) + if code is not None: + builder.with_script(code) + return builder + + def new_recipe(self, type, recipe_name=None): + """ + Starts creation of a new recipe taking this dataset as input. + For more details, please see :meth:`dataikuapi.dss.project.DSSProject.new_recipe` + + :param str type: Type of the recipe + """ + builder = self.project.new_recipe(type=type, name=recipe_name) + builder.with_input(self.dataset_name) + return builder + +class DSSDatasetSettings(DSSTaggableObjectSettings): + def __init__(self, dataset, settings): + super(DSSDatasetSettings, self).__init__(settings) + self.dataset = dataset + self.settings = settings + + def get_raw(self): + """Get the raw dataset settings as a dict""" + return self.settings + + def get_raw_params(self): + """Get the type-specific params, as a raw dict""" + return self.settings["params"] + + @property + def type(self): + return self.settings["type"] + + def remove_partitioning(self): + self.settings["partitioning"] = {"dimensions" : []} + + def add_discrete_partitioning_dimension(self, dim_name): + self.settings["partitioning"]["dimensions"].append({"name": dim_name, "type": "value"}) + + def add_time_partitioning_dimension(self, dim_name, period="DAY"): + self.settings["partitioning"]["dimensions"].append({"name": dim_name, "type": "time", "params":{"period": period}}) + + def add_raw_schema_column(self, column): + self.settings["schema"]["columns"].append(column) + + def save(self): + self.dataset.client._perform_empty( + "PUT", "/projects/%s/datasets/%s" % (self.dataset.project_key, self.dataset.dataset_name), + body=self.settings) + +class FSLikeDatasetSettings(DSSDatasetSettings): + def __init__(self, dataset, settings): + super(FSLikeDatasetSettings, self).__init__(dataset, settings) + + def set_connection_and_path(self, connection, path): + self.settings["params"]["connection"] = connection + self.settings["params"]["path"] = path + + def get_raw_format_params(self): + """Get the raw format parameters as a dict""" + return self.settings["formatParams"] + + def set_format(self, format_type, format_params = None): + if format_params is None: + format_params = {} + self.settings["formatType"] = format_type + self.settings["formatParams"] = format_params + + def set_csv_format(self, separator=",", style="excel", skip_rows_before=0, header_row=True, skip_rows_after=0): + format_params = { + "style" : style, + "separator": separator, + "skipRowsBeforeHeader": skip_rows_before, + "parseHeaderRow": header_row, + "skipRowsAfterHeader": skip_rows_after + } + self.set_format("csv", format_params) + + def set_partitioning_file_pattern(self, pattern): + self.settings["partitioning"]["filePathPattern"] = pattern + +class SQLDatasetSettings(DSSDatasetSettings): + def __init__(self, dataset, settings): + super(SQLDatasetSettings, self).__init__(dataset, settings) + + def set_table(self, connection, schema, table): + """Sets this SQL dataset in 'table' mode, targeting a particular table of a connection""" + self.settings["params"].update({ + "connection": connection, + "mode": "table", + "schema": schema, + "table": table + }) + class DSSManagedDatasetCreationHelper(object): def __init__(self, project, dataset_name): @@ -382,15 +740,27 @@ def with_copy_partitioning_from(self, dataset_ref, object_type='DATASET'): self.creation_settings["partitioningOptionId"] = "copy:%s:%s" % (code, dataset_ref) return self - def create(self): + def create(self, overwrite=False): """ Executes the creation of the managed dataset according to the selected options - + :param overwrite: If the dataset being created already exists, delete it first (removing data) :return: The :class:`DSSDataset` corresponding to the newly created dataset """ + if overwrite and self.already_exists(): + self.project.get_dataset(self.dataset_name).delete(drop_data = True) + self.project.client._perform_json("POST", "/projects/%s/datasets/managed" % self.project.project_key, body = { "name": self.dataset_name, "creationSettings": self.creation_settings }) - return DSSDataset(self.project.client, self.project.project_key, self.dataset_name) \ No newline at end of file + return DSSDataset(self.project.client, self.project.project_key, self.dataset_name) + + def already_exists(self): + """Returns whether this managed dataset already exists""" + dataset = self.project.get_dataset(self.dataset_name) + try: + dataset.get_metadata() + return True + except Exception as e: + return False diff --git a/dataikuapi/dss/flow.py b/dataikuapi/dss/flow.py index 24136897..e4cc141a 100644 --- a/dataikuapi/dss/flow.py +++ b/dataikuapi/dss/flow.py @@ -1,13 +1,17 @@ from .utils import AnyLoc -from .recipe import DSSRecipeDefinitionAndPayload +from .dataset import DSSDataset +from .recipe import DSSRecipe, DSSRecipeDefinitionAndPayload from .future import DSSFuture -import logging +import logging, json class DSSProjectFlow(object): def __init__(self, client, project): self.client = client self.project = project + def get_graph(self): + data = self.client._perform_json("GET", "/projects/%s/flow/graph/" % (self.project.project_key)) + return DSSProjectFlowGraph(self, data) def replace_input_computable(self, current_ref, new_ref, type="DATASET"): """ @@ -37,7 +41,8 @@ def replace_input_computable(self, current_ref, new_ref, type="DATASET"): settings.save() for recipe in self.project.list_recipes(): - fake_rap = DSSRecipeDefinitionAndPayload({"recipe" : recipe}) + recipe_handle = self.project.get_recipe(recipe["name"]) + fake_rap = DSSRecipeDefinitionAndPayload(recipe_handle, {"recipe" : recipe}) if fake_rap.has_input(current_ref): logging.info("Recipe %s has %s as input, performing the replacement by %s"% \ (recipe["name"], current_ref, new_ref)) @@ -81,6 +86,140 @@ def propagate_schema(self, dataset_name, rebuild=False, recipe_update_options={} return DSSFuture(self.client,update_future.get('jobId', None), update_future) +class DSSProjectFlowGraph(object): + + def __init__(self, flow, data): + self.flow = flow + self.data = data + self.nodes = data["nodes"] + + def get_source_computables(self, as_type="dict"): + """ + Returns the list of source computables. + :param as_type: How to return the source computables. Possible values are "dict" and "object" + + :return: if as_type=dict, each computable is returned as a dict containing at least "ref" and "type". + if as_type=object, each computable is returned as a :class:`dataikuapi.dss.dataset.DSSDataset`, + :class:`dataikuapi.dss.managedfolder.DSSManagedFolder`, + :class:`dataikuapi.dss.savedmodel.DSSSavedModel`, or streaming endpoint + """ + ret = [] + for node in self.nodes.values(): + if len(node["predecessors"]) == 0 and node["type"].startswith("COMPUTABLE"): + ret.append(node) + return self._convert_nodes_list(ret, as_type) + + def get_source_recipes(self, as_type="dict"): + """ + Returns the list of source recipes. + :param as_type: How to return the source recipes. Possible values are "dict" and "object" + + :return: if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type". + if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`, + """ + ret = [] + for node in self.nodes.values(): + if len(node["predecessors"]) == 0 and node["type"] == "RUNNABLE_RECIPE": + ret.append(node) + return self._convert_nodes_list(ret, as_type) + + def get_source_datasets(self): + """ + Returns the list of source datasets for this project. + :rtype list of :class:`dataikuapi.dss.dataset.DSSDataset` + """ + return [self._get_object_from_graph_node(x) for x in self.get_source_computables() if x["type"] == "COMPUTABLE_DATASET"] + + def get_successor_recipes(self, node, as_type="dict"): + """ + Returns a list of recipes that are a successor of a graph node + + :param node: Either a name or :class:`dataikuapi.dss.dataset.DSSDataset` object + :param as_type: How to return the successor recipes. Possible values are "dict" and "object" + :return if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type". + if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`, + """ + if isinstance(node, DSSDataset): + node = node.dataset_name + + computable = self.nodes.get(node, None) + if computable is None: + raise ValueError("Computable %s not found in Flow graph" % node) + + runnables = [self.nodes[x] for x in computable["successors"]] + return self._convert_nodes_list(runnables, as_type) + + def get_successor_computables(self, node, as_type="dict"): + """ + Returns a list of computables that are a successor of a given graph node + + :param as_type: How to return the successor recipes. Possible values are "dict" and "object" + :return if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type". + if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`, + """ + if isinstance(node, DSSRecipe): + node = node.recipe_name + runnable = self.nodes.get(node, None) + if runnable is None: + raise ValueError("Runnable %s not found in Flow graph" % node) + + computables = [self.nodes[x] for x in runnable["successors"]] + return self._convert_nodes_list(computables, as_type) + + def _convert_nodes_list(self, nodes, as_type): + if as_type == "object" or as_type == "objects": + return [self._get_object_from_graph_node(node) for node in nodes] + else: + return nodes + + def _get_object_from_graph_node(self, node): + if node["type"] == "COMPUTABLE_DATASET": + return DSSDataset(self.flow.client, self.flow.project.project_key, node["ref"]) + elif node["type"] == "RUNNABLE_RECIPE": + return DSSRecipe(self.flow.client, self.flow.project.project_key, node["ref"]) + else: + # TODO + raise Exception("unsupported node type %s" % node["type"]) + + def get_items_in_traversal_order(self, as_type="dict"): + ret = [] + def add_to_set(node): + #print("*** Adding: %s" % node["ref"]) + ret.append(node) + def in_set(obj): + for candidate in ret: + if candidate["type"] == obj["type"] and candidate["ref"] == obj["ref"]: + return True + return False + + def add_from(graph_node): + #print("Add from %s" % graph_node["ref"]) + # To keep traversal order, we recurse to predecessors first + for predecessor_ref in graph_node["predecessors"]: + #print(" Pred = %s " % predecessor_ref) + predecessor_node = self.nodes[predecessor_ref] + if not in_set(predecessor_node): + add_from(predecessor_node) + + # Then add ourselves + if not in_set(graph_node): + add_to_set(graph_node) + + # Then recurse to successors + for successor_ref in graph_node["successors"]: + #print(" Succ = %s " % successor_ref) + successor_node = self.nodes[successor_ref] + if not in_set(successor_node): + add_from(successor_node) + + for source_computable in self.get_source_computables(): + add_from(source_computable) + + for source_recipe in self.get_source_recipes(): + add_from(source_recipe) + + return self._convert_nodes_list(ret, as_type) + class DSSFlowTool(object): """ Handle to interact with a flow tool diff --git a/dataikuapi/dss/managedfolder.py b/dataikuapi/dss/managedfolder.py index f8e18ee7..09fc69fb 100644 --- a/dataikuapi/dss/managedfolder.py +++ b/dataikuapi/dss/managedfolder.py @@ -15,6 +15,10 @@ def __init__(self, client, project_key, odb_id): self.project_key = project_key self.odb_id = odb_id + @property + def id(self): + return self.odb_id + ######################################################## # Managed folder deletion ######################################################## diff --git a/dataikuapi/dss/ml.py b/dataikuapi/dss/ml.py index 387d8848..80f2ca4a 100644 --- a/dataikuapi/dss/ml.py +++ b/dataikuapi/dss/ml.py @@ -1,7 +1,7 @@ from ..utils import DataikuException from ..utils import DataikuUTF8CSVReader from ..utils import DataikuStreamedHttpUTF8CSVReader -import json +import json, warnings import time from .metrics import ComputedMetrics from .utils import DSSDatasetSelectionBuilder, DSSFilterBuilder @@ -10,10 +10,21 @@ class PredictionSplitParamsHandler(object): """Object to modify the train/test splitting params.""" + SPLIT_PARAMS_KEY = 'splitParams' + def __init__(self, mltask_settings): """Do not call directly, use :meth:`DSSMLTaskSettings.get_split_params`""" self.mltask_settings = mltask_settings + def get_raw(self): + """Gets the raw settings of the prediction split configuration. This returns a reference to the raw settings, not a copy, + so changes made to the returned object will be reflected when saving. + + :rtype: dict + """ + return self.mltask_settings[PredictionSplitParamsHandler.SPLIT_PARAMS_KEY] + + def set_split_random(self, train_ratio = 0.8, selection = None, dataset_name=None): """ Sets the train/test split to random splitting of an extract of a single dataset @@ -22,7 +33,7 @@ def set_split_random(self, train_ratio = 0.8, selection = None, dataset_name=Non :param object selection: A :class:`~dataikuapi.dss.utils.DSSDatasetSelectionBuilder` to build the settings of the extract of the dataset. May be None (won't be changed) :param str dataset_name: Name of dataset to split. If None, the main dataset used to create the visual analysis will be used. """ - sp = self.mltask_settings["splitParams"] + sp = self.mltask_settings[PredictionSplitParamsHandler.SPLIT_PARAMS_KEY] sp["ttPolicy"] = "SPLIT_SINGLE_DATASET" if selection is not None: if isinstance(selection, DSSDatasetSelectionBuilder): @@ -36,6 +47,8 @@ def set_split_random(self, train_ratio = 0.8, selection = None, dataset_name=Non if dataset_name is not None: sp["ssdDatasetSmartName"] = dataset_name + return self + def set_split_kfold(self, n_folds = 5, selection = None, dataset_name=None): """ Sets the train/test split to k-fold splitting of an extract of a single dataset @@ -44,7 +57,7 @@ def set_split_kfold(self, n_folds = 5, selection = None, dataset_name=None): :param object selection: A :class:`~dataikuapi.dss.utils.DSSDatasetSelectionBuilder` to build the settings of the extract of the dataset. May be None (won't be changed) :param str dataset_name: Name of dataset to split. If None, the main dataset used to create the visual analysis will be used. """ - sp = self.mltask_settings["splitParams"] + sp = self.mltask_settings[PredictionSplitParamsHandler.SPLIT_PARAMS_KEY] sp["ttPolicy"] = "SPLIT_SINGLE_DATASET" if selection is not None: if isinstance(selection, DSSDatasetSelectionBuilder): @@ -58,6 +71,8 @@ def set_split_kfold(self, n_folds = 5, selection = None, dataset_name=None): if dataset_name is not None: sp["ssdDatasetSmartName"] = dataset_name + return self + def set_split_explicit(self, train_selection, test_selection, dataset_name=None, test_dataset_name=None, train_filter=None, test_filter=None): """ Sets the train/test split to explicit extract of one or two dataset(s) @@ -69,7 +84,7 @@ def set_split_explicit(self, train_selection, test_selection, dataset_name=None, :param object train_filter: A :class:`~dataikuapi.dss.utils.DSSFilterBuilder` to build the settings of the filter of the train dataset. May be None (won't be changed) :param object test_filter: A :class:`~dataikuapi.dss.utils.DSSFilterBuilder` to build the settings of the filter of the test dataset. May be None (won't be changed) """ - sp = self.mltask_settings["splitParams"] + sp = self.mltask_settings[PredictionSplitParamsHandler.SPLIT_PARAMS_KEY] if dataset_name is None: raise Exception("For explicit splitting a dataset_name is mandatory") if test_dataset_name is None or test_dataset_name == dataset_name: @@ -108,44 +123,15 @@ def set_split_explicit(self, train_selection, test_selection, dataset_name=None, else: test_split["filter"] = test_filter + return self -class DSSMLTaskSettings(object): - """ - Object to read and modify the settings of a ML task. - - Do not create this object directly, use :meth:`DSSMLTask.get_settings()` instead - """ - def __init__(self, client, project_key, analysis_id, mltask_id, mltask_settings): - self.client = client - self.project_key = project_key - self.analysis_id = analysis_id - self.mltask_id = mltask_id - self.mltask_settings = mltask_settings - - def get_raw(self): - """ - Gets the raw settings of this ML Task. This returns a reference to the raw settings, not a copy, - so changes made to the returned object will be reflected when saving. - - :rtype: dict - """ - return self.mltask_settings - - def get_split_params(self): - """ - Gets an object to modify train/test splitting params. - - :rtype: :class:`PredictionSplitParamsHandler` + def set_time_ordering(self, feature_name, ascending=True): """ - return PredictionSplitParamsHandler(self.mltask_settings) - - def split_ordered_by(self, feature_name, ascending=True): - """ - Uses a variable to sort the data for train/test split and hyperparameter optimization + Uses a variable to sort the data for train/test split and hyperparameter optimization by time :param str feature_name: Name of the variable to use :param bool ascending: True iff the test set is expected to have larger time values than the train set """ - self.remove_ordered_split() + self.unset_time_ordering() if not feature_name in self.mltask_settings["preprocessing"]["per_feature"]: raise ValueError("Feature %s doesn't exist in this ML task, can't use as time" % feature_name) self.mltask_settings['time']['enabled'] = True @@ -160,9 +146,11 @@ def split_ordered_by(self, feature_name, ascending=True): elif self.mltask_settings['modeling']['gridSearchParams']['mode'] == "SHUFFLE": self.mltask_settings['modeling']['gridSearchParams']['mode'] = "TIME_SERIES_SINGLE_SPLIT" - def remove_ordered_split(self): + return self + + def unset_time_ordering(self): """ - Remove time-based ordering. + Remove time-based ordering for train/test split and hyperparameter optimization """ self.mltask_settings['time']['enabled'] = False self.mltask_settings['time']['timeVariable'] = None @@ -174,6 +162,31 @@ def remove_ordered_split(self): elif self.mltask_settings['modeling']['gridSearchParams']['mode'] == "TIME_SERIES_SINGLE_SPLIT": self.mltask_settings['modeling']['gridSearchParams']['mode'] = "SHUFFLE" + return self + + +class DSSMLTaskSettings(object): + """ + Object to read and modify the settings of a ML task. + + Do not create this object directly, use :meth:`DSSMLTask.get_settings()` instead + """ + def __init__(self, client, project_key, analysis_id, mltask_id, mltask_settings): + self.client = client + self.project_key = project_key + self.analysis_id = analysis_id + self.mltask_id = mltask_id + self.mltask_settings = mltask_settings + + def get_raw(self): + """ + Gets the raw settings of this ML Task. This returns a reference to the raw settings, not a copy, + so changes made to the returned object will be reflected when saving. + + :rtype: dict + """ + return self.mltask_settings + def get_feature_preprocessing(self, feature_name): """ Gets the feature preprocessing params for a particular feature. This returns a reference to the @@ -214,27 +227,6 @@ def use_feature(self, feature_name): """ self.get_feature_preprocessing(feature_name)["role"] = "INPUT" - def use_sample_weighting(self, feature_name): - """ - Uses a feature as sample weight - :param str feature_name: Name of the feature to use - """ - self.remove_sample_weighting() - if not feature_name in self.mltask_settings["preprocessing"]["per_feature"]: - raise ValueError("Feature %s doesn't exist in this ML task, can't use as weight" % feature_name) - self.mltask_settings['weight']['weightMethod'] = 'SAMPLE_WEIGHT' - self.mltask_settings['weight']['sampleWeightVariable'] = feature_name - self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] = 'WEIGHT' - - def remove_sample_weighting(self): - """ - Remove sample weighting. If a feature was used as weight, it's set back to being an input feature - """ - self.mltask_settings['weight']['weightMethod'] = 'NO_WEIGHTING' - for feature_name in self.mltask_settings['preprocessing']['per_feature']: - if self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] == 'WEIGHT': - self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] = 'INPUT' - def get_algorithm_settings(self, algorithm_name): """ Gets the training settings for a particular algorithm. This returns a reference to the @@ -360,6 +352,118 @@ class DSSPredictionMLTaskSettings(DSSMLTaskSettings): "KERAS_CODE" : "keras" } + class PredictionTypes: + BINARY = "BINARY_CLASSIFICATION" + REGRESSION = "REGRESSION" + MULTICLASS = "MULTICLASS" + + def __init__(self, client, project_key, analysis_id, mltask_id, mltask_settings): + DSSMLTaskSettings.__init__(self, client, project_key, analysis_id, mltask_id, mltask_settings) + + if self.get_prediction_type() not in [self.PredictionTypes.BINARY, self.PredictionTypes.REGRESSION, self.PredictionTypes.MULTICLASS]: + raise ValueError("Unknown prediction type: {}".format(self.prediction_type)) + + self.classification_prediction_types = [self.PredictionTypes.BINARY, self.PredictionTypes.MULTICLASS] + + def get_prediction_type(self): + return self.mltask_settings['predictionType'] + + @property + def split_params(self): + """ + Gets a handle to modify train/test splitting params. + + :rtype: :class:`PredictionSplitParamsHandler` + """ + return self.get_split_params() + + def get_split_params(self): + """ + Gets a handle to modify train/test splitting params. + + :rtype: :class:`PredictionSplitParamsHandler` + """ + return PredictionSplitParamsHandler(self.mltask_settings) + + def split_ordered_by(self, feature_name, ascending=True): + """ + Deprecated. Use split_params.set_time_ordering() + """ + warnings.warn("split_ordered_by() is deprecated, please use split_params.set_time_ordering() instead", DeprecationWarning) + self.split_params.set_time_ordering(feature_name, ascending=ascending) + + return self + + def remove_ordered_split(self): + """ + Deprecated. Use split_params.unset_time_ordering() + """ + warnings.warn("remove_ordered_split() is deprecated, please use split_params.unset_time_ordering() instead", DeprecationWarning) + self.split_params.unset_time_ordering() + + return self + + def use_sample_weighting(self, feature_name): + """ + Deprecated. use set_weighting() + """ + warnings.warn("use_sample_weighting() is deprecated, please use set_weighting() instead", DeprecationWarning) + return self.set_weighting(method='SAMPLE_WEIGHT', feature_name=feature_name, ) + + def set_weighting(self, method, feature_name=None): + """ + Sets the method to weight samples. + + If there was a WEIGHT feature declared previously, it will be set back as an INPUT feature first. + + :param str method: Method to use. One of NO_WEIGHTING, SAMPLE_WEIGHT (must give a feature name), + CLASS_WEIGHT or CLASS_AND_SAMPLE_WEIGHT (must give a feature name) + :param str feature_name: Name of the feature to use as sample weight + """ + + # First, if there was a WEIGHT feature, restore it as INPUT + for feature_name in self.mltask_settings['preprocessing']['per_feature']: + if self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] == 'WEIGHT': + self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] = 'INPUT' + + if method == "NO_WEIGHTING": + self.mltask_settings['weight']['weightMethod'] = method + + elif method == "SAMPLE_WEIGHT": + if not feature_name in self.mltask_settings["preprocessing"]["per_feature"]: + raise ValueError("Feature %s doesn't exist in this ML task, can't use as weight" % feature_name) + + self.mltask_settings['weight']['weightMethod'] = method + self.mltask_settings['weight']['sampleWeightVariable'] = feature_name + self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] = 'WEIGHT' + + elif method == "CLASS_WEIGHT": + if self.get_prediction_type() not in self.classification_prediction_types: + raise ValueError("Weighting method: {} not compatible with prediction type: {}, should be in {}".format(method, self.get_prediction_type(), self.classification_prediction_types)) + + self.mltask_settings['weight']['weightMethod'] = method + + elif method == "CLASS_AND_SAMPLE_WEIGHT": + if self.get_prediction_type() not in self.classification_prediction_types: + raise ValueError("Weighting method: {} not compatible with prediction type: {}, should be in {}".format(method, self.get_prediction_type(), self.classification_prediction_types)) + if not feature_name in self.mltask_settings["preprocessing"]["per_feature"]: + raise ValueError("Feature %s doesn't exist in this ML task, can't use as weight" % feature_name) + + self.mltask_settings['weight']['weightMethod'] = method + self.mltask_settings['weight']['sampleWeightVariable'] = feature_name + self.mltask_settings['preprocessing']['per_feature'][feature_name]['role'] = 'WEIGHT' + + else: + raise ValueError("Unknown weighting method: {}".format(method)) + + return self + + def remove_sample_weighting(self): + """ + Deprecated. Use unset_weighting() instead + """ + warnings.warn("remove_sample_weighting() is deprecated, please use set_weigthing(method=\"NO_WEIGHTING\") instead", DeprecationWarning) + return self.unset_weighting() class DSSClusteringMLTaskSettings(DSSMLTaskSettings): __doc__ = [] diff --git a/dataikuapi/dss/plugin.py b/dataikuapi/dss/plugin.py index de661e05..ecc1f3cc 100644 --- a/dataikuapi/dss/plugin.py +++ b/dataikuapi/dss/plugin.py @@ -1,11 +1,5 @@ -from .dataset import DSSDataset -from .recipe import DSSRecipe -from .managedfolder import DSSManagedFolder -from .savedmodel import DSSSavedModel -from .job import DSSJob -from .scenario import DSSScenario -from .apiservice import DSSAPIService -import sys +from dataikuapi.utils import DataikuException + class DSSPluginSettings(object): """ @@ -30,6 +24,7 @@ def save(self): """Saves the settings to DSS""" self.client._perform_empty("POST", "/plugins/%s/settings" % (self.plugin_id), body=self.settings) + class DSSPlugin(object): """ A plugin on the DSS instance @@ -116,6 +111,41 @@ def update_from_git(self, repository_url, checkout = "master", subpath=None): }) return self.client.get_future(ret["jobId"]) + ######################################################## + # Plugin uninstall/delete + ######################################################## + + def list_usages(self, project_key=None): + """ + Get the list of usages of the plugin. + + :param str project_key: optional key of project where to look for usages. Default is None and looking in all projects. + :return: a :class:`DSSPluginUsages` + """ + return DSSPluginUsages( + self.client._perform_json("GET", "/plugins/{pluginId}/actions/listUsages".format(pluginId=self.plugin_id), + params={"projectKey": project_key}) + ) + + def delete(self, force=False): + """ + Delete a plugin. + + If not forced (default), pre-deletion checks will be run (as by :func:`prepare_delete` and the deletion will be + performed if and only if no usage of the plugin is detected and no error occurred during usages analysis. + + :param bool force: if True, plugin will be deleted even if usages are found or errors occurred during usages + analysis. Default is False. + :return: a :class:`dataikuapi.dssfuture.DSSFuture` + """ + + params = { + "force": force + } + ret = self.client._perform_json("POST", "/plugins/{pluginId}/actions/delete".format(pluginId=self.plugin_id), + body=params) + return self.client.get_future(ret.get("jobId", None)) + ######################################################## # Managing the dev plugin's contents ######################################################## @@ -147,4 +177,88 @@ def put_file(self, path, f): data = f.read() # eat it all, because making it work with a path variable and a MultifilePart in swing looks complicated return self.client._perform_empty("POST", "/plugins/%s/contents/%s" % (self.plugin_id, path), raw_body=data) - \ No newline at end of file + +class DSSPluginUsage(object): + """ + Information on a usage of an element of a plugin. + + Has the following properties: + - element_kind: webapps, python-formats,... + - element_type: type name of the element + - object_id: id of the object using the plugin element. Can be None. + - object_type: type of the object using the plugin element. Can be None. + - project_key: project key of the object using the plugin element. Can be None. + """ + def __init__(self, data): + """ + Instantiate a DSSPluginUsage from the dict of its properties. + :param dict data: dict of properties + """ + self.element_kind = data["elementKind"] + self.element_type = data["elementType"] + self.object_id = data.get("objectId", None) + self.object_type = data.get("objectType", None) + self.project_key = data.get("projectKey", None) + + +class DSSMissingType(object): + """ + Information on a type not found while analyzing usages of a plugin. + + Has the following properties: + - missing type: the missing type + - object_id: id of the object depending on the missing type. Can be None. + - object_type: type of the object depending on the missing type. Can be None. + - project_key: project key of the object depending on the missing type. Can be None. + """ + def __init__(self, data): + """ + Instantiate a DSSMissingType from the dict of its properties + :param dict data: dictionary of properties + """ + self.missing_type = data["missingType"] + self.object_id = data.get("objectId", None) + self.object_type = data.get("objectType", None) + self.project_key = data.get("projectKey", None) + + +class DSSPluginUsages(object): + """ + Information on the usages of a plugin. + + Has the following properties: + - usages (list of :class:`DSSPluginUsage`) + - missing_types (a list of :class:`DSSMissingType`). + + Some custom types may not be found during usage analysis, typically when a plugin was removed + but is still used. This prevents some detailed analysis and may hide some uses. + This information is provided in missing_types. + """ + def __init__(self, data): + """ + Initialize a DSSPluginUsages from a dict of its properties + + :param dict data: the usages as json dict + """ + self._data = data + self.usages = [] + self.missing_types = [] + for json_usage in data.get("usages", []): + self.usages.append(DSSPluginUsage(json_usage)) + for json_missing_type in data.get("missingTypes"): + self.missing_types.append(DSSMissingType(json_missing_type)) + + def get_raw(self): + """ + Get plugin usages as a dictionary. + :rtype: dict + """ + return self._data + + def maybe_used(self): + """ + Returns true if the plugin maybe in use, as usages of the plugin were found, or errors + encountered during analysis. + :return: + """ + return not (not self.usages and not self.missing_types) diff --git a/dataikuapi/dss/project.py b/dataikuapi/dss/project.py index 16f4dbcf..c7fecb15 100644 --- a/dataikuapi/dss/project.py +++ b/dataikuapi/dss/project.py @@ -1,13 +1,12 @@ -import time -from .dataset import DSSDataset, DSSManagedDatasetCreationHelper +import time, warnings, sys, os.path as osp +from .dataset import DSSDataset, DSSDatasetListItem, DSSManagedDatasetCreationHelper from .recipe import DSSRecipe +from . import recipe from .managedfolder import DSSManagedFolder from .savedmodel import DSSSavedModel from .job import DSSJob, DSSJobWaiter from .scenario import DSSScenario from .apiservice import DSSAPIService -import sys -import os.path as osp from .future import DSSFuture from .notebook import DSSNotebook from .macro import DSSMacro @@ -209,15 +208,22 @@ def set_permissions(self, permissions): # Datasets ######################################################## - def list_datasets(self): + def list_datasets(self, as_type="listitems"): """ - List the datasets in this project - - :returns: The list of the datasets, each one as a dictionary. Each dataset dict contains at least a `name` field which is the name of the dataset - :rtype: list of dicts + List the datasets in this project. + + :param str as_type: How to return the list. Supported values are "listitems" and "objects". + :returns: The list of the datasets. If "as_type" is "listitems", each one as a :class:`dataset.DSSDatasetListItem`. + If "as_type" is "objects", each one as a :class:`dataset.DSSDataset` + :rtype: list """ - return self.client._perform_json( - "GET", "/projects/%s/datasets/" % self.project_key) + items = self.client._perform_json("GET", "/projects/%s/datasets/" % self.project_key) + if as_type == "listitems" or as_type == "listitem": + return [DSSDatasetListItem(self.client, item) for item in items] + elif as_type == "objects" or as_type == "object": + return [DSSDataset(self.client, self.project_key, item["name"]) for item in items] + else: + raise ValueError("Unknown as_type") def get_dataset(self, dataset_name): """ @@ -267,6 +273,68 @@ def create_dataset(self, dataset_name, type, body = obj) return DSSDataset(self.client, self.project_key, dataset_name) + def create_upload_dataset(self, dataset_name, connection=None): + obj = { + "name" : dataset_name, + "projectKey" : self.project_key, + "type" : "UploadedFiles", + "params" : {} + } + if connection is not None: + obj["params"]["uploadConnection"] = connection + self.client._perform_json("POST", "/projects/%s/datasets/" % self.project_key, + body = obj) + return DSSDataset(self.client, self.project_key, dataset_name) + + def create_filesystem_dataset(self, dataset_name, connection, path_in_connection): + return self.create_fslike_dataset(dataset_name, "Filesystem", connection, path_in_connection) + + def create_s3_dataset(self, dataset_name, connection, path_in_connection, bucket=None): + """ + Creates a new external S3 dataset in the project and returns a :class:`~dataikuapi.dss.dataset.DSSDataset` to interact with it. + + The created dataset doesn not have its format and schema initialized, it is recommend to use + :meth:`~dataikuapi.dss.dataset.DSSDataset.autodetect_settings` on the returned object + + :param dataset_name: Name of the dataset to create. Must not already exist + :rtype: `~dataikuapi.dss.dataset.DSSDataset` + """ + extra_params = {} + if bucket is not None: + extra_params["bucket"] = bucket + return self.create_fslike_dataset(dataset_name, "S3", connection, path_in_connection, extra_params) + + def create_fslike_dataset(self, dataset_name, dataset_type, connection, path_in_connection, extra_params=None): + body = { + "name" : dataset_name, + "projectKey" : self.project_key, + "type" : dataset_type, + "params" : { + "connection" : connection, + "path": path_in_connection + } + } + if extra_params is not None: + body["params"].update(extra_params) + self.client._perform_json("POST", "/projects/%s/datasets/" % self.project_key, body = body) + return DSSDataset(self.client, self.project_key, dataset_name) + + def create_sql_table_dataset(self, dataset_name, type, connection, table, schema): + obj = { + "name" : dataset_name, + "projectKey" : self.project_key, + "type" : type, + "params" : { + "connection" : connection, + "mode": "table", + "table" : table, + "schema" : schema + } + } + self.client._perform_json("POST", "/projects/%s/datasets/" % self.project_key, + body = obj) + return DSSDataset(self.client, self.project_key, dataset_name) + def new_managed_dataset_creation_helper(self, dataset_name): """ Creates a helper class to create a managed dataset in the project @@ -277,7 +345,8 @@ def new_managed_dataset_creation_helper(self, dataset_name): return DSSManagedDatasetCreationHelper(self, dataset_name) ######################################################## - # ML + # Lab and ML + # Don't forget to synchronize with DSSDataset.* ######################################################## def create_prediction_ml_task(self, input_dataset, target_variable, @@ -522,7 +591,7 @@ def start_job(self, definition): def start_job_and_wait(self, definition, no_fail=False): """ - Create a new job. Wait the end of the job to complete. + Starts a new job and waits for it to complete. Args: definition: the definition for the job to create. The definition must contain the type of job (RECURSIVE_BUILD, @@ -535,8 +604,27 @@ def start_job_and_wait(self, definition, no_fail=False): waiter = DSSJobWaiter(job) return waiter.wait(no_fail) + def new_job(self, job_type='NON_RECURSIVE_FORCED_BUILD'): + """ + Create a job to be run + + You need to add outputs to the job (i.e. what you want to build) before running it. + + .. code-block:: python + + job_builder = project.new_job() + job_builder.with_output("mydataset") + complete_job = job_builder.start_and_wait() + print("Job %s done" % complete_job.id) + + :rtype: :class:`JobDefinitionBuilder` + """ + return JobDefinitionBuilder(self, job_type) + def new_job_definition_builder(self, job_type='NON_RECURSIVE_FORCED_BUILD'): - return JobDefinitionBuilder(self.project_key, job_type) + """Deprecated. Please use :meth:`new_job`""" + warnings.warn("new_job_definition_builder is deprecated, please use new_job", DeprecationWarning) + return JobDefinitionBuilder(self, job_type) ######################################################## # Variables @@ -750,13 +838,9 @@ def list_recipes(self): def get_recipe(self, recipe_name): """ - Get a handle to interact with a specific recipe - - Args: - recipe_name: the name of the desired recipe - - Returns: - A :class:`dataikuapi.dss.recipe.DSSRecipe` recipe handle + Gets a :class:`dataikuapi.dss.recipe.DSSRecipe` handle to interact with a recipe + :param str recipe_name: The name of the recipe + :rtype :class:`dataikuapi.dss.recipe.DSSRecipe` """ return DSSRecipe(self.client, self.project_key, recipe_name) @@ -783,6 +867,68 @@ def create_recipe(self, recipe_proto, creation_settings): body = definition)['name'] return DSSRecipe(self.client, self.project_key, recipe_name) + def new_recipe(self, type, name=None): + """ + Initializes the creation of a new recipe. Returns a :class:`dataikuapi.dss.recipe.DSSRecipeCreator` + or one of its subclasses to complete the creation of the recipe. + + Usage example: + + .. code-block:: python + + grouping_recipe_builder = project.new_recipe("grouping") + grouping_recipe_builder.with_input("dataset_to_group_on") + # Create a new managed dataset for the output in the "filesystem_managed" connection + grouping_recipe_builder.with_new_output("grouped_dataset", "filesystem_managed") + grouping_recipe_builder.with_group_key("column") + recipe = grouping_recipe_builder.build() + + # After the recipe is created, you can edit its settings + recipe_settings = recipe.get_settings() + recipe_settings.set_column_aggregations("value", sum=True) + recipe_settings.save() + + # And you may need to apply new schemas to the outputs + recipe.compute_schema_updates().apply() + + :param str type: Type of the recipe + :param str name: Optional, base name for the new recipe. + :rtype: :class:`dataikuapi.dss.recipe.DSSRecipeCreator` + """ + + if type == "grouping": + return recipe.GroupingRecipeCreator(name, self) + elif type == "window": + return recipe.WindowRecipeCreator(name, self) + elif type == "sync": + return recipe.SyncRecipeCreator(name, self) + elif type == "sort": + return recipe.SortRecipeCreator(name, self) + elif type == "topn": + return recipe.TopNRecipeCreator(name, self) + elif type == "distinct": + return recipe.DistinctRecipeCreator(name, self) + elif type == "join": + return recipe.JoinRecipeCreator(name, self) + elif type == "vstack": + return recipe.StackRecipeCreator(name, self) + elif type == "sampling": + return recipe.SamplingRecipeCreator(name, self) + elif type == "split": + return recipe.SplitRecipeCreator(name, self) + elif type == "prepare" or type == "shaker": + return recipe.PrepareRecipeCreator(name, self) + elif type == "prediction_scoring": + return recipe.PredictionScoringRecipeCreator(name, self) + elif type == "clustering_scoring": + return recipe.ClusteringScoringRecipeCreator(name, self) + elif type == "download": + return recipe.DownloadRecipeCreator(name, self) + elif type == "sql_query": + return recipe.SQLQueryRecipeCreator(name, self) + elif type in ["python", "r", "sql_script", "pyspark", "sparkr", "spark_scala", "shell"]: + return recipe.CodeRecipeCreator(name, type, self) + ######################################################## # Flow ######################################################## @@ -965,7 +1111,7 @@ def to_schema_table_pair(x): def get_app_manifest(self): raw_data = self.client._perform_json("GET", "/projects/%s/app-manifest" % self.project_key) - return DSSAppManifest(self.client, raw_data) + return DSSAppManifest(self.client, raw_data, self.project_key) class TablesImportDefinition(object): @@ -1141,16 +1287,11 @@ def save(self): body = self.settings) class JobDefinitionBuilder(object): - def __init__(self, project_key, job_type="NON_RECURSIVE_FORCED_BUILD"): - """ - Create a helper to build a job definition - - :param project_key: the project in which the job is launched - :param job_type: the build type for the job RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD, - RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD - - """ - self.project_key = project_key + """ + Helper to run a job. Do not create this class directly, use :meth:`DSSProject.new_job` + """ + def __init__(self, project, job_type="NON_RECURSIVE_FORCED_BUILD"): + self.project = project self.definition = {'type':job_type, 'refreshHiveMetastore':False, 'outputs':[]} def with_type(self, job_type): @@ -1173,10 +1314,39 @@ def with_refresh_metastore(self, refresh_metastore): def with_output(self, name, object_type=None, object_project_key=None, partition=None): """ - Adds an item to build in the definition + Adds an item to build in this job """ self.definition['outputs'].append({'type':object_type, 'id':name, 'projectKey':object_project_key, 'partition':partition}) return self def get_definition(self): + """Gets the internal definition for this job""" return self.definition + + def start(self): + """ + Starts the job, and return a :doc:`dataikuapi.dss.job.DSSJob` handle to interact with it. + + You need to wait for the returned job to complete + + :return: the :class:`dataikuapi.dss.job.DSSJob` job handle + :rtype: :class:`dataikuapi.dss.job.DSSJob` + """ + job_def = self.project.client._perform_json("POST", + "/projects/%s/jobs/" % self.project.project_key, body = self.definition) + return DSSJob(self.project.client, self.project.project_key, job_def['id']) + + def start_and_wait(self, no_fail=False): + """ + Starts the job, waits for it to complete and returns a a :doc:`dataikuapi.dss.job.DSSJob` handle to interact with it + + Raises if the job failed. + + :param no_fail: if True, does not raise if the job failed. + :return: the :class:`dataikuapi.dss.job.DSSJob` job handle + :rtype: :class:`dataikuapi.dss.job.DSSJob` + """ + job = self.start() + waiter = DSSJobWaiter(job) + waiter.wait(no_fail) + return job diff --git a/dataikuapi/dss/recipe.py b/dataikuapi/dss/recipe.py index 4f5323bd..7a05af44 100644 --- a/dataikuapi/dss/recipe.py +++ b/dataikuapi/dss/recipe.py @@ -1,19 +1,78 @@ from ..utils import DataikuException +from .utils import DSSTaggableObjectSettings from .discussion import DSSObjectDiscussions -import json +import json, logging, warnings + +##################################################### +# Base classes +##################################################### class DSSRecipe(object): """ - A handle to an existing recipe on the DSS instance + A handle to an existing recipe on the DSS instance. + Do not create this directly, use :meth:`dataikuapi.dss.project.DSSProject.get_recipe` """ def __init__(self, client, project_key, recipe_name): self.client = client self.project_key = project_key self.recipe_name = recipe_name - ######################################################## - # Dataset deletion - ######################################################## + @property + def name(self): + """The name of the recipe""" + return self.recipe_name + + def compute_schema_updates(self): + """ + Computes which updates are required to the outputs of this recipe. + The required updates are returned as a :class:`RequiredSchemaUpdates` object, which then + allows you to :meth:`~RequiredSchemaUpdates.apply` the changes. + + Usage example: + + .. code-block:: python + + required_updates = recipe.compute_schema_updates() + if required_updates.any_action_required(): + print("Some schemas will be updated") + + # Note that you can call apply even if no changes are required. This will be noop + required_updates.apply() + """ + data = self.client._perform_json( + "GET", "/projects/%s/recipes/%s/schema-update" % (self.project_key, self.recipe_name)) + return RequiredSchemaUpdates(self, data) + + def run(self, job_type="NON_RECURSIVE_FORCED_BUILD", partitions=None, wait=True, no_fail=False): + """ + Starts a new job to run this recipe and wait for it to complete. + Raises if the job failed. + + .. code-block:: python + + job = recipe.run() + print("Job %s done" % job.id) + + :param job_type: The job type. One of RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD or RECURSIVE_FORCED_BUILD + :param partitions: If the outputs are partitioned, a list of partition ids to build + :param no_fail: if True, does not raise if the job failed. + :return: the :class:`dataikuapi.dss.job.DSSJob` job handle corresponding to the built job + :rtype: :class:`dataikuapi.dss.job.DSSJob` + """ + + settings = self.get_settings() + output_refs = settings.get_flat_output_refs() + + if len(output_refs) == 0: + raise Exception("recipe has no outputs, can't run it") + + jd = self.client.get_project(self.project_key).new_job(job_type) + jd.with_output(output_refs[0], partition=partitions) + + if wait: + return jd.start_and_wait() + else: + return jd.start() def delete(self): """ @@ -22,59 +81,92 @@ def delete(self): return self.client._perform_empty( "DELETE", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name)) - ######################################################## - # Recipe definition - ######################################################## + def get_settings(self): + """ + Gets the settings of the recipe, as a :class:`DSSRecipeSettings` or one of its subclasses. + + Some recipes have a dedicated class for the settings, with additional helpers to read and modify the settings - def get_definition_and_payload(self): + Once you are done modifying the returned settings object, you can call :meth:`~DSSRecipeSettings.save` on it + in order to save the modifications to the DSS recipe """ - Gets the definition of the recipe + data = self.client._perform_json( + "GET", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name)) + type = data["recipe"]["type"] + + if type == "grouping": + return GroupingRecipeSettings(self, data) + elif type == "window": + return WindowRecipeSettings(self, data) + elif type == "sync": + return SyncRecipeSettings(self, data) + elif type == "sort": + return SortRecipeSettings(self, data) + elif type == "topn": + return TopNRecipeSettings(self, data) + elif type == "distinct": + return DistinctRecipeSettings(self, data) + elif type == "join": + return JoinRecipeSettings(self, data) + elif type == "vstack": + return StackRecipeSettings(self, data) + elif type == "sampling": + return SamplingRecipeSettings(self, data) + elif type == "split": + return SplitRecipeSettings(self, data) + elif type == "prepare" or type == "shaker": + return PrepareRecipeSettings(self, data) + #elif type == "prediction_scoring": + #elif type == "clustering_scoring": + elif type == "download": + return DownloadRecipeSettings(self, data) + #elif type == "sql_query": + # return WindowRecipeSettings(self, data) + elif type in ["python", "r", "sql_script", "pyspark", "sparkr", "spark_scala", "shell"]: + return CodeRecipeSettings(self, data) + else: + return DSSRecipeSettings(self, data) - :returns: the definition, as a :py:class:`DSSRecipeDefinitionAndPayload` object, containing the recipe definition itself and its payload - :rtype: :py:class:`DSSRecipeDefinitionAndPayload` + def get_definition_and_payload(self): """ + Deprecated. Use :meth:`get_settings` + """ + warnings.warn("Recipe.get_definition_and_payload is deprecated, please use get_settings", DeprecationWarning) + data = self.client._perform_json( "GET", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name)) - return DSSRecipeDefinitionAndPayload(data) + return DSSRecipeDefinitionAndPayload(self, data) def set_definition_and_payload(self, definition): """ - Sets and saves the definition of the recipe - - :param definition object: the definition, as a :py:class:`DSSRecipeDefinitionAndPayload` object. You should only set a definition object - that has been retrieved using the :py:meth:get_definition_and_payload call. + Deprecated. Use :meth:`get_settings` and :meth:`DSSRecipeSettings.save` """ + warnings.warn("Recipe.set_definition_and_payload is deprecated, please use get_settings", DeprecationWarning) + definition._payload_to_str() return self.client._perform_json( "PUT", "/projects/%s/recipes/%s" % (self.project_key, self.recipe_name), body=definition.data) - ######################################################## - # Recipe status - ######################################################## - def get_status(self): """ Gets the status of this recipe (status messages, engines status, ...) - :return: an object to interact + :return: a :class:`dataikuapi.dss.recipe.DSSRecipeStatus` object to interact with the status :rtype: :class:`dataikuapi.dss.recipe.DSSRecipeStatus` """ data = self.client._perform_json( "GET", "/projects/%s/recipes/%s/status" % (self.project_key, self.recipe_name)) return DSSRecipeStatus(self.client, data) - ######################################################## - # Recipe metadata - ######################################################## def get_metadata(self): """ Get the metadata attached to this recipe. The metadata contains label, description checklists, tags and custom metadata of the recipe - Returns: - a dict object. For more information on available metadata, please see - https://doc.dataiku.com/dss/api/5.0/rest/ + :returns: a dict. For more information on available metadata, please see + https://doc.dataiku.com/dss/api/8.0/rest/ + :rtype dict """ return self.client._perform_json( "GET", "/projects/%s/recipes/%s/metadata" % (self.project_key, self.recipe_name)) @@ -82,18 +174,13 @@ def get_metadata(self): def set_metadata(self, metadata): """ Set the metadata on this recipe. - - Args: - metadata: the new state of the metadata for the recipe. You should only set a metadata object + :params dict metadata: the new state of the metadata for the recipe. You should only set a metadata object that has been retrieved using the get_metadata call. """ return self.client._perform_json( "PUT", "/projects/%s/recipes/%s/metadata" % (self.project_key, self.recipe_name), body=metadata) - ######################################################## - # Discussions - ######################################################## def get_object_discussions(self): """ Get a handle to manage discussions on the recipe @@ -103,7 +190,11 @@ def get_object_discussions(self): """ return DSSObjectDiscussions(self.client, self.project_key, "RECIPE", self.recipe_name) + class DSSRecipeStatus(object): + """Status of a recipce. + Do not create that directly, use :meth:`DSSRecipe.get_status`""" + def __init__(self, client, data): """Do not call that directly, use :meth:`dataikuapi.dss.recipe.DSSRecipe.get_status`""" self.client = client @@ -152,64 +243,124 @@ def get_status_messages(self): """ return self.data["allMessagesForFrontend"]["messages"] -class DSSRecipeDefinitionAndPayload(object): + +class DSSRecipeSettings(DSSTaggableObjectSettings): """ - Definition for a recipe, that is, the recipe definition itself and its payload + Settings of a recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` """ - def __init__(self, data): + def __init__(self, recipe, data): + super(DSSRecipeSettings, self).__init__(data["recipe"]) + self.recipe = recipe self.data = data + self.recipe_settings = self.data["recipe"] + self._str_payload = self.data.get("payload", None) + self._obj_payload = None + + def save(self): + """ + Saves back the recipe in DSS. + """ + self._payload_to_str() + return self.recipe.client._perform_json( + "PUT", "/projects/%s/recipes/%s" % (self.recipe.project_key, self.recipe.recipe_name), + body=self.data) + + @property + def type(self): + return self.recipe_settings["type"] + + @property + def str_payload(self): + """The raw "payload" of the recipe, as a string""" + self._payload_to_str() + return self._str_payload + @str_payload.setter + def str_payload(self, payload): + self._str_payload = payload + self._obj_payload = None + + @property + def obj_payload(self): + """The raw "payload" of the recipe, as a dict""" + self._payload_to_obj() + return self._obj_payload + + @property + def raw_params(self): + """The raw 'params' field of the recipe settings, as a dict""" + return self.recipe_settings["params"] + + def _payload_to_str(self): + if self._obj_payload is not None: + self._str_payload = json.dumps(self._obj_payload) + self._obj_payload = None + if self._str_payload is not None: + self.data["payload"] = self._str_payload + + def _payload_to_obj(self): + if self._str_payload is not None: + self._obj_payload = json.loads(self._str_payload) + self._str_payload = None def get_recipe_raw_definition(self): """ - Get the recipe definition as a raw JSON object + Get the recipe definition as a raw dict + :rtype dict """ - return self.data.get('recipe', None) + return self.recipe_settings def get_recipe_inputs(self): """ - Get the list of inputs of this recipe + Get a structured dict of inputs to this recipe + :rtype dict """ - return self.data.get('recipe').get('inputs') + return self.recipe_settings.get('inputs') def get_recipe_outputs(self): """ - Get the list of outputs of this recipe + Get a structured dict of outputs of this recipe + :rtype dict """ - return self.data.get('recipe').get('outputs') + return self.recipe_settings.get('outputs') def get_recipe_params(self): """ - Get the parameters of this recipe, as a raw JSON object + Get the parameters of this recipe, as a dict + :rtype dict """ - return self.data.get('recipe').get('params') + return self.recipe_settings.get('params') def get_payload(self): """ - Get the payload or script of this recipe, as a raw string + Get the payload or script of this recipe, as a string + :rtype string """ - return self.data.get('payload', None) + self._payload_to_str() + return self._str_payload def get_json_payload(self): """ - Get the payload or script of this recipe, as a JSON object + Get the payload or script of this recipe, parsed from JSON, as a dict + :rtype dict """ - return json.loads(self.data.get('payload', None)) + self._payload_to_obj() + return self._obj_payload def set_payload(self, payload): """ - Set the raw payload of this recipe - + Set the payload of this recipe :param str payload: the payload, as a string """ - self.data['payload'] = payload + self._str_payload = payload + self._obj_payload = None def set_json_payload(self, payload): """ - Set the raw payload of this recipe - + Set the payload of this recipe :param dict payload: the payload, as a dict. The payload will be converted to a JSON string internally """ - self.data['payload'] = json.dumps(payload) + self._str_payload = None + self._obj_payload = payload def has_input(self, input_ref): """Returns whether this recipe has a given ref as input""" @@ -245,6 +396,108 @@ def replace_output(self, current_output_ref, new_output_ref): if item.get("ref", None) == current_output_ref: item["ref"] = new_output_ref + def add_input(self, role, ref, partition_deps=None): + if partition_deps is None: + partition_deps = [] + self._get_or_create_input_role(role)["items"].append({"ref": ref, "deps": partition_deps}) + + def add_output(self, role, ref, append_mode=False): + self._get_or_create_output_role(role)["items"].append({"ref": ref, "appendMode": append_mode}) + + def _get_or_create_input_role(self, role): + inputs = self.get_recipe_inputs() + if not role in inputs: + role_obj = {"items": []} + inputs[role] = role_obj + return inputs[role] + + def _get_or_create_output_role(self, role): + outputs = self.get_recipe_outputs() + if not role in outputs: + role_obj = {"items": []} + outputs[role] = role_obj + return outputs[role] + + def _get_flat_inputs(self): + ret = [] + for role_key, role_obj in self.get_recipe_inputs().items(): + for item in role_obj["items"]: + ret.append((role_key, item)) + return ret + + def _get_flat_outputs(self): + ret = [] + for role_key, role_obj in self.get_recipe_outputs().items(): + for item in role_obj["items"]: + ret.append((role_key, item)) + return ret + + def get_flat_input_refs(self): + """ + Returns a list of all input refs of this recipe, regardless of the input role + :rtype list of strings + """ + ret = [] + for role_key, role_obj in self.get_recipe_inputs().items(): + for item in role_obj["items"]: + ret.append(item["ref"]) + return ret + + def get_flat_output_refs(self): + """ + Returns a list of all output refs of this recipe, regardless of the output role + :rtype list of strings + """ + ret = [] + for role_key, role_obj in self.get_recipe_outputs().items(): + for item in role_obj["items"]: + ret.append(item["ref"]) + return ret + + +# Old name, deprecated +class DSSRecipeDefinitionAndPayload(DSSRecipeSettings): + """ + Deprecated. Settings of a recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass + +class RequiredSchemaUpdates(object): + """ + Representation of the updates required to the schema of the outputs of a recipe. + Do not create this class directly, use :meth:`DSSRecipe.compute_schema_updates` + """ + + def __init__(self, recipe, data): + self.recipe = recipe + self.data = data + self.drop_and_recreate = True + self.synchronize_metastore = True + + def any_action_required(self): + return self.data["totalIncompatibilities"] > 0 + + def apply(self): + results = [] + for computable in self.data["computables"]: + osu = { + "computableType": computable["type"], + # dirty + "computableId": computable["type"] == "DATASET" and computable["datasetName"] or computable["id"], + "newSchema": computable["newSchema"], + "dropAndRecreate": self.drop_and_recreate, + "synchronizeMetastore" : self.synchronize_metastore + } + + results.append(self.recipe.client._perform_json("POST", + "/projects/%s/recipes/%s/actions/updateOutputSchema" % (self.recipe.project_key, self.recipe.recipe_name), + body=osu)) + return results + +##################################################### +# Recipes creation infrastructure +##################################################### + class DSSRecipeCreator(object): """ Helper to create new recipes @@ -264,6 +517,9 @@ def __init__(self, type, name, project): self.creation_settings = { } + def set_name(self, name): + self.recipe_proto["name"] = name + def _build_ref(self, object_id, project_key=None): if project_key is not None and project_key != self.project.project_key: return project_key + '.' + object_id @@ -286,6 +542,13 @@ def _with_output(self, dataset_name, append=False, role="main"): role_obj["items"].append({'ref':self._build_ref(dataset_name, None), 'appendMode': append}) return self + def _get_input_refs(self): + ret = [] + for role_key, role_obj in self.recipe_proto["inputs"].items(): + for item in role_obj["items"]: + ret.append(item["ref"]) + return ret + def with_input(self, dataset_name, project_key=None, role="main"): """ Add an existing object as input to the recipe-to-be-created @@ -311,8 +574,12 @@ def with_output(self, dataset_name, append=False, role="main"): return self._with_output(dataset_name, append, role) def build(self): + """Deprecated. Use create()""" + return self.create() + + def create(self): """ - Create a new recipe in the project, and return a handle to interact with it. + Creates the new recipe in the project, and return a handle to interact with it. Returns: A :class:`dataikuapi.dss.recipe.DSSRecipe` recipe handle @@ -320,6 +587,9 @@ def build(self): self._finish_creation_settings() return self.project.create_recipe(self.recipe_proto, self.creation_settings) + def set_raw_mode(self): + self.creation_settings["rawCreation"] = True + def _finish_creation_settings(self): pass @@ -349,27 +619,33 @@ def with_existing_output(self, dataset_name, append=False): self._with_output(dataset_name, append) return self - def with_new_output(self, name, connection_id, typeOptionId=None, format_option_id=None, override_sql_schema=None, partitioning_option_id=None, append=False, object_type='DATASET'): + def with_new_output(self, name, connection_id, typeOptionId=None, format_option_id=None, override_sql_schema=None, partitioning_option_id=None, append=False, object_type='DATASET', overwrite=False): """ Create a new dataset as output to the recipe-to-be-created. The dataset is not created immediately, - but when the recipe is created (ie in the build() method) + but when the recipe is created (ie in the create() method) :param str name: name of the dataset or identifier of the managed folder :param str connection_id: name of the connection to create the dataset on :param str typeOptionId: sub-type of dataset, for connection where the type could be ambiguous. Typically, this is SCP or SFTP, for SSH connection - :param str format_option_id: name of a format preset relevant for the dataset type. Possible values are: CSV_ESCAPING_NOGZIP_FORHIVE, - CSV_UNIX_GZIP, CSV_EXCEL_GZIP, CSV_EXCEL_GZIP_BIGQUERY, CSV_NOQUOTING_NOGZIP_FORPIG, PARQUET_HIVE, - AVRO, ORC + :param str format_option_id: name of a format preset relevant for the dataset type. Possible values are: CSV_ESCAPING_NOGZIP_FORHIVE, + CSV_UNIX_GZIP, CSV_EXCEL_GZIP, CSV_EXCEL_GZIP_BIGQUERY, CSV_NOQUOTING_NOGZIP_FORPIG, PARQUET_HIVE, + AVRO, ORC :param override_sql_schema: schema to force dataset, for SQL dataset. If left empty, will be autodetected :param str partitioning_option_id: to copy the partitioning schema of an existing dataset 'foo', pass a value of 'copy:dataset:foo' :param append: whether the recipe should append or overwrite the output when running (note: not available for all dataset types) :param str object_type: DATASET or MANAGED_FOLDER + :param overwrite: If the dataset being created already exists, overwrite it (and delete data) """ if object_type == 'DATASET': assert self.create_output_dataset is None + + dataset = self.project.get_dataset(name) + if overwrite and dataset.exists(): + dataset.delete(drop_data=True) + self.create_output_dataset = True self.output_dataset_settings = {'connectionId':connection_id,'typeOptionId':typeOptionId,'specificSettings':{'formatOptionId':format_option_id, 'overrideSQLSchema':override_sql_schema},'partitioningOptionId':partitioning_option_id} self._with_output(name, append) @@ -407,11 +683,97 @@ def _finish_creation_settings(self): super(VirtualInputsSingleOutputRecipeCreator, self)._finish_creation_settings() self.creation_settings['virtualInputs'] = self.virtual_inputs -######################## -# -# actual recipe creators -# -######################## + +##################################################### +# Per-recipe-type classes: Visual recipes +##################################################### + +class GroupingRecipeSettings(DSSRecipeSettings): + """ + Settings of a grouping recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + def clear_grouping_keys(self): + """Removes all grouping keys from this grouping recipe""" + self.obj_payload["keys"] = [] + + def add_grouping_key(self, column): + """ + Adds grouping on a column + :param str column: Column to group on + """ + self.obj_payload["keys"].append({"column":column}) + + def set_global_count_enabled(self, enabled): + self.obj_payload["globalCount"] = enabled + + def get_or_create_column_settings(self, column): + """ + Gets a dict representing the aggregations to perform on a column. + Creates it and adds it to the potential aggregations if it does not already exists + :param str column: The column name + :rtype dict + """ + found = None + for gv in self.obj_payload["values"]: + if gv["column"] == column: + found = gv + break + if found is None: + found = {"column" : column} + self.obj_payload["values"].append(found) + return found + + def set_column_aggregations(self, column, type, min=False, max=False, count=False, count_distinct=False, + sum=False,concat=False,stddev=False,avg=False): + """ + Sets the basic aggregations on a column. + Returns the dict representing the aggregations on the column + + :param str column: The column name + :param str type: The type of the column (as a DSS schema type name) + :rtype dict + """ + cs = self.get_or_create_column_settings(column) + cs["type"] = type + cs["min"] = min + cs["max"] = max + cs["count"] = count + cs["countDistinct"] = count_distinct + cs["sum"] = sum + cs["concat"] = concat + cs["stddev"] = stddev + return cs + +class GroupingRecipeCreator(SingleOutputRecipeCreator): + """ + Create a Group recipe + """ + def __init__(self, name, project): + SingleOutputRecipeCreator.__init__(self, 'grouping', name, project) + self.group_key = None + + def with_group_key(self, group_key): + """ + Set a column as the first grouping key. Only a single grouping key may be set + at recipe creation time. For additional groupings, get the recipe settings + + :param str group_key: name of a column in the input dataset + """ + self.group_key = group_key + return self + + def _finish_creation_settings(self): + super(GroupingRecipeCreator, self)._finish_creation_settings() + if self.group_key is not None: + self.creation_settings['groupKey'] = self.group_key + + +class WindowRecipeSettings(DSSRecipeSettings): + """ + Settings of a window recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for window + class WindowRecipeCreator(SingleOutputRecipeCreator): """ Create a Window recipe @@ -419,6 +781,13 @@ class WindowRecipeCreator(SingleOutputRecipeCreator): def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'window', name, project) + +class SyncRecipeSettings(DSSRecipeSettings): + """ + Settings of a sync recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for sync + class SyncRecipeCreator(SingleOutputRecipeCreator): """ Create a Sync recipe @@ -426,6 +795,13 @@ class SyncRecipeCreator(SingleOutputRecipeCreator): def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'sync', name, project) + +class SortRecipeSettings(DSSRecipeSettings): + """ + Settings of a sort recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for sort + class SortRecipeCreator(SingleOutputRecipeCreator): """ Create a Sort recipe @@ -433,6 +809,13 @@ class SortRecipeCreator(SingleOutputRecipeCreator): def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'sort', name, project) + +class TopNRecipeSettings(DSSRecipeSettings): + """ + Settings of a topn recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for topn + class TopNRecipeCreator(DSSRecipeCreator): """ Create a TopN recipe @@ -440,6 +823,13 @@ class TopNRecipeCreator(DSSRecipeCreator): def __init__(self, name, project): DSSRecipeCreator.__init__(self, 'topn', name, project) + +class DistinctRecipeSettings(DSSRecipeSettings): + """ + Settings of a distinct recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for distinct + class DistinctRecipeCreator(SingleOutputRecipeCreator): """ Create a Distinct recipe @@ -447,26 +837,151 @@ class DistinctRecipeCreator(SingleOutputRecipeCreator): def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'distinct', name, project) -class GroupingRecipeCreator(SingleOutputRecipeCreator): + +class PrepareRecipeSettings(DSSRecipeSettings): """ - Create a Group recipe + Settings of a prepare recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass + + @property + def raw_steps(self): + """ + Returns a raw list of the steps of this prepare recipe. + You can modify the returned list. + + Each step is a dict of settings. The precise settings for each step are not documented + """ + return self.obj_payload["steps"] + + def add_processor_step(self, type, params): + step = { + "metaType": "PROCESSOR", + "type": type, + "params": params + } + self.raw_steps.append(step) + + def add_filter_on_bad_meaning(self, meaning, columns): + params = { + "action" : "REMOVE_ROW", + "type" : meaning + } + if isinstance(columns, basestring): + params["appliesTo"] = "SINGLE_COLUMN" + params["columns"] = [columns] + elif isinstance(columns, list): + params["appliesTo"] = "COLUMNS" + params["columns"] = columns + +class PrepareRecipeCreator(SingleOutputRecipeCreator): + """ + Create a Prepare recipe """ def __init__(self, name, project): - SingleOutputRecipeCreator.__init__(self, 'grouping', name, project) - self.group_key = None + SingleOutputRecipeCreator.__init__(self, 'shaker', name, project) - def with_group_key(self, group_key): + +class JoinRecipeSettings(DSSRecipeSettings): + """ + Settings of a join recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + + In order to enable self-joins, join recipes are based on a concept of "virtual inputs". + Every join, computed pre-join column, pre-join filter, ... is based on one virtual input, and + each virtual input references an input of the recipe, by index + + For example, if a recipe has inputs A and B and declares two joins: + - A->B + - A->A(based on a computed column) + + There are 3 virtual inputs: + * 0: points to recipe input 0 (i.e. dataset A) + * 1: points to recipe input 1 (i.e. dataset B) + * 2: points to recipe input 0 (i.e. dataset A) and includes the computed column + + * The first join is between virtual inputs 0 and 1 + * The second join is between virtual inputs 0 and 2 + """ + pass # TODO: Write helpers for join + + @property + def raw_virtual_inputs(self): + """ + Returns the raw list of virtual inputs + :rtype list of dict """ - Set a column as grouping key + return self.obj_payload["virtualInputs"] - :param str group_key: name of a column in the input + @property + def raw_joins(self): """ - self.group_key = group_key - return self + Returns the raw list of joins + :rtype list of dict + """ + return self.obj_payload["joins"] - def _finish_creation_settings(self): - super(GroupingRecipeCreator, self)._finish_creation_settings() - self.creation_settings['groupKey'] = self.group_key + def add_virtual_input(self, input_dataset_index): + """ + Adds a virtual input pointing to the specified input dataset of the recipe + (referenced by index in the inputs list) + """ + self.raw_virtual_inputs.append({"index": input_dataset_index}) + + def add_pre_join_computed_column(self, virtual_input_index, computed_column): + """ + Adds a computed column to a virtual input + + Use :class:`dataikuapi.dss.utils.DSSComputedColumn` to build the computed_column object + """ + self.raw_virtual_inputs[virtual_input_index]["computedColumns"].append(computed_column) + + def add_join(self, join_type="LEFT", input1=0, input2=1): + """ + Adds a join between two virtual inputs. The join is initialized with no condition. + + Use :meth:`add_condition_to_join` on the return value to add a join condition (for example column equality) + to the join + + :returns the newly added join as a dict + :rtype dict + """ + jp = self.obj_payload + if not "joins" in jp: + jp["joins"] = [] + join = { + "conditionsMode": "AND", + "on": [], + "table1": input1, + "table2": input2, + "type": join_type + } + jp["joins"].append(join) + return join + + def add_condition_to_join(self, join, type="EQ", column1=None, column2=None): + """ + Adds a condition to a join + :param str column1: Name of "left" column + :param str column2: Name of "right" column + """ + cond = { + "type" : type, + "column1": {"name": column1, "table": join["table1"]}, + "column2": {"name": column2, "table": join["table2"]}, + } + join["on"].append(cond) + return cond + + def add_post_join_computed_column(self, computed_column): + """ + Adds a post-join computed column + + Use :class:`dataikuapi.dss.utils.DSSComputedColumn` to build the computed_column object + """ + self.obj_payload["computedColumns"].append(computed_column) + + def set_post_filter(self, postfilter): + self.obj_payload["postFilter"] = postfilter class JoinRecipeCreator(VirtualInputsSingleOutputRecipeCreator): """ @@ -482,6 +997,12 @@ class FuzzyJoinRecipeCreator(VirtualInputsSingleOutputRecipeCreator): def __init__(self, name, project): VirtualInputsSingleOutputRecipeCreator.__init__(self, 'fuzzyjoin', name, project) +class StackRecipeSettings(DSSRecipeSettings): + """ + Settings of a stack recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for stack + class StackRecipeCreator(VirtualInputsSingleOutputRecipeCreator): """ Create a Stack recipe @@ -489,6 +1010,13 @@ class StackRecipeCreator(VirtualInputsSingleOutputRecipeCreator): def __init__(self, name, project): VirtualInputsSingleOutputRecipeCreator.__init__(self, 'vstack', name, project) + +class SamplingRecipeSettings(DSSRecipeSettings): + """ + Settings of a sampling recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for sampling + class SamplingRecipeCreator(SingleOutputRecipeCreator): """ Create a Sample/Filter recipe @@ -496,6 +1024,94 @@ class SamplingRecipeCreator(SingleOutputRecipeCreator): def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'sampling', name, project) + +class SplitRecipeSettings(DSSRecipeSettings): + """ + Settings of a split recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for split + +class SplitRecipeCreator(DSSRecipeCreator): + """ + Create a Split recipe + """ + def __init__(self, name, project): + DSSRecipeCreator.__init__(self, "split", name, project) + + def _finish_creation_settings(self): + pass + + +class DownloadRecipeSettings(DSSRecipeSettings): + """ + Settings of a download recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + pass # TODO: Write helpers for download + +class DownloadRecipeCreator(SingleOutputRecipeCreator): + """ + Create a Download recipe + """ + def __init__(self, name, project): + SingleOutputRecipeCreator.__init__(self, 'download', name, project) + + +##################################################### +# Per-recipe-type classes: Code recipes +##################################################### + +class CodeRecipeSettings(DSSRecipeSettings): + """ + Settings of a code recipe. Do not create this directly, use :meth:`DSSRecipe.get_settings` + """ + def get_code(self): + """ + Returns the code of the recipe as a string + :rtype string + """ + self._payload_to_str() + return self._str_payload + + def set_code(self, code): + """ + Updates the code of the recipe + :param str code: The new code as a string + """ + self.set_payload(code) + + def get_code_env_settings(self): + """ + Returns the code env settings for this recipe + :rtype dict + """ + rp = self.get_recipe_params() + if not "envSelection" in rp: + raise ValueError("This recipe kind does not seem to take a code env selection") + return rp["envSelection"] + + def set_code_env(self, code_env=None, inherit=False, use_builtin=False): + """ + Sets the code env to use for this recipe. + + Exactly one of `code_env`, `inherit` or `use_builtin` must be passed + + :param str code_env: The name of a code env + :param bool inherit: Use the project's default code env + :param bool use_builtin: Use the builtin code env + """ + rp = self.get_recipe_params() + if not "envSelection" in rp: + raise ValueError("This recipe kind does not seem to take a code env selection") + + if code_env is not None: + rp["envSelection"] = {"envMode": "EXPLICIT_ENV", "envName": "code_env"} + elif inherit: + rp["envSelection"] = {"envMode": "INHERIT"} + elif use_builtin: + rp["envSelection"] = {"envMode": "USE_BUILTIN_MODE"} + else: + raise ValueError("No env setting selected") + class CodeRecipeCreator(DSSRecipeCreator): def __init__(self, name, type, project): """ @@ -515,10 +1131,110 @@ def with_script(self, script): self.script = script return self + def with_new_output_dataset(self, name, connection, + type=None, format=None, + copy_partitioning_from="FIRST_INPUT", + append=False, overwrite=False): + """ + Create a new managed dataset as output to the recipe-to-be-created. The dataset is created immediately + + :param str name: name of the dataset to create + :param str connection_id: name of the connection to create the dataset on + :param str type: type of dataset, for connection where the type could be ambiguous. Typically, + this is SCP or SFTP, for SSH connection + :param str format: name of a format preset relevant for the dataset type. Possible values are: CSV_ESCAPING_NOGZIP_FORHIVE, + CSV_UNIX_GZIP, CSV_EXCEL_GZIP, CSV_EXCEL_GZIP_BIGQUERY, CSV_NOQUOTING_NOGZIP_FORPIG, PARQUET_HIVE, + AVRO, ORC. If None, uses the default + :param str copy_partitioning_from: Whether to copy the partitioning from another thing. + Use None for not partitioning the output, "FIRST_INPUT" to copy from the first input of the recipe, + "dataset:XXX" to copy from a dataset name, or "folder:XXX" to copy from a folder id + :param append: whether the recipe should append or overwrite the output when running (note: not available for all dataset types) + :param overwrite: If the dataset being created already exists, overwrite it (and delete data) + """ + + ch = self.project.new_managed_dataset_creation_helper(name) + ch.with_store_into(connection, type_option_id=type, format_option_id=format) + + # FIXME: can't manage input folder + if copy_partitioning_from == "FIRST_INPUT": + inputs = self._get_input_refs() + if len(inputs) == 0: + logging.warn("No input declared yet, can't copy partitioning from first input") + else: + self.creation_settings["partitioningOptionId"] = "copy:dataset:%s" % (inputs[0]) + elif copy_partitioning_from is not None: + self.creation_settings["partitioningOptionId"] = "copy:%s" % copy_partitioning_from + + ch.create(overwrite=overwrite) + + self.with_output(name, append=append) + return self + def _finish_creation_settings(self): super(CodeRecipeCreator, self)._finish_creation_settings() self.creation_settings['script'] = self.script +class PythonRecipeCreator(CodeRecipeCreator): + """ + Creates a Python recipe. + A Python recipe can be defined either by its complete code, like a normal Python recipe, or + by a function signature. + + When using a function, the function must take as arguments: + * A list of dataframes corresponding to the dataframes of the input datasets + * Optional named arguments corresponding to arguments passed to the creator + """ + + def __init__(self, name, project): + DSSRecipeCreator.__init__(self, "python", name, project) + + DEFAULT_RECIPE_CODE_TMPL = """ +# This code is autogenerated by PythonRecipeCreator function mode +import dataiku, dataiku.recipe, json +from {module_name} import {fname} +input_datasets = dataiku.recipe.get_inputs_as_datasets() +output_datasets = dataiku.recipe.get_outputs_as_datasets() +params = json.loads('{params_json}') + +logging.info("Reading %d input datasets as dataframes" % len(input_datasets)) +input_dataframes = [ds.get_dataframe() for ds in input_datasets] + +logging.info("Calling user function {fname}") +function_input = input_dataframes if len(input_dataframes) > 1 else input_dataframes[0] +output_dataframes = {fname}(function_input, **params) + +if not isinstance(output_dataframes, list): + output_dataframes = [output_dataframes] + +if not len(output_dataframes) == len(output_datasets): + raise Exception("Code function {fname}() returned %d dataframes but recipe expects %d output datasets", \\ + (len(output_dataframes), len(output_datasets))) +output = list(zip(output_datasets, output_dataframes)) +for ds, df in output: + logging.info("Writing function result to dataset %s" % ds.name) + ds.write_with_schema(df) +""" + + def with_function_name(self, module_name, function_name, custom_template=None, **function_args): + """ + Defines this recipe as being a functional recipe calling a function name from a module name + """ + script_tmpl = PythonRecipeCreator.DEFAULT_RECIPE_CODE_TMPL if custom_template is None else custom_template + + if function_args is None: + function_args = {} + + code = script_tmpl.format(module_name=module_name, fname=function_name, params_json = json.dumps(function_args)) + self.with_script(code) + + return self + + def with_function(self, fn, custom_template=None, **function_args): + import inspect + #TODO: add in documentation that relative imports wont work + module_name = inspect.getmodule(fn).__name__ + fname = fn.__name__ + return self.with_function_name(module_name, fname, custom_template, **function_args) class SQLQueryRecipeCreator(SingleOutputRecipeCreator): """ @@ -527,15 +1243,10 @@ class SQLQueryRecipeCreator(SingleOutputRecipeCreator): def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'sql_query', name, project) -class SplitRecipeCreator(DSSRecipeCreator): - """ - Create a Split recipe - """ - def __init__(self, name, project): - DSSRecipeCreator.__init__(self, "split", name, project) - def _finish_creation_settings(self): - pass +##################################################### +# Per-recipe-type classes: ML recipes +##################################################### class PredictionScoringRecipeCreator(SingleOutputRecipeCreator): """ @@ -607,3 +1318,36 @@ class DownloadRecipeCreator(SingleOutputRecipeCreator): """ def __init__(self, name, project): SingleOutputRecipeCreator.__init__(self, 'download', name, project) + + +class RequiredSchemaUpdates(object): + """ + Representation of the updates required to the schema of the outputs of a recipe. + Do not create this class directly, use :meth:`DSSRecipe.compute_schema_updates` + """ + + def __init__(self, recipe, data): + self.recipe = recipe + self.data = data + self.drop_and_recreate = True + self.synchronize_metastore = True + + def any_action_required(self): + return self.data["totalIncompatibilities"] > 0 + + def apply(self): + results = [] + for computable in self.data["computables"]: + osu = { + "computableType": computable["type"], + # dirty + "computableId": computable["type"] == "DATASET" and computable["datasetName"] or computable["id"], + "newSchema": computable["newSchema"], + "dropAndRecreate": self.drop_and_recreate, + "synchronizeMetastore" : self.synchronize_metastore + } + + results.append(self.recipe.client._perform_json("POST", + "/projects/%s/recipes/%s/actions/updateOutputSchema" % (self.recipe.project_key, self.recipe.recipe_name), + body=osu)) + return results diff --git a/dataikuapi/dss/savedmodel.py b/dataikuapi/dss/savedmodel.py index d98391df..ce75d074 100644 --- a/dataikuapi/dss/savedmodel.py +++ b/dataikuapi/dss/savedmodel.py @@ -1,10 +1,8 @@ -from ..utils import DataikuException -from ..utils import DataikuUTF8CSVReader -from ..utils import DataikuStreamedHttpUTF8CSVReader -import json from .ml import DSSTrainedPredictionModelDetails, DSSTrainedClusteringModelDetails from .metrics import ComputedMetrics -from .discussion import DSSObjectDiscussions +from .ml import DSSTrainedClusteringModelDetails +from .ml import DSSTrainedPredictionModelDetails + class DSSSavedModel(object): """ @@ -69,6 +67,25 @@ def set_active_version(self, version_id): self.client._perform_empty( "POST", "/projects/%s/savedmodels/%s/versions/%s/actions/setActive" % (self.project_key, self.sm_id, version_id)) + def delete_versions(self, versions, remove_intermediate=True): + """ + Delete version(s) of the saved model + + :param versions: list of versions to delete + :type versions: list[str] + :param remove_intermediate: also remove intermediate versions (default: True). In the case of a partitioned + model, an intermediate version is created every time a partition has finished training. + :type remove_intermediate: bool + """ + if not isinstance(versions, list): + versions = [versions] + body = { + "versions": versions, + "removeIntermediate": remove_intermediate + } + self.client._perform_empty( + "POST", "/projects/%s/savedmodels/%s/actions/delete-versions" % (self.project_key, self.sm_id), + body=body) ######################################################## # Metrics ######################################################## diff --git a/dataikuapi/dss/tools/__init__.py b/dataikuapi/dss/tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dataikuapi/dss/tools/codegen.py b/dataikuapi/dss/tools/codegen.py new file mode 100644 index 00000000..06371d93 --- /dev/null +++ b/dataikuapi/dss/tools/codegen.py @@ -0,0 +1,386 @@ +import json, copy, re +from dataikuapi.dss.recipe import * +from dataikuapi.dss.dataset import * + +class _IndentContext(object): + def __init__(self, flow_code_generator): + self.flow_code_generator = flow_code_generator + + def __enter__(self): + self.flow_code_generator.cur_indent += 1 + + def __exit__(self, b, c, d): + self.flow_code_generator.cur_indent -= 1 + + +def slugify(name): + return re.sub("[^A-Za-z0-9_]", "_", name) + +class FlowCodeGenerator(object): + def __init__(self): + self.code = "" + self.cur_indent = 0 + self.remove_metrics_on_datasets = False + self.remove_display_width_on_prepare = False + + def set_options(self, remove_metrics_on_datasets=False, remove_display_width_on_prepare=False): + self.remove_metrics_on_datasets = remove_metrics_on_datasets + self.remove_display_width_on_prepare = remove_display_width_on_prepare + pass + + def generate_code_for_dataset(self, dataset): + entrypoint_name = "create_dataset_%s" % dataset.dataset_name + self._generate_code_for_dataset(dataset, entrypoint_name) + return self.code + + def generate_code_for_recipe(self, recipe): + entrypoint_name = "create_recipe_%s" % recipe.recipe_name + self._generate_code_for_recipe(recipe, entrypoint_name) + return self.code + + def generate_code_for_project(self, project, entrypoint_name = None): + self.gen("import json") + self.gen("") + + if entrypoint_name is None: + entrypoint_name = "create_flow_for_project" + + self.gen("def %s(project):" % entrypoint_name) + + flow_graph = project.get_flow().get_graph() + flow_items = flow_graph.get_items_in_traversal_order(as_type="object") + + entrypoints_to_call = [] + with _IndentContext(self) as ic: + for item in flow_items: + if isinstance(item, DSSDataset): + entrypoint_name = "create_dataset_%s" % slugify(item.dataset_name) + self._generate_code_for_dataset(item, entrypoint_name) + else: + entrypoint_name = "create_recipe_%s" % slugify(item.recipe_name) + self._generate_code_for_recipe(item, entrypoint_name) + entrypoints_to_call.append(entrypoint_name) + + self.gen("") + self.gen("# Actual creation of the Flow from the individual functions") + for ep in entrypoints_to_call: + self.gen("%s(project)" % ep) + + return self.code + + def _generate_code_for_dataset(self, dataset, entrypoint_name): + self.gen("def %s(project):" % entrypoint_name) + settings = dataset.get_settings() + raw = settings.get_raw() + + templates = dataset.client._perform_json("GET", "/projects/X/datasets/templates") + + do_not_copy = [ + "projectKey", + "name", + "type", + "versionTag", + "creationTag", + "schema" + ] + + self.gen(" # Base dataset params") + + if raw["type"] == "UploadedFiles": + self.gen(" dataset = project.create_upload_dataset(\"%s\")" % dataset.dataset_name) + self.gen(" settings = dataset.get_settings()") + self.lf() + + self.codegen_object_fields_explicit(settings.get_raw(), templates["dataset"], ["params"], "settings.get_raw()") + do_not_copy.append("params") + + elif raw["type"] in DSSDataset.FS_TYPES: + srcp = raw["params"] + self.gen(" dataset = project.create_fslike_dataset(\"%s\", \"%s\", \"%s\", \"%s\")" % \ + (dataset.dataset_name, raw["type"], srcp["connection"], srcp.get("path", "/"))) + self.gen(" settings = dataset.get_settings()") + self.lf() + + self.codegen_object_fields(srcp, templates["abstractFSConfig"], + ["connection", "path"], "settings.get_raw_params()") + do_not_copy.append("params") + elif raw["type"] in DSSDataset.SQL_TYPES: + self.gen(" dataset = project.create_dataset(\"%s\", \"%s\")" % (dataset.dataset_name, raw["type"])) + self.gen(" settings = dataset.get_settings()") + self.lf() + + srcp = raw["params"] + if srcp.get("mode", None) == "table": + self.gen(" settings.set_table(%s, %s, %s)" % \ + (self.objstr(srcp.get("connection")), self.objstr(srcp.get("schema")), + self.objstr(srcp.get("table")))) + + self.codegen_object_fields(srcp, templates["abstractSQLConfig"], + ["mode", "connection", "schema", "table"], "settings.get_raw_params()") + do_not_copy.append("params") + else: + self.gen(" dataset = project.create_dataset(\"%s\", \"%s\")" % (dataset.dataset_name, raw["type"])) + self.gen(" settings = dataset.get_settings()") + + self.codegen_object_fields_explicit(settings.get_raw(), templates["dataset"], ["params"], "settings.get_raw()") + do_not_copy.append("params") + + # Copy of format params + if "formatType" in raw: + self.lf() + self.gen(" # Format params") + + handled = False + + if raw["formatType"] == "csv": + csv_format = raw["formatParams"] + self.gen(" settings.set_csv_format(separator=\"%s\", style=\"%s\", skip_rows_before=%d, header_row=%s, skip_rows_after=%d)"%\ + (csv_format.get("separator", None), csv_format["style"], csv_format["skipRowsBeforeHeader"],\ + csv_format["parseHeaderRow"], csv_format["skipRowsAfterHeader"])) + + self.codegen_object_fields(csv_format, templates["csvFormat"], + ["separator", "style", "skipRowsBeforeHeader", + "parseHeaderRow", "skipRowsAfterHeader", "probableNumberOfRecords"], + "settings.get_raw_format_params()") + else: + self.codegen_object_fields_explicit(settings.get_raw(), templates["dataset"], ["formatType", "formatParams"], + "settings.get_raw()") + do_not_copy.extend(["formatType", "formatParams"]) + + self.lf() + self.gen(" # Schema") + for column in settings.get_raw()["schema"]["columns"]: + self.gen(" settings.add_raw_schema_column(%s)" % column) + + if not self.remove_metrics_on_datasets: + self.lf() + self.gen(" # Metrics") + self.codegen_object_fields(settings.get_raw()["metrics"], templates["dataset"]["metrics"], [], "settings.get_raw()[\"metrics\"]") + do_not_copy.append("metrics") + + self.lf() + self.gen(" # Other dataset params") + self.codegen_object_fields(settings.get_raw(), templates["dataset"], do_not_copy, "settings.get_raw()") + + self.lf() + self.gen(" settings.save()") + self.lf() + + def _generate_code_for_recipe(self, recipe, entrypoint_name): + logging.info("Codegen for recipe %s" % entrypoint_name) + self.gen("def %s(project):" % entrypoint_name) + settings = recipe.get_settings() + raw = settings.get_recipe_raw_definition() + + templates = recipe.client._perform_json("GET", "/projects/%s/recipes/templates" % recipe.project_key) + + template = {"tags":[], "optionalDependencies": False, "redispatchPartitioning": False, + "maxRunningActivities": 0, "neverRecomputeExistingPartitions" : False, + "customFields":{}, "customMeta": {"kv":{}}, "checklists" : {"checklists":[]}} + + do_not_copy = [ + "projectKey", + "name", + "type", + "versionTag", + "creationTag", + "inputs", + "outputs" + ] + + self.gen(" # Create the recipe as a blank recipe") + self.gen(" builder = project.new_recipe(\"%s\", \"%s\")" % (raw["type"], recipe.recipe_name)) + self.gen(" builder.set_raw_mode()") + self.gen(" recipe = builder.create()") + self.lf() + self.gen(" # Setup the recipe") + self.gen(" settings = recipe.get_settings()" ) + + self.lf() + self.gen(" # Recipe inputs/outputs") + for (input_role, input_item) in settings._get_flat_inputs(): + if len(input_item["deps"]) > 0: + self.gen(" settings.add_input(%s, %s, %s)" % (self.objstr(input_role), + self.objstr(input_item["ref"]), + self.objstr(input_item["deps"]))) + else: + self.gen(" settings.add_input(%s, %s)" % (self.objstr(input_role), + self.objstr(input_item["ref"]))) + for (output_role, output_item) in settings._get_flat_outputs(): + self.gen(" settings.add_output(%s, %s, %s)" % (self.objstr(output_role), + self.objstr(output_item["ref"]), + self.objstr(output_item["appendMode"]))) + self.lf() + + types_with_obj_payload = ["join", "grouping", "shaker"] + + # A bit of "classical cleanup" + # Remove the dirty "map" in Spark read params + if settings.type in types_with_obj_payload and "engineParams" in settings.obj_payload: + rp = settings.obj_payload["engineParams"].get("spark", {}).get("readParams", {}) + if rp.get("mode", "?") == "AUTO": + rp["map"] = {} + if raw is not None and "params" in raw and "engineParams" in raw["params"]: + rp = raw["params"]["engineParams"].get("spark", {}).get("readParams", {}) + if rp.get("mode", "?") == "AUTO": + rp["map"] = {} + + # And some per-type cleanup + def cleanup_grouping(): + for grouping_key in settings.obj_payload.get("keys", []): + for item in ["count", "last", "min", "max", "sum", "countDistinct", "stddev", "avg", "concat", "first"]: + if item in grouping_key and grouping_key[item] == False: + del grouping_key[item] + for aggregation in settings.obj_payload.get("values", []): + for item in ["count", "last", "min", "max", "sum", "countDistinct", "stddev", "avg", "concat", "first", + "concatDistinct", "$idx", "sum2", "firstLastNotNull"]: + if item in aggregation and aggregation.get(item, None) == False: + del aggregation[item] + + def cleanup_join(): + for vi in settings.raw_virtual_inputs: + if not vi["preFilter"]["enabled"]: + del vi["preFilter"] + + def cleanup_shaker(): + if self.remove_display_width_on_prepare: + if "columnWidthsByName" in settings.obj_payload: + del settings.obj_payload["columnWidthsByName"] + + cleanup_by_type = { + "grouping": cleanup_grouping, + "join": cleanup_join, + "shaker": cleanup_shaker + } + + if settings.type in cleanup_by_type: + cleanup_by_type[settings.type]() + + # Output payload, either globally for code + if isinstance(settings, CodeRecipeSettings): + code = settings.get_code() + self.gen(" # Recipe code") + self.gen(" settings.set_payload(%s)" % self.payloadstr(code)) + + # per-field for recipes with JSON payload + elif settings.type in types_with_obj_payload: + prefix_by_type = { + "join": "Join details", + "shaker": "Prepare script" + } + self.gen(" # %s" % (prefix_by_type.get(settings.type, "Recipe payload"))) + payload = settings.obj_payload + payload_template = templates["payloadsByType"][settings.type] + self.gen(" settings.set_payload(\"{}\")") + self.codegen_object_fields(payload, payload_template, [], "settings.obj_payload") + + # Or as string for others + elif len(settings.get_payload()) > 0: + self.gen(" # Recipe payload") + self.gen(" settings.set_payload(%s)" % self.payloadstr(settings.get_payload())) + + # Then params + if settings.type in templates["paramsByType"] and "params" in raw: + self.lf() + self.gen(" # Type-specific parameters") + self.codegen_object_fields(raw["params"], templates["paramsByType"][settings.type], [], + "settings.raw_params") + do_not_copy.append("params") + + # And finally other recipe fields that are not params + self.lf() + self.gen(" # Other parameters") + self.codegen_object_fields(raw, template, do_not_copy, "settings.get_recipe_raw_definition()") + self.lf() + self.gen(" settings.save()") + self.lf() + + # Helpers + + def gen(self, code): + self.code += "%s%s\n" % (" " * (4 * self.cur_indent), code) + + def lf(self): + self.code += "\n" + + def payloadstr(self, payload): + if payload.endswith("\n"): + return "\"\"\"%s\"\"\"" % payload + else: + return "\"\"\"%s\n\"\"\"" % payload + + def objstr(self, obj): + return ObjectFieldFormatter(self.cur_indent + 1).format(obj) + + def codegen_object_fields_explicit(self, object, template, copy, prefix): + for key in copy: + if not key in object: + continue + self.codegen_object_field(object, key, template, prefix) + + def codegen_object_fields(self, object, template, do_not_copy, prefix): + for key in object.keys(): + if key in do_not_copy: + continue + self.codegen_object_field(object, key, template, prefix) + + def codegen_object_field(self, object, key, template, prefix): + value_for_key = object[key] + default_value_for_key = template.get(key, None) + + if default_value_for_key is not None and value_for_key == default_value_for_key: + #print("Skipping value equal to default: %s" % key) + return + else: + #print("Not equal for %s" % key) + #print("Template: %s" % default_value_for_key) + #print("Real: %s" % value_for_key) + self.gen(" %s[\"%s\"] = %s" % ( prefix, key, self.objstr(value_for_key))) + + +class ObjectFieldFormatter(object): + def __init__(self, base_indent): + self.formatters = { + dict: self.__class__.format_dict, + list: self.__class__.format_list, + } + self.sp = " " + self.base_indent = base_indent + self.no_pretty_level = 0 + + def format(self, value, depth=0): + base_repr = repr(value) + if len(base_repr) < 25: + # This entire value is very small, don't bother pretty-priting it + return base_repr + elif type(value) in self.formatters: + return self.formatters[type(value)](self, value, depth) + else: + return base_repr + + def format_dict(self, value, depth): + indent = self.base_indent + depth + if depth > 2 or self.no_pretty_level > 0: + return repr(value) + else: + items = [ + "\n" + self.sp * (indent + 1) + repr(key) + ': ' + self.format(value[key], depth + 1) + for key in value + ] + return '{%s}' % (','.join(items) + "\n" + self.sp * indent) + + def format_list(self, value, depth): + indent = self.base_indent + depth + if depth > 2 or self.no_pretty_level > 0 or len(value) == 0: + return repr(value) + else: + # Big array, don't pretty-print inner + if len(value) > 3: + self.no_pretty_level += 1 + items = [ + "\n" + self.sp * (indent + 1) + self.format(item, depth + 1) + for item in value + ] + if len(value) > 3: + self.no_pretty_level -= 1 + return '[%s]' % (','.join(items) + "\n" + self.sp * indent) \ No newline at end of file diff --git a/dataikuapi/dss/utils.py b/dataikuapi/dss/utils.py index 5a2b1c4c..faa25acd 100644 --- a/dataikuapi/dss/utils.py +++ b/dataikuapi/dss/utils.py @@ -41,6 +41,131 @@ def with_selected_partitions(self, ids): self.selection["selectedPartitions"] = ids return self +class DSSComputedColumn(object): + + @staticmethod + def formula(name, formula, type="double"): + return {"expr": formula, "mode": "GREL", "name": name, "type": type} + +import sys +if sys.version_info > (3,4): + from enum import Enum +else: + class Enum(object): + pass + +class DSSFilterOperator(Enum): + EMPTY_ARRAY = "empty array" + NOT_EMPTY_ARRAY = "not empty array" + CONTAINS_ARRAY = "array contains" + NOT_EMPTY = "not empty" + EMPTY = "is empty" + NOT_EMPTY_STRING = "not empty string" + EMPTY_STRING = "empty string" + IS_TRUE = "true" + IS_FALSE = "false" + EQUALS_STRING = "== [string]" + EQUALS_CASE_INSENSITIVE_STRING = "== [string]i" + NOT_EQUALS_STRING = "!= [string]" + SAME = "== [NaNcolumn]" + DIFFERENT = "!= [NaNcolumn]" + EQUALS_NUMBER = "== [number]" + NOT_EQUALS_NUMBER = "!= [number]" + GREATER_NUMBER = "> [number]" + LESS_NUMBER = "< [number]" + GREATER_OR_EQUAL_NUMBER = ">= [number]" + LESS_OR_EQUAL_NUMBER = "<= [number]" + EQUALS_DATE = "== [date]" + GREATER_DATE = "> [date]" + GREATER_OR_EQUAL_DATE = ">= [date]" + LESS_DATE = "< [date]" + LESS_OR_EQUAL_DATE = "<= [date]" + BETWEEN_DATE = ">< [date]" + EQUALS_COL = "== [column]" + NOT_EQUALS_COL = "!= [column]" + GREATER_COL = "> [column]" + LESS_COL = "< [column]" + GREATER_OR_EQUAL_COL = ">= [column]" + LESS_OR_EQUAL_COL = "<= [column]" + CONTAINS_STRING = "contains" + REGEX = "regex" + +class DSSFilter(object): + """Helper class to build filter objects for use in visual recipes""" + @staticmethod + def of_single_condition(column, operator, string = None, num = None, date = None, time = None, date2 = None, time2 = None, unit = None): + return { + "enabled": True, + "uiData": { + 'conditions': [DSSFilter.condition(column, operator, string, num, date, time, date2, time2, unit)], + "mode": "&&" + } + } + + @staticmethod + def of_and_conditions(conditions): + return { + "enabled": True, + "uiData": { + 'conditions': conditions, + "mode": "&&" + } + } + + @staticmethod + def of_or_conditions(conditions): + return { + "enabled": True, + "uiData": { + 'conditions': conditions, + "mode": "||" + } + } + + @staticmethod + def of_formula(formula): + return { + "enabled": True, + "uiData": { + "mode": "CUSTOM" + }, + "expression" : formula + } + + @staticmethod + def of_sql_expression(sql_expression): + return { + "enabled": True, + "uiData": { + "mode": "SQL" + }, + "expression" : sql_expression + } + + @staticmethod + def condition(column, operator, string = None, num = None, date = None, time = None, date2 = None, time2 = None, unit = None): + if isinstance(operator, DSSFilterOperator): + operator = operator.value + cond = { + "input": column, + "operator": operator + } + if string is not None: + cond["string"] = string + if num is not None: + cond["num"] = num + if date is not None: + cond["date"] = date + if time is not None: + cond["time"] = time + if date2 is not None: + cond["date2"] = date2 + if time2 is not None: + cond["time2"] = time2 + if unit is not None: + cond["unit"] = unit + + return cond class DSSFilterBuilder(object): """ @@ -88,3 +213,54 @@ def from_full(ref): return AnyLoc(elts[0], elts[1]) else: raise Exception("Cannot parse object id, it's not a full id") + + +class DSSTaggableObjectListItem(dict): + """An item in a list of taggable objects. Do not instantiate this class""" + def __init__(self, data): + super(DSSTaggableObjectListItem, self).__init__(data) + self._data = data + + @property + def tags(self): + return self._data["tags"] + +class DSSTaggableObjectSettings(object): + def __init__(self, taggable_object_data): + self._tod = taggable_object_data + + @property + def tags(self): + """The tags of the object, as a list of strings""" + return self._tod["tags"] + + @tags.setter + def tags(self, tags): + self._tod["tags"] = tags + + @property + def description(self): + """The description of the object as a string""" + return self._tod.get("description", None) + + @description.setter + def description(self, description): + self._tod["description"] = description + + @property + def short_description(self): + """The short description of the object as a string""" + return self._tod.get("shortDesc", None) + + @short_description.setter + def short_description(self, short_description): + self._tod["shortDesc"] = short_description + + @property + def custom_fields(self): + """The custom fields of the object as a dict. Returns None if there are no custom fields""" + return self._tod.get("customFields", None) + + @custom_fields.setter + def custom_fields(self, custom_fields): + self._tod["customFields"] = custom_fields \ No newline at end of file