1+ from utils .helperFunctions import helperFunctions
12import asyncio
23import json
34import pytz
1011from genie .serializers import NotebookObjectSerializer , NotebookRunLogsSerializer
1112from workflows .models import Workflow , WorkflowNotebookMap
1213from utils .apiResponse import ApiResponse
13- from utils .zeppelinAPI import Zeppelin , ZeppelinAPI
14+ from utils .zeppelinAPI import ZeppelinAPI
1415from genie .tasks import runNotebookJob as runNotebookJobTask
1516from django .conf import settings
1617
@@ -28,27 +29,29 @@ class NotebookJobServices:
2829 Class containing services related to NotebookJob model
2930 """
3031 @staticmethod
31- async def _fetchNotebookStatuses (notebooks : list ):
32+ async def _fetchNotebookStatuses (notebooks : list , workspaceId : int = 0 ):
3233 """
3334 Async method to fetch notebook status details for multiple notebooks
3435 Returns a dict with notebook ids as keys
3536 :param notebooks: List of notebook describing dicts each containing the 'id' field
3637 """
38+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
3739 notebookStatuses = {}
38- for future in asyncio .as_completed ([Zeppelin .getNotebookStatus (notebook ["id" ]) for notebook in notebooks ]):
40+ for future in asyncio .as_completed ([ZeppelinAPI ( workspaceName ) .getNotebookStatus (notebook ["id" ]) for notebook in notebooks ]):
3941 status = await future
4042 notebookStatuses [status ["id" ]] = status
4143 return notebookStatuses
4244
4345 @staticmethod
44- def getNotebooks (offset : int = 0 , limit : int = None , searchQuery : str = None , sorter : dict = None , _filter : dict = None ):
46+ def getNotebooks (offset : int = 0 , limit : int = None , searchQuery : str = None , sorter : dict = None , _filter : dict = None , workspaceId : int = 0 ):
4547 """
4648 Service to fetch and serialize NotebookJob objects
4749 Number of NotebookObjects fetched is stored as the constant GET_NOTEBOOKOJECTS_LIMIT
4850 :param offset: Offset for fetching NotebookJob objects
4951 """
52+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
5053 res = ApiResponse (message = "Error retrieving notebooks" )
51- notebooks = Zeppelin .getAllNotebooks ()
54+ notebooks = ZeppelinAPI ( workspaceName ) .getAllNotebooks ()
5255 if searchQuery :
5356 notebooks = NotebookJobServices .search (notebooks , "path" , searchQuery )
5457 if sorter .get ('order' , False ):
@@ -159,12 +162,13 @@ def sortingOnNotebook(notebooks, sorter, _filter):
159162 return notebooks
160163
161164 @staticmethod
162- def archivedNotebooks ():
165+ def archivedNotebooks (workspaceId : int = 0 ):
163166 """
164167 Get archived notebooks
165168 """
169+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
166170 res = ApiResponse (message = "Error retrieving archived notebooks" )
167- notebooks = Zeppelin .getAllNotebooks ("~Trash" )
171+ notebooks = ZeppelinAPI ( workspaceName ) .getAllNotebooks ("~Trash" )
168172 if notebooks :
169173 res .update (True , "Archived notebooks retrieved successfully" , notebooks )
170174 return res
@@ -183,10 +187,11 @@ def getNotebookObject(notebookObjId: int):
183187
184188
185189 @staticmethod
186- def getNotebooksLight ():
190+ def getNotebooksLight (workspaceId : int = 0 ):
187191 """ Gets concise notebook data"""
192+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
188193 res = ApiResponse (message = "Error retrieving notebooks" )
189- notebooks = Zeppelin .getAllNotebooks ()
194+ notebooks = ZeppelinAPI ( workspaceName ) .getAllNotebooks ()
190195 res .update (True , "Notebooks retrieved successfully" , notebooks )
191196 return res
192197
@@ -235,7 +240,7 @@ def _prepareNotebookJson(notebookTemplate: NotebookTemplate, payload: dict):
235240
236241
237242 @staticmethod
238- def addNotebook (payload : dict ):
243+ def addNotebook (payload : dict , workspaceId : int = 0 ):
239244 """
240245 Service to create and add a template based notebook
241246 :param payload: Dict containing notebook template info
@@ -244,28 +249,30 @@ def addNotebook(payload: dict):
244249 defaultPayload = payload .copy ()
245250 notebookTemplate = NotebookTemplate .objects .get (id = payload .get ("notebookTemplateId" , 0 ))
246251 notebook , connection = NotebookJobServices ._prepareNotebookJson (notebookTemplate , payload )
247- notebookZeppelinId = Zeppelin .addNotebook (notebook )
252+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
253+ notebookZeppelinId = ZeppelinAPI (workspaceName ).addNotebook (notebook )
248254 if notebookZeppelinId :
249255 NotebookObject .objects .create (notebookZeppelinId = notebookZeppelinId , connection = connection , notebookTemplate = notebookTemplate , defaultPayload = defaultPayload )
250256 res .update (True , "Notebook added successfully" )
251257 return res
252258
253259 @staticmethod
254- def editNotebook (notebookObjId : int , payload : dict ):
260+ def editNotebook (notebookObjId : int , payload : dict , workspaceId : int = 0 ):
255261 """
256262 Service to update a template based notebook
257263 :param notebookObjId: ID of the NotebookObject to be edited
258264 :param payload: Dict containing notebook template info
259265 """
266+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
260267 res = ApiResponse (message = "Error updating notebook" )
261268 defaultPayload = payload .copy ()
262269 notebookObject = NotebookObject .objects .get (id = notebookObjId )
263270 notebook , connection = NotebookJobServices ._prepareNotebookJson (notebookObject .notebookTemplate , payload )
264271
265- updateSuccess = Zeppelin .updateNotebookParagraphs (notebookObject .notebookZeppelinId , notebook )
272+ updateSuccess = ZeppelinAPI ( workspaceName ) .updateNotebookParagraphs (notebookObject .notebookZeppelinId , notebook )
266273 if updateSuccess :
267274 if defaultPayload .get ("name" ):
268- Zeppelin .renameNotebook (notebookObject .notebookZeppelinId , defaultPayload .get ("name" ))
275+ ZeppelinAPI ( workspaceName ) .renameNotebook (notebookObject .notebookZeppelinId , defaultPayload .get ("name" ))
269276 notebookObject .defaultPayload = defaultPayload
270277 notebookObject .connection = connection
271278 notebookObject .save ()
@@ -313,13 +320,13 @@ def deleteNotebookJob(notebookId: int):
313320 return res
314321
315322 @staticmethod
316- def runNotebookJob (notebookId : str ):
323+ def runNotebookJob (notebookId : str , workspaceId : int = 0 ):
317324 """
318325 Service to run notebook job
319326 """
320327 res = ApiResponse ("Error in running notebook" )
321328 notebookRunLogs = NotebookRunLogs .objects .create (notebookId = notebookId , status = NOTEBOOK_STATUS_QUEUED , runType = "Manual" )
322- runNotebookJobTask .delay (notebookId = notebookId , notebookRunLogsId = notebookRunLogs .id , runType = "Manual" )
329+ runNotebookJobTask .delay (notebookId = notebookId , notebookRunLogsId = notebookRunLogs .id , runType = "Manual" , workspaceId = workspaceId )
323330 res .update (True , "Notebook triggered successfully" , None )
324331 return res
325332
@@ -341,56 +348,61 @@ def stopNotebookJob(notebookId: str):
341348 return res
342349
343350 @staticmethod
344- def clearNotebookResults (notebookId : str ):
351+ def clearNotebookResults (notebookId : str , workspaceId : int = 0 ):
345352 """
346353 Service to clear notebook job
347354 """
355+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
348356 res = ApiResponse (message = "Error in clearing notebook" )
349- response = Zeppelin .clearNotebookResults (notebookId )
357+ response = ZeppelinAPI ( workspaceName ) .clearNotebookResults (notebookId )
350358 if response :
351359 res .update (True , "Notebook cleared successfully" , None )
352360 return res
353361
354362 @staticmethod
355- def cloneNotebook (notebookId : str , payload : dict ):
363+ def cloneNotebook (notebookId : str , payload : dict , workspaceId : int = 0 ):
356364 """
357365 Service to clone notebook job
358366 """
367+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
359368 res = ApiResponse (message = "Error in cloning notebook" )
360- response = Zeppelin .cloneNotebook (notebookId , json .dumps (payload ))
369+ response = ZeppelinAPI ( workspaceName ) .cloneNotebook (notebookId , json .dumps (payload ))
361370 if response :
362371 res .update (True , "Notebook cloned successfully" , None )
363372 return res
364373
365374 @staticmethod
366- def archiveNotebook (notebookId : str , notebookName : str ):
375+ def archiveNotebook (notebookId : str , notebookName : str , workspaceId : int = 0 ):
367376 """
368377 Service to run notebook
369378 """
379+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
370380 res = ApiResponse (message = "Error in archiving notebook" )
371- response = Zeppelin .renameNotebook (notebookId , "~Trash/" + notebookName )
381+ response = ZeppelinAPI ( workspaceName ) .renameNotebook (notebookId , "~Trash/" + notebookName )
372382 if response :
373383 res .update (True , "Notebook archived successfully" , None )
374384 return res
375385
376386 @staticmethod
377- def unarchiveNotebook (notebookId : str , notebookName : str ):
387+ def unarchiveNotebook (notebookId : str , notebookName : str , workspaceId : int = 0 ):
378388 """
379389 Service to unarchive notebook
380390 """
391+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
381392 res = ApiResponse (message = "Error in archiving notebook" )
382- response = Zeppelin .renameNotebook (notebookId , notebookName )
393+ response = ZeppelinAPI ( workspaceName ) .renameNotebook (notebookId , notebookName )
383394 if response :
384395 res .update (True , "Notebook unarchived successfully" , None )
385396 return res
386397
387398 @staticmethod
388- def deleteNotebook (notebookId : str ):
399+ def deleteNotebook (notebookId : str , workspaceId : int = 0 ):
389400 """
390401 Service to run notebook job
391402 """
403+ workspaceName = helperFunctions .getWorkspaceName (workspaceId )
392404 res = ApiResponse (message = "Error in deleting notebook" )
393- response = Zeppelin .deleteNotebook (notebookId )
405+ response = ZeppelinAPI ( workspaceName ) .deleteNotebook (notebookId )
394406 if response :
395407 NotebookObject .objects .filter (notebookZeppelinId = notebookId ).delete ()
396408 res .update (True , "Notebook deleted successfully" , None )
0 commit comments