1- """
2- generate answer from image module
3- """
41import base64
2+ import asyncio
53from typing import List , Optional
6- import requests
4+ import aiohttp
75from .base_node import BaseNode
86from ..utils .logging import get_logger
97
@@ -22,10 +20,46 @@ def __init__(
2220 ):
2321 super ().__init__ (node_name , "node" , input , output , 2 , node_config )
2422
25- def execute (self , state : dict ) -> dict :
23+ async def process_image (self , session , api_key , image_data , user_prompt ):
24+ # Convert image data to base64
25+ base64_image = base64 .b64encode (image_data ).decode ('utf-8' )
26+
27+ headers = {
28+ "Content-Type" : "application/json" ,
29+ "Authorization" : f"Bearer { api_key } "
30+ }
31+
32+ payload = {
33+ "model" : self .node_config ["config" ]["llm" ]["model" ],
34+ "messages" : [
35+ {
36+ "role" : "user" ,
37+ "content" : [
38+ {
39+ "type" : "text" ,
40+ "text" : user_prompt
41+ },
42+ {
43+ "type" : "image_url" ,
44+ "image_url" : {
45+ "url" : f"data:image/jpeg;base64,{ base64_image } "
46+ }
47+ }
48+ ]
49+ }
50+ ],
51+ "max_tokens" : 300
52+ }
53+
54+ async with session .post ("https://api.openai.com/v1/chat/completions" ,
55+ headers = headers , json = payload ) as response :
56+ result = await response .json ()
57+ return result .get ('choices' , [{}])[0 ].get ('message' , {}).get ('content' , 'No response' )
58+
59+ async def execute_async (self , state : dict ) -> dict :
2660 """
2761 Processes images from the state, generates answers,
28- consolidates the results, and updates the state.
62+ consolidates the results, and updates the state asynchronously .
2963 """
3064 self .logger .info (f"--- Executing { self .node_name } Node ---" )
3165
@@ -39,54 +73,27 @@ def execute(self, state: dict) -> dict:
3973 is not supported. Supported models are:
4074 { ', ' .join (supported_models )} .""" )
4175
42- if self .node_config ["config" ]["llm" ]["model" ].startswith ("gpt" ):
43- api_key = self .node_config .get ("config" , {}).get ("llm" , {}).get ("api_key" , "" )
76+ api_key = self .node_config .get ("config" , {}).get ("llm" , {}).get ("api_key" , "" )
4477
45- for image_data in images :
46- base64_image = base64 .b64encode (image_data ).decode ('utf-8' )
78+ async with aiohttp .ClientSession () as session :
79+ tasks = [
80+ self .process_image (session , api_key , image_data ,
81+ state .get ("user_prompt" , "Extract information from the image" ))
82+ for image_data in images
83+ ]
4784
48- headers = {
49- "Content-Type" : "application/json" ,
50- "Authorization" : f"Bearer { api_key } "
51- }
52-
53- payload = {
54- "model" : self .node_config ["config" ]["llm" ]["model" ],
55- "messages" : [
56- {
57- "role" : "user" ,
58- "content" : [
59- {
60- "type" : "text" ,
61- "text" : state .get ("user_prompt" ,
62- "Extract information from the image" )
63- },
64- {
65- "type" : "image_url" ,
66- "image_url" : {
67- "url" : f"data:image/jpeg;base64,{ base64_image } "
68- }
69- }
70- ]
71- }
72- ],
73- "max_tokens" : 300
74- }
85+ analyses = await asyncio .gather (* tasks )
7586
76- response = requests .post ("https://api.openai.com/v1/chat/completions" ,
77- headers = headers ,
78- json = payload ,
79- timeout = 10 )
80- result = response .json ()
87+ consolidated_analysis = " " .join (analyses )
8188
82- response_text = result . get ( 'choices' ,
83- [{}])[ 0 ]. get ( 'message' , {}). get ( 'content' , 'No response' )
84- analyses . append ( response_text )
89+ state [ 'answer' ] = {
90+ "consolidated_analysis" : consolidated_analysis
91+ }
8592
86- consolidated_analysis = " " . join ( analyses )
93+ return state
8794
88- state [ 'answer' ] = {
89- "consolidated_analysis" : consolidated_analysis
90- }
91-
92- return state
95+ def execute ( self , state : dict ) -> dict :
96+ """
97+ Wrapper to run the asynchronous execute_async function in a synchronous context.
98+ """
99+ return asyncio . run ( self . execute_async ( state ))
0 commit comments