Skip to content

Commit 59e8dae

Browse files
Manul from Pathwayembe-pwjanchorowskiXGendredxtrous
committed
Release 0.7.1
Co-authored-by: Michał Bartoszkiewicz <embe@pathway.com> Co-authored-by: Jan Chorowski <janek@pathway.com> Co-authored-by: Xavier Gendre <xavier@pathway.com> Co-authored-by: Adrian Kosowski <adrian@pathway.com> Co-authored-by: Jakub Kowalski <kuba@pathway.com> Co-authored-by: Sergey Kulik <sergey@pathway.com> Co-authored-by: Mateusz Lewandowski <mateusz@pathway.com> Co-authored-by: Mohamed Malhou <mohamed@pathway.com> Co-authored-by: Krzysztof Nowicki <krzysiek@pathway.com> Co-authored-by: Richard Pelgrim <richard.pelgrim@pathway.com> Co-authored-by: Kamil Piechowiak <kamil@pathway.com> Co-authored-by: Paweł Podhajski <pawel.podhajski@pathway.com> Co-authored-by: Olivier Ruas <olivier@pathway.com> Co-authored-by: Przemysław Uznański <przemek@pathway.com> Co-authored-by: Sebastian Włudzik <sebastian.wludzik@pathway.com> GitOrigin-RevId: 50c042a9e5b04e8ecc09ccb3f80086f6e598cdff
1 parent 4c570e1 commit 59e8dae

27 files changed

+778
-118
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66
## [Unreleased]
77

8+
## [0.7.1] - 2023-11-17
9+
10+
### Added
11+
12+
- Experimental Google Drive input connector.
13+
- Stateful deduplication function (`pw.stateful.deduplicate`) allowing alerting on significant changes.
14+
- The ability to split data into batches in `pw.debug.table_from_markdown` and `pw.debug.table_from_pandas`.
15+
816
## [0.7.0] - 2023-11-16
917

1018
### Added

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pathway"
3-
version = "0.7.0"
3+
version = "0.7.1"
44
edition = "2021"
55
publish = false
66
rust-version = "1.72.0"
@@ -13,7 +13,6 @@ crate-type = ["cdylib", "lib"]
1313
[dev-dependencies]
1414
assert_matches = "1.5.0"
1515
eyre = "0.6.8"
16-
tempfile = "3.8.1"
1716

1817
[dependencies]
1918
arc-swap = "1.6.0"
@@ -59,6 +58,7 @@ serde_json = "1.0"
5958
serde_with = "3.4.0"
6059
smallvec = { version = "1.11.1", features = ["union", "const_generics"] }
6160
syn = { version = "2.0.38", features = ["default", "full", "visit", "visit-mut"] } # Hack to keep features unified between normal and build deps
61+
tempfile = "3.8.1"
6262
thiserror = "1.0.50"
6363
timely = { path = "./external/timely-dataflow/timely", features = ["bincode"] }
6464
tokio = "1.33.0"

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dependencies = [
3131
"diskcache >= 5.2.1",
3232
"exceptiongroup >= 1.1.3; python_version < '3.11'",
3333
"boto3 >= 1.26.76",
34+
"google-api-python-client >= 2.108.0",
3435
]
3536

3637
[project.optional-dependencies]

python/pathway/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,16 @@
7979
unwrap,
8080
)
8181
from pathway.internals.api import PathwayType as Type
82-
from pathway.stdlib import graphs, indexing, ml, ordered, statistical, temporal, utils
82+
from pathway.stdlib import (
83+
graphs,
84+
indexing,
85+
ml,
86+
ordered,
87+
stateful,
88+
statistical,
89+
temporal,
90+
utils,
91+
)
8392
from pathway.stdlib.utils.async_transformer import AsyncTransformer
8493
from pathway.stdlib.utils.pandas_transformer import pandas_transformer
8594

@@ -161,6 +170,7 @@
161170
"Duration",
162171
"Json",
163172
"table_transformer",
173+
"stateful",
164174
]
165175

166176

python/pathway/debug/__init__.py

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,27 @@ def table_to_pandas(table: Table):
120120
return res
121121

122122

123+
def _validate_dataframe(df: pd.DataFrame) -> None:
124+
for pseudocolumn in api.PANDAS_PSEUDOCOLUMNS:
125+
if pseudocolumn in df.columns:
126+
if not pd.api.types.is_integer_dtype(df[pseudocolumn].dtype):
127+
raise ValueError(f"Column {pseudocolumn} has to contain integers only.")
128+
if api.TIME_PSEUDOCOLUMN in df.columns:
129+
if any(df[api.TIME_PSEUDOCOLUMN] < 0):
130+
raise ValueError(
131+
f"Column {api.TIME_PSEUDOCOLUMN} cannot contain negative times."
132+
)
133+
if any(df[api.TIME_PSEUDOCOLUMN] % 2 == 1):
134+
warn("timestamps are required to be even; all timestamps will be doubled")
135+
df[api.TIME_PSEUDOCOLUMN] = 2 * df[api.TIME_PSEUDOCOLUMN]
136+
137+
if api.DIFF_PSEUDOCOLUMN in df.columns:
138+
if any((df[api.DIFF_PSEUDOCOLUMN] != 1) & (df[api.DIFF_PSEUDOCOLUMN] != -1)):
139+
raise ValueError(
140+
f"Column {api.DIFF_PSEUDOCOLUMN} can only have 1 and -1 values."
141+
)
142+
143+
123144
@runtime_type_check
124145
@trace_user_frame
125146
def table_from_pandas(
@@ -128,14 +149,27 @@ def table_from_pandas(
128149
unsafe_trusted_ids: bool = False,
129150
schema: type[Schema] | None = None,
130151
) -> Table:
152+
"""
153+
A function for creating a table from a pandas DataFrame. If it contains a special
154+
column ``__time__``, rows will be split into batches with timestamps from the column.
155+
A special column ``__diff__`` can be used to set an event type - with ``1`` treated
156+
as inserting the row and ``-1`` as removing it.
157+
"""
131158
if id_from is not None and schema is not None:
132159
raise ValueError("parameters `schema` and `id_from` are mutually exclusive")
133160

161+
ordinary_columns_names = [
162+
column for column in df.columns if column not in api.PANDAS_PSEUDOCOLUMNS
163+
]
134164
if schema is None:
135-
schema = schema_from_pandas(df, id_from=id_from)
136-
elif list(df.columns) != schema.column_names():
165+
schema = schema_from_pandas(
166+
df, id_from=id_from, exclude_columns=api.PANDAS_PSEUDOCOLUMNS
167+
)
168+
elif ordinary_columns_names != schema.column_names():
137169
raise ValueError("schema does not match given dataframe")
138170

171+
_validate_dataframe(df)
172+
139173
return table_from_datasource(
140174
PandasDataSource(
141175
schema=schema,
@@ -168,18 +202,28 @@ def _markdown_to_pandas(table_def):
168202
).convert_dtypes()
169203

170204

171-
def parse_to_table(
205+
def table_from_markdown(
172206
table_def,
173207
id_from=None,
174208
unsafe_trusted_ids=False,
175209
schema: type[Schema] | None = None,
176210
) -> Table:
211+
"""
212+
A function for creating a table from its definition in markdown. If it contains a special
213+
column ``__time__``, rows will be split into batches with timestamps from the column.
214+
A special column ``__diff__`` can be used to set an event type - with ``1`` treated
215+
as inserting the row and ``-1`` as removing it.
216+
"""
177217
df = _markdown_to_pandas(table_def)
178218
return table_from_pandas(
179219
df, id_from=id_from, unsafe_trusted_ids=unsafe_trusted_ids, schema=schema
180220
)
181221

182222

223+
# XXX: clean this up
224+
parse_to_table = table_from_markdown
225+
226+
183227
@runtime_type_check
184228
def table_from_parquet(
185229
path: str | PathLike,
@@ -205,10 +249,6 @@ def table_to_parquet(table: Table, filename: str | PathLike):
205249
return df.to_parquet(filename)
206250

207251

208-
# XXX: clean this up
209-
table_from_markdown = parse_to_table
210-
211-
212252
class _EmptyConnectorSubject(ConnectorSubject):
213253
def run(self):
214254
pass
@@ -352,7 +392,7 @@ def table_from_pandas(
352392
"""
353393
if schema is None:
354394
schema = schema_from_pandas(
355-
df, exclude_columns=["_time", "_diff", "_worker"]
395+
df, exclude_columns={"_time", "_diff", "_worker"}
356396
)
357397
schema, api_schema = read_schema(schema=schema)
358398
value_fields: list[api.ValueField] = api_schema["value_fields"]

python/pathway/engine.pyi

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@ class LegacyTable:
104104
class Table:
105105
"""Table with tuples containing values from multiple columns."""
106106

107+
class InputRow:
108+
"""Row of data for static_table"""
109+
110+
def __init__(
111+
self,
112+
key: Pointer,
113+
value: list[Value],
114+
time: int = 0,
115+
diff: int = 1,
116+
shard: int | None = None,
117+
) -> None: ...
118+
107119
class MissingValueError(BaseException):
108120
"Marker class to indicate missing attributes"
109121

@@ -374,7 +386,7 @@ class Scope:
374386
def static_table(
375387
self,
376388
universe: Universe,
377-
rows: Iterable[tuple[Pointer, list[Value]]],
389+
rows: Iterable[InputRow],
378390
dt: DType,
379391
) -> Table: ...
380392
def map_column(
@@ -617,6 +629,15 @@ class SnapshotAccess(Enum):
617629
REPLAY: SnapshotAccess
618630
FULL: SnapshotAccess
619631

632+
class DataEventType(Enum):
633+
INSERT: DataEventType
634+
DELETE: DataEventType
635+
UPSERT: DataEventType
636+
637+
class SessionType(Enum):
638+
NATIVE: SessionType
639+
UPSERT: SessionType
640+
620641
class SnapshotEvent:
621642
@staticmethod
622643
def insert(key: Pointer, values: list[Value]) -> SnapshotEvent: ...

python/pathway/internals/api.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ def ids_from_pandas(
7171
return {k: ref_scalar(*args) for (k, *args) in df[id_from].itertuples()}
7272

7373

74+
TIME_PSEUDOCOLUMN = "__time__"
75+
DIFF_PSEUDOCOLUMN = "__diff__"
76+
SHARD_PSEUDOCOLUMN = "__shard__"
77+
PANDAS_PSEUDOCOLUMNS = {TIME_PSEUDOCOLUMN, DIFF_PSEUDOCOLUMN, SHARD_PSEUDOCOLUMN}
78+
79+
7480
def static_table_from_pandas(
7581
scope,
7682
df: pd.DataFrame,
@@ -79,17 +85,19 @@ def static_table_from_pandas(
7985
) -> Table:
8086
ids = ids_from_pandas(df, connector_properties, id_from)
8187

82-
all_data: list[tuple[Pointer, list[Value]]] = [(key, []) for key in ids.values()]
83-
8488
data = {}
8589
for c in df.columns:
86-
data[c] = {ids[k]: denumpify(v) for k, v in df[c].items()}
90+
data[c] = [denumpify(v) for _, v in df[c].items()]
91+
# df[c].items() is used because df[c].values is a numpy array
92+
ordinary_columns = [
93+
column for column in df.columns if column not in PANDAS_PSEUDOCOLUMNS
94+
]
8795

8896
if connector_properties is None:
8997
column_properties = []
90-
for c in df.columns:
98+
for c in ordinary_columns:
9199
dtype: type = int
92-
for v in data[c].values():
100+
for v in data[c]:
93101
if v is not None:
94102
dtype = type(v)
95103
break
@@ -99,13 +107,19 @@ def static_table_from_pandas(
99107
connector_properties = ConnectorProperties(column_properties=column_properties)
100108

101109
assert len(connector_properties.column_properties) == len(
102-
df.columns
110+
ordinary_columns
103111
), "prrovided connector properties do not match the dataframe"
104112

105-
for c in df.columns:
106-
for (key, values), (column_key, value) in zip(
107-
all_data, data[c].items(), strict=True
108-
):
109-
assert key == column_key
110-
values.append(value)
111-
return scope.static_table(all_data, connector_properties)
113+
input_data: list[InputRow] = []
114+
for i, index in enumerate(df.index):
115+
key = ids[index]
116+
values = [data[c][i] for c in ordinary_columns]
117+
time = data[TIME_PSEUDOCOLUMN][i] if TIME_PSEUDOCOLUMN in data else 0
118+
diff = data[DIFF_PSEUDOCOLUMN][i] if DIFF_PSEUDOCOLUMN in data else 1
119+
if diff not in [-1, 1]:
120+
raise ValueError(f"Column {DIFF_PSEUDOCOLUMN} can only contain 1 and -1.")
121+
shard = data[SHARD_PSEUDOCOLUMN][i] if SHARD_PSEUDOCOLUMN in data else None
122+
input_row = InputRow(key, values, time=time, diff=diff, shard=shard)
123+
input_data.append(input_row)
124+
125+
return scope.static_table(input_data, connector_properties)

python/pathway/internals/schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def schema_from_pandas(
8181
*,
8282
id_from: list[str] | None = None,
8383
name: str | None = None,
84-
exclude_columns: list[str] = [],
84+
exclude_columns: set[str] = set(),
8585
) -> type[Schema]:
8686
if name is None:
8787
name = "schema_from_pandas(" + str(dframe.columns) + ")"

python/pathway/io/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
debezium,
66
elasticsearch,
77
fs,
8+
gdrive,
89
http,
910
jsonlines,
1011
kafka,
@@ -42,4 +43,5 @@
4243
"subscribe",
4344
"s3",
4445
"s3_csv",
46+
"gdrive",
4547
]

0 commit comments

Comments
 (0)