Skip to content

Commit e6bef27

Browse files
authored
Snow 2220426 streaming output (#2589)
* SNOW-2220426: streaming output * SNOW-2220426: streaming output release notes * SNOW-2220426: streaming output fix new lines handling * SNOW-2220426: streaming output fix windows paths in test * SNOW-2220426: streaming output drop deprecated function
1 parent 15c612e commit e6bef27

File tree

3 files changed

+632
-19
lines changed

3 files changed

+632
-19
lines changed

RELEASE-NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* Bumped to `typer=0.17.3`. Improved displaying help messages.
2828
* Fixed using `ctx.var` in `snow sql` with Jinja templating.
2929
* Fixed issues when pasting content with trailing new lines.
30+
* Improved output handling with streaming
3031

3132

3233
# v3.11.0

src/snowflake/cli/_app/printing.py

Lines changed: 153 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from json import JSONEncoder
2323
from pathlib import Path
2424
from textwrap import indent
25-
from typing import TextIO
25+
from typing import Any, Dict, TextIO
2626

2727
from rich import box, get_console
2828
from rich import print as rich_print
@@ -61,13 +61,114 @@ def default(self, o):
6161
return list(o.result)
6262
if isinstance(o, (date, datetime, time)):
6363
return o.isoformat()
64-
if isinstance(o, (Path, Decimal)):
64+
if isinstance(o, Path):
65+
return o.as_posix()
66+
if isinstance(o, Decimal):
6567
return str(o)
6668
if isinstance(o, bytearray):
6769
return o.hex()
6870
return super().default(o)
6971

7072

73+
class StreamingJSONEncoder(JSONEncoder):
74+
"""Streaming JSON encoder that doesn't materialize generators into lists"""
75+
76+
def default(self, o):
77+
if isinstance(o, str):
78+
return sanitize_for_terminal(o)
79+
if isinstance(o, (ObjectResult, MessageResult)):
80+
return o.result
81+
if isinstance(o, (CollectionResult, MultipleResults)):
82+
raise TypeError(
83+
f"CollectionResult should be handled by streaming functions, not encoder"
84+
)
85+
if isinstance(o, (date, datetime, time)):
86+
return o.isoformat()
87+
if isinstance(o, Path):
88+
return o.as_posix()
89+
if isinstance(o, Decimal):
90+
return str(o)
91+
if isinstance(o, bytearray):
92+
return o.hex()
93+
return super().default(o)
94+
95+
96+
def _print_json_item_with_array_indentation(item: Any, indent: int):
97+
"""Print a JSON item with proper indentation for array context"""
98+
if indent:
99+
indented_output = json.dumps(item, cls=StreamingJSONEncoder, indent=indent)
100+
indented_lines = indented_output.split("\n")
101+
for i, line in enumerate(indented_lines):
102+
if i == 0:
103+
print(" " * indent + line, end="")
104+
else:
105+
print("\n" + " " * indent + line, end="")
106+
else:
107+
json.dump(item, sys.stdout, cls=StreamingJSONEncoder, separators=(",", ":"))
108+
109+
110+
def _stream_collection_as_json(result: CollectionResult, indent: int = 4):
111+
"""Stream a CollectionResult as a JSON array without loading all data into memory"""
112+
items = iter(result.result)
113+
try:
114+
first_item = next(items)
115+
except StopIteration:
116+
print("[]", end="")
117+
return
118+
119+
print("[")
120+
121+
_print_json_item_with_array_indentation(first_item, indent)
122+
123+
for item in items:
124+
print(",")
125+
_print_json_item_with_array_indentation(item, indent)
126+
127+
print("\n]", end="")
128+
129+
130+
def _stream_collection_as_csv(result: CollectionResult):
131+
"""Stream a CollectionResult as CSV without loading all data into memory"""
132+
items = iter(result.result)
133+
try:
134+
first_item = next(items)
135+
except StopIteration:
136+
return
137+
138+
fieldnames = list(first_item.keys())
139+
if not isinstance(first_item, dict):
140+
raise TypeError("CSV output requires dictionary items")
141+
142+
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames, lineterminator="\n")
143+
writer.writeheader()
144+
_write_csv_row(writer, first_item)
145+
146+
for item in items:
147+
_write_csv_row(writer, item)
148+
149+
150+
def _write_csv_row(writer: csv.DictWriter, row_data: Dict[str, Any]):
151+
"""Write a single CSV row, handling special data types"""
152+
processed_row = {}
153+
for key, value in row_data.items():
154+
if isinstance(value, str):
155+
processed_row[key] = sanitize_for_terminal(value)
156+
elif isinstance(value, (date, datetime, time)):
157+
processed_row[key] = value.isoformat()
158+
elif isinstance(value, Path):
159+
processed_row[key] = value.as_posix()
160+
elif isinstance(value, Decimal):
161+
processed_row[key] = str(value)
162+
elif isinstance(value, bytearray):
163+
processed_row[key] = value.hex()
164+
elif value is None:
165+
processed_row[key] = ""
166+
else:
167+
processed_row[key] = str(value)
168+
169+
writer.writerow(processed_row)
170+
171+
71172
def _get_format_type() -> OutputFormat:
72173
output_format = get_cli_context().output_format
73174
if output_format:
@@ -110,12 +211,13 @@ def is_structured_format(output_format):
110211
def print_structured(
111212
result: CommandResult, output_format: OutputFormat = OutputFormat.JSON
112213
):
113-
"""Handles outputs like json, yml and other structured and parsable formats."""
214+
"""Handles outputs like json, csv and other structured and parsable formats with streaming."""
114215
printed_end_line = False
216+
115217
if isinstance(result, MultipleResults):
116218
if output_format == OutputFormat.CSV:
117219
for command_result in result.result:
118-
_print_csv_result(command_result)
220+
_print_csv_result_streaming(command_result)
119221
print(flush=True)
120222
printed_end_line = True
121223
else:
@@ -125,35 +227,67 @@ def print_structured(
125227
# instead of joining all the values into a JSON array or CSV entry set
126228
for r in result.result:
127229
if output_format == OutputFormat.CSV:
128-
_print_csv_result(r.result)
230+
_print_csv_result_streaming(r)
129231
else:
130-
json.dump(r, sys.stdout, cls=CustomJSONEncoder)
232+
json.dump(r, sys.stdout, cls=StreamingJSONEncoder)
131233
print(flush=True)
132234
printed_end_line = True
133235
else:
134236
if output_format == OutputFormat.CSV:
135-
_print_csv_result(result)
237+
_print_csv_result_streaming(result)
136238
printed_end_line = True
137239
else:
138-
json.dump(result, sys.stdout, cls=CustomJSONEncoder, indent=4)
240+
_print_json_result_streaming(result)
241+
139242
# Adds empty line at the end
140243
if not printed_end_line:
141244
print(flush=True)
142245

143246

144-
def _print_csv_result(result: CommandResult):
145-
data = json.loads(json.dumps(result, cls=CustomJSONEncoder))
247+
def _print_json_result_streaming(result: CommandResult):
248+
"""Print a single CommandResult as JSON with streaming support"""
249+
if isinstance(result, CollectionResult):
250+
_stream_collection_as_json(result, indent=4)
251+
elif isinstance(result, (ObjectResult, MessageResult)):
252+
json.dump(result, sys.stdout, cls=StreamingJSONEncoder, indent=4)
253+
else:
254+
json.dump(result, sys.stdout, cls=StreamingJSONEncoder, indent=4)
255+
256+
257+
def _print_object_result_as_csv(result: ObjectResult):
258+
"""Print an ObjectResult as a single-row CSV.
259+
260+
Converts the object's key-value pairs into a CSV with headers
261+
from the keys and a single data row from the values.
262+
"""
263+
data = result.result
146264
if isinstance(data, dict):
147-
writer = csv.DictWriter(sys.stdout, [*data], lineterminator="\n")
148-
writer.writeheader()
149-
writer.writerow(data)
150-
elif isinstance(data, list):
151-
if not data:
152-
return
153-
writer = csv.DictWriter(sys.stdout, [*data[0]], lineterminator="\n")
265+
writer = csv.DictWriter(
266+
sys.stdout, fieldnames=list(data.keys()), lineterminator="\n"
267+
)
154268
writer.writeheader()
155-
for entry in data:
156-
writer.writerow(entry)
269+
_write_csv_row(writer, data)
270+
271+
272+
def _print_message_result_as_csv(result: MessageResult):
273+
"""Print a MessageResult as CSV with a single 'message' column.
274+
275+
Creates a simple CSV structure with one column named 'message'
276+
containing the sanitized message text.
277+
"""
278+
writer = csv.DictWriter(sys.stdout, fieldnames=["message"], lineterminator="\n")
279+
writer.writeheader()
280+
writer.writerow({"message": sanitize_for_terminal(result.message)})
281+
282+
283+
def _print_csv_result_streaming(result: CommandResult):
284+
"""Print a single CommandResult as CSV with streaming support"""
285+
if isinstance(result, CollectionResult):
286+
_stream_collection_as_csv(result)
287+
elif isinstance(result, ObjectResult):
288+
_print_object_result_as_csv(result)
289+
elif isinstance(result, MessageResult):
290+
_print_message_result_as_csv(result)
157291

158292

159293
def _stream_json(result):

0 commit comments

Comments
 (0)