@@ -41,6 +41,29 @@ def __init__(self,
4141 def set_address (self , address ):
4242 self .address = address
4343
44+ def bind_table (self , data : Data , callback = None ):
45+ conf = data .config
46+ conf ['file' ] = os .path .join (str (self ._data_base_dir ), conf .get ('file' ))
47+ path = Path (conf .get ('file' ))
48+ if not path .exists ():
49+ raise Exception ('The file is obtained from the fate flow client machine, but it does not exist, '
50+ f'please check the path: { path } ' )
51+ response = self ._client .table .bind_path (path = str (path ),
52+ namespace = data .namespace ,
53+ name = data .table_name )
54+ try :
55+ if callback is not None :
56+ callback (response )
57+ status = str (response ['message' ]).lower ()
58+ else :
59+ status = response ["message" ]
60+ code = response ["code" ]
61+ if code != 0 :
62+ raise RuntimeError (f"Return code { code } != 0, bind path failed" )
63+ except BaseException :
64+ raise ValueError (f"Bind path failed, response={ response } " )
65+ return status
66+
4467 def transform_local_file_to_dataframe (self , data : Data , callback = None , output_path = None ):
4568 #data_warehouse = self.upload_data(data, callback, output_path)
4669 #status = self.transform_to_dataframe(data.namespace, data.table_name, data_warehouse, callback)
@@ -82,44 +105,6 @@ def upload_file_and_convert_to_dataframe(self, data: Data, callback=None, output
82105 self ._awaiting (job_id , "local" , 0 )
83106 return status
84107
85- """def upload_data(self, data: Data, callback=None, output_path=None):
86- response, file_path = self._upload_data(data, output_path=output_path)
87- try:
88- if callback is not None:
89- callback(response)
90- code = response["code"]
91- if code != 0:
92- raise ValueError(f"Return code {code}!=0")
93-
94- namespace = response["data"]["namespace"]
95- name = response["data"]["name"]
96- job_id = response["job_id"]
97- except BaseException:
98- raise ValueError(f"Upload data fails, response={response}")
99- # self.monitor_status(job_id, role=self.role, party_id=self.party_id)
100- self._awaiting(job_id, "local", 0)
101-
102- return dict(namespace=namespace, name=name)
103-
104- def transform_to_dataframe(self, namespace, table_name, data_warehouse, callback=None):
105- response = self._client.data.dataframe_transformer(namespace=namespace,
106- name=table_name,
107- data_warehouse=data_warehouse)
108-
109- try:
110- if callback is not None:
111- callback(response)
112- status = self._awaiting(response["job_id"], "local", 0)
113- status = str(status).lower()
114- else:
115- status = response["retmsg"]
116-
117- except Exception as e:
118- raise RuntimeError(f"upload data failed") from e
119- job_id = response["job_id"]
120- self._awaiting(job_id, "local", 0)
121- return status"""
122-
123108 def delete_data (self , data : Data ):
124109 try :
125110 table_name = data .config ['table_name' ] if data .config .get (
@@ -154,27 +139,6 @@ def _awaiting(self, job_id, role, party_id, callback=None):
154139 callback (response )
155140 time .sleep (1 )
156141
157- """def _upload_data(self, data, output_path=None, verbose=0, destroy=1):
158- conf = data.config
159- # if conf.get("engine", {}) != "PATH":
160- if output_path is not None:
161- conf['file'] = os.path.join(os.path.abspath(output_path), os.path.basename(conf.get('file')))
162- else:
163- if _config.data_switch is not None:
164- conf['file'] = os.path.join(str(self._cache_directory), os.path.basename(conf.get('file')))
165- else:
166- conf['file'] = os.path.join(str(self._data_base_dir), conf.get('file'))
167- path = Path(conf.get('file'))
168- if not path.exists():
169- raise Exception('The file is obtained from the fate flow client machine, but it does not exist, '
170- f'please check the path: {path}')
171- response = self._client.data.upload(file=str(path),
172- head=data.head,
173- meta=data.meta,
174- extend_sid=data.extend_sid,
175- partitions=data.partitions)
176- return response, conf["file"]"""
177-
178142 def _output_data_table (self , job_id , role , party_id , task_name ):
179143 response = self ._client .output .data_table (job_id , role = role , party_id = party_id , task_name = task_name )
180144 if response .get ("code" ) is not None :
@@ -223,7 +187,7 @@ def get_version(self):
223187 """def _add_notes(self, job_id, role, party_id, notes):
224188 data = dict(job_id=job_id, role=role, party_id=party_id, notes=notes)
225189 response = AddNotesResponse(self._post(url='job/update', json=data))
226- return response"""
190+ return response
227191
228192 def _table_bind(self, data):
229193 response = self._post(url='table/bind', json=data)
@@ -235,6 +199,7 @@ def _table_bind(self, data):
235199 except Exception as e:
236200 raise RuntimeError(f"table bind error: {response}") from e
237201 return response
202+ """
238203
239204
240205class Status (object ):
0 commit comments