Skip to content

Commit 32d205d

Browse files
authored
[Importer] Fix importer for Parquet, ORC, and Avro formats (#3736)
* [Importer] Fixing importer for Parquet, ORC, and Avro formats * fix lint issues
1 parent bb57a00 commit 32d205d

File tree

2 files changed

+194
-53
lines changed

2 files changed

+194
-53
lines changed

desktop/libs/indexer/src/indexer/indexers/sql.py

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,38 +14,27 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.import logging
1616

17-
from future import standard_library
18-
standard_library.install_aliases()
19-
from builtins import object
2017
import csv
21-
import logging
22-
import sys
23-
import urllib.request, urllib.error
2418
import uuid
25-
19+
import logging
20+
import urllib.error
21+
import urllib.request
22+
from builtins import object
2623
from collections import OrderedDict
24+
from urllib.parse import unquote as urllib_unquote, urlparse
2725

2826
from django.urls import reverse
27+
from django.utils.translation import gettext as _
2928

3029
from azure.abfs.__init__ import abfspath
30+
from desktop.lib import django_mako
31+
from desktop.lib.exceptions_renderable import PopupException
32+
from desktop.settings import BASE_DIR
3133
from hadoop.fs.hadoopfs import Hdfs
3234
from notebook.connectors.base import get_interpreter
3335
from notebook.models import make_notebook
3436
from useradmin.models import User
3537

36-
from desktop.lib import django_mako
37-
from desktop.lib.exceptions_renderable import PopupException
38-
from desktop.settings import BASE_DIR
39-
40-
if sys.version_info[0] > 2:
41-
from urllib.parse import urlparse, unquote as urllib_unquote
42-
from django.utils.translation import gettext as _
43-
else:
44-
from django.utils.translation import ugettext as _
45-
from urllib import unquote as urllib_unquote
46-
from urlparse import urlparse
47-
48-
4938
LOG = logging.getLogger()
5039

5140

@@ -60,6 +49,7 @@
6049
LOG.warning("Impala app is not enabled")
6150
impala_conf = None
6251

52+
6353
class SQLIndexer(object):
6454

6555
def __init__(self, user, fs):
@@ -139,16 +129,16 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
139129
"escapeChar" = "\\\\"
140130
''' % source['format']
141131

142-
use_temp_table = table_format in ('parquet', 'orc', 'kudu') or is_transactional or isIceberg
143-
if use_temp_table: # We'll be using a temp table to load data
132+
use_temp_table = table_format in ('parquet', 'orc', 'kudu', 'avro') or is_transactional or isIceberg
133+
if use_temp_table: # We'll be using a temp table to load data
144134
if load_data:
145135
table_name, final_table_name = 'hue__tmp_%s' % table_name, table_name
146136

147137
sql += '\n\nDROP TABLE IF EXISTS `%(database)s`.`%(table_name)s`;\n' % {
148138
'database': database,
149139
'table_name': table_name
150140
}
151-
else: # Manual
141+
else: # Manual
152142
row_format = ''
153143
file_format = table_format
154144
skip_header = False
@@ -159,8 +149,8 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
159149
collection_delimiter = None
160150
map_delimiter = None
161151

162-
if external or (load_data and table_format in ('parquet', 'orc', 'kudu')): # We'll use location to load data
163-
if not self.fs.isdir(external_path): # File selected
152+
if external or (load_data and table_format in ('parquet', 'orc', 'kudu', 'avro')): # We'll use location to load data
153+
if not self.fs.isdir(external_path): # File selected
164154
external_path, external_file_name = Hdfs.split(external_path)
165155

166156
if len(self.fs.listdir(external_path)) > 1:
@@ -171,7 +161,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
171161
self.fs.copy(source_path, external_path)
172162
else:
173163
self.fs.rename(source_path, external_path)
174-
elif load_data: # We'll use load data command
164+
elif load_data: # We'll use load data command
175165
parent_path = self.fs.parent_path(source_path)
176166
stats = self.fs.stats(parent_path)
177167
split = urlparse(source_path)
@@ -180,17 +170,16 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
180170
# check if the csv file is in encryption zone (encBit), then the scratch dir will be
181171
# in the same directory
182172
base_dir = parent_path if stats.encBit else self.fs.get_home_dir()
183-
user_scratch_dir = base_dir + '/.scratchdir/%s' % str(uuid.uuid4()) # Make sure it's unique.
173+
user_scratch_dir = base_dir + '/.scratchdir/%s' % str(uuid.uuid4()) # Make sure it's unique.
184174
self.fs.do_as_user(self.user, self.fs.mkdir, user_scratch_dir, 0o0777)
185175
self.fs.do_as_user(self.user, self.fs.rename, source['path'], user_scratch_dir)
186176
if editor_type == 'impala' and impala_conf and impala_conf.USER_SCRATCH_DIR_PERMISSION.get():
187177
self.fs.do_as_user(self.user, self.fs.chmod, user_scratch_dir, 0o0777, True)
188178
source_path = user_scratch_dir + '/' + source['path'].split('/')[-1]
189179

190-
if external_path.lower().startswith("abfs"): #this is to check if its using an ABFS path
180+
if external_path.lower().startswith("abfs"): # this is to check if its using an ABFS path
191181
external_path = abfspath(external_path)
192182

193-
194183
tbl_properties = OrderedDict()
195184
if skip_header:
196185
tbl_properties['skip.header.line.count'] = '1'
@@ -209,7 +198,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
209198
'serde_name': serde_name,
210199
'serde_properties': serde_properties,
211200
'file_format': file_format,
212-
'external': external or load_data and table_format in ('parquet', 'orc', 'kudu'),
201+
'external': external or load_data and table_format in ('parquet', 'orc', 'kudu', 'avro'),
213202
'path': external_path,
214203
'primary_keys': primary_keys if table_format == 'kudu' and not load_data else [],
215204
'tbl_properties': tbl_properties
@@ -269,7 +258,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
269258
extra_create_properties += "\nTBLPROPERTIES('transactional'='true', 'transactional_properties'='%s')" % \
270259
default_transactional_type
271260

272-
sql += '''\n\nCREATE TABLE `%(database)s`.`%(final_table_name)s`%(comment)s
261+
sql += '''\n\nCREATE %(table_type)sTABLE `%(database)s`.`%(final_table_name)s`%(comment)s
273262
%(extra_create_properties)s
274263
AS SELECT %(columns_list)s
275264
FROM `%(database)s`.`%(table_name)s`;''' % {
@@ -278,7 +267,8 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
278267
'table_name': table_name,
279268
'extra_create_properties': extra_create_properties,
280269
'columns_list': ', '.join(columns_list),
281-
'comment': ' COMMENT "%s"' % comment if comment else ''
270+
'comment': ' COMMENT "%s"' % comment if comment else '',
271+
'table_type': 'EXTERNAL ' if external and not is_transactional else ''
282272
}
283273
sql += '\n\nDROP TABLE IF EXISTS `%(database)s`.`%(table_name)s`;\n' % {
284274
'database': database,
@@ -377,17 +367,17 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
377367
row = self.nomalize_booleans(row, columns)
378368
_csv_rows.append(tuple(row))
379369

380-
if _csv_rows: #sql for data insertion
370+
if _csv_rows: # sql for data insertion
381371
csv_rows = str(_csv_rows)[1:-1]
382372

383373
if dialect in ('hive', 'mysql'):
384-
sql += '''\nINSERT INTO %(database)s.%(table_name)s VALUES %(csv_rows)s;\n'''% {
374+
sql += '''\nINSERT INTO %(database)s.%(table_name)s VALUES %(csv_rows)s;\n''' % {
385375
'database': database,
386376
'table_name': table_name,
387377
'csv_rows': csv_rows
388378
}
389379
elif dialect == 'impala':
390-
sql += '''\nINSERT INTO %(database)s.%(table_name)s_tmp VALUES %(csv_rows)s;\n'''% {
380+
sql += '''\nINSERT INTO %(database)s.%(table_name)s_tmp VALUES %(csv_rows)s;\n''' % {
391381
'database': database,
392382
'table_name': table_name,
393383
'csv_rows': csv_rows,
@@ -396,12 +386,12 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
396386
if dialect == 'impala':
397387
# casting from string to boolean is not allowed in impala so string -> int -> bool
398388
sql_ = ',\n'.join([
399-
' CAST ( `%(name)s` AS %(type)s ) `%(name)s`' % col if col['type'] != 'boolean' \
389+
' CAST ( `%(name)s` AS %(type)s ) `%(name)s`' % col if col['type'] != 'boolean'
400390
else ' CAST ( CAST ( `%(name)s` AS TINYINT ) AS boolean ) `%(name)s`' % col for col in columns
401391
])
402392

403393
sql += '''\nCREATE TABLE IF NOT EXISTS %(database)s.%(table_name)s
404-
AS SELECT\n%(sql_)s\nFROM %(database)s.%(table_name)s_tmp;\n\nDROP TABLE IF EXISTS %(database)s.%(table_name)s_tmp;'''% {
394+
AS SELECT\n%(sql_)s\nFROM %(database)s.%(table_name)s_tmp;\n\nDROP TABLE IF EXISTS %(database)s.%(table_name)s_tmp;''' % {
405395
'database': database,
406396
'table_name': table_name,
407397
'sql_': sql_
@@ -421,6 +411,7 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
421411
is_task=True
422412
)
423413

414+
424415
def _create_database(request, source, destination, start_time):
425416
database = destination['name']
426417
comment = destination['description']
@@ -465,6 +456,7 @@ def _create_table(request, source, destination, start_time=-1, file_encoding=Non
465456
else:
466457
return notebook.execute(request, batch=False)
467458

459+
468460
def _create_table_from_local(request, source, destination, start_time=-1):
469461
notebook = SQLIndexer(user=request.user, fs=request.fs).create_table_from_local_file(source, destination, start_time)
470462

0 commit comments

Comments
 (0)