Skip to content

Commit bc46d22

Browse files
authored
Replace deprecated APIs with modern equivalents (#143)
1 parent 5340eb3 commit bc46d22

File tree

6 files changed

+48
-174
lines changed

6 files changed

+48
-174
lines changed

examples/projects/twitter/services/pathway-app/app/main.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,26 @@ def get_data_table(dataset_path, poll_new_objects):
9191
"user": "postgres",
9292
"password": "changeme",
9393
}
94-
pw.io.postgres.write_snapshot(
94+
pw.io.postgres.write(
9595
author_meta,
9696
postgres_settings=postgres_settings,
9797
table_name="author_meta",
98-
primary_key=["tweet_to_author_id"],
98+
output_table_type="snapshot",
99+
primary_key=[author_meta.tweet_to_author_id],
99100
)
100-
pw.io.postgres.write_snapshot(
101+
pw.io.postgres.write(
101102
grouped,
102103
postgres_settings=postgres_settings,
103104
table_name="grouped",
104-
primary_key=["tweet_to_author_id", "time_bucket"],
105+
output_table_type="snapshot",
106+
primary_key=[grouped.tweet_to_author_id, grouped.time_bucket],
105107
)
106-
pw.io.postgres.write_snapshot(
108+
pw.io.postgres.write(
107109
tweet_pairs,
108110
postgres_settings=postgres_settings,
109111
table_name="tweet_pairs",
110-
primary_key=["tweet_from_id", "tweet_to_id"],
112+
output_table_type="snapshot",
113+
primary_key=[tweet_pairs.tweet_from_id, tweet_pairs.tweet_to_id],
111114
)
112115

113116
pw.run()

integration_tests/rag_evals/connector.py

Lines changed: 4 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
import json
2-
31
import httpx
4-
import requests
52

3+
from pathway.xpacks.llm.document_store import DocumentStoreClient
64
from pathway.xpacks.llm.question_answering import RAGClient, send_post_request
75

86

@@ -15,125 +13,8 @@ async def a_send_post_request(
1513
return response.json()
1614

1715

18-
class VectorStoreClient:
19-
"""
20-
A client you can use to query VectorStoreServer.
21-
22-
Please provide either the `url`, or `host` and `port`.
23-
24-
Args:
25-
host: host on which `VectorStoreServer </developers/api-docs/pathway-xpacks-llm/vectorstore#pathway.xpacks.llm.vector_store.VectorStoreServer>`_ listens
26-
port: port on which `VectorStoreServer </developers/api-docs/pathway-xpacks-llm/vectorstore#pathway.xpacks.llm.vector_store.VectorStoreServer>`_ listens
27-
url: url at which `VectorStoreServer </developers/api-docs/pathway-xpacks-llm/vectorstore#pathway.xpacks.llm.vector_store.VectorStoreServer>`_ listens
28-
timeout: timeout for the post requests in seconds
29-
""" # noqa
30-
31-
def __init__(
32-
self,
33-
host: str | None = None,
34-
port: int | None = None,
35-
url: str | None = None,
36-
timeout: int | None = 180,
37-
additional_headers: dict | None = None,
38-
):
39-
err = "Either (`host` and `port`) or `url` must be provided, but not both."
40-
if url is not None:
41-
if host or port:
42-
raise ValueError(err)
43-
self.url = url
44-
else:
45-
if host is None:
46-
raise ValueError(err)
47-
port = port or 80
48-
self.url = f"http://{host}:{port}"
49-
50-
self.timeout = timeout
51-
self.additional_headers = additional_headers or {}
52-
53-
def query(
54-
self,
55-
query: str,
56-
k: int = 3,
57-
metadata_filter: str | None = None,
58-
filepath_globpattern: str | None = None,
59-
) -> list[dict]:
60-
"""
61-
Perform a query to the vector store and fetch results.
62-
63-
Args:
64-
query:
65-
k: number of documents to be returned
66-
metadata_filter: optional string representing the metadata filtering query
67-
in the JMESPath format. The search will happen only for documents
68-
satisfying this filtering.
69-
filepath_globpattern: optional glob pattern specifying which documents
70-
will be searched for this query.
71-
"""
72-
73-
data = {"query": query, "k": k}
74-
if metadata_filter is not None:
75-
data["metadata_filter"] = metadata_filter
76-
if filepath_globpattern is not None:
77-
data["filepath_globpattern"] = filepath_globpattern
78-
url = self.url + "/v1/retrieve"
79-
response = requests.post(
80-
url,
81-
data=json.dumps(data),
82-
headers=self._get_request_headers(),
83-
timeout=self.timeout,
84-
)
85-
86-
responses = response.json()
87-
return sorted(responses, key=lambda x: x["dist"])
88-
89-
# Make an alias
90-
__call__ = query
91-
92-
def get_vectorstore_statistics(self):
93-
"""Fetch basic statistics about the vector store."""
94-
95-
url = self.url + "/v1/statistics"
96-
response = requests.post(
97-
url,
98-
json={},
99-
headers=self._get_request_headers(),
100-
timeout=self.timeout,
101-
)
102-
responses = response.json()
103-
return responses
104-
105-
def get_input_files(
106-
self,
107-
metadata_filter: str | None = None,
108-
filepath_globpattern: str | None = None,
109-
):
110-
"""
111-
Fetch information on documents in the the vector store.
112-
113-
Args:
114-
metadata_filter: optional string representing the metadata filtering query
115-
in the JMESPath format. The search will happen only for documents
116-
satisfying this filtering.
117-
filepath_globpattern: optional glob pattern specifying which documents
118-
will be searched for this query.
119-
"""
120-
url = self.url + "/v1/inputs"
121-
response = requests.post(
122-
url,
123-
json={
124-
"metadata_filter": metadata_filter,
125-
"filepath_globpattern": filepath_globpattern,
126-
},
127-
headers=self._get_request_headers(),
128-
timeout=self.timeout,
129-
)
130-
responses = response.json()
131-
return responses
132-
133-
def _get_request_headers(self):
134-
request_headers = {"Content-Type": "application/json"}
135-
request_headers.update(self.additional_headers)
136-
return request_headers
16+
# Use DocumentStoreClient from pathway instead of local implementation
17+
VectorStoreClient = DocumentStoreClient
13718

13819

13920
class RagConnector:
@@ -142,7 +23,7 @@ class RagConnector:
14223
def __init__(self, base_url: str):
14324
self.base_url = base_url
14425

145-
self.index_client = VectorStoreClient(
26+
self.index_client = DocumentStoreClient(
14627
url=base_url,
14728
)
14829

integration_tests/webserver/test_llm_xpack.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
import pathway as pw
1818
from pathway.internals.udfs.caches import InMemoryCache
1919
from pathway.tests.utils import wait_result_with_checker
20+
from pathway.xpacks.llm.document_store import DocumentStoreClient
2021
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer, RAGClient
2122
from pathway.xpacks.llm.tests.mocks import FakeChatModel, fake_embeddings_model
2223
from pathway.xpacks.llm.tests.utils import build_vector_store, create_build_rag_app
23-
from pathway.xpacks.llm.vector_store import VectorStoreClient, VectorStoreServer
24+
from pathway.xpacks.llm.vector_store import VectorStoreServer
2425

2526
PATHWAY_HOST = "127.0.0.1"
2627

@@ -85,7 +86,7 @@ def test_similarity_search_without_metadata(tmp_path: pathlib.Path, port: int):
8586
with open(tmp_path / "file_one.txt", "w+") as f:
8687
f.write("foo")
8788

88-
client = VectorStoreClient(host=PATHWAY_HOST, port=port)
89+
client = DocumentStoreClient(host=PATHWAY_HOST, port=port)
8990

9091
def checker() -> bool:
9192
output = []
@@ -109,7 +110,7 @@ def test_vector_store_with_langchain(tmp_path: pathlib.Path, port) -> None:
109110
with open(tmp_path / "file_one.txt", "w+") as f:
110111
f.write("foo\n\nbar")
111112

112-
client = VectorStoreClient(host=PATHWAY_HOST, port=port)
113+
client = DocumentStoreClient(host=PATHWAY_HOST, port=port)
113114

114115
def checker() -> bool:
115116
output = []
@@ -281,7 +282,7 @@ def fake_embeddings_model(x: str) -> list[float]:
281282

282283
def checker() -> bool:
283284
try:
284-
client = VectorStoreClient(host=PATHWAY_HOST, port=port)
285+
client = DocumentStoreClient(host=PATHWAY_HOST, port=port)
285286
inputs = client.get_input_files()
286287

287288
assert len(inputs) == 1
@@ -529,7 +530,7 @@ def test_serve_callable_with_search(port: int):
529530

530531
@rag_app.serve_callable(route=f"/{TEST_ENDPOINT}")
531532
async def return_top_doc_text(query):
532-
vs_client = VectorStoreClient(host=PATHWAY_HOST, port=port)
533+
vs_client = DocumentStoreClient(host=PATHWAY_HOST, port=port)
533534
return vs_client.query(query, k=1)[0]["text"]
534535

535536
def checker() -> bool:

integration_tests/webserver/test_rest_connector.py

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,18 @@ def target() -> None:
4545
r.raise_for_status()
4646
assert r.text == '"TWO"', r.text
4747

48-
queries, response_writer = pw.io.http.rest_connector(
49-
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=True
50-
)
48+
if isinstance(port, str):
49+
queries, response_writer = pw.io.http.rest_connector(
50+
host="127.0.0.1",
51+
port=port,
52+
schema=InputSchema,
53+
delete_completed_queries=True,
54+
)
55+
else:
56+
webserver = pw.io.http.PathwayWebserver(host="127.0.0.1", port=port)
57+
queries, response_writer = pw.io.http.rest_connector(
58+
webserver=webserver, schema=InputSchema, delete_completed_queries=True
59+
)
5160
responses = logic(queries)
5261
response_writer(responses)
5362
pw.io.csv.write(queries, output_path)
@@ -89,9 +98,9 @@ def target() -> None:
8998
json={"query": "two"},
9099
).raise_for_status()
91100

101+
webserver = pw.io.http.PathwayWebserver(host="127.0.0.1", port=port)
92102
queries, response_writer = pw.io.http.rest_connector(
93-
host="127.0.0.1",
94-
port=port,
103+
webserver=webserver,
95104
schema=InputSchema,
96105
route="/endpoint",
97106
delete_completed_queries=True,
@@ -129,8 +138,9 @@ def target() -> None:
129138
json={"query": "two"},
130139
).raise_for_status()
131140

141+
webserver = pw.io.http.PathwayWebserver(host="127.0.0.1", port=port)
132142
queries, response_writer = pw.io.http.rest_connector(
133-
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=True
143+
webserver=webserver, schema=InputSchema, delete_completed_queries=True
134144
)
135145
responses = logic(queries)
136146
response_writer(responses)
@@ -160,8 +170,9 @@ def target() -> None:
160170
json={"k": 1, "v": 2},
161171
).raise_for_status()
162172

173+
webserver = pw.io.http.PathwayWebserver(host="127.0.0.1", port=port)
163174
queries, response_writer = pw.io.http.rest_connector(
164-
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=False
175+
webserver=webserver, schema=InputSchema, delete_completed_queries=False
165176
)
166177
response_writer(queries.select(query_id=queries.id, result=pw.this.v))
167178

@@ -193,39 +204,20 @@ def target() -> None:
193204

194205

195206
def test_server_fail_on_duplicate_port(tmp_path: pathlib.Path, port: int) -> None:
196-
output_path = tmp_path / "output.csv"
197-
198207
class InputSchema(pw.Schema):
199208
k: int
200209
v: int
201210

211+
webserver = pw.io.http.PathwayWebserver(host="127.0.0.1", port=port)
202212
queries, response_writer = pw.io.http.rest_connector(
203-
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=False
213+
webserver=webserver, schema=InputSchema, delete_completed_queries=False
204214
)
205215
response_writer(queries.select(query_id=queries.id, result=pw.this.v))
206216

207-
queries_dup, response_writer_dup = pw.io.http.rest_connector(
208-
host="127.0.0.1", port=port, schema=InputSchema, delete_completed_queries=False
209-
)
210-
response_writer_dup(queries_dup.select(query_id=queries_dup.id, result=pw.this.v))
211-
212-
sum = queries.groupby(pw.this.k).reduce(
213-
key=pw.this.k, sum=pw.reducers.sum(pw.this.v)
214-
)
215-
sum_dup = queries_dup.groupby(pw.this.k).reduce(
216-
key=pw.this.k, sum=pw.reducers.sum(pw.this.v)
217-
)
218-
219-
pw.io.csv.write(sum, output_path)
220-
pw.io.csv.write(sum_dup, output_path)
221-
222-
with pytest.raises(OSError) as exc_info:
223-
pw.run()
224-
error_message = str(exc_info.value)
225-
assert (
226-
"error while attempting to bind on address" in error_message
227-
or "Address already in use" in error_message
228-
)
217+
with pytest.raises(RuntimeError, match="Added route will never be executed"):
218+
queries_dup, response_writer_dup = pw.io.http.rest_connector(
219+
webserver=webserver, schema=InputSchema, delete_completed_queries=False
220+
)
229221

230222

231223
def _test_server_two_endpoints(

python/pathway/tests/test_column_properties.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,9 @@ def test_rest_connector(delete_completed_queries: bool):
216216
class TestSchema(pw.Schema):
217217
a: int
218218

219+
webserver = io.http.PathwayWebserver(host="127.0.0.1", port=30000)
219220
table, response_writer = io.http.rest_connector(
220-
host="127.0.0.1",
221-
port=30000, # server is not started, port number does not matter
221+
webserver=webserver,
222222
schema=TestSchema,
223223
delete_completed_queries=delete_completed_queries,
224224
)

python/pathway/xpacks/llm/question_answering.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,13 @@
1414
from pathway.xpacks.llm import Doc, llms, prompts
1515
from pathway.xpacks.llm.document_store import (
1616
DocumentStore,
17+
DocumentStoreClient,
1718
SlidesDocumentStore,
1819
_get_jmespath_filter,
1920
)
2021
from pathway.xpacks.llm.llms import BaseChat, prompt_chat_single_qa
2122
from pathway.xpacks.llm.mcp_server import McpServable, McpServer
22-
from pathway.xpacks.llm.vector_store import (
23-
SlidesVectorStoreServer,
24-
VectorStoreClient,
25-
VectorStoreServer,
26-
)
23+
from pathway.xpacks.llm.vector_store import SlidesVectorStoreServer, VectorStoreServer
2724

2825
if TYPE_CHECKING:
2926
from pathway.xpacks.llm.servers import QARestServer, QASummaryRestServer
@@ -1106,7 +1103,7 @@ def __init__(
11061103
self.timeout = timeout
11071104
self.additional_headers = additional_headers or {}
11081105

1109-
self.index_client = VectorStoreClient(
1106+
self.index_client = DocumentStoreClient(
11101107
url=self.url,
11111108
timeout=self.timeout,
11121109
additional_headers=self.additional_headers,

0 commit comments

Comments
 (0)