|
| 1 | +# Operator Description |
| 2 | +# sem_map Map each record using a natural language projection |
| 3 | +# sem_filter Keep records that match the natural language predicate |
| 4 | +# sem_extract Extract one or more attributes from each row |
| 5 | +# sem_agg Aggregate across all records (e.g. for summarization) |
| 6 | +# sem_topk Order the records by some natural language sorting criteria |
| 7 | +# sem_join Join two datasets based on a natural language predicate |
| 8 | +import pathlib |
| 9 | +from typing import Type |
| 10 | + |
| 11 | +import pandas as pd |
| 12 | +from pydantic import BaseModel, Field |
| 13 | + |
| 14 | +from agentics import AG |
| 15 | +from agentics.core.atype import create_pydantic_model |
| 16 | + |
| 17 | + |
| 18 | +def normalize_input_data(source: AG | pd.DataFrame) -> AG: |
| 19 | + if type(source) is pd.DataFrame: |
| 20 | + return AG.from_dataframe(source) |
| 21 | + elif type(source) is AG: |
| 22 | + return source.clone() |
| 23 | + else: |
| 24 | + raise ValueError("source must be of type AG or pd.DataFrame") |
| 25 | + |
| 26 | + |
| 27 | +async def sem_map( |
| 28 | + source: AG | pd.DataFrame, |
| 29 | + target_type: Type[BaseModel] | str, |
| 30 | + instructions: str, |
| 31 | + merge_output: bool = True, ## Target, Merged |
| 32 | + **kwargs, |
| 33 | +) -> AG | pd.DataFrame: |
| 34 | + """ |
| 35 | + Agentics-native semantic map (LOTUS-style) from a source `AG` into a target schema. |
| 36 | + also works for LOTUS-style sem_extract |
| 37 | +
|
| 38 | + This function implements an agentic analogue of LOTUS `sem_map`: it semantically |
| 39 | + transforms each source state into a target representation defined by `target_type`, |
| 40 | + using natural-language `instructions` to guide the mapping. The resulting mapped states can optionally be merged back into the |
| 41 | + original `source` states. |
| 42 | +
|
| 43 | + Parameters |
| 44 | + ---------- |
| 45 | + source : AG |
| 46 | + The input Agentics graph/state collection containing the states to be mapped. |
| 47 | + target_type : Type[pydantic.BaseModel] | str |
| 48 | + Target schema for the mapped output. |
| 49 | + - If a `BaseModel` subclass, it is used directly as the target `atype`. |
| 50 | + - If a `str`, a Pydantic model is created dynamically named after target_type, |
| 51 | + using `instructions` as semantic guidance. A type named after the BaseModel |
| 52 | + with a single argument of type string. |
| 53 | + instructions : str , optional |
| 54 | + Natural-language rubric describing how to produce the target fields from the |
| 55 | + source content (e.g., extraction rules, normalization requirements, labeling |
| 56 | + criteria). |
| 57 | + source_fields : list[str], optional |
| 58 | + If provided, restricts the mapping input to these fields from each source |
| 59 | + state by setting `source.transduce_fields`. |
| 60 | + merge_output : bool, default=True |
| 61 | + If True, merge the mapped output into the original `source` states and return |
| 62 | + the merged `AG`. If False, return only the mapped output `AG`. |
| 63 | + **kwargs |
| 64 | + Additional keyword arguments forwarded to `AG(...)` when constructing the |
| 65 | + target agent graph (e.g., model/connection configuration, batching, caching, |
| 66 | + execution settings). |
| 67 | +
|
| 68 | + Returns |
| 69 | + ------- |
| 70 | + AG |
| 71 | + - If `merge_output=True`: an `AG` containing the original source states with |
| 72 | + mapped fields merged in. |
| 73 | + - If `merge_output=False`: the `AG` produced by the transduction, containing |
| 74 | + only the mapped output states. |
| 75 | +
|
| 76 | + Notes |
| 77 | + ----- |
| 78 | + - The semantic mapping is executed asynchronously via `await (target_ag << source)`. |
| 79 | + """ |
| 80 | + ag_source = normalize_input_data(source) |
| 81 | + target_ag = AG( |
| 82 | + atype=( |
| 83 | + create_pydantic_model( |
| 84 | + [(target_type, "str", instructions, False)], target_type |
| 85 | + ) |
| 86 | + if isinstance(target_type, str) |
| 87 | + else target_type |
| 88 | + ), |
| 89 | + **kwargs, |
| 90 | + ) |
| 91 | + |
| 92 | + ag_source.prompt_template = instructions |
| 93 | + |
| 94 | + map_out = await (target_ag << ag_source) |
| 95 | + output_ag = None |
| 96 | + if merge_output: |
| 97 | + output_ag = ag_source.merge_states(map_out) |
| 98 | + else: |
| 99 | + output_ag = ag_source |
| 100 | + if type(source) is pd.DataFrame: |
| 101 | + return output_ag.to_dataframe() |
| 102 | + return output_ag |
| 103 | + |
| 104 | + |
| 105 | +async def sem_filter( |
| 106 | + source: AG | pd.DataFrame, predicate_template: str, **kwargs |
| 107 | +) -> AG | pd.DataFrame: |
| 108 | + """ |
| 109 | + Agentics-native semantic filter over an `AG` using a LangChain-style condition template. |
| 110 | +
|
| 111 | + This function evaluates a natural-language predicate for each state in `source` |
| 112 | + and returns a new `AG` containing only the states classified as satisfying the |
| 113 | + predicate. It is an agentic analogue of LOTUS-style semantic filtering. |
| 114 | +
|
| 115 | + The `predicate_template` is a **LangChain-style template** (e.g., using `{field}` |
| 116 | + placeholders) that is rendered against each source state’s fields. The rendered |
| 117 | + text is then passed to an LLM-based logical classifier which produces a boolean |
| 118 | + decision (`condition_true`) for that state. |
| 119 | +
|
| 120 | + Parameters |
| 121 | + ---------- |
| 122 | + source : AG |
| 123 | + The input Agentics collection to be filtered. |
| 124 | + predicate_template : str |
| 125 | + A LangChain-style prompt template over the fields of `source` states, using |
| 126 | + placeholders like `{reviewText}` or `{title}`. For each state, the template is |
| 127 | + rendered with that state's field values and the resulting text is classified |
| 128 | + as True/False by the logical classifier. |
| 129 | + **kwargs |
| 130 | + Additional keyword arguments forwarded to `AG(...)` when constructing the |
| 131 | + classifier target graph (e.g., model/connection configuration, retries, |
| 132 | + caching, or batching settings). These override the defaults set here. |
| 133 | +
|
| 134 | + Returns |
| 135 | + ------- |
| 136 | + AG |
| 137 | + A cloned `AG` containing only the subset of original `source` states that |
| 138 | + satisfy the predicate (i.e., where `condition_true` is True). |
| 139 | +
|
| 140 | + Notes |
| 141 | + ----- |
| 142 | + - This function clones `source` to avoid mutating the original object. |
| 143 | + - Filtering assumes a 1:1 positional alignment between `source` and `map_out`. |
| 144 | + If your transduction can reorder or drop items, switch to an ID-based alignment. |
| 145 | + - Default classifier settings include `amap_batch_size=20` for batched evaluation. |
| 146 | + """ |
| 147 | + |
| 148 | + ag_source = normalize_input_data(source) |
| 149 | + |
| 150 | + target_ag = AG( |
| 151 | + atype=create_pydantic_model( |
| 152 | + [("condition_true", "bool", """Condition is True""", False)], name="filter" |
| 153 | + ), |
| 154 | + instructions="""You are a Logical Classifier. You have been given an input sentence. |
| 155 | + Read the input text and return true if the predicate is positive, false otherwise""", |
| 156 | + amap_batch_size=20, |
| 157 | + **kwargs, |
| 158 | + ) |
| 159 | + |
| 160 | + ag_source.prompt_template = predicate_template |
| 161 | + map_out = await (target_ag << ag_source) |
| 162 | + target = ag_source.clone() |
| 163 | + target.states = [] |
| 164 | + for i in range(len(map_out.states)): |
| 165 | + if map_out[i].condition_true: |
| 166 | + target.append(ag_source[i]) |
| 167 | + |
| 168 | + if type(source) is pd.DataFrame: |
| 169 | + return target.to_dataframe() |
| 170 | + else: |
| 171 | + return target |
| 172 | + |
| 173 | + |
| 174 | +async def sem_agg( |
| 175 | + source: AG | pd.DataFrame, |
| 176 | + target_type: Type[BaseModel] | str, |
| 177 | + instructions: str = None, |
| 178 | + # merge_output: bool = True, ## Target, Merged |
| 179 | + **kwargs, |
| 180 | +) -> AG | pd.DataFrame: |
| 181 | + """ """ |
| 182 | + ag_source = normalize_input_data(source) |
| 183 | + target_ag = AG( |
| 184 | + atype=( |
| 185 | + create_pydantic_model( |
| 186 | + [(target_type, "str", instructions, False)], target_type |
| 187 | + ) |
| 188 | + if isinstance(target_type, str) |
| 189 | + else target_type |
| 190 | + ), |
| 191 | + **kwargs, |
| 192 | + ) |
| 193 | + |
| 194 | + ag_source.prompt_template = instructions |
| 195 | + ag_source.transduction_type = "areduce" |
| 196 | + |
| 197 | + output_ag = await (target_ag << ag_source) |
| 198 | + if type(source) is pd.DataFrame: |
| 199 | + return output_ag.to_dataframe() |
| 200 | + return output_ag |
0 commit comments