11
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
- import time
14
+ from contextlib import contextmanager , nullcontext
15
15
from pathlib import Path
16
- from typing import List
16
+ from typing import Generator , List
17
17
18
18
import yaml
19
19
from snowflake .cli ._plugins .stage .manager import StageManager
20
20
from snowflake .cli .api .artifacts .upload import sync_artifacts_with_stage
21
21
from snowflake .cli .api .commands .utils import parse_key_value_variables
22
22
from snowflake .cli .api .console .console import cli_console
23
- from snowflake .cli .api .constants import DEFAULT_SIZE_LIMIT_MB , PatternMatchingType
23
+ from snowflake .cli .api .constants import (
24
+ DEFAULT_SIZE_LIMIT_MB ,
25
+ ObjectType ,
26
+ PatternMatchingType ,
27
+ )
24
28
from snowflake .cli .api .exceptions import CliError
25
29
from snowflake .cli .api .identifiers import FQN
26
30
from snowflake .cli .api .project .project_paths import ProjectPaths
27
31
from snowflake .cli .api .project .schemas .entities .common import PathMapping
28
- from snowflake .cli .api .project .util import unquote_identifier
29
32
from snowflake .cli .api .secure_path import SecurePath
30
33
from snowflake .cli .api .sql_execution import SqlExecutionMixin
31
34
from snowflake .cli .api .stage_path import StagePath
35
+ from snowflake .cli .api .utils .path_utils import is_stage_path
32
36
33
37
MANIFEST_FILE_NAME = "manifest.yml"
34
38
DCM_PROJECT_TYPE = "dcm_project"
35
39
36
40
37
41
class DCMProjectManager (SqlExecutionMixin ):
42
+ @contextmanager
43
+ def _collect_output (
44
+ self , project_identifier : FQN , output_path : str
45
+ ) -> Generator [str , None , None ]:
46
+ """
47
+ Context manager for handling output path - creates temporary stage for local paths,
48
+ downloads files after execution, and ensures proper cleanup.
49
+
50
+ Args:
51
+ project_identifier: The DCM project identifier
52
+ output_path: Either a stage path (@stage/path) or local directory path
53
+
54
+ Yields:
55
+ str: The effective output path to use in the DCM command
56
+ """
57
+ temp_stage_for_local_output = None
58
+ stage_manager = StageManager ()
59
+
60
+ if should_download_files := not is_stage_path (output_path ):
61
+ temp_stage_fqn = FQN .from_resource (
62
+ ObjectType .DCM_PROJECT , project_identifier , "OUTPUT_TMP_STAGE"
63
+ )
64
+ stage_manager .create (temp_stage_fqn , temporary = True )
65
+ effective_output_path = StagePath .from_stage_str (temp_stage_fqn .identifier )
66
+ temp_stage_for_local_output = (temp_stage_fqn .identifier , Path (output_path ))
67
+ else :
68
+ effective_output_path = StagePath .from_stage_str (output_path )
69
+
70
+ yield effective_output_path .absolute_path ()
71
+
72
+ if should_download_files :
73
+ assert temp_stage_for_local_output is not None
74
+ stage_path , local_path = temp_stage_for_local_output
75
+ stage_manager .get_recursive (stage_path = stage_path , dest_path = local_path )
76
+ cli_console .step (f"Plan output saved to: { local_path .resolve ()} " )
77
+ else :
78
+ cli_console .step (f"Plan output saved to: { output_path } " )
79
+
38
80
def execute (
39
81
self ,
40
82
project_identifier : FQN ,
@@ -45,28 +87,31 @@ def execute(
45
87
alias : str | None = None ,
46
88
output_path : str | None = None ,
47
89
):
90
+ with self ._collect_output (project_identifier , output_path ) if (
91
+ output_path and dry_run
92
+ ) else nullcontext () as output_stage :
93
+ query = f"EXECUTE DCM PROJECT { project_identifier .sql_identifier } "
94
+ if dry_run :
95
+ query += " PLAN"
96
+ else :
97
+ query += " DEPLOY"
98
+ if alias :
99
+ query += f' AS "{ alias } "'
100
+ if configuration or variables :
101
+ query += f" USING"
102
+ if configuration :
103
+ query += f" CONFIGURATION { configuration } "
104
+ if variables :
105
+ query += StageManager .parse_execute_variables (
106
+ parse_key_value_variables (variables )
107
+ ).removeprefix (" using" )
108
+ stage_path = StagePath .from_stage_str (from_stage )
109
+ query += f" FROM { stage_path .absolute_path ()} "
110
+ if output_stage is not None :
111
+ query += f" OUTPUT_PATH { output_stage } "
112
+ result = self .execute_query (query = query )
48
113
49
- query = f"EXECUTE DCM PROJECT { project_identifier .sql_identifier } "
50
- if dry_run :
51
- query += " PLAN"
52
- else :
53
- query += " DEPLOY"
54
- if alias :
55
- query += f' AS "{ alias } "'
56
- if configuration or variables :
57
- query += f" USING"
58
- if configuration :
59
- query += f" CONFIGURATION { configuration } "
60
- if variables :
61
- query += StageManager .parse_execute_variables (
62
- parse_key_value_variables (variables )
63
- ).removeprefix (" using" )
64
- stage_path = StagePath .from_stage_str (from_stage )
65
- query += f" FROM { stage_path .absolute_path ()} "
66
- if output_path :
67
- output_stage_path = StagePath .from_stage_str (output_path )
68
- query += f" OUTPUT_PATH { output_stage_path .absolute_path ()} "
69
- return self .execute_query (query = query )
114
+ return result
70
115
71
116
def create (self , project_identifier : FQN ) -> None :
72
117
query = f"CREATE DCM PROJECT { project_identifier .sql_identifier } "
@@ -114,10 +159,9 @@ def sync_local_files(project_identifier: FQN) -> str:
114
159
definitions .append (MANIFEST_FILE_NAME )
115
160
116
161
with cli_console .phase (f"Uploading definition files" ):
117
- unquoted_name = unquote_identifier (project_identifier .name )
118
- stage_fqn = FQN .from_string (
119
- f"DCM_{ unquoted_name } _{ int (time .time ())} _TMP_STAGE"
120
- ).using_context ()
162
+ stage_fqn = FQN .from_resource (
163
+ ObjectType .DCM_PROJECT , project_identifier , "_TMP_STAGE"
164
+ )
121
165
sync_artifacts_with_stage (
122
166
project_paths = ProjectPaths (project_root = Path .cwd ()),
123
167
stage_root = stage_fqn .identifier ,
0 commit comments