@@ -94,6 +94,175 @@ async def run_migrations():
9494 click .echo ("Memory migrations completed successfully." )
9595
9696
97+ @cli .command ()
98+ @click .option (
99+ "--batch-size" ,
100+ default = 1000 ,
101+ help = "Number of keys to process in each batch" ,
102+ )
103+ @click .option (
104+ "--dry-run" ,
105+ is_flag = True ,
106+ help = "Only count keys without migrating" ,
107+ )
108+ def migrate_working_memory (batch_size : int , dry_run : bool ):
109+ """
110+ Migrate working memory keys from string format to JSON format.
111+
112+ This command migrates all working memory keys stored in the old string
113+ format (JSON serialized as a string) to the new native Redis JSON format.
114+
115+ Use --dry-run to see how many keys need migration without making changes.
116+ """
117+ import asyncio
118+ import time
119+
120+ from agent_memory_server .utils .keys import Keys
121+ from agent_memory_server .working_memory import (
122+ set_migration_complete ,
123+ )
124+
125+ configure_logging ()
126+
127+ async def run_migration ():
128+ import json as json_module
129+
130+ redis = await get_redis_conn ()
131+
132+ # Scan for string keys only using _type filter (much faster)
133+ string_keys = []
134+ cursor = 0
135+ pattern = Keys .working_memory_key ("*" )
136+
137+ click .echo ("Scanning for working memory keys (string type only)..." )
138+ scan_start = time .time ()
139+
140+ while True :
141+ # Use _type="string" to only get string keys directly
142+ cursor , keys = await redis .scan (
143+ cursor , match = pattern , count = batch_size , _type = "string"
144+ )
145+
146+ if keys :
147+ string_keys .extend (keys )
148+
149+ if cursor == 0 :
150+ break
151+
152+ scan_time = time .time () - scan_start
153+
154+ click .echo (f"Scan completed in { scan_time :.2f} s" )
155+ click .echo (f" String format (need migration): { len (string_keys )} " )
156+
157+ if not string_keys :
158+ click .echo ("\n No keys need migration. All done!" )
159+ # Mark migration as complete
160+ await set_migration_complete (redis )
161+ return
162+
163+ if dry_run :
164+ click .echo ("\n --dry-run specified, no changes made." )
165+ return
166+
167+ # Migrate keys in batches using pipeline
168+ click .echo (f"\n Migrating { len (string_keys )} keys..." )
169+ migrate_start = time .time ()
170+ migrated = 0
171+ errors = 0
172+
173+ # Process in batches
174+ for batch_start in range (0 , len (string_keys ), batch_size ):
175+ batch_keys = string_keys [batch_start : batch_start + batch_size ]
176+
177+ # Read all string data and TTLs in a pipeline
178+ read_pipe = redis .pipeline ()
179+ for key in batch_keys :
180+ read_pipe .get (key )
181+ read_pipe .ttl (key )
182+ results = await read_pipe .execute ()
183+
184+ # Parse results (alternating: data, ttl, data, ttl, ...)
185+ migrations = [] # List of (key, data, ttl) tuples
186+ for i , key in enumerate (batch_keys ):
187+ string_data = results [i * 2 ]
188+ ttl = results [i * 2 + 1 ]
189+
190+ if string_data is None :
191+ continue
192+
193+ try :
194+ if isinstance (string_data , bytes ):
195+ string_data = string_data .decode ("utf-8" )
196+ data = json_module .loads (string_data )
197+ migrations .append ((key , data , ttl ))
198+ except Exception as e :
199+ errors += 1
200+ logger .error (f"Failed to parse key { key } : { e } " )
201+
202+ # Execute migrations in a pipeline (delete + json.set + expire if needed)
203+ if migrations :
204+ write_pipe = redis .pipeline ()
205+ for key , data , ttl in migrations :
206+ write_pipe .delete (key )
207+ write_pipe .json ().set (key , "$" , data )
208+ if ttl > 0 :
209+ write_pipe .expire (key , ttl )
210+
211+ try :
212+ await write_pipe .execute ()
213+ migrated += len (migrations )
214+ except Exception as e :
215+ # If batch fails, try one by one
216+ logger .warning (
217+ f"Batch migration failed, retrying individually: { e } "
218+ )
219+ for key , data , ttl in migrations :
220+ try :
221+ await redis .delete (key )
222+ await redis .json ().set (key , "$" , data )
223+ if ttl > 0 :
224+ await redis .expire (key , ttl )
225+ migrated += 1
226+ except Exception as e2 :
227+ errors += 1
228+ logger .error (f"Failed to migrate key { key } : { e2 } " )
229+
230+ # Progress update
231+ total_processed = batch_start + len (batch_keys )
232+ if total_processed % 10000 == 0 or total_processed == len (string_keys ):
233+ elapsed = time .time () - migrate_start
234+ rate = migrated / elapsed if elapsed > 0 else 0
235+ remaining = len (string_keys ) - total_processed
236+ eta = remaining / rate if rate > 0 else 0
237+ click .echo (
238+ f" Migrated { migrated } /{ len (string_keys )} "
239+ f"({ rate :.0f} keys/sec, ETA: { eta :.0f} s)"
240+ )
241+
242+ migrate_time = time .time () - migrate_start
243+ rate = migrated / migrate_time if migrate_time > 0 else 0
244+
245+ click .echo (f"\n Migration completed in { migrate_time :.2f} s" )
246+ click .echo (f" Migrated: { migrated } " )
247+ click .echo (f" Errors: { errors } " )
248+ click .echo (f" Rate: { rate :.0f} keys/sec" )
249+
250+ if errors == 0 :
251+ # Mark migration as complete
252+ await set_migration_complete (redis )
253+ click .echo ("\n Migration status set to complete." )
254+ click .echo (
255+ "\n 💡 Tip: Set WORKING_MEMORY_MIGRATION_COMPLETE=true to skip "
256+ "startup checks permanently."
257+ )
258+ else :
259+ click .echo (
260+ "\n Migration completed with errors. " "Run again to retry failed keys."
261+ )
262+
263+ asyncio .run (run_migration ())
264+
265+
97266@cli .command ()
98267@click .option ("--port" , default = settings .port , help = "Port to run the server on" )
99268@click .option ("--host" , default = "0.0.0.0" , help = "Host to run the server on" )
0 commit comments