1515# See the License for the specific language governing permissions and
1616# limitations under the License.
1717
18- import re
1918import json
20- import time
2119import logging
20+ import re
2221import threading
22+ import time
2323from builtins import object
2424
2525from django .core .cache import caches
6666 hiveserver2_use_ssl ,
6767)
6868from beeswax .models import QUERY_TYPES , Compute , QueryHistory
69- from desktop .conf import CLUSTER_ID , has_connectors
69+ from desktop .conf import has_connectors
7070from desktop .lib .django_util import format_preserving_redirect
7171from desktop .lib .exceptions_renderable import PopupException
7272from desktop .lib .parameterization import substitute_variables
@@ -108,7 +108,6 @@ def get_zk_hs2():
108108 # Filter nodes that match the expected pattern before sorting
109109 sequence_nodes = [x for x in hiveservers if re .search (r'sequence=\d+' , x )]
110110 LOG .info ("Selecting Hive server via the following sequence_nodes {0}" .format (sequence_nodes ))
111- # ['serverUri=dw-dev18:10000;version=3.1.2;sequence=0000000052', 'serverUri=dw-dev17:10000;version=3.1.2;sequence=0000000051']
112111
113112 if sequence_nodes :
114113 # Sort the filtered list based on the sequence number
@@ -148,20 +147,20 @@ def get(user, query_server=None, cluster=None):
148147 from impala .dbms import ImpalaDbms
149148 from impala .server import ImpalaServerClient
150149 DBMS_CACHE [user .id ][query_server ['server_name' ]] = ImpalaDbms (
151- HiveServerClientCompatible (ImpalaServerClient (query_server , user )),
152- QueryHistory .SERVER_TYPE [1 ][0 ]
150+ HiveServerClientCompatible (ImpalaServerClient (query_server , user )),
151+ QueryHistory .SERVER_TYPE [1 ][0 ]
153152 )
154153 elif query_server ['server_name' ] == 'hms' :
155154 from beeswax .server .hive_metastore_server import HiveMetastoreClient
156155 DBMS_CACHE [user .id ][query_server ['server_name' ]] = HiveServer2Dbms (
157- HiveMetastoreClient (query_server , user ),
158- QueryHistory .SERVER_TYPE [1 ][0 ]
156+ HiveMetastoreClient (query_server , user ),
157+ QueryHistory .SERVER_TYPE [1 ][0 ]
159158 )
160159 else :
161160 from beeswax .server .hive_server2_lib import HiveServerClient
162161 DBMS_CACHE [user .id ][query_server ['server_name' ]] = HiveServer2Dbms (
163- HiveServerClientCompatible (HiveServerClient (query_server , user )),
164- QueryHistory .SERVER_TYPE [1 ][0 ]
162+ HiveServerClientCompatible (HiveServerClient (query_server , user )),
163+ QueryHistory .SERVER_TYPE [1 ][0 ]
165164 )
166165 elif RESET_HS2_QUERY_SERVER :
167166 from beeswax .server .hive_server2_lib import HiveServerClient , HiveServerClientCompatible
@@ -205,10 +204,10 @@ def get_query_server_config(name='beeswax', connector=None):
205204 cache .set (
206205 "llap" ,
207206 json .dumps ({
208- "host" : llap_servers ["addresses" ][0 ]["host" ],
209- "port" : llap_servers ["addresses" ][0 ]["port" ]
210- }),
211- CACHE_TIMEOUT .get ()
207+ "host" : llap_servers ["addresses" ][0 ]["host" ],
208+ "port" : llap_servers ["addresses" ][0 ]["port" ]
209+ }),
210+ CACHE_TIMEOUT .get ()
212211 )
213212 else :
214213 LOG .error ("Hive LLAP endpoint not found, reverting to config values" )
@@ -296,45 +295,45 @@ def get_query_server_config(name='beeswax', connector=None):
296295 elif name == 'hms' :
297296 kerberos_principal = get_hiveserver2_kerberos_principal (HIVE_SERVER_HOST .get ())
298297 query_server = {
299- 'server_name' : 'hms' ,
300- 'server_host' : HIVE_METASTORE_HOST .get () if not cluster_config else cluster_config . get ( 'server_host' ),
301- 'server_port' : HIVE_METASTORE_PORT .get (),
302- 'principal' : kerberos_principal ,
303- 'transport_mode' : 'http' if hiveserver2_transport_mode () == 'HTTP' else 'socket' ,
304- 'auth_username' : AUTH_USERNAME .get (),
305- 'auth_password' : AUTH_PASSWORD .get (),
306- 'use_sasl' : HIVE_USE_SASL .get ()
298+ 'server_name' : 'hms' ,
299+ 'server_host' : HIVE_METASTORE_HOST .get (),
300+ 'server_port' : HIVE_METASTORE_PORT .get (),
301+ 'principal' : kerberos_principal ,
302+ 'transport_mode' : 'http' if hiveserver2_transport_mode () == 'HTTP' else 'socket' ,
303+ 'auth_username' : AUTH_USERNAME .get (),
304+ 'auth_password' : AUTH_PASSWORD .get (),
305+ 'use_sasl' : HIVE_USE_SASL .get ()
307306 }
308307 else :
309308 kerberos_principal = get_hiveserver2_kerberos_principal (activeEndpoint ["host" ])
310309 query_server = {
311- 'server_name' : 'beeswax' if name != 'hplsql' else 'hplsql' ,
312- 'server_host' : activeEndpoint ["host" ],
313- 'server_port' : LLAP_SERVER_PORT .get () if name == 'llap' else int (activeEndpoint ["port" ]),
314- 'principal' : kerberos_principal ,
315- 'http_url' : '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % {
316- 'protocol' : 'https' if hiveserver2_use_ssl () else 'http' ,
317- 'host' : activeEndpoint ["host" ],
318- 'port' : activeEndpoint ["port" ],
319- 'end_point' : hiveserver2_thrift_http_path ()
320- },
321- 'transport_mode' : 'http' if hiveserver2_transport_mode () == 'HTTP' else 'socket' ,
322- 'auth_username' : AUTH_USERNAME .get (),
323- 'auth_password' : AUTH_PASSWORD .get (),
324- 'use_sasl' : HIVE_USE_SASL .get (),
325- 'close_sessions' : CLOSE_SESSIONS .get (),
326- 'has_session_pool' : has_session_pool (),
327- 'max_number_of_sessions' : MAX_NUMBER_OF_SESSIONS .get ()
328- }
310+ 'server_name' : 'beeswax' if name != 'hplsql' else 'hplsql' ,
311+ 'server_host' : activeEndpoint ["host" ],
312+ 'server_port' : LLAP_SERVER_PORT .get () if name == 'llap' else int (activeEndpoint ["port" ]),
313+ 'principal' : kerberos_principal ,
314+ 'http_url' : '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % {
315+ 'protocol' : 'https' if hiveserver2_use_ssl () else 'http' ,
316+ 'host' : activeEndpoint ["host" ],
317+ 'port' : activeEndpoint ["port" ],
318+ 'end_point' : hiveserver2_thrift_http_path ()
319+ },
320+ 'transport_mode' : 'http' if hiveserver2_transport_mode () == 'HTTP' else 'socket' ,
321+ 'auth_username' : AUTH_USERNAME .get (),
322+ 'auth_password' : AUTH_PASSWORD .get (),
323+ 'use_sasl' : HIVE_USE_SASL .get (),
324+ 'close_sessions' : CLOSE_SESSIONS .get (),
325+ 'has_session_pool' : has_session_pool (),
326+ 'max_number_of_sessions' : MAX_NUMBER_OF_SESSIONS .get ()
327+ }
329328
330329 if name == 'sparksql' : # Extends Hive as very similar
331330 from spark .conf import SQL_SERVER_HOST as SPARK_SERVER_HOST , SQL_SERVER_PORT as SPARK_SERVER_PORT , USE_SASL as SPARK_USE_SASL
332331
333332 query_server .update ({
334- 'server_name' : 'sparksql' ,
335- 'server_host' : SPARK_SERVER_HOST .get (),
336- 'server_port' : SPARK_SERVER_PORT .get (),
337- 'use_sasl' : SPARK_USE_SASL .get ()
333+ 'server_name' : 'sparksql' ,
334+ 'server_host' : SPARK_SERVER_HOST .get (),
335+ 'server_port' : SPARK_SERVER_PORT .get (),
336+ 'use_sasl' : SPARK_USE_SASL .get ()
338337 })
339338
340339 if not query_server .get ('dialect' ):
@@ -375,28 +374,28 @@ def get_query_server_config_via_connector(connector):
375374 auth_password = dbms_conf .AUTH_PASSWORD .get ()
376375
377376 return {
378- 'is_compute' : True ,
379- 'dialect' : compute ['dialect' ],
380- 'server_name' : compute_name ,
381- 'server_host' : server_host ,
382- 'server_port' : server_port ,
383- # For connectors/computes, the auth details are not available
384- # from configs and needs patching before submitting requests
385- 'principal' : 'TODO' ,
386- 'auth_username' : compute ['options' ].get ('auth_username' , auth_username ),
387- 'auth_password' : compute ['options' ].get ('auth_password' , auth_password ),
388-
389- 'impersonation_enabled' : impersonation_enabled ,
390- 'use_sasl' : str (compute ['options' ].get ('use_sasl' , True )).upper () == 'TRUE' ,
391- 'SESSION_TIMEOUT_S' : 15 * 60 ,
392- 'querycache_rows' : 1000 ,
393- 'QUERY_TIMEOUT_S' : 15 * 60 ,
394- 'transport_mode' : compute ['options' ].get ('transport_mode' , 'http' ),
395- 'http_url' : compute ['options' ].get ('http_url' , 'http://%s:%s/cliservice' % (server_host , server_port )),
396-
397- 'close_sessions' : str (compute ['options' ].get ('close_sessions' , True )).upper () == 'TRUE' ,
398- 'has_session_pool' : str (compute ['options' ].get ('has_session_pool' , False )).upper () == 'TRUE' ,
399- 'max_number_of_sessions' : compute ['options' ].get ('has_session_pool' , - 1 )
377+ 'is_compute' : True ,
378+ 'dialect' : compute ['dialect' ],
379+ 'server_name' : compute_name ,
380+ 'server_host' : server_host ,
381+ 'server_port' : server_port ,
382+ # For connectors/computes, the auth details are not available
383+ # from configs and needs patching before submitting requests
384+ 'principal' : 'TODO' ,
385+ 'auth_username' : compute ['options' ].get ('auth_username' , auth_username ),
386+ 'auth_password' : compute ['options' ].get ('auth_password' , auth_password ),
387+
388+ 'impersonation_enabled' : impersonation_enabled ,
389+ 'use_sasl' : str (compute ['options' ].get ('use_sasl' , True )).upper () == 'TRUE' ,
390+ 'SESSION_TIMEOUT_S' : 15 * 60 ,
391+ 'querycache_rows' : 1000 ,
392+ 'QUERY_TIMEOUT_S' : 15 * 60 ,
393+ 'transport_mode' : compute ['options' ].get ('transport_mode' , 'http' ),
394+ 'http_url' : compute ['options' ].get ('http_url' , 'http://%s:%s/cliservice' % (server_host , server_port )),
395+
396+ 'close_sessions' : str (compute ['options' ].get ('close_sessions' , True )).upper () == 'TRUE' ,
397+ 'has_session_pool' : str (compute ['options' ].get ('has_session_pool' , False )).upper () == 'TRUE' ,
398+ 'max_number_of_sessions' : compute ['options' ].get ('has_session_pool' , - 1 )
400399 }
401400
402401
@@ -426,7 +425,7 @@ def __init__(self, client, server_type):
426425 self .client = client
427426 self .server_type = server_type
428427 self .server_name = self .client .query_server .get ('dialect' ) if self .client .query_server ['server_name' ].isdigit () \
429- else self .client .query_server ['server_name' ]
428+ else self .client .query_server ['server_name' ]
430429
431430 @classmethod
432431 def to_matching_wildcard (cls , identifier = None ):
@@ -514,10 +513,10 @@ def _get_tables_via_sparksql(self, database, table_names='*'):
514513
515514 # We get back: database | tableName | isTemporary
516515 return [{
517- 'name' : row [1 ],
518- 'type' : 'VIEW' if row [2 ] else 'TABLE' ,
519- 'comment' : ''
520- }
516+ 'name' : row [1 ],
517+ 'type' : 'VIEW' if row [2 ] else 'TABLE' ,
518+ 'comment' : ''
519+ }
521520 for row in result .rows ()
522521 ]
523522 else :
@@ -529,10 +528,10 @@ def get_table(self, database, table_name):
529528 except QueryServerException as e :
530529 LOG .debug ("Seems like %s.%s could be a Kudu table" % (database , table_name ))
531530 if 'java.lang.ClassNotFoundException' in e .message and [
532- prop
533- for prop in self .get_table_properties (database , table_name , property_name = 'storage_handler' ).rows ()
534- if 'KuduStorageHandler' in prop [0 ]
535- ]:
531+ prop
532+ for prop in self .get_table_properties (database , table_name , property_name = 'storage_handler' ).rows ()
533+ if 'KuduStorageHandler' in prop [0 ]
534+ ]:
536535 query_server = get_query_server_config ('impala' )
537536 db = get (self .client .user , query_server )
538537 table = db .get_table (database , table_name )
@@ -630,9 +629,9 @@ def execute_statement(self, hql):
630629
631630 def fetch (self , query_handle , start_over = False , rows = None ):
632631 no_start_over_support = [
633- config_variable
634- for config_variable in self .get_default_configuration (False )
635- if config_variable .key == 'support_start_over' and config_variable .value == 'false'
632+ config_variable
633+ for config_variable in self .get_default_configuration (False )
634+ if config_variable .key == 'support_start_over' and config_variable .value == 'false'
636635 ]
637636 if no_start_over_support :
638637 start_over = False
@@ -830,28 +829,28 @@ def get_table_columns_stats(self, database, table, column):
830829 return self ._extract_impala_column (data )
831830 else :
832831 return [
833- {'col_name' : data [2 ][0 ]},
834- {'data_type' : data [2 ][1 ]},
835- {'min' : data [2 ][2 ]},
836- {'max' : data [2 ][3 ]},
837- {'num_nulls' : data [2 ][4 ]},
838- {'distinct_count' : data [2 ][5 ]},
839- {'avg_col_len' : data [2 ][6 ]},
840- {'max_col_len' : data [2 ][7 ]},
841- {'num_trues' : data [2 ][8 ]},
842- {'num_falses' : data [2 ][9 ]}
832+ {'col_name' : data [2 ][0 ]},
833+ {'data_type' : data [2 ][1 ]},
834+ {'min' : data [2 ][2 ]},
835+ {'max' : data [2 ][3 ]},
836+ {'num_nulls' : data [2 ][4 ]},
837+ {'distinct_count' : data [2 ][5 ]},
838+ {'avg_col_len' : data [2 ][6 ]},
839+ {'max_col_len' : data [2 ][7 ]},
840+ {'num_trues' : data [2 ][8 ]},
841+ {'num_falses' : data [2 ][9 ]}
843842 ]
844843 else :
845844 return []
846845
847846 def _extract_impala_column (self , col ):
848847 return [
849- {'col_name' : col [0 ]},
850- {'data_type' : col [1 ]},
851- {'distinct_count' : col [2 ]},
852- {'num_nulls' : col [3 ]},
853- {'max_col_len' : col [4 ]},
854- {'avg_col_len' : col [5 ]},
848+ {'col_name' : col [0 ]},
849+ {'data_type' : col [1 ]},
850+ {'distinct_count' : col [2 ]},
851+ {'num_nulls' : col [3 ]},
852+ {'max_col_len' : col [4 ]},
853+ {'avg_col_len' : col [5 ]},
855854 ]
856855
857856 def get_table_properties (self , database , table , property_name = None ):
@@ -890,7 +889,7 @@ def get_top_terms(self, database, table, column, limit=30, prefix=None):
890889 GROUP BY %(column)s
891890 ORDER BY ct DESC
892891 LIMIT %(limit)s''' % {
893- 'database' : database , 'table' : table , 'column' : column , 'prefix_match' : prefix_match , 'limit' : limit ,
892+ 'database' : database , 'table' : table , 'column' : column , 'prefix_match' : prefix_match , 'limit' : limit ,
894893 }
895894
896895 query = hql_query (hql )
@@ -1048,7 +1047,7 @@ def create_table_as_a_select(self, request, query_history, target_database, targ
10481047 except Exception as double_trouble :
10491048 LOG .exception ('Failed to drop table "%s" as well: %s' % (target_table , double_trouble ))
10501049 raise ex
1051- url = format_preserving_redirect ( request , reverse ( 'metastore:index' ))
1050+
10521051
10531052 return query_history
10541053
@@ -1121,17 +1120,17 @@ def execute_and_watch(self, query, design=None, query_history=None):
11211120 hql_query = query .hql_query
11221121 if query_history is None :
11231122 query_history = QueryHistory .build (
1124- owner = self .client .user ,
1125- query = hql_query ,
1126- server_host = '%(server_host)s' % self .client .query_server ,
1127- server_port = '%(server_port)d' % self .client .query_server ,
1128- server_name = '%(server_name)s' % self .client .query_server ,
1129- server_type = self .server_type ,
1130- last_state = QueryHistory .STATE .submitted .value ,
1131- design = design ,
1132- notify = query .query .get ('email_notify' , False ),
1133- query_type = query .query ['type' ],
1134- statement_number = 0
1123+ owner = self .client .user ,
1124+ query = hql_query ,
1125+ server_host = '%(server_host)s' % self .client .query_server ,
1126+ server_port = '%(server_port)d' % self .client .query_server ,
1127+ server_name = '%(server_name)s' % self .client .query_server ,
1128+ server_type = self .server_type ,
1129+ last_state = QueryHistory .STATE .submitted .value ,
1130+ design = design ,
1131+ notify = query .query .get ('email_notify' , False ),
1132+ query_type = query .query ['type' ],
1133+ statement_number = 0
11351134 )
11361135 query_history .save ()
11371136
@@ -1240,9 +1239,6 @@ def get_functions(self, prefix=None, database=None):
12401239 '''
12411240 Not using self.client.get_functions() as pretty limited. More comments there.
12421241 '''
1243- result = None
1244-
1245- function_filter = "'%s*'" % prefix if prefix else ''
12461242
12471243 if self .client .query_server ['dialect' ] == 'impala' :
12481244 if database is None :
@@ -1300,7 +1296,7 @@ def get_primary_keys(self, database_name, table_name, catalog_name=None):
13001296 )
13011297
13021298 def get_foreign_keys (self , parent_catalog_name = None , parent_database_name = None , parent_table_name = None , foreign_catalog_name = None ,
1303- foreign_database_name = None , foreign_table_name = None ):
1299+ foreign_database_name = None , foreign_table_name = None ):
13041300
13051301 return self .client .get_foreign_keys (
13061302 parent_catalog_name = parent_catalog_name ,
0 commit comments