@@ -179,7 +179,6 @@ class HS2Api(Api):
179179 def get_properties (lang = 'hive' ):
180180 return ImpalaConfiguration .PROPERTIES if lang == 'impala' else HiveConfiguration .PROPERTIES
181181
182-
183182 @query_error_handler
184183 def create_session (self , lang = 'hive' , properties = None ):
185184 application = 'beeswax' if lang == 'hive' or lang == 'llap' else lang
@@ -246,7 +245,6 @@ def create_session(self, lang='hive', properties=None):
246245
247246 return response
248247
249-
250248 @query_error_handler
251249 def close_session (self , session ):
252250 app_name = session .get ('type' )
@@ -281,7 +279,6 @@ def close_session(self, session):
281279
282280 return response
283281
284-
285282 def close_session_idle (self , notebook , session ):
286283 idle = True
287284 response = {'result' : []}
@@ -317,16 +314,14 @@ def execute(self, notebook, snippet):
317314 db = self ._get_db (snippet , interpreter = self .interpreter )
318315
319316 statement = self ._get_current_statement (notebook , snippet )
320- compute = snippet .get ('compute' , {})
321- session_type = compute ['name' ] if is_compute (snippet ) and compute .get ('name' ) else snippet ['type' ]
317+ session_type = self ._find_session_type (snippet )
322318 session = self ._get_session (notebook , session_type )
323319
324320 query = self ._prepare_hql_query (snippet , statement ['statement' ], session )
325321 _session = self ._get_session_by_id (notebook , session_type )
326322
327-
328323 try :
329- if statement .get ('statement_id' ) == 0 : # TODO: move this to client
324+ if statement .get ('statement_id' ) == 0 : # TODO: move this to client
330325 if query .database and not statement ['statement' ].lower ().startswith ('set' ):
331326 result = db .use (query .database , session = _session )
332327 if result .session :
@@ -356,6 +351,14 @@ def execute(self, notebook, snippet):
356351
357352 return response
358353
354+ def _find_session_type (self , snippet ):
355+ compute = snippet .get ('compute' , {})
356+ if is_compute (snippet ) and compute .get ('name' ):
357+ return compute ['name' ]
358+ connector = snippet .get ('connector' , {})
359+ if connector and connector .get ('name' ):
360+ return connector ['name' ]
361+ return snippet .get ('type' )
359362
360363 @query_error_handler
361364 def check_status (self , notebook , snippet ):
@@ -384,7 +387,6 @@ def check_status(self, notebook, snippet):
384387
385388 return response
386389
387-
388390 @query_error_handler
389391 def fetch_result (self , notebook , snippet , rows , start_over ):
390392 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -411,7 +413,6 @@ def fetch_result(self, notebook, snippet, rows, start_over):
411413 'type' : 'table'
412414 }
413415
414-
415416 @query_error_handler
416417 def fetch_result_size (self , notebook , snippet ):
417418 resp = {
@@ -440,7 +441,6 @@ def fetch_result_size(self, notebook, snippet):
440441
441442 return resp
442443
443-
444444 @query_error_handler
445445 def cancel (self , notebook , snippet ):
446446 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -449,15 +449,13 @@ def cancel(self, notebook, snippet):
449449 db .cancel_operation (handle )
450450 return {'status' : 0 }
451451
452-
453452 @query_error_handler
454453 def get_log (self , notebook , snippet , startFrom = None , size = None ):
455454 db = self ._get_db (snippet , interpreter = self .interpreter )
456455
457456 handle = self ._get_handle (snippet )
458457 return db .get_log (handle , start_over = startFrom == 0 )
459458
460-
461459 @query_error_handler
462460 def close_statement (self , notebook , snippet ):
463461 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -472,7 +470,6 @@ def close_statement(self, notebook, snippet):
472470 raise e
473471 return {'status' : 0 }
474472
475-
476473 def can_start_over (self , notebook , snippet ):
477474 try :
478475 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -484,13 +481,12 @@ def can_start_over(self, notebook, snippet):
484481 raise e
485482 return can_start_over
486483
487-
488484 @query_error_handler
489485 def progress (self , notebook , snippet , logs = '' ):
490486 patch_snippet_for_connector (snippet )
491487
492488 if snippet ['dialect' ] == 'hive' :
493- match = re .search ('Total jobs = (\d+)' , logs , re .MULTILINE )
489+ match = re .search (r 'Total jobs = (\d+)' , logs , re .MULTILINE )
494490 total = int (match .group (1 )) if match else 1
495491
496492 started = logs .count ('Starting Job' )
@@ -499,13 +495,12 @@ def progress(self, notebook, snippet, logs=''):
499495 progress = int ((started + ended ) * 100 / (total * 2 ))
500496 return max (progress , 5 ) # Return 5% progress as a minimum
501497 elif snippet ['dialect' ] == 'impala' :
502- match = re .findall ('(\d+)% Complete' , logs , re .MULTILINE )
498+ match = re .findall (r '(\d+)% Complete' , logs , re .MULTILINE )
503499 # Retrieve the last reported progress percentage if it exists
504500 return int (match [- 1 ]) if match and isinstance (match , list ) else 0
505501 else :
506502 return 50
507503
508-
509504 @query_error_handler
510505 def get_jobs (self , notebook , snippet , logs ):
511506 jobs = []
@@ -552,7 +547,6 @@ def get_jobs(self, notebook, snippet, logs):
552547
553548 return jobs
554549
555-
556550 @query_error_handler
557551 def autocomplete (self , snippet , database = None , table = None , column = None , nested = None , operation = None ):
558552 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -577,7 +571,6 @@ def autocomplete(self, snippet, database=None, table=None, column=None, nested=N
577571
578572 return resp
579573
580-
581574 @query_error_handler
582575 def get_sample_data (self , snippet , database = None , table = None , column = None , is_async = False , operation = None ):
583576 try :
@@ -586,7 +579,6 @@ def get_sample_data(self, snippet, database=None, table=None, column=None, is_as
586579 except QueryServerException as ex :
587580 raise QueryError (ex .message )
588581
589-
590582 @query_error_handler
591583 def explain (self , notebook , snippet ):
592584 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -613,7 +605,6 @@ def explain(self, notebook, snippet):
613605 'statement' : statement ,
614606 }
615607
616-
617608 @query_error_handler
618609 def export_data_as_hdfs_file (self , snippet , target_file , overwrite ):
619610 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -626,8 +617,7 @@ def export_data_as_hdfs_file(self, snippet, target_file, overwrite):
626617
627618 return '/filebrowser/view=%s' % urllib_quote (
628619 urllib_quote (target_file .encode ('utf-8' ), safe = SAFE_CHARACTERS_URI_COMPONENTS )
629- ) # Quote twice, because of issue in the routing on client
630-
620+ ) # Quote twice, because of issue in the routing on client
631621
632622 def export_data_as_table (self , notebook , snippet , destination , is_temporary = False , location = None ):
633623 db = self ._get_db (snippet , interpreter = self .interpreter )
@@ -654,7 +644,6 @@ def export_data_as_table(self, notebook, snippet, destination, is_temporary=Fals
654644
655645 return hql , success_url
656646
657-
658647 def export_large_data_to_hdfs (self , notebook , snippet , destination ):
659648 response = self ._get_current_statement (notebook , snippet )
660649 session = self ._get_session (notebook , snippet ['type' ])
@@ -684,7 +673,6 @@ def export_large_data_to_hdfs(self, notebook, snippet, destination):
684673
685674 return hql , success_url
686675
687-
688676 def upgrade_properties (self , lang = 'hive' , properties = None ):
689677 upgraded_properties = copy .deepcopy (self .get_properties (lang ))
690678
@@ -708,7 +696,6 @@ def upgrade_properties(self, lang='hive', properties=None):
708696
709697 return upgraded_properties
710698
711-
712699 def _get_session (self , notebook , type = 'hive' ):
713700 session = next ((session for session in notebook ['sessions' ] if session ['type' ] == type ), None )
714701 return session
@@ -723,7 +710,6 @@ def _get_session_by_id(self, notebook, type='hive'):
723710 filters ['owner' ] = self .user
724711 return Session .objects .get (** filters )
725712
726-
727713 def _get_hive_execution_engine (self , notebook , snippet ):
728714 # Get hive.execution.engine from snippet properties, if none, then get from session
729715 properties = snippet ['properties' ]
@@ -746,7 +732,6 @@ def _get_hive_execution_engine(self, notebook, snippet):
746732
747733 return engine
748734
749-
750735 def _prepare_hql_query (self , snippet , statement , session ):
751736 settings = snippet ['properties' ].get ('settings' , None )
752737 file_resources = snippet ['properties' ].get ('files' , None )
@@ -775,7 +760,6 @@ def _prepare_hql_query(self, snippet, statement, session):
775760 database = database
776761 )
777762
778-
779763 def get_browse_query (self , snippet , database , table , partition_spec = None ):
780764 db = self ._get_db (snippet , interpreter = self .interpreter )
781765 table = db .get_table (database , table )
@@ -789,7 +773,6 @@ def get_browse_query(self, snippet, database, table, partition_spec=None):
789773 else :
790774 return db .get_select_star_query (database , table , limit = 100 )
791775
792-
793776 def _get_handle (self , snippet ):
794777 try :
795778 handle = snippet ['result' ]['handle' ].copy ()
@@ -805,7 +788,6 @@ def _get_handle(self, snippet):
805788
806789 return HiveServerQueryHandle (** handle )
807790
808-
809791 def _get_db (self , snippet , is_async = False , interpreter = None ):
810792 if interpreter and interpreter .get ('dialect' ):
811793 dialect = interpreter ['dialect' ]
@@ -828,7 +810,6 @@ def _get_db(self, snippet, is_async=False, interpreter=None):
828810 # Note: name is not used if interpreter is present
829811 return dbms .get (self .user , query_server = get_query_server_config (name = name , connector = interpreter ))
830812
831-
832813 def _parse_job_counters (self , job_id ):
833814 # Attempt to fetch total records from the job's Hive counter
834815 total_records , total_size = None , None
@@ -864,7 +845,6 @@ def _parse_job_counters(self, job_id):
864845
865846 return total_records , total_size
866847
867-
868848 def _get_hive_result_size (self , notebook , snippet ):
869849 total_records , total_size , msg = None , None , None
870850 engine = self ._get_hive_execution_engine (notebook , snippet ).lower ()
@@ -879,8 +859,8 @@ def _get_hive_result_size(self, notebook, snippet):
879859 else :
880860 msg = _ ('Hive query did not execute any jobs.' )
881861 elif engine == 'spark' :
882- total_records_re = "RECORDS_OUT_0: (?P<total_records>\d+)"
883- total_size_re = "Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P<total_size>\d+)"
862+ total_records_re = r "RECORDS_OUT_0: (?P<total_records>\d+)"
863+ total_size_re = r "Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P<total_size>\d+)"
884864 total_records_match = re .search (total_records_re , logs , re .MULTILINE )
885865 total_size_match = re .search (total_size_re , logs , re .MULTILINE )
886866
@@ -891,7 +871,6 @@ def _get_hive_result_size(self, notebook, snippet):
891871
892872 return total_records , total_size , msg
893873
894-
895874 def _get_impala_result_size (self , notebook , snippet ):
896875 total_records_match = None
897876 total_records , total_size , msg = None , None , None
@@ -904,7 +883,7 @@ def _get_impala_result_size(self, notebook, snippet):
904883
905884 fragment = self ._get_impala_query_profile (server_url , query_id = query_id )
906885 total_records_re = \
907- "Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P<total_records>\d+)\).*?(Averaged Fragment F\d\d)"
886+ r "Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P<total_records>\d+)\).*?(Averaged Fragment F\d\d)"
908887 total_records_match = re .search (total_records_re , fragment , re .MULTILINE | re .DOTALL )
909888
910889 if total_records_match :
@@ -917,7 +896,6 @@ def _get_impala_result_size(self, notebook, snippet):
917896
918897 return total_records , total_size , msg
919898
920-
921899 def _get_impala_query_id (self , snippet ):
922900 guid = None
923901 if 'result' in snippet and 'handle' in snippet ['result' ] and 'guid' in snippet ['result' ]['handle' ]:
@@ -929,7 +907,6 @@ def _get_impala_query_id(self, snippet):
929907 LOG .warning ('Snippet does not contain a valid result handle, cannot extract Impala query ID.' )
930908 return guid
931909
932-
933910 def _get_impala_query_profile (self , server_url , query_id ):
934911 api = get_impalad_api (user = self .user , url = server_url )
935912
@@ -944,18 +921,15 @@ def _get_impala_query_profile(self, server_url, query_id):
944921
945922 return profile
946923
947-
948924 def _get_impala_profile_plan (self , query_id , profile ):
949- query_plan_re = "Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id' : query_id }
925+ query_plan_re = r "Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id' : query_id }
950926 query_plan_match = re .search (query_plan_re , profile , re .MULTILINE | re .DOTALL )
951927 return query_plan_match .group () if query_plan_match else None
952928
953-
954929 def describe_column (self , notebook , snippet , database = None , table = None , column = None ):
955930 db = self ._get_db (snippet , interpreter = self .interpreter )
956931 return db .get_table_columns_stats (database , table , column )
957932
958-
959933 def describe_table (self , notebook , snippet , database = None , table = None ):
960934 db = self ._get_db (snippet , interpreter = self .interpreter )
961935 tb = db .get_table (database , table )
0 commit comments