-
Notifications
You must be signed in to change notification settings - Fork 145
Expand file tree
/
Copy path_plan_builder.py
More file actions
89 lines (83 loc) · 3.59 KB
/
_plan_builder.py
File metadata and controls
89 lines (83 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
from typing import Dict, List, Optional, Tuple
from snowflake.snowpark._internal.analyzer.expression import Attribute
from snowflake.snowpark._internal.analyzer.snowflake_plan import (
SnowflakePlan,
SnowflakePlanBuilder,
)
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import LogicalPlan
from snowflake.snowpark._internal.utils import is_single_quoted
from snowflake.snowpark.mock._plan import MockExecutionPlan, MockFileOperation
from snowflake.snowpark.mock._stage_registry import SUPPORT_READ_OPTIONS
from snowflake.snowpark.mock._telemetry import LocalTestOOBTelemetryService
class MockSnowflakePlanBuilder(SnowflakePlanBuilder):
def create_temp_table(self, *args, **kwargs):
LocalTestOOBTelemetryService.get_instance().log_not_supported_error(
external_feature_name="DataFrame.cache_result",
internal_feature_name="MockSnowflakePlanBuilder.create_temp_table",
raise_error=NotImplementedError,
)
def read_file(
self,
path: str,
format: str,
options: Dict[str, str],
schema: List[Attribute],
schema_to_cast: Optional[List[Tuple[str, str]]] = None,
transformations: Optional[List[str]] = None,
metadata_project: Optional[List[str]] = None,
metadata_schema: Optional[List[Attribute]] = None,
use_user_schema: bool = False,
) -> MockExecutionPlan:
if format.lower() not in SUPPORT_READ_OPTIONS.keys():
LocalTestOOBTelemetryService.get_instance().log_not_supported_error(
external_feature_name=f"Reading {format} data into dataframe",
internal_feature_name="MockSnowflakePlanBuilder.read_file",
parameters_info={"format": str(format)},
raise_error=NotImplementedError,
)
return MockExecutionPlan(
source_plan=MockFileOperation(
session=self.session,
operator=MockFileOperation.Operator.READ_FILE,
stage_location=path,
format=format,
schema=schema,
options=options,
),
session=self.session,
)
def file_operation_plan(
self, command: str, file_name: str, stage_location: str, options: Dict[str, str]
) -> MockExecutionPlan:
if options.get("auto_compress", False):
LocalTestOOBTelemetryService.get_instance().log_not_supported_error(
external_feature_name="File operation PUT with auto_compress=True",
internal_feature_name="MockSnowflakePlanBuilder.file_operation_plan",
parameters_info={"auto_compress": "True", "command": str(command)},
raise_error=NotImplementedError,
)
return MockExecutionPlan(
source_plan=MockFileOperation(
session=self.session,
operator=MockFileOperation.Operator(command),
local_file_name=file_name,
stage_location=stage_location[1:-1]
if is_single_quoted(stage_location)
else stage_location,
options=options,
),
session=self.session,
)
def join_table_function(
self,
func: str,
child: SnowflakePlan,
source_plan: Optional[LogicalPlan],
left_cols: List[str],
right_cols: List[str],
use_constant_subquery_alias: bool,
) -> MockExecutionPlan:
return MockExecutionPlan(source_plan=source_plan, session=self.session)