Skip to content

Commit 1433ba6

Browse files
authored
Merge pull request #92 from dataiku/fix/flowApi
Add Saved models and folders to flow read API
2 parents 6f95bc0 + 0447064 commit 1433ba6

File tree

1 file changed

+26
-22
lines changed

1 file changed

+26
-22
lines changed

dataikuapi/dss/flow.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def get_zone_of_object(self, obj):
5656
"""
5757
Finds the zone to which this object belongs.
5858
59-
If the object is not found in any specific zone, it belongs to the default zone, and the default
59+
If the object is not found in any specific zone, it belongs to the default zone, and the default
6060
zone is returned
6161
6262
:param object obj: A :class:`dataikuapi.dss.dataset.DSSDataset`, :class:`dataikuapi.dss.managedfolder.DSSManagedFolder`,
@@ -120,7 +120,7 @@ def start_tool(self, type, data={}):
120120
121121
:param type str: one of {COPY, CHECK_CONSISTENCY, PROPAGATE_SCHEMA} (tools) or {TAGS, CUSTOM_FIELDS, CONNECTIONS, COUNT_OF_RECORDS, FILESIZE, FILEFORMATS, RECIPES_ENGINES, RECIPES_CODE_ENVS, IMPALA_WRITE_MODE, HIVE_MODE, SPARK_ENGINE, SPARK_CONFIG, SPARK_PIPELINES, SQL_PIPELINES, PARTITIONING, PARTITIONS, SCENARIOS, CREATION, LAST_MODIFICATION, LAST_BUILD, RECENT_ACTIVITY, WATCH} (views)
122122
:param data dict: initial data for the tool (optional)
123-
123+
124124
:returns: a :class:`.flow.DSSFlowTool` handle to interact with the newly-created tool or view
125125
"""
126126
tool_id = self.client._perform_text("POST", "/projects/%s/flow/tools/" % self.project.project_key, params={'type':type}, body=data)
@@ -214,7 +214,7 @@ def mark_recipe_as_ok(self, name):
214214
"""Marks a recipe as always considered as OK during propagation"""
215215
self.settings["markAsOkRecipes"].append(name)
216216

217-
def set_grouping_update_options(self, recipe=None, remove_missing_aggregates=True, remove_missing_keys=True,
217+
def set_grouping_update_options(self, recipe=None, remove_missing_aggregates=True, remove_missing_keys=True,
218218
new_aggregates={}):
219219
"""
220220
Sets update options for grouping recipes
@@ -230,7 +230,7 @@ def set_grouping_update_options(self, recipe=None, remove_missing_aggregates=Tru
230230
else:
231231
self.settings["recipeUpdateOptions"]["byName"][recipe] = data
232232

233-
def set_window_update_options(self, recipe=None, remove_missing_aggregates=True, remove_missing_in_window=True,
233+
def set_window_update_options(self, recipe=None, remove_missing_aggregates=True, remove_missing_in_window=True,
234234
new_aggregates={}):
235235
"""
236236
Sets update options for window recipes
@@ -246,7 +246,7 @@ def set_window_update_options(self, recipe=None, remove_missing_aggregates=True,
246246
else:
247247
self.settings["recipeUpdateOptions"]["byName"][recipe] = data
248248

249-
def set_join_update_options(self, recipe=None, remove_missing_join_conditions=True, remove_missing_join_values=True,
249+
def set_join_update_options(self, recipe=None, remove_missing_join_conditions=True, remove_missing_join_values=True,
250250
new_selected_columns={}):
251251
"""
252252
Sets update options for join recipes
@@ -308,7 +308,7 @@ def _to_native_obj(self, zone_item):
308308
p = self.flow.project
309309
else:
310310
p = self.client.get_project(zone_item["projectKey"])
311-
311+
312312
if zone_item["objectType"] == "DATASET":
313313
return p.get_dataset(zone_item["objectId"])
314314
elif zone_item["objectType"] == "MANAGED_FOLDER":
@@ -346,7 +346,7 @@ def items(self):
346346
explicitly in a zone. To get the full list of items in a zone, including in the "default" zone, use
347347
the :meth:`get_graph` method.
348348
349-
@rtype list of zone items, either :class:`dataikuapi.dss.dataset.DSSDataset`,
349+
@rtype list of zone items, either :class:`dataikuapi.dss.dataset.DSSDataset`,
350350
:class:`dataikuapi.dss.managedfolder.DSSManagedFolder`,
351351
or :class:`dataikuapi.dss.savedmodel.DSSSavedModel` or :class:`dataiuapi.dss.recipe.DSSRecipe`
352352
"""
@@ -381,7 +381,7 @@ def shared(self):
381381
382382
This list is read-only, to modify it, use :meth:`add_shared` and :meth:`remove_shared`
383383
384-
@rtype list of shared zone items, either :class:`dataikuapi.dss.dataset.DSSDataset`,
384+
@rtype list of shared zone items, either :class:`dataikuapi.dss.dataset.DSSDataset`,
385385
:class:`dataikuapi.dss.managedfolder.DSSManagedFolder`,
386386
or :class:`dataikuapi.dss.savedmodel.DSSSavedModel` or :class:`dataiuapi.dss.recipe.DSSRecipe`
387387
"""
@@ -408,7 +408,7 @@ def get_raw(self):
408408
"""
409409
Gets the raw settings of the zone.
410410
411-
You cannot modify the `items` and `shared` elements through this class. Instead, use :meth:`DSSFlowZone.add_item` and
411+
You cannot modify the `items` and `shared` elements through this class. Instead, use :meth:`DSSFlowZone.add_item` and
412412
others
413413
"""
414414
return self._raw
@@ -432,7 +432,7 @@ def color(self, new_color):
432432
def save(self):
433433
"""Saves the settings of the zone"""
434434
self._zone.client._perform_empty("PUT", "/projects/%s/flow/zones/%s" % (self._zone.flow.project.project_key, self._zone.id),
435-
body=self._raw)
435+
body=self._raw)
436436

437437
class DSSProjectFlowGraph(object):
438438

@@ -445,8 +445,8 @@ def get_source_computables(self, as_type="dict"):
445445
"""
446446
Returns the list of source computables.
447447
:param as_type: How to return the source computables. Possible values are "dict" and "object"
448-
449-
:return: if as_type=dict, each computable is returned as a dict containing at least "ref" and "type".
448+
449+
:return: if as_type=dict, each computable is returned as a dict containing at least "ref" and "type".
450450
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.dataset.DSSDataset`,
451451
:class:`dataikuapi.dss.managedfolder.DSSManagedFolder`,
452452
:class:`dataikuapi.dss.savedmodel.DSSSavedModel`, or streaming endpoint
@@ -461,8 +461,8 @@ def get_source_recipes(self, as_type="dict"):
461461
"""
462462
Returns the list of source recipes.
463463
:param as_type: How to return the source recipes. Possible values are "dict" and "object"
464-
465-
:return: if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
464+
465+
:return: if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
466466
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`,
467467
"""
468468
ret = []
@@ -484,7 +484,7 @@ def get_successor_recipes(self, node, as_type="dict"):
484484
485485
:param node: Either a name or :class:`dataikuapi.dss.dataset.DSSDataset` object
486486
:param as_type: How to return the successor recipes. Possible values are "dict" and "object"
487-
:return if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
487+
:return if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
488488
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`,
489489
"""
490490
if isinstance(node, DSSDataset):
@@ -496,13 +496,13 @@ def get_successor_recipes(self, node, as_type="dict"):
496496

497497
runnables = [self.nodes[x] for x in computable["successors"]]
498498
return self._convert_nodes_list(runnables, as_type)
499-
499+
500500
def get_successor_computables(self, node, as_type="dict"):
501501
"""
502502
Returns a list of computables that are a successor of a given graph node
503-
503+
504504
:param as_type: How to return the successor recipes. Possible values are "dict" and "object"
505-
:return if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
505+
:return if as_type=dict, each recipes is returned as a dict containing at least "ref" and "type".
506506
if as_type=object, each computable is returned as a :class:`dataikuapi.dss.recipe.DSSRecipe`,
507507
"""
508508
if isinstance(node, DSSRecipe):
@@ -525,9 +525,13 @@ def _get_object_from_graph_node(self, node):
525525
return DSSDataset(self.flow.client, self.flow.project.project_key, node["ref"])
526526
elif node["type"] == "RUNNABLE_RECIPE":
527527
return DSSRecipe(self.flow.client, self.flow.project.project_key, node["ref"])
528+
elif node["type"] == "COMPUTABLE_FOLDER":
529+
return DSSManagedFolder(self.flow.client, self.flow.project.project_key, node["ref"])
530+
elif node["type"] == "COMPUTABLE_SAVED_MODEL":
531+
return DSSSavedModel(self.flow.client, self.flow.project.project_key, node["ref"])
528532
else:
529-
# TODO
530-
raise Exception("unsupported node type %s" % node["type"])
533+
# TODO add streaming elements
534+
raise Exception("unsupported node type: %s" % node["type"])
531535

532536
def get_items_in_traversal_order(self, as_type="dict"):
533537
ret = []
@@ -590,15 +594,15 @@ def get_state(self, options={}):
590594
:returns: the state, as a dict
591595
"""
592596
return self.client._perform_json("GET", "/projects/%s/flow/tools/%s/state" % (self.project_key, self.tool_id), body=options)
593-
597+
594598
def do(self, action):
595599
"""
596600
Perform a manual user action on the tool
597601
598602
:returns: the current state, as a dict
599603
"""
600604
return self.client._perform_json("PUT", "/projects/%s/flow/tools/%s/action" % (self.project_key, self.tool_id), body=action)
601-
605+
602606
def update(self, options={}):
603607
"""
604608
(for tools only) Start updating the tool state

0 commit comments

Comments
 (0)