@@ -595,6 +595,163 @@ async def _get_rai_attack_objectives(
595595
596596 return selected_prompts
597597
598+ async def _apply_xpia_prompts (self , objectives_list : List , target_type_str : str ) -> List :
599+ """Apply XPIA prompt formatting to objectives for indirect jailbreak strategy.
600+
601+ XPIA prompts are wrapper structures that contain:
602+ - content: benign user query to trigger tool use
603+ - context: attack vehicle with {attack_text} placeholder
604+ - context_type: modality for formatting (email, document, html, code)
605+ - tool_name: name for mock tool
606+
607+ We inject the baseline attack objectives into these XPIA wrapper prompts.
608+ """
609+ self .logger .debug (f"Applying XPIA prompts to objectives for indirect jailbreak (target_type={ target_type_str } )" )
610+
611+ try :
612+ # Fetch XPIA wrapper prompts from RAI service
613+ @self .retry_manager .create_retry_decorator (context = "xpia_prompts" )
614+ async def get_xpia_prompts_with_retry ():
615+ return await self .generated_rai_client .get_attack_objectives (
616+ risk_type = None ,
617+ risk_category = "xpia" ,
618+ application_scenario = "" ,
619+ strategy = None ,
620+ language = self .language .value ,
621+ scan_session_id = self .scan_session_id ,
622+ target = target_type_str ,
623+ )
624+
625+ xpia_prompts = await get_xpia_prompts_with_retry ()
626+
627+ # If no agent XPIA prompts and we're trying agent, fallback to model
628+ if (not xpia_prompts or len (xpia_prompts ) == 0 ) and target_type_str == "agent" :
629+ self .logger .debug ("No agent-type XPIA prompts available, falling back to model-type XPIA prompts" )
630+ try :
631+ xpia_prompts = await self .generated_rai_client .get_attack_objectives (
632+ risk_type = None ,
633+ risk_category = "xpia" ,
634+ application_scenario = "" ,
635+ strategy = None ,
636+ language = self .language .value ,
637+ scan_session_id = self .scan_session_id ,
638+ target = "model" ,
639+ )
640+ if xpia_prompts and len (xpia_prompts ) > 0 :
641+ self .logger .debug (f"Fetched { len (xpia_prompts )} model-type XPIA wrapper prompts as fallback" )
642+ except Exception as fallback_error :
643+ self .logger .error (f"Error fetching model-type XPIA prompts as fallback: { str (fallback_error )} " )
644+
645+ if not xpia_prompts or len (xpia_prompts ) == 0 :
646+ self .logger .warning ("No XPIA prompts available (even after fallback), returning objectives unchanged" )
647+ return objectives_list
648+
649+ self .logger .debug (f"Fetched { len (xpia_prompts )} XPIA wrapper prompts" )
650+
651+ # Apply XPIA wrapping to each baseline objective
652+ for objective in objectives_list :
653+ if "messages" in objective and len (objective ["messages" ]) > 0 :
654+ message = objective ["messages" ][0 ]
655+ if isinstance (message , dict ) and "content" in message :
656+ # Get the baseline attack content to inject
657+ baseline_attack_content = message ["content" ]
658+ # Preserve the original baseline context if it exists
659+ baseline_context = message .get ("context" , "" )
660+
661+ # Normalize baseline_context to a list of context dicts
662+ baseline_contexts = []
663+ if baseline_context :
664+ # Extract baseline context from RAI service format
665+ context_dict = {"content" : baseline_context }
666+ if message .get ("tool_name" ):
667+ context_dict ["tool_name" ] = message ["tool_name" ]
668+ if message .get ("context_type" ):
669+ context_dict ["context_type" ] = message ["context_type" ]
670+ baseline_contexts = [context_dict ]
671+
672+ # Check if baseline contexts have agent fields (context_type, tool_name)
673+ baseline_contexts_with_agent_fields = []
674+ baseline_contexts_without_agent_fields = []
675+
676+ for ctx in baseline_contexts :
677+ if isinstance (ctx , dict ):
678+ if "context_type" in ctx or "tool_name" in ctx :
679+ # This baseline context has agent fields - preserve it separately
680+ baseline_contexts_with_agent_fields .append (ctx )
681+ self .logger .debug (
682+ f"Found baseline context with agent fields: tool_name={ ctx .get ('tool_name' )} , context_type={ ctx .get ('context_type' )} "
683+ )
684+ else :
685+ # This baseline context has no agent fields - can be embedded
686+ baseline_contexts_without_agent_fields .append (ctx )
687+ else :
688+ baseline_contexts_without_agent_fields .append ({"content" : str (ctx )})
689+
690+ # For baseline contexts without agent fields, embed them in the attack content
691+ if baseline_contexts_without_agent_fields :
692+ context_texts = [
693+ ctx .get ("content" , "" )
694+ for ctx in baseline_contexts_without_agent_fields
695+ if ctx .get ("content" )
696+ ]
697+ if context_texts :
698+ combined_context = "\n \n " .join (context_texts )
699+ baseline_attack_content = f"{ baseline_attack_content } \n \n Context:\n { combined_context } "
700+ self .logger .debug (
701+ f"Embedded { len (context_texts )} baseline context(s) without agent fields into attack content"
702+ )
703+
704+ # Randomly select an XPIA wrapper prompt
705+ xpia_prompt = random .choice (xpia_prompts )
706+ xpia_message = xpia_prompt .get ("messages" , [{}])[0 ]
707+
708+ # Extract XPIA wrapper structure
709+ user_query = xpia_message .get ("content" , "" )
710+ attack_vehicle_context = xpia_message .get ("context" , "" )
711+ context_type = xpia_message .get ("context_type" ) or "text"
712+ tool_name = xpia_message .get ("tool_name" , "" )
713+
714+ # Inject baseline attack (now with appended context) into the {attack_text} placeholder
715+ if "{attack_text}" in attack_vehicle_context :
716+ injected_context = attack_vehicle_context .replace ("{attack_text}" , baseline_attack_content )
717+ else :
718+ # No placeholder found, append to end
719+ injected_context = f"{ attack_vehicle_context } \n \n { baseline_attack_content } "
720+
721+ # Apply modality-based formatting
722+ formatted_context = format_content_by_modality (injected_context , context_type )
723+
724+ # Update the message with benign user query
725+ message ["content" ] = user_query
726+
727+ # Build the contexts list: XPIA context + any baseline contexts with agent fields
728+ contexts = [
729+ {"content" : formatted_context , "context_type" : context_type , "tool_name" : tool_name }
730+ ]
731+
732+ # Add baseline contexts with agent fields as separate context entries
733+ if baseline_contexts_with_agent_fields :
734+ contexts .extend (baseline_contexts_with_agent_fields )
735+ self .logger .debug (
736+ f"Preserved { len (baseline_contexts_with_agent_fields )} baseline context(s) with agent fields"
737+ )
738+
739+ message ["context" ] = contexts
740+ message ["context_type" ] = (
741+ context_type # Keep at message level for backward compat (XPIA primary)
742+ )
743+ message ["tool_name" ] = tool_name
744+
745+ self .logger .debug (
746+ f"Wrapped baseline attack in XPIA: total contexts={ len (contexts )} , xpia_tool={ tool_name } , xpia_type={ context_type } "
747+ )
748+
749+ except Exception as e :
750+ self .logger .error (f"Error applying XPIA prompts: { str (e )} " )
751+ self .logger .warning ("XPIA prompt application failed, returning original objectives" )
752+
753+ return objectives_list
754+
598755 async def _apply_jailbreak_prefixes (self , objectives_list : List ) -> List :
599756 """Apply jailbreak prefixes to objectives."""
600757 self .logger .debug ("Applying jailbreak prefixes to objectives" )
0 commit comments