Skip to content

Commit 3eff591

Browse files
[computes] fixed session-type extraction for connectors
The problem is that connector based query execution is not able to reuse session to fetch results. The frontend is sending the correct session_id but our session fetching logic got broken when the computes was implemented. we are now looking for the session_type from compute['name'] for computes, connector['name'] for connector and then snippets['type'] for old config file based hive/impala sessions. Change-Id: I386f3af1b0057e078e7b804f447f401e976b433a
1 parent c1875de commit 3eff591

File tree

1 file changed

+20
-45
lines changed

1 file changed

+20
-45
lines changed

desktop/libs/notebook/src/notebook/connectors/hiveserver2.py

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
# limitations under the License.
1717

1818
from __future__ import division
19-
from future import standard_library
20-
standard_library.install_aliases()
2119
from builtins import next, object
2220
import binascii
2321
import copy
@@ -44,6 +42,9 @@
4442
from notebook.connectors.base import Api, QueryError, QueryExpired, OperationTimeout, OperationNotSupported, _get_snippet_name, Notebook, \
4543
get_interpreter, patch_snippet_for_connector
4644

45+
from future import standard_library
46+
standard_library.install_aliases()
47+
4748
if sys.version_info[0] > 2:
4849
from urllib.parse import quote as urllib_quote, unquote as urllib_unquote
4950
from django.utils.translation import gettext as _
@@ -179,7 +180,6 @@ class HS2Api(Api):
179180
def get_properties(lang='hive'):
180181
return ImpalaConfiguration.PROPERTIES if lang == 'impala' else HiveConfiguration.PROPERTIES
181182

182-
183183
@query_error_handler
184184
def create_session(self, lang='hive', properties=None):
185185
application = 'beeswax' if lang == 'hive' or lang == 'llap' else lang
@@ -246,7 +246,6 @@ def create_session(self, lang='hive', properties=None):
246246

247247
return response
248248

249-
250249
@query_error_handler
251250
def close_session(self, session):
252251
app_name = session.get('type')
@@ -281,7 +280,6 @@ def close_session(self, session):
281280

282281
return response
283282

284-
285283
def close_session_idle(self, notebook, session):
286284
idle = True
287285
response = {'result': []}
@@ -317,16 +315,14 @@ def execute(self, notebook, snippet):
317315
db = self._get_db(snippet, interpreter=self.interpreter)
318316

319317
statement = self._get_current_statement(notebook, snippet)
320-
compute = snippet.get('compute', {})
321-
session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type']
318+
session_type = self._find_session_type(snippet)
322319
session = self._get_session(notebook, session_type)
323320

324321
query = self._prepare_hql_query(snippet, statement['statement'], session)
325322
_session = self._get_session_by_id(notebook, session_type)
326323

327-
328324
try:
329-
if statement.get('statement_id') == 0: # TODO: move this to client
325+
if statement.get('statement_id') == 0: # TODO: move this to client
330326
if query.database and not statement['statement'].lower().startswith('set'):
331327
result = db.use(query.database, session=_session)
332328
if result.session:
@@ -356,6 +352,14 @@ def execute(self, notebook, snippet):
356352

357353
return response
358354

355+
def _find_session_type(self, snippet):
356+
compute = snippet.get('compute', {})
357+
if is_compute(snippet) and compute.get('name'):
358+
return compute['name']
359+
connector = snippet.get('connector', {})
360+
if connector and connector.get('name'):
361+
return connector['name']
362+
return snippet.get('type')
359363

360364
@query_error_handler
361365
def check_status(self, notebook, snippet):
@@ -384,7 +388,6 @@ def check_status(self, notebook, snippet):
384388

385389
return response
386390

387-
388391
@query_error_handler
389392
def fetch_result(self, notebook, snippet, rows, start_over):
390393
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -411,7 +414,6 @@ def fetch_result(self, notebook, snippet, rows, start_over):
411414
'type': 'table'
412415
}
413416

414-
415417
@query_error_handler
416418
def fetch_result_size(self, notebook, snippet):
417419
resp = {
@@ -440,7 +442,6 @@ def fetch_result_size(self, notebook, snippet):
440442

441443
return resp
442444

443-
444445
@query_error_handler
445446
def cancel(self, notebook, snippet):
446447
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -449,15 +450,13 @@ def cancel(self, notebook, snippet):
449450
db.cancel_operation(handle)
450451
return {'status': 0}
451452

452-
453453
@query_error_handler
454454
def get_log(self, notebook, snippet, startFrom=None, size=None):
455455
db = self._get_db(snippet, interpreter=self.interpreter)
456456

457457
handle = self._get_handle(snippet)
458458
return db.get_log(handle, start_over=startFrom == 0)
459459

460-
461460
@query_error_handler
462461
def close_statement(self, notebook, snippet):
463462
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -472,7 +471,6 @@ def close_statement(self, notebook, snippet):
472471
raise e
473472
return {'status': 0}
474473

475-
476474
def can_start_over(self, notebook, snippet):
477475
try:
478476
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -484,13 +482,12 @@ def can_start_over(self, notebook, snippet):
484482
raise e
485483
return can_start_over
486484

487-
488485
@query_error_handler
489486
def progress(self, notebook, snippet, logs=''):
490487
patch_snippet_for_connector(snippet)
491488

492489
if snippet['dialect'] == 'hive':
493-
match = re.search('Total jobs = (\d+)', logs, re.MULTILINE)
490+
match = re.search(r'Total jobs = (\d+)', logs, re.MULTILINE)
494491
total = int(match.group(1)) if match else 1
495492

496493
started = logs.count('Starting Job')
@@ -499,13 +496,12 @@ def progress(self, notebook, snippet, logs=''):
499496
progress = int((started + ended) * 100 / (total * 2))
500497
return max(progress, 5) # Return 5% progress as a minimum
501498
elif snippet['dialect'] == 'impala':
502-
match = re.findall('(\d+)% Complete', logs, re.MULTILINE)
499+
match = re.findall(r'(\d+)% Complete', logs, re.MULTILINE)
503500
# Retrieve the last reported progress percentage if it exists
504501
return int(match[-1]) if match and isinstance(match, list) else 0
505502
else:
506503
return 50
507504

508-
509505
@query_error_handler
510506
def get_jobs(self, notebook, snippet, logs):
511507
jobs = []
@@ -552,7 +548,6 @@ def get_jobs(self, notebook, snippet, logs):
552548

553549
return jobs
554550

555-
556551
@query_error_handler
557552
def autocomplete(self, snippet, database=None, table=None, column=None, nested=None, operation=None):
558553
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -577,7 +572,6 @@ def autocomplete(self, snippet, database=None, table=None, column=None, nested=N
577572

578573
return resp
579574

580-
581575
@query_error_handler
582576
def get_sample_data(self, snippet, database=None, table=None, column=None, is_async=False, operation=None):
583577
try:
@@ -586,7 +580,6 @@ def get_sample_data(self, snippet, database=None, table=None, column=None, is_as
586580
except QueryServerException as ex:
587581
raise QueryError(ex.message)
588582

589-
590583
@query_error_handler
591584
def explain(self, notebook, snippet):
592585
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -613,7 +606,6 @@ def explain(self, notebook, snippet):
613606
'statement': statement,
614607
}
615608

616-
617609
@query_error_handler
618610
def export_data_as_hdfs_file(self, snippet, target_file, overwrite):
619611
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -626,8 +618,7 @@ def export_data_as_hdfs_file(self, snippet, target_file, overwrite):
626618

627619
return '/filebrowser/view=%s' % urllib_quote(
628620
urllib_quote(target_file.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS)
629-
) # Quote twice, because of issue in the routing on client
630-
621+
) # Quote twice, because of issue in the routing on client
631622

632623
def export_data_as_table(self, notebook, snippet, destination, is_temporary=False, location=None):
633624
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -654,7 +645,6 @@ def export_data_as_table(self, notebook, snippet, destination, is_temporary=Fals
654645

655646
return hql, success_url
656647

657-
658648
def export_large_data_to_hdfs(self, notebook, snippet, destination):
659649
response = self._get_current_statement(notebook, snippet)
660650
session = self._get_session(notebook, snippet['type'])
@@ -684,7 +674,6 @@ def export_large_data_to_hdfs(self, notebook, snippet, destination):
684674

685675
return hql, success_url
686676

687-
688677
def upgrade_properties(self, lang='hive', properties=None):
689678
upgraded_properties = copy.deepcopy(self.get_properties(lang))
690679

@@ -708,7 +697,6 @@ def upgrade_properties(self, lang='hive', properties=None):
708697

709698
return upgraded_properties
710699

711-
712700
def _get_session(self, notebook, type='hive'):
713701
session = next((session for session in notebook['sessions'] if session['type'] == type), None)
714702
return session
@@ -723,7 +711,6 @@ def _get_session_by_id(self, notebook, type='hive'):
723711
filters['owner'] = self.user
724712
return Session.objects.get(**filters)
725713

726-
727714
def _get_hive_execution_engine(self, notebook, snippet):
728715
# Get hive.execution.engine from snippet properties, if none, then get from session
729716
properties = snippet['properties']
@@ -746,7 +733,6 @@ def _get_hive_execution_engine(self, notebook, snippet):
746733

747734
return engine
748735

749-
750736
def _prepare_hql_query(self, snippet, statement, session):
751737
settings = snippet['properties'].get('settings', None)
752738
file_resources = snippet['properties'].get('files', None)
@@ -775,7 +761,6 @@ def _prepare_hql_query(self, snippet, statement, session):
775761
database=database
776762
)
777763

778-
779764
def get_browse_query(self, snippet, database, table, partition_spec=None):
780765
db = self._get_db(snippet, interpreter=self.interpreter)
781766
table = db.get_table(database, table)
@@ -789,7 +774,6 @@ def get_browse_query(self, snippet, database, table, partition_spec=None):
789774
else:
790775
return db.get_select_star_query(database, table, limit=100)
791776

792-
793777
def _get_handle(self, snippet):
794778
try:
795779
handle = snippet['result']['handle'].copy()
@@ -805,7 +789,6 @@ def _get_handle(self, snippet):
805789

806790
return HiveServerQueryHandle(**handle)
807791

808-
809792
def _get_db(self, snippet, is_async=False, interpreter=None):
810793
if interpreter and interpreter.get('dialect'):
811794
dialect = interpreter['dialect']
@@ -828,7 +811,6 @@ def _get_db(self, snippet, is_async=False, interpreter=None):
828811
# Note: name is not used if interpreter is present
829812
return dbms.get(self.user, query_server=get_query_server_config(name=name, connector=interpreter))
830813

831-
832814
def _parse_job_counters(self, job_id):
833815
# Attempt to fetch total records from the job's Hive counter
834816
total_records, total_size = None, None
@@ -864,7 +846,6 @@ def _parse_job_counters(self, job_id):
864846

865847
return total_records, total_size
866848

867-
868849
def _get_hive_result_size(self, notebook, snippet):
869850
total_records, total_size, msg = None, None, None
870851
engine = self._get_hive_execution_engine(notebook, snippet).lower()
@@ -879,8 +860,8 @@ def _get_hive_result_size(self, notebook, snippet):
879860
else:
880861
msg = _('Hive query did not execute any jobs.')
881862
elif engine == 'spark':
882-
total_records_re = "RECORDS_OUT_0: (?P<total_records>\d+)"
883-
total_size_re = "Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P<total_size>\d+)"
863+
total_records_re = r"RECORDS_OUT_0: (?P<total_records>\d+)"
864+
total_size_re = r"Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P<total_size>\d+)"
884865
total_records_match = re.search(total_records_re, logs, re.MULTILINE)
885866
total_size_match = re.search(total_size_re, logs, re.MULTILINE)
886867

@@ -891,7 +872,6 @@ def _get_hive_result_size(self, notebook, snippet):
891872

892873
return total_records, total_size, msg
893874

894-
895875
def _get_impala_result_size(self, notebook, snippet):
896876
total_records_match = None
897877
total_records, total_size, msg = None, None, None
@@ -904,7 +884,7 @@ def _get_impala_result_size(self, notebook, snippet):
904884

905885
fragment = self._get_impala_query_profile(server_url, query_id=query_id)
906886
total_records_re = \
907-
"Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P<total_records>\d+)\).*?(Averaged Fragment F\d\d)"
887+
r"Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P<total_records>\d+)\).*?(Averaged Fragment F\d\d)"
908888
total_records_match = re.search(total_records_re, fragment, re.MULTILINE | re.DOTALL)
909889

910890
if total_records_match:
@@ -917,7 +897,6 @@ def _get_impala_result_size(self, notebook, snippet):
917897

918898
return total_records, total_size, msg
919899

920-
921900
def _get_impala_query_id(self, snippet):
922901
guid = None
923902
if 'result' in snippet and 'handle' in snippet['result'] and 'guid' in snippet['result']['handle']:
@@ -929,7 +908,6 @@ def _get_impala_query_id(self, snippet):
929908
LOG.warning('Snippet does not contain a valid result handle, cannot extract Impala query ID.')
930909
return guid
931910

932-
933911
def _get_impala_query_profile(self, server_url, query_id):
934912
api = get_impalad_api(user=self.user, url=server_url)
935913

@@ -944,18 +922,15 @@ def _get_impala_query_profile(self, server_url, query_id):
944922

945923
return profile
946924

947-
948925
def _get_impala_profile_plan(self, query_id, profile):
949-
query_plan_re = "Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id}
926+
query_plan_re = r"Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id}
950927
query_plan_match = re.search(query_plan_re, profile, re.MULTILINE | re.DOTALL)
951928
return query_plan_match.group() if query_plan_match else None
952929

953-
954930
def describe_column(self, notebook, snippet, database=None, table=None, column=None):
955931
db = self._get_db(snippet, interpreter=self.interpreter)
956932
return db.get_table_columns_stats(database, table, column)
957933

958-
959934
def describe_table(self, notebook, snippet, database=None, table=None):
960935
db = self._get_db(snippet, interpreter=self.interpreter)
961936
tb = db.get_table(database, table)

0 commit comments

Comments
 (0)