diff --git a/genai/live/live_code_exec_with_txt.py b/genai/live/live_code_exec_with_txt.py new file mode 100644 index 00000000000..e4e794af0d1 --- /dev/null +++ b/genai/live/live_code_exec_with_txt.py @@ -0,0 +1,69 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + + +async def generate_content() -> list[str]: + # [START googlegenaisdk_live_code_exec_with_txt] + from google import genai + from google.genai.types import ( + LiveConnectConfig, + Modality, + Tool, + ToolCodeExecution, + Content, + Part, + ) + + client = genai.Client() + # model_id = "gemini-live-2.5-flash" #todo + model_id = "gemini-2.0-flash-live-preview-04-09" + config = LiveConnectConfig( + response_modalities=[Modality.TEXT], + tools=[Tool(code_execution=ToolCodeExecution())], + ) + async with client.aio.live.connect(model=model_id, config=config) as session: + text_input = "Compute the largest prime palindrome under 10" + print("> ", text_input, "\n") + await session.send_client_content( + turns=Content(role="user", parts=[Part(text=text_input)]) + ) + + response = [] + + async for chunk in session.receive(): + if chunk.server_content: + if chunk.text is not None: + response.append(chunk.text) + + model_turn = chunk.server_content.model_turn + if model_turn: + for part in model_turn.parts: + if part.executable_code is not None: + print(part.executable_code.code) + + if part.code_execution_result is not None: + print(part.code_execution_result.output) + + print("".join(response)) + # Example output: + # > Compute the largest prime palindrome under 10 + # Final Answer: The final answer is $\boxed{7}$ + # [END googlegenaisdk_live_code_exec_with_txt] + return response + + +if __name__ == "__main__": + asyncio.run(generate_content()) diff --git a/genai/live/live_func_call_with_txt.py b/genai/live/live_func_call_with_txt.py new file mode 100644 index 00000000000..2109c7a17fc --- /dev/null +++ b/genai/live/live_func_call_with_txt.py @@ -0,0 +1,72 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + + +async def generate_content() -> list[str]: + # [START googlegenaisdk_live_func_call_with_txt] + from google import genai + from google.genai.types import ( + LiveConnectConfig, + Modality, + Tool, + FunctionDeclaration, + FunctionResponse, + ) + + client = genai.Client() + # model = "gemini-live-2.5-flash" + model_id = "gemini-2.0-flash-live-preview-04-09" + + turn_on_the_lights = FunctionDeclaration(name="turn_on_the_lights") + turn_off_the_lights = FunctionDeclaration(name="turn_off_the_lights") + + config = LiveConnectConfig( + response_modalities=[Modality.TEXT], + tools=[Tool(function_declarations=[turn_on_the_lights, turn_off_the_lights])], + ) + async with client.aio.live.connect(model=model_id, config=config) as session: + text_input = "Turn on the lights please" + print("> ", text_input, "\n") + await session.send_client_content(turns={"parts": [{"text": text_input}]}) + + async for chunk in session.receive(): + if chunk.server_content: + if chunk.text is not None: + print(chunk.text) + + elif chunk.tool_call: + function_responses = [] + for fc in chunk.tool_call.function_calls: + function_response = FunctionResponse( + name=fc.name, + response={ + "result": "ok" + }, # simple, hard-coded function response + ) + function_responses.append(function_response) + print(function_response.response["result"]) + + await session.send_tool_response(function_responses=function_responses) + + # Example output: + # > Turn on the lights please + # ok + # [END googlegenaisdk_live_func_call_with_txt] + return function_responses + + +if __name__ == "__main__": + asyncio.run(generate_content()) diff --git a/genai/live/live_ground_googsearch_with_txt.py b/genai/live/live_ground_googsearch_with_txt.py new file mode 100644 index 00000000000..d4243d099e4 --- /dev/null +++ b/genai/live/live_ground_googsearch_with_txt.py @@ -0,0 +1,70 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio + + +async def generate_content() -> list[str]: + # [START googlegenaisdk_live_ground_googsearch_with_txt] + from google import genai + from google.genai.types import ( + LiveConnectConfig, + Modality, + Tool, + GoogleSearch, + Content, + Part, + ) + + client = genai.Client() + # model = "gemini-live-2.5-flash" #todo + model_id = "gemini-2.0-flash-live-preview-04-09" + config = LiveConnectConfig( + response_modalities=[Modality.TEXT], + tools=[Tool(google_search=GoogleSearch())], + ) + async with client.aio.live.connect(model=model_id, config=config) as session: + text_input = "When did the last Brazil vs. Argentina soccer match happen?" + await session.send_client_content( + turns=Content(role="user", parts=[Part(text=text_input)]) + ) + + response = [] + + async for chunk in session.receive(): + if chunk.server_content: + if chunk.text is not None: + response.append(chunk.text) + + # The model might generate and execute Python code to use Search + model_turn = chunk.server_content.model_turn + if model_turn: + for part in model_turn.parts: + if part.executable_code is not None: + print(part.executable_code.code) + + if part.code_execution_result is not None: + print(part.code_execution_result.output) + + print("".join(response)) + # Example output: + # > When did the last Brazil vs. Argentina soccer match happen? + # The last Brazil vs. Argentina soccer match was on March 25, 2025, a 2026 World Cup qualifier, where Argentina defeated Brazil 4-1. + # [END googlegenaisdk_live_ground_googsearch_with_txt] + return response + + +if __name__ == "__main__": + asyncio.run(generate_content()) diff --git a/genai/live/live_ground_ragengine_with_txt.py b/genai/live/live_ground_ragengine_with_txt.py new file mode 100644 index 00000000000..038e33c50be --- /dev/null +++ b/genai/live/live_ground_ragengine_with_txt.py @@ -0,0 +1,73 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio + + +async def generate_content(memory_corpus: str) -> list[str]: + # [START googlegenaisdk_live_ground_ragengine_with_txt] + from google import genai + from google.genai.types import ( + Content, + LiveConnectConfig, + Modality, + Part, + Tool, + Retrieval, + VertexRagStore, + VertexRagStoreRagResource, + ) + + client = genai.Client() + # model_id = "gemini-live-2.5-flash" + model_id = "gemini-2.0-flash-live-preview-04-09" + + rag_store = VertexRagStore( + rag_resources=[ + VertexRagStoreRagResource( + rag_corpus=memory_corpus # Use memory corpus if you want to store context. + ) + ], + # Set `store_context` to true to allow Live API sink context into your memory corpus. + store_context=True, + ) + config = LiveConnectConfig( + response_modalities=[Modality.TEXT], + tools=[Tool(retrieval=Retrieval(vertex_rag_store=rag_store))], + ) + + async with client.aio.live.connect(model=model_id, config=config) as session: + text_input = "What year did Mariusz Pudzianowski win World's Strongest Man?" + print("> ", text_input, "\n") + + await session.send_client_content( + turns=Content(role="user", parts=[Part(text=text_input)]) + ) + + response = [] + + async for message in session.receive(): + if message.text: + response.append(message.text) + continue + + print("".join(response)) + # Example output: + # > What year did Mariusz Pudzianowski win World's Strongest Man? + # Mariusz Pudzianowski won World's Strongest Man in 2002, 2003, 2005, 2007, and 2008. + # [END googlegenaisdk_live_ground_ragengine_with_txt] + return response + + +if __name__ == "__main__": + asyncio.run(generate_content("memory_corpus")) diff --git a/genai/live/live_websocket_audiogen_with_txt.py b/genai/live/live_websocket_audiogen_with_txt.py index f7b6f07e5f8..277d4d5f8ba 100644 --- a/genai/live/live_websocket_audiogen_with_txt.py +++ b/genai/live/live_websocket_audiogen_with_txt.py @@ -20,7 +20,9 @@ def get_bearer_token() -> str: import google.auth from google.auth.transport.requests import Request - creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + creds, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) auth_req = Request() creds.refresh(auth_req) bearer_token = creds.token @@ -55,9 +57,7 @@ async def generate_content() -> str: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -66,9 +66,7 @@ async def generate_content() -> str: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = { "response_modalities": ["AUDIO"], "speech_config": { @@ -77,7 +75,9 @@ async def generate_content() -> str: }, } - async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session: + async with connect( + WEBSOCKET_SERVICE_URL, additional_headers=headers + ) as websocket_session: # 1. Send setup configuration websocket_config = { "setup": { @@ -120,7 +120,9 @@ async def generate_content() -> str: server_content = response_chunk.get("serverContent") if not server_content: # This might indicate an error or an unexpected message format - print(f"Received non-serverContent message or empty content: {response_chunk}") + print( + f"Received non-serverContent message or empty content: {response_chunk}" + ) break # Collect audio chunks @@ -129,7 +131,9 @@ async def generate_content() -> str: for part in model_turn["parts"]: if part["inlineData"]["mimeType"] == "audio/pcm": audio_chunk = base64.b64decode(part["inlineData"]["data"]) - aggregated_response_parts.append(np.frombuffer(audio_chunk, dtype=np.int16)) + aggregated_response_parts.append( + np.frombuffer(audio_chunk, dtype=np.int16) + ) # End of response if server_content.get("turnComplete"): @@ -137,7 +141,9 @@ async def generate_content() -> str: # Save audio to a file if aggregated_response_parts: - wavfile.write("output.wav", 24000, np.concatenate(aggregated_response_parts)) + wavfile.write( + "output.wav", 24000, np.concatenate(aggregated_response_parts) + ) # Example response: # Setup Response: {'setupComplete': {}} # Input: Hello? Gemini are you there? diff --git a/genai/live/live_websocket_audiotranscript_with_txt.py b/genai/live/live_websocket_audiotranscript_with_txt.py index 5192b81ef17..5304e1914bb 100644 --- a/genai/live/live_websocket_audiotranscript_with_txt.py +++ b/genai/live/live_websocket_audiotranscript_with_txt.py @@ -20,7 +20,9 @@ def get_bearer_token() -> str: import google.auth from google.auth.transport.requests import Request - creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + creds, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) auth_req = Request() creds.refresh(auth_req) bearer_token = creds.token @@ -55,9 +57,7 @@ async def generate_content() -> str: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -66,9 +66,7 @@ async def generate_content() -> str: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = { "response_modalities": ["AUDIO"], "speech_config": { @@ -77,7 +75,9 @@ async def generate_content() -> str: }, } - async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session: + async with connect( + WEBSOCKET_SERVICE_URL, additional_headers=headers + ) as websocket_session: # 1. Send setup configuration websocket_config = { "setup": { @@ -125,7 +125,9 @@ async def generate_content() -> str: server_content = response_chunk.get("serverContent") if not server_content: # This might indicate an error or an unexpected message format - print(f"Received non-serverContent message or empty content: {response_chunk}") + print( + f"Received non-serverContent message or empty content: {response_chunk}" + ) break # Transcriptions @@ -142,7 +144,9 @@ async def generate_content() -> str: for part in model_turn["parts"]: if part["inlineData"]["mimeType"] == "audio/pcm": audio_chunk = base64.b64decode(part["inlineData"]["data"]) - aggregated_response_parts.append(np.frombuffer(audio_chunk, dtype=np.int16)) + aggregated_response_parts.append( + np.frombuffer(audio_chunk, dtype=np.int16) + ) # End of response if server_content.get("turnComplete"): diff --git a/genai/live/live_websocket_textgen_with_audio.py b/genai/live/live_websocket_textgen_with_audio.py index de6fd9d55c3..f91cff35b57 100644 --- a/genai/live/live_websocket_textgen_with_audio.py +++ b/genai/live/live_websocket_textgen_with_audio.py @@ -20,7 +20,9 @@ def get_bearer_token() -> str: import google.auth from google.auth.transport.requests import Request - creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + creds, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) auth_req = Request() creds.refresh(auth_req) bearer_token = creds.token @@ -65,9 +67,7 @@ def read_wavefile(filepath: str) -> tuple[str, str]: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -76,12 +76,12 @@ def read_wavefile(filepath: str) -> tuple[str, str]: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = {"response_modalities": ["TEXT"]} - async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session: + async with connect( + WEBSOCKET_SERVICE_URL, additional_headers=headers + ) as websocket_session: # 1. Send setup configuration websocket_config = { "setup": { @@ -105,7 +105,9 @@ def read_wavefile(filepath: str) -> tuple[str, str]: return "Error: WebSocket setup failed." # 3. Send audio message - encoded_audio_message, mime_type = read_wavefile("hello_gemini_are_you_there.wav") + encoded_audio_message, mime_type = read_wavefile( + "hello_gemini_are_you_there.wav" + ) # Example audio message: "Hello? Gemini are you there?" user_message = { @@ -136,7 +138,9 @@ def read_wavefile(filepath: str) -> tuple[str, str]: server_content = response_chunk.get("serverContent") if not server_content: # This might indicate an error or an unexpected message format - print(f"Received non-serverContent message or empty content: {response_chunk}") + print( + f"Received non-serverContent message or empty content: {response_chunk}" + ) break # Collect text responses diff --git a/genai/live/live_websocket_textgen_with_txt.py b/genai/live/live_websocket_textgen_with_txt.py index b36487cc9a0..f8e88fa0521 100644 --- a/genai/live/live_websocket_textgen_with_txt.py +++ b/genai/live/live_websocket_textgen_with_txt.py @@ -20,7 +20,9 @@ def get_bearer_token() -> str: import google.auth from google.auth.transport.requests import Request - creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + creds, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) auth_req = Request() creds.refresh(auth_req) bearer_token = creds.token @@ -51,9 +53,7 @@ async def generate_content() -> str: # Websocket Configuration WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com" - WEBSOCKET_SERVICE_URL = ( - f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" - ) + WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent" # Websocket Authentication headers = { @@ -62,12 +62,12 @@ async def generate_content() -> str: } # Model Configuration - model_path = ( - f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" - ) + model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}" model_generation_config = {"response_modalities": ["TEXT"]} - async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session: + async with connect( + WEBSOCKET_SERVICE_URL, additional_headers=headers + ) as websocket_session: # 1. Send setup configuration websocket_config = { "setup": { @@ -110,7 +110,9 @@ async def generate_content() -> str: server_content = response_chunk.get("serverContent") if not server_content: # This might indicate an error or an unexpected message format - print(f"Received non-serverContent message or empty content: {response_chunk}") + print( + f"Received non-serverContent message or empty content: {response_chunk}" + ) break # Collect text responses diff --git a/genai/live/live_with_txt.py b/genai/live/live_with_txt.py index a3c75188439..fd412af7740 100644 --- a/genai/live/live_with_txt.py +++ b/genai/live/live_with_txt.py @@ -35,7 +35,9 @@ async def generate_content() -> list[str]: ) as session: text_input = "Hello? Gemini, are you there?" print("> ", text_input, "\n") - await session.send_client_content(turns=Content(role="user", parts=[Part(text=text_input)])) + await session.send_client_content( + turns=Content(role="user", parts=[Part(text=text_input)]) + ) response = [] diff --git a/genai/live/requirements-test.txt b/genai/live/requirements-test.txt index 4fb57f7f08d..e4ce134b76a 100644 --- a/genai/live/requirements-test.txt +++ b/genai/live/requirements-test.txt @@ -2,3 +2,4 @@ backoff==2.2.1 google-api-core==2.19.0 pytest==8.2.0 pytest-asyncio==0.25.3 +pytest-mock==3.12.0 diff --git a/genai/live/requirements.txt b/genai/live/requirements.txt index c12e6a7e2f7..68f512d39e1 100644 --- a/genai/live/requirements.txt +++ b/genai/live/requirements.txt @@ -1,3 +1,4 @@ -google-genai==1.20.0 +google-genai==1.27.0 scipy==1.15.3 -websockets==15.0.1 \ No newline at end of file +websockets==15.0.1 +soundfile==0.13.1 \ No newline at end of file diff --git a/genai/live/test_live_examples.py b/genai/live/test_live_examples.py index ce382539861..1e7988ec65a 100644 --- a/genai/live/test_live_examples.py +++ b/genai/live/test_live_examples.py @@ -19,12 +19,15 @@ import os import pytest - import live_websocket_audiogen_with_txt import live_websocket_audiotranscript_with_txt import live_websocket_textgen_with_audio import live_websocket_textgen_with_txt import live_with_txt +import live_code_exec_with_txt +import live_ground_googsearch_with_txt +import live_ground_ragengine_with_txt +import live_func_call_with_txt os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True" os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1" @@ -32,6 +35,35 @@ # os.environ['GOOGLE_CLOUD_PROJECT'] = "add-your-project-name" +@pytest.fixture() +def mock_rag_components(mocker): + mock_client_cls = mocker.patch("google.genai.Client") + + from google.genai.types import VertexRagStore, VertexRagStoreRagResource + + mocker.patch( + "google.genai.types.VertexRagStoreRagResource", + side_effect=lambda rag_corpus: VertexRagStoreRagResource(rag_corpus=rag_corpus), + ) + mocker.patch( + "google.genai.types.VertexRagStore", + side_effect=lambda rag_resources, store_context: VertexRagStore( + rag_resources=rag_resources, store_context=store_context + ), + ) + + mock_session = mocker.AsyncMock() + mock_session.__aenter__.return_value = mock_session + mock_session.receive.return_value = iter( + [ + mocker.MagicMock( + text="Mariusz Pudzianowski won in 2002, 2003, 2005, 2007, and 2008." + ) + ] + ) + mock_client_cls.return_value.aio.live.connect.return_value = mock_session + + @pytest.mark.asyncio async def test_live_with_text() -> None: assert await live_with_txt.generate_content() @@ -55,3 +87,23 @@ async def test_live_websocket_audiogen_with_txt() -> None: @pytest.mark.asyncio async def test_live_websocket_audiotranscript_with_txt() -> None: assert await live_websocket_audiotranscript_with_txt.generate_content() + + +@pytest.mark.asyncio +async def test_live_code_exec_with_txt() -> None: + assert await live_code_exec_with_txt.generate_content() + + +@pytest.mark.asyncio +async def test_live_ground_googsearch_with_txt() -> None: + assert await live_ground_googsearch_with_txt.generate_content() + + +@pytest.mark.asyncio +async def test_live_ground_ragengine_with_txt(mock_rag_components) -> None: + assert await live_ground_ragengine_with_txt.generate_content("test_memory_corpus") + + +@pytest.mark.asyncio +async def test_live_func_call_with_txt() -> None: + assert await live_func_call_with_txt.generate_content()