Skip to content

Commit f6f0c0a

Browse files
fix pipeline async awaiting
1 parent f2cfa6e commit f6f0c0a

File tree

2 files changed

+26
-30
lines changed

2 files changed

+26
-30
lines changed

langgraph/checkpoint/redis/aio.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ async def _write_obj_tx(
6363
exists: int = await pipe.exists(key)
6464
if upsert_case:
6565
if exists:
66-
await pipe.json().set(key, "$.channel", write_obj["channel"])
67-
await pipe.json().set(key, "$.type", write_obj["type"])
68-
await pipe.json().set(key, "$.blob", write_obj["blob"])
66+
pipe.json().set(key, "$.channel", write_obj["channel"])
67+
pipe.json().set(key, "$.type", write_obj["type"])
68+
pipe.json().set(key, "$.blob", write_obj["blob"])
6969
else:
70-
await pipe.json().set(key, "$", write_obj)
70+
pipe.json().set(key, "$", write_obj)
7171
else:
7272
if not exists:
73-
await pipe.json().set(key, "$", write_obj)
73+
pipe.json().set(key, "$", write_obj)
7474

7575

7676
class AsyncRedisSaver(
@@ -604,12 +604,12 @@ async def aput(
604604
pipeline = self._redis.pipeline(transaction=True)
605605

606606
# Add checkpoint data to pipeline
607-
await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
607+
pipeline.json().set(checkpoint_key, "$", checkpoint_data)
608608

609609
if blobs:
610610
# Add all blob operations to the pipeline
611611
for key, data in blobs:
612-
await pipeline.json().set(key, "$", data)
612+
pipeline.json().set(key, "$", data)
613613

614614
# Execute all operations atomically
615615
await pipeline.execute()
@@ -660,7 +660,7 @@ async def aput(
660660
else:
661661
# For non-cluster mode, use pipeline
662662
pipeline = self._redis.pipeline(transaction=True)
663-
await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
663+
pipeline.json().set(checkpoint_key, "$", checkpoint_data)
664664
await pipeline.execute()
665665
except Exception:
666666
# If this also fails, we just propagate the original cancellation
@@ -788,20 +788,18 @@ async def aput_writes(
788788
exists = await self._redis.exists(key)
789789
if exists:
790790
# Update existing key
791-
await pipeline.json().set(
792-
key, "$.channel", write_obj["channel"]
793-
)
794-
await pipeline.json().set(key, "$.type", write_obj["type"])
795-
await pipeline.json().set(key, "$.blob", write_obj["blob"])
791+
pipeline.json().set(key, "$.channel", write_obj["channel"])
792+
pipeline.json().set(key, "$.type", write_obj["type"])
793+
pipeline.json().set(key, "$.blob", write_obj["blob"])
796794
else:
797795
# Create new key
798-
await pipeline.json().set(key, "$", write_obj)
796+
pipeline.json().set(key, "$", write_obj)
799797
created_keys.append(key)
800798
else:
801799
# For non-upsert case, only set if key doesn't exist
802800
exists = await self._redis.exists(key)
803801
if not exists:
804-
await pipeline.json().set(key, "$", write_obj)
802+
pipeline.json().set(key, "$", write_obj)
805803
created_keys.append(key)
806804

807805
# Execute all operations atomically

langgraph/checkpoint/redis/ashallow.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@
9090
async def _write_obj_tx(pipe: Pipeline, key: str, write_obj: dict[str, Any]) -> None:
9191
exists: int = await pipe.exists(key)
9292
if exists:
93-
await pipe.json().set(key, "$.channel", write_obj["channel"])
94-
await pipe.json().set(key, "$.type", write_obj["type"])
95-
await pipe.json().set(key, "$.blob", write_obj["blob"])
93+
pipe.json().set(key, "$.channel", write_obj["channel"])
94+
pipe.json().set(key, "$.type", write_obj["type"])
95+
pipe.json().set(key, "$.blob", write_obj["blob"])
9696
else:
97-
await pipe.json().set(key, "$", write_obj)
97+
pipe.json().set(key, "$", write_obj)
9898

9999

100100
class AsyncShallowRedisSaver(BaseRedisSaver[AsyncRedis, AsyncSearchIndex]):
@@ -240,7 +240,7 @@ async def aput(
240240
)
241241

242242
# Add checkpoint data to pipeline
243-
await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
243+
pipeline.json().set(checkpoint_key, "$", checkpoint_data)
244244

245245
# Before storing the new blobs, clean up old ones that won't be needed
246246
# - Get a list of all blob keys for this thread_id and checkpoint_ns
@@ -274,7 +274,7 @@ async def aput(
274274
continue
275275
else:
276276
# This is an old version, delete it
277-
await pipeline.delete(blob_key)
277+
pipeline.delete(blob_key)
278278

279279
# Store the new blob values
280280
blobs = self._dump_blobs(
@@ -287,7 +287,7 @@ async def aput(
287287
if blobs:
288288
# Add all blob data to pipeline
289289
for key, data in blobs:
290-
await pipeline.json().set(key, "$", data)
290+
pipeline.json().set(key, "$", data)
291291

292292
# Execute all operations atomically
293293
await pipeline.execute()
@@ -571,7 +571,7 @@ async def aput_writes(
571571

572572
# If the write is for a different checkpoint_id, delete it
573573
if key_checkpoint_id != checkpoint_id:
574-
await pipeline.delete(write_key)
574+
pipeline.delete(write_key)
575575

576576
# Add new writes to the pipeline
577577
upsert_case = all(w[0] in WRITES_IDX_MAP for w in writes)
@@ -589,17 +589,15 @@ async def aput_writes(
589589
exists = await self._redis.exists(key)
590590
if exists:
591591
# Update existing key
592-
await pipeline.json().set(
593-
key, "$.channel", write_obj["channel"]
594-
)
595-
await pipeline.json().set(key, "$.type", write_obj["type"])
596-
await pipeline.json().set(key, "$.blob", write_obj["blob"])
592+
pipeline.json().set(key, "$.channel", write_obj["channel"])
593+
pipeline.json().set(key, "$.type", write_obj["type"])
594+
pipeline.json().set(key, "$.blob", write_obj["blob"])
597595
else:
598596
# Create new key
599-
await pipeline.json().set(key, "$", write_obj)
597+
pipeline.json().set(key, "$", write_obj)
600598
else:
601599
# For shallow implementation, always set the full object
602-
await pipeline.json().set(key, "$", write_obj)
600+
pipeline.json().set(key, "$", write_obj)
603601

604602
# Execute all operations atomically
605603
await pipeline.execute()

0 commit comments

Comments
 (0)