Skip to content

Commit abcb59e

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
No public description
PiperOrigin-RevId: 781578088
1 parent 6e5c421 commit abcb59e

File tree

2 files changed

+251
-0
lines changed

2 files changed

+251
-0
lines changed

vertexai/_genai/_evals_data_converters.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class EvalDatasetSchema(_common.CaseInSensitiveEnum):
3535
GEMINI = "gemini"
3636
FLATTEN = "flatten"
3737
OPENAI = "openai"
38+
OBSERVABILITY = "observability"
3839
UNKNOWN = "unknown"
3940

4041

@@ -442,6 +443,157 @@ def convert(self, raw_data: list[dict[str, Any]]) -> types.EvaluationDataset:
442443
return types.EvaluationDataset(eval_cases=eval_cases)
443444

444445

446+
class _ObservabilityDataConverter(_EvalDataConverter):
447+
"""Converter for dataset in GCP Observability GenAI format."""
448+
449+
def _message_to_content(self, message: dict[str, Any]) -> genai_types.Content:
450+
"""Converts Obs GenAI message format to Content."""
451+
parts = []
452+
message_parts = message.get("parts", [])
453+
if isinstance(message_parts, list):
454+
for message_part in message_parts:
455+
part = None
456+
part_type = message_part.get("type", "")
457+
match part_type:
458+
case "text":
459+
part = genai_types.Part(text=message_part.content)
460+
461+
case "blob":
462+
part = genai_types.Part(inline_data=genai_types.Blob(
463+
data=message_part.data,
464+
mime_type=message_part.mime_type
465+
))
466+
467+
case "file_data":
468+
part = genai_types.Part(file_data=genai_types.FileData(
469+
file_uri=message_part.file_uri,
470+
mime_type=message_part.mime_type
471+
))
472+
473+
case _:
474+
logger.warning(
475+
"Unrecgonized message part type of '%s' found."
476+
"Skipping part.",
477+
part_type
478+
)
479+
480+
if part is not None:
481+
parts.append(part)
482+
483+
return genai_types.Content(
484+
parts=parts,
485+
role=message.get("role", "")
486+
)
487+
488+
def _parse_messages(
489+
self,
490+
eval_case_id: str,
491+
input_dict: dict[str, Any],
492+
output_dict: dict[str, Any],
493+
system_dict: Optional[dict[str, Any]] = None
494+
) -> types.EvalCase:
495+
"""Parses a set of messages into an EvalCase."""
496+
497+
# System message
498+
system_instruction = None
499+
if system_dict is not None:
500+
system_msgs = system_dict.get("messages", [])
501+
if system_msgs:
502+
system_instruction = self._message_to_content(system_msgs[0])
503+
504+
# Input message
505+
prompt = None
506+
conversation_history = []
507+
input_msgs = input_dict.get("messages", [])
508+
if input_msgs:
509+
# Extract latest message as prompt
510+
prompt = self._message_to_content(input_msgs[-1])
511+
512+
# All previous messages are history
513+
if len(input_msgs) > 1:
514+
for msg in input_msgs[:-1]:
515+
conversation_history.append(genai_types.Message(
516+
content=self._message_to_content(msg)
517+
))
518+
519+
# Output message
520+
responses = []
521+
output_msgs = output_dict.get("messages", [])
522+
if output_msgs:
523+
response = genai_types.ResponseCandidate(
524+
response=self._message_to_content(output_msgs[0])
525+
)
526+
responses.append(response)
527+
528+
return types.EvalCase(
529+
eval_case_id=eval_case_id,
530+
prompt=prompt,
531+
responses=responses,
532+
system_instruction=system_instruction,
533+
conversation_history=conversation_history,
534+
reference=None,
535+
)
536+
537+
def _load_raw_data(self, data: Any, case_index: int) -> dict[Any, str]:
538+
"""Loads raw data into dict if possible."""
539+
if isinstance(data, str):
540+
try:
541+
loaded_json = json.loads(data)
542+
if isinstance(loaded_json, dict):
543+
return loaded_json
544+
else:
545+
logger.warning(
546+
"Decoded response JSON is not a dictionary for case"
547+
" %s. Type: %s",
548+
case_index,
549+
type(loaded_json),
550+
)
551+
except json.JSONDecodeError:
552+
logger.warning(
553+
"Could not decode response JSON string for case %s."
554+
" Treating as empty response.",
555+
case_index,
556+
)
557+
elif isinstance(data, dict):
558+
return data
559+
560+
@override
561+
def convert(self, raw_data: list[dict[str, Any]]) -> types.EvaluationDataset:
562+
"""Converts a list of GCP Observability GenAI data into an EvaluationDataset."""
563+
eval_cases = []
564+
565+
for i, item in enumerate(raw_data):
566+
eval_case_id = f"observability_eval_case_{i}"
567+
568+
if "input" not in item or "output" not in item:
569+
logger.warning(
570+
"Skipping case %s due to missing 'input' or 'output' key.",
571+
i
572+
)
573+
continue
574+
575+
input_data = item.get("input", {})
576+
input_dict = self._load_raw_data(input_data, i)
577+
578+
output_data = item.get("output", {})
579+
output_dict = self._load_raw_data(output_data, i)
580+
581+
system_dict = None
582+
if "system" in item:
583+
system_data = item.get("system", {})
584+
system_dict = self._load_raw_data(system_data, i)
585+
586+
eval_case = self._parse_messages(
587+
eval_case_id,
588+
input_dict,
589+
output_dict,
590+
system_dict
591+
)
592+
eval_cases.append(eval_case)
593+
594+
return types.EvaluationDataset(eval_cases=eval_cases)
595+
596+
445597
def auto_detect_dataset_schema(
446598
raw_dataset: list[dict[str, Any]],
447599
) -> Union[EvalDatasetSchema, str]:
@@ -476,6 +628,11 @@ def auto_detect_dataset_schema(
476628
if "role" in messages_list[0] and "content" in messages_list[0]:
477629
return EvalDatasetSchema.OPENAI
478630

631+
if "format" in keys:
632+
format_content = first_item.get("format", "")
633+
if isinstance(format_content, str) and format_content == "observability":
634+
return EvalDatasetSchema.OBSERVABILITY
635+
479636
if {"prompt", "response"}.issubset(keys) or {
480637
"response",
481638
"reference",
@@ -489,6 +646,7 @@ def auto_detect_dataset_schema(
489646
EvalDatasetSchema.GEMINI: _GeminiEvalDataConverter,
490647
EvalDatasetSchema.FLATTEN: _FlattenEvalDataConverter,
491648
EvalDatasetSchema.OPENAI: _OpenAIDataConverter,
649+
EvalDatasetSchema.OBSERVABILITY: _ObservabilityDataConverter,
492650
}
493651

494652

vertexai/_genai/types.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6155,6 +6155,99 @@ def _check_pandas_installed(cls, data: Any) -> Any:
61556155
)
61566156
return data
61576157

6158+
@classmethod
6159+
def load_from_sources(
6160+
cls,
6161+
input_source: str,
6162+
output_source: str,
6163+
system_source: Optional[str] = None,
6164+
client: Optional[Any] = None,
6165+
) -> "EvaluationDataset":
6166+
if (
6167+
not input_source.startswith("gs://")
6168+
or not output_source.startswith("gs://")
6169+
or (
6170+
system_source is not None
6171+
and not system_source.startswith("gs://")
6172+
)
6173+
):
6174+
raise TypeError("Only GCS sources are supported.")
6175+
6176+
try:
6177+
from google.cloud import storage
6178+
6179+
storage_client = storage.Client(
6180+
credentials=client._api_client._credentials if client else None
6181+
)
6182+
6183+
# Input source
6184+
try:
6185+
path_without_prefix = input_source[len("gs://") :]
6186+
bucket_name, blob_path = path_without_prefix.split("/", 1)
6187+
6188+
bucket = storage_client.bucket(bucket_name)
6189+
blob = bucket.blob(blob_path)
6190+
6191+
input_str = blob.download_as_bytes().decode("utf-8")
6192+
except Exception as e:
6193+
raise IOError(
6194+
f"Failed to read from GCS path {input_source}: {e}"
6195+
) from e
6196+
6197+
# Output source
6198+
try:
6199+
path_without_prefix = output_source[len("gs://") :]
6200+
bucket_name, blob_path = path_without_prefix.split("/", 1)
6201+
6202+
bucket = storage_client.bucket(bucket_name)
6203+
blob = bucket.blob(blob_path)
6204+
6205+
output_str = blob.download_as_bytes().decode("utf-8")
6206+
except Exception as e:
6207+
raise IOError(
6208+
f"Failed to read from GCS path {output_source}: {e}"
6209+
) from e
6210+
6211+
# System source
6212+
system_str = ""
6213+
if system_source is not None:
6214+
try:
6215+
path_without_prefix = system_source[len("gs://") :]
6216+
bucket_name, blob_path = path_without_prefix.split("/", 1)
6217+
6218+
bucket = storage_client.bucket(bucket_name)
6219+
blob = bucket.blob(blob_path)
6220+
6221+
system_str = blob.download_as_bytes().decode("utf-8")
6222+
except Exception as e:
6223+
raise IOError(
6224+
f"Failed to read from GCS path {system_str}: {e}"
6225+
) from e
6226+
6227+
except ImportError as e:
6228+
raise ImportError(
6229+
"Reading from GCS requires the 'google-cloud-storage'"
6230+
" library. Please install it with 'pip install"
6231+
" google-cloud-aiplatform[evaluation]'."
6232+
) from e
6233+
6234+
try:
6235+
import pandas as pd
6236+
6237+
eval_dataset_df = pd.DataFrame(
6238+
{
6239+
"format": ["observability"],
6240+
"input": [input_str],
6241+
"output": [output_str],
6242+
"system": [system_str],
6243+
}
6244+
)
6245+
6246+
except ImportError as e:
6247+
raise ImportError("Pandas DataFrame library is required.") from e
6248+
6249+
return EvaluationDataset(eval_dataset_df=eval_dataset_df)
6250+
61586251
def show(self) -> None:
61596252
"""Shows the evaluation dataset."""
61606253
from . import _evals_visualization

0 commit comments

Comments
 (0)