Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,22 @@ def _build_args(
return [v for _, v in zip(signature.parameters, kwargs.values())]


class TargetStateCompatibility(Enum):
"""The compatibility of the target state."""

COMPATIBLE = "Compatible"
PARTIALLY_COMPATIBLE = "PartialCompatible"
NOT_COMPATIBLE = "NotCompatible"


class _TargetConnector:
"""
The connector class passed to the engine.
"""

_spec_cls: type[Any]
_state_cls: type[Any]
_setup_key_type: Any
_setup_state_cls: type[Any]
_connector_cls: type[Any]

_get_persistent_key_fn: Callable[[_TargetConnectorContext, str], Any]
Expand All @@ -475,10 +484,15 @@ class _TargetConnector:
_mutatation_type: AnalyzedDictType | None

def __init__(
self, spec_cls: type[Any], state_cls: type[Any], connector_cls: type[Any]
self,
spec_cls: type[Any],
setup_key_type: Any,
setup_state_cls: type[Any],
connector_cls: type[Any],
):
self._spec_cls = spec_cls
self._state_cls = state_cls
self._setup_key_type = setup_key_type
self._setup_state_cls = setup_state_cls
self._connector_cls = connector_cls

self._get_persistent_key_fn = _get_required_method(
Expand Down Expand Up @@ -591,9 +605,9 @@ def get_setup_state(self, export_context: _TargetConnectorContext) -> Any:
get_persistent_state_fn = getattr(self._connector_cls, "get_setup_state", None)
if get_persistent_state_fn is None:
state = export_context.spec
if not isinstance(state, self._state_cls):
if not isinstance(state, self._setup_state_cls):
raise ValueError(
f"Expect a get_setup_state() method for {self._connector_cls} that returns an instance of {self._state_cls}"
f"Expect a get_setup_state() method for {self._connector_cls} that returns an instance of {self._setup_state_cls}"
)
else:
args = _build_args(
Expand All @@ -605,12 +619,31 @@ def get_setup_state(self, export_context: _TargetConnectorContext) -> Any:
index_options=export_context.index_options,
)
state = get_persistent_state_fn(*args)
if not isinstance(state, self._state_cls):
if not isinstance(state, self._setup_state_cls):
raise ValueError(
f"Method {get_persistent_state_fn.__name__} must return an instance of {self._state_cls}, got {type(state)}"
f"Method {get_persistent_state_fn.__name__} must return an instance of {self._setup_state_cls}, got {type(state)}"
)
return dump_engine_object(state)

def check_state_compatibility(
self, raw_desired_state: Any, raw_existing_state: Any
) -> Any:
check_state_compatibility_fn = getattr(
self._connector_cls, "check_state_compatibility", None
)
if check_state_compatibility_fn is not None:
compatibility = check_state_compatibility_fn(
load_engine_object(self._setup_state_cls, raw_desired_state),
load_engine_object(self._setup_state_cls, raw_existing_state),
)
else:
compatibility = (
TargetStateCompatibility.COMPATIBLE
if raw_desired_state == raw_existing_state
else TargetStateCompatibility.PARTIALLY_COMPATIBLE
)
return dump_engine_object(compatibility)

async def prepare_async(self, export_context: _TargetConnectorContext) -> None:
prepare_fn = getattr(self._connector_cls, "prepare", None)
if prepare_fn is None:
Expand All @@ -626,7 +659,8 @@ async def prepare_async(self, export_context: _TargetConnectorContext) -> None:
async_prepare_fn = to_async_call(prepare_fn)
export_context.prepared_spec = await async_prepare_fn(*args)

def describe_resource(self, key: Any) -> str:
def describe_resource(self, raw_key: Any) -> str:
key = load_engine_object(self._setup_key_type, raw_key)
describe_fn = getattr(self._connector_cls, "describe", None)
if describe_fn is None:
return str(key)
Expand All @@ -636,13 +670,16 @@ async def apply_setup_changes_async(
self,
changes: list[tuple[Any, list[dict[str, Any] | None], dict[str, Any] | None]],
) -> None:
for key, previous, current in changes:
for raw_key, previous, current in changes:
key = load_engine_object(self._setup_key_type, raw_key)
prev_specs = [
load_engine_object(self._state_cls, spec) if spec is not None else None
load_engine_object(self._setup_state_cls, spec)
if spec is not None
else None
for spec in previous
]
curr_spec = (
load_engine_object(self._state_cls, current)
load_engine_object(self._setup_state_cls, current)
if current is not None
else None
)
Expand Down Expand Up @@ -676,7 +713,10 @@ async def mutate_async(


def target_connector(
spec_cls: type[Any], state_cls: type[Any] | None = None
*,
spec_cls: type[Any],
setup_key_type: Any = Any,
setup_state_cls: type[Any] | None = None,
) -> Callable[[type], type]:
"""
Decorate a class to provide a target connector for an op.
Expand All @@ -688,7 +728,9 @@ def target_connector(

# Register the target connector.
def _inner(connector_cls: type) -> type:
connector = _TargetConnector(spec_cls, state_cls or spec_cls, connector_cls)
connector = _TargetConnector(
spec_cls, setup_key_type, setup_state_cls or spec_cls, connector_cls
)
_engine.register_target_connector(spec_cls.__name__, connector)
return connector_cls

Expand Down
2 changes: 1 addition & 1 deletion src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub struct ResourceSetupChangeItem<'a> {
pub setup_change: &'a dyn setup::ResourceSetupChange,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum SetupStateCompatibility {
/// The resource is fully compatible with the desired state.
/// This means the resource can be updated to the desired state without any loss of data.
Expand Down
22 changes: 17 additions & 5 deletions src/ops/py_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,23 @@ impl interface::TargetFactory for PyExportTargetFactory {
desired_state: &serde_json::Value,
existing_state: &serde_json::Value,
) -> Result<SetupStateCompatibility> {
Ok(if desired_state == existing_state {
SetupStateCompatibility::Compatible
} else {
SetupStateCompatibility::PartialCompatible
})
let compatibility = Python::with_gil(|py| -> Result<_> {
let result = self
.py_target_connector
.call_method(
py,
"check_state_compatibility",
(
pythonize(py, desired_state)?,
pythonize(py, existing_state)?,
),
None,
)
.to_result_with_py_trace(py)?;
let compatibility: SetupStateCompatibility = depythonize(&result.into_bound(py))?;
Ok(compatibility)
})?;
Ok(compatibility)
}

fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
Expand Down
Loading