|
265 | 265 | "outputs": [], |
266 | 266 | "source": [ |
267 | 267 | "#The following code to get raw movies data is commented out in favour of getting pre-vectorised data\n", |
| 268 | + "#If you want to vectorize the raw data from storage_file_url, uncomment the below, and set vectorizeFlag=True\n", |
268 | 269 | "#data = urllib.request.urlopen(storage_file_url)\n", |
269 | | - "#data = json.load(data)" |
| 270 | + "#data = json.load(data)\n" |
270 | 271 | ] |
271 | 272 | }, |
272 | 273 | { |
|
276 | 277 | "metadata": {}, |
277 | 278 | "outputs": [], |
278 | 279 | "source": [ |
| 280 | + "vectorizeFlag=False\n", |
| 281 | + "\n", |
279 | 282 | "import asyncio\n", |
280 | 283 | "import time\n", |
281 | 284 | "from concurrent.futures import ThreadPoolExecutor\n", |
282 | 285 | "\n", |
283 | 286 | "async def generate_vectors(items, vector_property):\n", |
284 | | - " for item in items:\n", |
285 | | - " vectorArray = await generate_embeddings(item['overview'])\n", |
286 | | - " item[vector_property] = vectorArray\n", |
| 287 | + " # Create a thread pool executor for the synchronous generate_embeddings\n", |
| 288 | + " loop = asyncio.get_event_loop()\n", |
| 289 | + " \n", |
| 290 | + " # Define a function to call generate_embeddings using run_in_executor\n", |
| 291 | + " async def generate_embedding_for_item(item):\n", |
| 292 | + " try:\n", |
| 293 | + " # Offload the sync generate_embeddings to a thread\n", |
| 294 | + " vectorArray = await loop.run_in_executor(None, generate_embeddings, item['overview'])\n", |
| 295 | + " item[vector_property] = vectorArray\n", |
| 296 | + " except Exception as e:\n", |
| 297 | + " # Log or handle exceptions if needed\n", |
| 298 | + " logging.error(f\"Error generating embedding for item: {item['overview'][:50]}...\", exc_info=True)\n", |
| 299 | + " \n", |
| 300 | + " # Create tasks for all the items to generate embeddings concurrently\n", |
| 301 | + " tasks = [generate_embedding_for_item(item) for item in items]\n", |
| 302 | + " \n", |
| 303 | + " # Run all the tasks concurrently and wait for their completion\n", |
| 304 | + " await asyncio.gather(*tasks)\n", |
| 305 | + " \n", |
287 | 306 | " return items\n", |
288 | 307 | "\n", |
289 | | - "async def insert_data():\n", |
| 308 | + "async def insert_data(vectorize=False):\n", |
290 | 309 | " start_time = time.time() # Record the start time\n", |
291 | 310 | " \n", |
| 311 | + " # If vectorize flag is True, generate vectors for the data\n", |
| 312 | + " if vectorize:\n", |
| 313 | + " print(\"Vectorizing data, please wait...\")\n", |
| 314 | + " global data\n", |
| 315 | + " data = await generate_vectors(data, \"vector\")\n", |
| 316 | + "\n", |
292 | 317 | " counter = 0\n", |
293 | 318 | " tasks = []\n", |
294 | 319 | " max_concurrency = 5 # Adjust this value to control the level of concurrency\n", |
|
318 | 343 | " print(f\"All {counter} documents inserted!\")\n", |
319 | 344 | " print(f\"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)\")\n", |
320 | 345 | "\n", |
321 | | - "# Run the async function\n", |
322 | | - "await insert_data()\n", |
323 | | - " " |
| 346 | + "# Run the async function with the vectorize flag set to True or False as needed\n", |
| 347 | + "await insert_data(vectorizeFlag) # or await insert_data() for default\n" |
324 | 348 | ] |
325 | 349 | }, |
326 | 350 | { |
|
0 commit comments