-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathframe.py
More file actions
1531 lines (1370 loc) · 67.6 KB
/
frame.py
File metadata and controls
1531 lines (1370 loc) · 67.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
import functools
from collections.abc import Hashable
from dataclasses import dataclass
from logging import getLogger
from types import MappingProxyType
from typing import Any, Callable, NamedTuple, Optional, Union
import pandas as native_pd
from pandas import DatetimeTZDtype
from pandas._typing import IndexLabel
from snowflake.snowpark._internal.analyzer.analyzer_utils import (
quote_name_without_upper_casing,
)
from snowflake.snowpark.column import Column as SnowparkColumn
from snowflake.snowpark.functions import (
array_construct,
col,
count,
count_distinct,
iff,
last_value,
max as max_,
)
from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import (
OrderedDataFrame,
OrderingColumn,
)
from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import (
SnowparkPandasType,
)
from snowflake.snowpark.modin.plugin._internal.type_utils import (
_get_timezone_from_timestamp_tz,
)
from snowflake.snowpark.modin.plugin._internal.utils import (
DEFAULT_DATA_COLUMN_LABEL,
INDEX_LABEL,
ROW_POSITION_COLUMN_LABEL,
append_columns,
assert_duplicate_free,
cache_result,
count_rows,
extract_pandas_label_from_snowflake_quoted_identifier,
fill_missing_levels_for_pandas_label,
from_pandas_label,
get_distinct_rows,
is_valid_snowflake_quoted_identifier,
snowpark_to_pandas_helper,
to_pandas_label,
)
from snowflake.snowpark.modin.plugin._typing import (
LabelIdentifierPair,
LabelTuple,
PandasLabelToSnowflakeIdentifierPair,
)
from snowflake.snowpark.modin.utils import MODIN_UNNAMED_SERIES_LABEL
from snowflake.snowpark.types import DataType
from snowflake.snowpark.window import Window
logger = getLogger(__name__)
LEFT_PREFIX = "left"
RIGHT_PREFIX = "right"
def _create_snowflake_quoted_identifier_to_snowpark_pandas_type(
data_column_snowflake_quoted_identifiers: list[str],
index_column_snowflake_quoted_identifiers: list[str],
data_column_types: Optional[list[Optional[SnowparkPandasType]]],
index_column_types: Optional[list[Optional[SnowparkPandasType]]],
) -> MappingProxyType[str, Optional[SnowparkPandasType]]:
"""
Helper method to create map from Snowflake quoted identifier to Snowpark pandas type.
Args:
data_column_snowflake_quoted_identifiers: Snowflake quoted identifiers of data columns.
index_column_snowflake_quoted_identifiers: Snowflake quoted identifiers of index columns.
data_column_types: Snowpark pandas types of data columns.
index_column_types: Snowpark pandas types of index columns.
Returns:
dict mapping each column's Snowflake quoted identifier to the column's Snowpark pandas type.
"""
if data_column_types is not None:
assert len(data_column_types) == len(
data_column_snowflake_quoted_identifiers
), (
f"The length of data_column_types {data_column_types} is different from the length of "
f"data_column_snowflake_quoted_identifiers {data_column_snowflake_quoted_identifiers}"
)
for t in data_column_types:
assert t is None or isinstance(
t, SnowparkPandasType
), f"wrong data_column_types value {t}"
if index_column_types is not None:
assert len(index_column_types) == len(
index_column_snowflake_quoted_identifiers
), (
f"The length of index_column_types {index_column_types} is different from the length of "
f"index_column_snowflake_quoted_identifiers {index_column_snowflake_quoted_identifiers}"
)
for t in index_column_types:
assert t is None or isinstance(
t, SnowparkPandasType
), f"wrong index_column_types value {t}"
return MappingProxyType(
{
k: v
for k, v in zip(
(
*data_column_snowflake_quoted_identifiers,
*index_column_snowflake_quoted_identifiers,
),
(
*(
data_column_types
if data_column_types is not None
else [None] * len(data_column_snowflake_quoted_identifiers)
),
*(
index_column_types
if index_column_types is not None
else [None] * len(index_column_snowflake_quoted_identifiers)
),
),
)
}
)
class UpdatedInternalFrameResult(NamedTuple):
"""Contains the updated internal frame and mapping from old ids to new ids."""
frame: "InternalFrame"
old_id_to_new_id_mappings: dict[str, str]
@dataclass(frozen=True)
class InternalFrame:
"""
internal abstraction of storage format to hold all information necessary to represent
a pandas.DataFrame within Snowflake
"""
# OrderedDataFrame representation of the state of the data hold by this internal frame
# Ordering columns and row position column are maintained by OrderedDataFrame
ordered_dataframe: OrderedDataFrame
# Map between label and snowflake quoted identifier.
# This map is maintained as an ordered list, which must be in the order of
# pandas index columns + pandas data columns.
# For MultiIndex as df.columns, the pandas label will be a tuple for each column.
# An example of MultiIndex as df.columns:
# pd.MultiIndex.from_tuples([('baz', 'A'), ('baz', 'B'), ('zoo', 'A'), ('zoo', 'B')])
# the pandas labels of data columns will be [('baz', 'A'), ('baz', 'B'), ('zoo', 'A'), ('zoo', 'B')]
label_to_snowflake_quoted_identifier: tuple[LabelIdentifierPair, ...]
# Number of index columns for the pandas dataframe, where the first num_index_columns elements
# of pandas_label_to_snowflake_quoted_identifier is for the pandas index columns
num_index_columns: int
# Store pandas labels for columns' index name or multiindex names, e.g., the labels is used to generate
# df.columns.names
# The length of data_column_index_names equals to number of multiindex levels.
# For a 3-level MultiIndex, the value can be like ['A', 'B', 'C']
data_column_index_names: tuple[LabelTuple, ...]
# Map from snowflake identifier to cached Snowpark pandas data type.
# The type is None if we don't know the Snowpark data type.
# n.b. that we map to SnowparkPandasType rather than to DataType, because
# we don't want to try tracking regular Snowpark Python types at all.
# This map is a MappingProxyType so that it's immutable.
snowflake_quoted_identifier_to_snowpark_pandas_type: MappingProxyType[
str, Optional[SnowparkPandasType]
]
@classmethod
def create(
cls,
*,
ordered_dataframe: OrderedDataFrame,
data_column_pandas_labels: list[Hashable],
data_column_pandas_index_names: list[Hashable],
data_column_snowflake_quoted_identifiers: list[str],
index_column_pandas_labels: list[Hashable],
index_column_snowflake_quoted_identifiers: list[str],
data_column_types: Optional[list[Optional[SnowparkPandasType]]],
index_column_types: Optional[list[Optional[SnowparkPandasType]]],
) -> "InternalFrame":
"""
Args:
ordered_dataframe: underlying ordered dataframe used
data_column_pandas_labels: A list of pandas hashable labels for pandas data columns.
data_column_pandas_index_names: A list of hashable labels for pandas column index names
data_column_snowflake_quoted_identifiers: A list of snowflake quoted identifiers for pandas data columns,
represented by str. These identifiers are used to refer columns in underlying snowpark dataframe to
access data in snowflake.
data_column_types: An optional list of optional Snowpark pandas types for the data columns.
index_column_pandas_labels: A list of pandas index column labels.
index_column_snowflake_quoted_identifiers: A list of snowflake quoted identifiers for pandas index columns.
index_column_types: An optional list of optional Snowpark pandas types for the index columns.
"""
assert len(data_column_snowflake_quoted_identifiers) == len(
data_column_pandas_labels
), f"data column label identifier length mismatch, labels {data_column_pandas_labels}, identifiers {data_column_snowflake_quoted_identifiers}"
assert len(index_column_snowflake_quoted_identifiers) == len(
index_column_pandas_labels
), f"index column label identifier length mismatch, labels {index_column_pandas_labels}, identifiers {index_column_snowflake_quoted_identifiers}"
# List of pandas_label_to_snowflake_quoted_identifier mapping for index columns
index_columns_mapping: list[LabelIdentifierPair] = [
LabelIdentifierPair(
# index column labels is always flat with only one level
from_pandas_label(pandas_label, num_levels=1),
snowflake_quoted_identifier,
)
for pandas_label, snowflake_quoted_identifier in zip(
index_column_pandas_labels,
index_column_snowflake_quoted_identifiers,
)
]
# List of pandas_label_to_snowflake_quoted_identifier mapping for data columns
data_columns_mapping: list[LabelIdentifierPair] = [
LabelIdentifierPair(
from_pandas_label(
pandas_label,
num_levels=len(data_column_pandas_index_names),
),
snowflake_quoted_identifier,
)
for pandas_label, snowflake_quoted_identifier in zip(
data_column_pandas_labels,
data_column_snowflake_quoted_identifiers,
)
]
return cls(
ordered_dataframe=ordered_dataframe,
label_to_snowflake_quoted_identifier=tuple(
index_columns_mapping + data_columns_mapping
),
num_index_columns=len(index_column_snowflake_quoted_identifiers),
data_column_index_names=tuple(
# data_column_index_names is always flat with only one level
from_pandas_label(name, num_levels=1)
for name in data_column_pandas_index_names
),
snowflake_quoted_identifier_to_snowpark_pandas_type=_create_snowflake_quoted_identifier_to_snowpark_pandas_type(
data_column_snowflake_quoted_identifiers,
index_column_snowflake_quoted_identifiers,
data_column_types,
index_column_types,
),
)
def __post_init__(self) -> None:
# perform checks for dataclass here
# check there must be at least one index column associated with the dataframe
assert (
self.num_index_columns >= 1
), "At least 1 index column should be presented for the dataframe"
# the ordering_columns_tuple cannot be empty, because we guarantee the determinism
# for the data order of the dataframe,
assert len(self.ordering_columns) > 0, "ordering_columns cannot be empty"
# validate data columns
self._validate_data_column_pandas_index_names()
# make sure that all names required in metadata are present within snowpark_dataframe
# so that the internal frame represents a valid state.
snowflake_quoted_identifiers = (
self.ordered_dataframe.projected_column_snowflake_quoted_identifiers
)
def validate_snowflake_quoted_identifier(
quoted_identifier: str,
column_category: str,
hashable_label: Hashable = None,
) -> None:
"""
validation for the snowflake quoted identifier, which performs two checks:
1) the identifier is quoted 2) the identifier exists in the underlying snowpark dataframe
Returns:
None. Assertion is raised if any check fails.
"""
# generate a properly quoted escaped_name for the error message below.
escaped_name = quoted_identifier.replace("'", "\\'")
assert is_valid_snowflake_quoted_identifier(
quoted_identifier
), f"Found not-quoted identifier for '{column_category}':'{escaped_name}'"
assert quoted_identifier in snowflake_quoted_identifiers, (
f"{column_category}={escaped_name} not found in snowpark dataframe "
f"schema {snowflake_quoted_identifiers}, pandas_label={hashable_label}"
)
# validate the snowflake quoted identifier data + index columns
for (
label,
snowflake_quoted_identifier,
) in self.label_to_snowflake_quoted_identifier:
validate_snowflake_quoted_identifier(
snowflake_quoted_identifier,
"dataframe column",
to_pandas_label(label),
)
# check that snowflake quoted identifier is duplicate free
assert_duplicate_free(
self.index_column_snowflake_quoted_identifiers
+ self.data_column_snowflake_quoted_identifiers,
"dataframe columns",
)
def _validate_data_column_pandas_index_names(self) -> None:
# the index on column (df.columns) must have a name (can be None)
assert (
len(self.data_column_pandas_index_names) >= 1
), "data_column_pandas_index_names cannot be empty"
# validate all labels are tuples with the same length
num_levels = len(self.data_column_pandas_index_names)
for label, _ in self.label_to_snowflake_quoted_identifier[
self.num_index_columns :
]:
assert num_levels == len(
label
), f"All tuples in data_column_pandas_labels must have the same length {num_levels}, but got {label}"
@property
def index_column_snowflake_quoted_identifiers(self) -> list[str]:
"""
Get snowflake quoted identifier for all index columns
Returns:
List of snowflake quoted identifiers for index columns
"""
return [
col.snowflake_quoted_identifier
for col in self.label_to_snowflake_quoted_identifier[
: self.num_index_columns
]
]
@property
def data_column_snowflake_quoted_identifiers(self) -> list[str]:
"""
Get snowflake quoted identifier for all data columns
Returns:
List of snowflake quoted identifiers for data columns
"""
return [
col.snowflake_quoted_identifier
for col in self.label_to_snowflake_quoted_identifier[
self.num_index_columns :
]
]
def get_snowflake_type(
self, identifier: Union[str, list[str]]
) -> Union[DataType, list[DataType]]:
"""
Get the Snowflake type.
Args:
identifier: one or a list of Snowflake quoted identifiers
Returns:
The one or a list of Snowflake types.
"""
if isinstance(identifier, list):
return list(self.quoted_identifier_to_snowflake_type(identifier).values())
return list(self.quoted_identifier_to_snowflake_type([identifier]).values())[0]
def quoted_identifier_to_snowflake_type(
self, identifiers: Optional[list[str]] = None
) -> dict[str, DataType]:
"""
Get a map from Snowflake quoted identifier to Snowflake types.
Args:
identifiers: if identifiers is given, only return the mapping for those inputs. Otherwise, the map will
include all identifiers in the frame.
Return:
A mapping from Snowflake quoted identifier to Snowflake types.
"""
snowpark_pandas_type_mapping = (
self.snowflake_quoted_identifier_to_snowpark_pandas_type
)
if identifiers is not None:
# ordered dataframe may include columns that are not index or data
# columns of this InternalFrame, so don't assume that each
# identifier is in snowflake_quoted_identifier_to_snowflake_type.
cached_types = {
id: snowpark_pandas_type_mapping.get(id, None) for id in identifiers
}
if None not in cached_types.values():
# if all types are cached, then we don't need to call schema
return cached_types
all_identifier_to_type = {}
for f in self.ordered_dataframe.schema.fields:
id = f.column_identifier.quoted_name
cached_type = snowpark_pandas_type_mapping.get(id, None)
all_identifier_to_type[id] = cached_type or f.datatype
if identifiers is not None:
# Python dict's keys and values are iterated over in insertion order. This make sense result dict
# `identifier_to_type`'s order matches with the input `identifier`
identifier_to_type = {id: all_identifier_to_type[id] for id in identifiers}
else:
identifier_to_type = all_identifier_to_type
return identifier_to_type
@property
def index_column_pandas_labels(self) -> list[Hashable]:
"""
Get pandas labels for all index columns
Returns:
List of pandas labels for index columns
"""
return [
to_pandas_label(col.label)
for col in self.label_to_snowflake_quoted_identifier[
: self.num_index_columns
]
]
@property
def data_column_pandas_labels(self) -> list[Hashable]:
"""
Get pandas labels for all data columns
Returns:
List of pandas labels for data columns
"""
return [
to_pandas_label(col.label)
for col in self.label_to_snowflake_quoted_identifier[
self.num_index_columns :
]
]
@property
def ordering_column_snowflake_quoted_identifiers(self) -> list[str]:
"""
Get snowflake quoted identifier for ordering columns
Return:
List of snowflake quoted identifier for the ordering columns
"""
return self.ordered_dataframe.ordering_column_snowflake_quoted_identifiers
@property
def ordering_columns(self) -> list[OrderingColumn]:
"""
Get list of ordering columns.
Returns:
List of OrderingColumn.
"""
return self.ordered_dataframe.ordering_columns
@property
def row_position_snowflake_quoted_identifier(self) -> Optional[str]:
return self.ordered_dataframe.row_position_snowflake_quoted_identifier
@property
def row_count_snowflake_quoted_identifier(self) -> Optional[str]:
return self.ordered_dataframe.row_count_snowflake_quoted_identifier
@property
def data_column_pandas_index_names(self) -> list[Hashable]:
"""Returns pandas labels from column index (df.columns.names)."""
return [to_pandas_label(name) for name in self.data_column_index_names]
def num_index_levels(self, *, axis: int = 0) -> int:
"""
Returns number of index levels for given `axis`.
Args:
axis: If axis=0, return number of levels in row labels.
If axis=1, return number of levels in columns labels.
Returns:
number of index levels for given `axis`
Raises:
ValueError if `axis` is not valid.
"""
if axis == 0:
return self.num_index_columns
elif axis == 1:
return len(self.data_column_pandas_index_names)
else:
raise ValueError("'axis' can only be 0 or 1")
def is_multiindex(self, *, axis: int = 0) -> bool:
"""
Returns whether the InternalFrame has a MultiIndex along `axis`.
Args:
axis: If axis=0, return whether the InternalFrame has a MultiIndex as df.index.
If axis=1, return whether the InternalFrame has a MultiIndex as df.columns.
"""
return self.num_index_levels(axis=axis) > 1
def is_unnamed_series(self) -> bool:
"""
Check if the InternalFrame is a representation for an unnamed series. An InternalFrame represents an
unnamed series if there is only one data column and the data column has label name MODIN_UNNAMED_SERIES_LABEL.
"""
return (
len(self.data_column_pandas_labels) == 1
and self.data_column_pandas_labels[0] == MODIN_UNNAMED_SERIES_LABEL
)
@property
def data_columns_index(self) -> native_pd.Index:
"""
Returns Snowpark pandas Index object for column index (df.columns).
Note this object will still hold an internal pandas index (i.e., not lazy) to avoid unnecessary pulling data from Snowflake.
"""
if self.is_multiindex(axis=1):
return native_pd.MultiIndex.from_tuples(
self.data_column_pandas_labels,
names=self.data_column_pandas_index_names,
)
else:
return native_pd.Index(
self.data_column_pandas_labels,
name=self.data_column_pandas_index_names[0],
# setting tupleize_cols=False to avoid creating a MultiIndex
# otherwise, when labels are tuples (e.g., [("A", "a"), ("B", "b")]),
# a MultiIndex will be created incorrectly
tupleize_cols=False,
)
def index_columns_pandas_index(self, **kwargs: Any) -> native_pd.Index:
"""
Get pandas index. The method eagerly pulls the values from Snowflake because index requires the values to be
filled.
Returns:
The index (row labels) of the DataFrame.
"""
return snowpark_to_pandas_helper(
self,
index_only=True,
**kwargs,
)
def get_snowflake_quoted_identifiers_group_by_pandas_labels(
self,
pandas_labels: list[Hashable],
include_index: bool = True,
include_data: bool = True,
) -> list[tuple[str, ...]]:
"""
Map given pandas labels to names in underlying snowpark dataframe. Given labels can be data or index labels.
Single label can map to multiple snowpark names from underlying dataframe. Which is represented by tuples.
We return the result in the same order as input pandas_labels.
Args:
pandas_labels: A list of pandas labels.
include_index: Include the index columns in addition to potentially data columns, default is True.
include_data: Include the data columns in addition to potentially index columns, default is True.
Returns:
A list of tuples for matched identifiers. Each element of list is a tuple of str containing matched
snowflake quoted identifiers for corresponding pandas label in 'pandas_labels'.
Length and order of this list is same as length of given 'pandas_labels'.
"""
snowflake_quoted_identifiers = []
for label in pandas_labels:
matched_columns = list(
filter(
lambda col: to_pandas_label(col.label) == label,
self.label_to_snowflake_quoted_identifier[
(0 if include_index else self.num_index_columns) : (
len(self.label_to_snowflake_quoted_identifier)
if include_data
else self.num_index_columns
)
],
)
)
snowflake_quoted_identifiers.append(
tuple(col.snowflake_quoted_identifier for col in matched_columns)
)
return snowflake_quoted_identifiers
def parse_levels_to_integer_levels(
self, levels: IndexLabel, allow_duplicates: bool, axis: int = 0
) -> list[int]:
"""
Returns a list of integers representing levels in Index object on given axis.
Args:
levels: IndexLabel, can be int, level name, or sequence of such.
allow_duplicates: whether allow duplicated levels in the result. When False, the result will not
contain any duplicated levels. Otherwise, the result will contain duplicated level number if
different level value is mapped to the same level number.
axis: DataFrame axis, given levels belong to. Defaults to 0. Allowed values
are 0 or 1.
Returns:
List[int]
A list of integers corresponding to the index levels for the given level, and in the same
order as given level
"""
num_level = self.num_index_levels(axis=axis)
if levels is not None:
if not isinstance(levels, (tuple, list)):
levels = [levels]
result = []
for key in levels:
if isinstance(key, int):
error_message = f"Too many levels: Index has only {num_level} level{'s' if num_level > 1 else ''}"
# when key < 0, raise IndexError if key < -num_level as native pandas does
# set key to a positive number as native pandas does
if key < 0:
key = key + num_level
if key < 0:
raise IndexError(
f"{error_message}, {key - num_level} is not a valid level number"
)
# when key > num_level - 1, raise IndexError as native pandas does
elif key > num_level - 1: # level starts from 0
raise IndexError(f"{error_message}, not {key + 1}")
elif isinstance(key, str): # get level number from label
try:
if axis == 0:
key = self.index_column_pandas_labels.index(key)
else:
key = self.data_column_pandas_index_names.index(key)
# if key doesn't exist, a ValueError will be raised
except ValueError:
if num_level > 1:
raise KeyError(f"Level {key} not found")
else:
raise KeyError(
f"Requested level ({key}) does not match index name ({self.index_column_pandas_labels[0]})"
)
# do not add key in the result if the key is already in the result and duplication is not allowed
if (key not in result) or allow_duplicates:
result.append(key)
else:
result = list(range(num_level))
return result
def get_pandas_labels_for_levels(self, levels: list[int]) -> list[Hashable]:
"""
Get the list of corresponding pandas labels for a list of given integer
Index levels.
Note: duplication in levels is allowed.
"""
return [self.index_column_pandas_labels[level] for level in levels]
def get_snowflake_identifiers_for_levels(self, levels: list[int]) -> list[str]:
"""
Get the list of corresponding Snowflake identifiers for a list of given integer index levels.
Note: duplication in levels is allowed.
"""
return [
self.index_column_snowflake_quoted_identifiers[level] for level in levels
]
def get_snowflake_identifiers_and_pandas_labels_from_levels(
self, levels: list[int]
) -> tuple[
list[Hashable],
list[str],
list[Optional[SnowparkPandasType]],
list[Hashable],
list[str],
list[Optional[SnowparkPandasType]],
]:
"""
Selects snowflake identifiers and pandas labels from index columns in `levels`.
Also returns snowflake identifiers and pandas labels not in `levels`.
Args:
levels: A list of integers represents levels in pandas Index.
Returns:
A tuple contains 6 lists:
1. The first list contains snowflake identifiers of index columns in `levels`.
2. The second list contains pandas labels of index columns in `levels`.
3. The third list contains Snowpark pandas types of index columns in `levels`.
4. The fourth list contains snowflake identifiers of index columns not in `levels`.
5. The fifth list contains pandas labels of index columns not in `levels`.
6. The sixth list contains Snowpark pandas types of index columns not in `levels`.
"""
index_column_pandas_labels_in_levels = []
index_column_snowflake_quoted_identifiers_in_levels = []
index_column_types_in_levels = []
index_column_pandas_labels_not_in_levels = []
index_column_snowflake_quoted_identifiers_not_in_levels = []
index_column_types_not_in_levels = []
for idx, (identifier, label, type) in enumerate(
zip(
self.index_column_snowflake_quoted_identifiers,
self.index_column_pandas_labels,
self.cached_index_column_snowpark_pandas_types,
)
):
if idx in levels:
index_column_pandas_labels_in_levels.append(label)
index_column_snowflake_quoted_identifiers_in_levels.append(identifier)
index_column_types_in_levels.append(type)
else:
index_column_pandas_labels_not_in_levels.append(label)
index_column_snowflake_quoted_identifiers_not_in_levels.append(
identifier
)
index_column_types_not_in_levels.append(type)
return (
index_column_pandas_labels_in_levels,
index_column_snowflake_quoted_identifiers_in_levels,
index_column_types_in_levels,
index_column_pandas_labels_not_in_levels,
index_column_snowflake_quoted_identifiers_not_in_levels,
index_column_types_not_in_levels,
)
@functools.cached_property
def num_rows(self) -> int:
"""
Returns:
Number of rows in this frame.
"""
return count_rows(self.ordered_dataframe)
def has_unique_index(self, axis: Optional[int] = 0) -> bool:
"""
Returns true if index has unique values on specified axis.
Args:
axis: {0, 1} defaults to 0
Returns:
True if index has unique values on specified axis, otherwise returns False.
"""
if axis == 1:
return self.data_columns_index.is_unique
else:
if self.num_index_columns == 1:
index_col = col(self.index_column_snowflake_quoted_identifiers[0])
# COUNT(DISTINCT) ignores NULL values, so if there is a NULL value in the column,
# we include it via IFF(MAX(<col> IS NULL), 1, 0) which will return 1 if there is
# at least one NULL contained within a column, and 0 if there are no NULL values.
return self.ordered_dataframe.agg(
(
(
count_distinct(index_col)
+ iff(max_(index_col.is_null()), 1, 0)
)
== count("*")
).as_("is_unique")
).collect()[0][0]
else:
# Note: We can't use 'count_distinct' directly on columns because it
# ignores null values. As a workaround we first create an ARRAY and
# call 'count_distinct' on ARRAY column.
return self.ordered_dataframe.agg(
(
count_distinct(
array_construct(
*self.index_column_snowflake_quoted_identifiers
)
)
== count("*")
).as_("is_unique"),
).collect()[0][0]
def validate_no_duplicated_data_columns_mapped_for_labels(
self,
pandas_labels: list[Hashable],
user_frame_identifier: Optional[str] = None,
) -> None:
"""
For a given set of pandas labels, verify that there are no multiple data columns in the frame
mapped to the same label in the `pandas_labels`.
Args:
pandas_labels: set of pandas labels to check for duplicated column mappings
user_frame_identifier: the identifier for the frame that is used in the error message to help user to
identify which input frame has error. For example, it can be 'condition' or 'other' frame for
where API.
Raises:
ValueError: if for a pandas label, there exists more than one data columns in the given frame mapped to the label.
"""
label_identifiers_list = (
self.get_snowflake_quoted_identifiers_group_by_pandas_labels(
pandas_labels=pandas_labels, include_index=False
)
)
labels_with_duplication = [
pandas_labels[i]
for (i, label_identifiers_tuple) in enumerate(label_identifiers_list)
if len(label_identifiers_tuple) > 1
]
if len(labels_with_duplication) > 0:
# The error message raised under duplication cases is different from native pandas.
# Native pandas raises ValueError with message "cannot reindex on an axis with duplicate labels"
# for duplication occurs in the condition frame, and raises InvalidIndexError with no message for
# duplication occurs in other frame.
# Snowpark pandas gives a clear message to the customer about what is the problem with the dataframe.
message = f"Multiple columns are mapped to each label in {labels_with_duplication} in DataFrame"
if user_frame_identifier is not None:
message += f" {user_frame_identifier}"
raise ValueError(message)
@property
def cached_data_column_snowpark_pandas_types(
self,
) -> list[Optional[SnowparkPandasType]]:
"""
Return the cached Snowpark pandas types for this frame's data columns.
The cached Snowpark types may be different from the Snowpark types in
the OrderedDataframe for types that don't exist in Snowpark Python, like
TimedeltaType.
The cached type is None for a data column if the type is unknown.
Returns:
A list of Snowpark types for this frame's data columns.
"""
return [
self.snowflake_quoted_identifier_to_snowpark_pandas_type[
v.snowflake_quoted_identifier
]
for v in self.label_to_snowflake_quoted_identifier[self.num_index_columns :]
]
@property
def cached_index_column_snowpark_pandas_types(
self,
) -> list[Optional[SnowparkPandasType]]:
"""
Return the cached Snowpark pandas types for this frame's index columns.
The cached Snowpark types may be different from the Snowpark types in
the OrderedDataframe for types that don't exist in Snowpark Python, like
TimedeltaType.
The cached type is None for a index column if the type is unknown.
Returns:
A list of Snowpark types for this frame's index columns.
"""
return [
self.snowflake_quoted_identifier_to_snowpark_pandas_type[
v.snowflake_quoted_identifier
]
for v in self.label_to_snowflake_quoted_identifier[: self.num_index_columns]
]
def to_pandas(
self, statement_params: Optional[dict[str, str]] = None, **kwargs: Any
) -> native_pd.DataFrame:
"""
Convert this InternalFrame to ``pandas.DataFrame``.
Args:
statement_params: Dictionary of statement level parameters to be set while executing this action.
Returns:
pandas.DataFrame
The InternalFrame converted to pandas.
"""
return snowpark_to_pandas_helper(
self,
statement_params=statement_params,
**kwargs,
)
###########################################################################
# START: Internal Frame mutation APIs.
# APIs that creates a new InternalFrame instance, should only be added below
def ensure_row_position_column(self) -> "InternalFrame":
"""
Ensure row position column is computed for given internal frame.
Returns:
A new InternalFrame instance with computed virtual index.
"""
return InternalFrame.create(
ordered_dataframe=self.ordered_dataframe.ensure_row_position_column(),
data_column_pandas_labels=self.data_column_pandas_labels,
data_column_snowflake_quoted_identifiers=self.data_column_snowflake_quoted_identifiers,
data_column_pandas_index_names=self.data_column_pandas_index_names,
data_column_types=self.cached_data_column_snowpark_pandas_types,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
index_column_types=self.cached_index_column_snowpark_pandas_types,
)
def ensure_row_count_column(self) -> "InternalFrame":
"""
Ensure row position column is computed for given internal frame.
Returns:
A new InternalFrame instance with computed virtual index.
"""
return InternalFrame.create(
ordered_dataframe=self.ordered_dataframe.ensure_row_count_column(),
data_column_pandas_labels=self.data_column_pandas_labels,
data_column_snowflake_quoted_identifiers=self.data_column_snowflake_quoted_identifiers,
data_column_pandas_index_names=self.data_column_pandas_index_names,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
data_column_types=self.cached_data_column_snowpark_pandas_types,
index_column_types=self.cached_index_column_snowpark_pandas_types,
)
def persist_to_temporary_table(self) -> "InternalFrame":
"""
Persists the OrderedDataFrame backing this InternalFrame to a temporary table for the duration of the session.
Returns:
A new InternalFrame with the backing OrderedDataFrame persisted to a temporary table.
"""
return InternalFrame.create(
ordered_dataframe=cache_result(self.ordered_dataframe),
data_column_pandas_labels=self.data_column_pandas_labels,
data_column_snowflake_quoted_identifiers=self.data_column_snowflake_quoted_identifiers,
data_column_types=self.cached_data_column_snowpark_pandas_types,
data_column_pandas_index_names=self.data_column_pandas_index_names,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
index_column_types=self.cached_index_column_snowpark_pandas_types,
)
def append_column(
self,
pandas_label: Hashable,
value: SnowparkColumn,
value_type: Optional[SnowparkPandasType] = None,
) -> "InternalFrame":
"""
Append a column to this frame. The column is added at the end. For a frame with multiindex column, it
automatically fills the missing levels with None. For example, in a table with MultiIndex columns like
("A", "col1"), ("A", "col2"), ("B", "col1"), ("B", "col2"), appending a count column "cnt" will produce
a column labelled ("cnt", None).
Args:
pandas_label: pandas label for column to be inserted.
value: SnowparkColumn.
value_type: The optional SnowparkPandasType for the new column.
Returns:
A new InternalFrame with new column.
"""
# +---------------+---------------+---------------+---------------+ +---------------+
# | ("A", "col1") | ("A", "col2") | ("B", "col1") | ("B", "col2") | | "cnt" |
# +---------------+---------------+---------------+---------------+ + +---------------+
# | . . . | . . . | . . . | . . . | | . . . |
# +---------------+---------------+---------------+---------------+ +---------------+
#
# Appending a column "cnt" to the table below will produce the following table:
# +---------------+---------------+---------------+---------------+---------------+
# | ("A", "col1") | ("A", "col2") | ("B", "col1") | ("B", "col2") | ("cnt", None) |
# +---------------+---------------+---------------+---------------+---------------+
# | . . . | . . . | . . . | . . . | . . . |
# +---------------+---------------+---------------+---------------+---------------+
# Generate label for the column to be appended.
nlevels = self.num_index_levels(axis=1)
pandas_label = fill_missing_levels_for_pandas_label(
pandas_label, nlevels, 0, None
)
# Generate snowflake quoted identifier for new column to be added.
new_column_identifier = (
self.ordered_dataframe.generate_snowflake_quoted_identifiers(
pandas_labels=[pandas_label],
)[0]
)
new_ordered_dataframe = append_columns(
self.ordered_dataframe, new_column_identifier, value
)
return InternalFrame.create(
ordered_dataframe=new_ordered_dataframe,
data_column_pandas_labels=self.data_column_pandas_labels + [pandas_label],
data_column_snowflake_quoted_identifiers=self.data_column_snowflake_quoted_identifiers
+ [new_column_identifier],
data_column_pandas_index_names=self.data_column_pandas_index_names,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,