-
Notifications
You must be signed in to change notification settings - Fork 543
Expand file tree
/
Copy pathdirect_file_operation_utils.py
More file actions
88 lines (68 loc) · 2.59 KB
/
direct_file_operation_utils.py
File metadata and controls
88 lines (68 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .connection import SnowflakeConnection
import os
from abc import ABC, abstractmethod
from .constants import CMD_TYPE_UPLOAD
class FileOperationParserBase(ABC):
"""The interface of internal utility functions for file operation parsing."""
@abstractmethod
def __init__(self, connection):
pass
@abstractmethod
def parse_file_operation(
self,
stage_location,
local_file_name,
target_directory,
command_type,
options,
has_source_from_stream=False,
):
"""Converts the file operation details into a SQL and returns the SQL parsing result."""
pass
class StreamDownloaderBase(ABC):
"""The interface of internal utility functions for stream downloading of file."""
@abstractmethod
def __init__(self, connection):
pass
@abstractmethod
def download_as_stream(self, ret, decompress=False):
pass
class FileOperationParser(FileOperationParserBase):
def __init__(self, connection: SnowflakeConnection):
self._connection = connection
def parse_file_operation(
self,
stage_location,
local_file_name,
target_directory,
command_type,
options,
has_source_from_stream=False,
):
"""Parses a file operation by constructing SQL and getting the SQL parsing result from server."""
options = options or {}
options_in_sql = " ".join(f"{k}={v}" for k, v in options.items())
if command_type == CMD_TYPE_UPLOAD:
if has_source_from_stream:
stage_location, unprefixed_local_file_name = os.path.split(
stage_location
)
local_file_name = "file://" + unprefixed_local_file_name
sql = f"PUT {local_file_name} ? {options_in_sql}"
params = [stage_location]
else:
raise NotImplementedError(f"unsupported command type: {command_type}")
with self._connection.cursor() as cursor:
# Send constructed SQL to server and get back parsing result.
processed_params = cursor._connection._process_params_qmarks(params, cursor)
return cursor._execute_helper(
sql, binding_params=processed_params, is_internal=True
)
class StreamDownloader(StreamDownloaderBase):
def __init__(self, connection):
pass
def download_as_stream(self, ret, decompress=False):
raise NotImplementedError("download_as_stream is not yet supported")