Skip to content

Latest commit

 

History

History
717 lines (545 loc) · 20.2 KB

File metadata and controls

717 lines (545 loc) · 20.2 KB
name distributed-state-sync-skill
description Implements CRDT (Conflict-Free Replicated Data Types) for distributed state management with automatic conflict resolution
version 1.0.0
tags
orchestration
crdt
state-management
distributed-systems
conflict-resolution

Distributed State Sync Skill

Purpose

This skill provides Conflict-Free Replicated Data Types (CRDTs) for managing distributed state across multiple agents with automatic conflict resolution. It enables agents to update shared state concurrently without coordination while guaranteeing eventual consistency.

When to Use This Skill

Use this skill when:

  • ✅ Multiple agents need to share and update state concurrently
  • ✅ Network partitions or delays are possible
  • ✅ Need automatic conflict resolution without locking
  • ✅ Want eventually consistent distributed data structures
  • ✅ Implementing collaborative multi-agent workflows

Don't use this skill for:

  • ❌ Single-agent workflows (use local state)
  • ❌ Scenarios requiring immediate consistency (use locking)
  • ❌ Simple read-only state sharing
  • ❌ States that don't conflict (use simple replication)

Core Data Structures

1. OR-Set (Observed-Remove Set)

Purpose: Distributed set where adds and removes can happen concurrently

Properties:

  • Concurrent adds are preserved
  • Removes only affect observed elements
  • Eventually consistent across all replicas

Implementation:

from dataclasses import dataclass, field
from typing import Any, Dict, Set, Tuple
import uuid

@dataclass
class ORSet:
    """
    Observed-Remove Set (OR-Set) CRDT.

    Maintains both added and removed elements with unique IDs.
    Elements can be added and removed concurrently.
    """
    added: Dict[Any, Set[str]] = field(default_factory=dict)  # element -> set of unique IDs
    removed: Set[Tuple[Any, str]] = field(default_factory=set)  # set of (element, unique_id) pairs

    def add(self, element: Any) -> str:
        """
        Add an element to the set.

        Returns:
            Unique ID for this add operation
        """
        unique_id = str(uuid.uuid4())

        if element not in self.added:
            self.added[element] = set()

        self.added[element].add(unique_id)

        return unique_id

    def remove(self, element: Any) -> None:
        """
        Remove an element from the set.

        Removes all currently observed instances of the element.
        """
        if element in self.added:
            for unique_id in self.added[element]:
                self.removed.add((element, unique_id))

    def contains(self, element: Any) -> bool:
        """Check if element is in the set."""
        if element not in self.added:
            return False

        # Element exists if it has any non-removed instances
        for unique_id in self.added[element]:
            if (element, unique_id) not in self.removed:
                return True

        return False

    def get_elements(self) -> Set[Any]:
        """Get all elements currently in the set."""
        elements = set()

        for element, unique_ids in self.added.items():
            # Check if element has any non-removed instances
            if any((element, uid) not in self.removed for uid in unique_ids):
                elements.add(element)

        return elements

    def merge(self, other: 'ORSet') -> 'ORSet':
        """
        Merge with another OR-Set replica.

        Returns:
            New OR-Set with merged state
        """
        merged = ORSet()

        # Merge added elements
        all_elements = set(self.added.keys()) | set(other.added.keys())
        for element in all_elements:
            merged.added[element] = (
                self.added.get(element, set()) |
                other.added.get(element, set())
            )

        # Merge removed elements
        merged.removed = self.removed | other.removed

        return merged

Example:

# Agent A and Agent B working concurrently
agent_a_set = ORSet()
agent_b_set = ORSet()

# Agent A adds "task-1"
agent_a_set.add("task-1")

# Agent B adds "task-2" (concurrent)
agent_b_set.add("task-2")

# Merge sets
final_set = agent_a_set.merge(agent_b_set)

# Result: Both tasks present
assert final_set.contains("task-1")
assert final_set.contains("task-2")

2. G-Counter (Grow-Only Counter)

Purpose: Distributed counter that only increments

Properties:

  • Each replica has its own counter
  • Total = sum of all replica counters
  • Merge is max operation per replica

Implementation:

@dataclass
class GCounter:
    """
    Grow-Only Counter CRDT.

    Each replica maintains its own counter.
    Total value is the sum of all replica counters.
    """
    counters: Dict[str, int] = field(default_factory=dict)  # replica_id -> count
    replica_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    def increment(self, amount: int = 1) -> None:
        """Increment this replica's counter."""
        if self.replica_id not in self.counters:
            self.counters[self.replica_id] = 0

        self.counters[self.replica_id] += amount

    def value(self) -> int:
        """Get total value across all replicas."""
        return sum(self.counters.values())

    def merge(self, other: 'GCounter') -> 'GCounter':
        """
        Merge with another G-Counter replica.

        Returns:
            New G-Counter with merged state
        """
        merged = GCounter(replica_id=self.replica_id)

        # Take max of each replica's counter
        all_replicas = set(self.counters.keys()) | set(other.counters.keys())
        for replica in all_replicas:
            merged.counters[replica] = max(
                self.counters.get(replica, 0),
                other.counters.get(replica, 0)
            )

        return merged

Example:

# Three agents counting tasks completed
agent_a_counter = GCounter(replica_id="agent-a")
agent_b_counter = GCounter(replica_id="agent-b")
agent_c_counter = GCounter(replica_id="agent-c")

# Each agent increments locally
agent_a_counter.increment(5)  # Completed 5 tasks
agent_b_counter.increment(3)  # Completed 3 tasks
agent_c_counter.increment(7)  # Completed 7 tasks

# Merge all counters
merged = agent_a_counter.merge(agent_b_counter).merge(agent_c_counter)

# Total tasks completed
assert merged.value() == 15  # 5 + 3 + 7

3. LWW-Register (Last-Write-Wins Register)

Purpose: Distributed register where last write wins based on timestamp

Properties:

  • Each update has a timestamp
  • Latest timestamp wins on conflict
  • Simple but may lose data

Implementation:

@dataclass
class LWWRegister:
    """
    Last-Write-Wins Register CRDT.

    Stores a value with a timestamp.
    On merge, value with latest timestamp wins.
    """
    value: Any = None
    timestamp: float = 0.0
    replica_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    def set(self, value: Any) -> None:
        """Set value with current timestamp."""
        import time
        self.value = value
        self.timestamp = time.time()

    def get(self) -> Any:
        """Get current value."""
        return self.value

    def merge(self, other: 'LWWRegister') -> 'LWWRegister':
        """
        Merge with another LWW-Register.

        Returns:
            New register with value from latest write
        """
        merged = LWWRegister()

        if self.timestamp > other.timestamp:
            merged.value = self.value
            merged.timestamp = self.timestamp
            merged.replica_id = self.replica_id
        elif other.timestamp > self.timestamp:
            merged.value = other.value
            merged.timestamp = other.timestamp
            merged.replica_id = other.replica_id
        else:
            # Timestamps equal - use replica_id as tiebreaker
            if self.replica_id > other.replica_id:
                merged.value = self.value
                merged.timestamp = self.timestamp
                merged.replica_id = self.replica_id
            else:
                merged.value = other.value
                merged.timestamp = other.timestamp
                merged.replica_id = other.replica_id

        return merged

Example:

# Two agents updating same configuration
agent_a_config = LWWRegister(replica_id="agent-a")
agent_b_config = LWWRegister(replica_id="agent-b")

# Agent A updates at T=1000
agent_a_config.timestamp = 1000.0
agent_a_config.value = {"mode": "production"}

# Agent B updates at T=1001 (1 second later)
agent_b_config.timestamp = 1001.0
agent_b_config.value = {"mode": "staging"}

# Merge: B's value wins (latest timestamp)
merged = agent_a_config.merge(agent_b_config)
assert merged.value == {"mode": "staging"}

4. PN-Counter (Positive-Negative Counter)

Purpose: Distributed counter supporting both increment and decrement

Properties:

  • Combines two G-Counters (positive and negative)
  • Value = positive.value() - negative.value()
  • Fully decentralized

Implementation:

@dataclass
class PNCounter:
    """
    Positive-Negative Counter CRDT.

    Supports both increment and decrement operations.
    Internally uses two G-Counters.
    """
    positive: GCounter = field(default_factory=GCounter)
    negative: GCounter = field(default_factory=GCounter)

    def increment(self, amount: int = 1) -> None:
        """Increment counter."""
        self.positive.increment(amount)

    def decrement(self, amount: int = 1) -> None:
        """Decrement counter."""
        self.negative.increment(amount)

    def value(self) -> int:
        """Get current value (positive - negative)."""
        return self.positive.value() - self.negative.value()

    def merge(self, other: 'PNCounter') -> 'PNCounter':
        """Merge with another PN-Counter."""
        merged = PNCounter()
        merged.positive = self.positive.merge(other.positive)
        merged.negative = self.negative.merge(other.negative)
        return merged

Example:

# Agent tracking resource pool (add/remove resources)
agent_a_pool = PNCounter()
agent_b_pool = PNCounter()

# Agent A adds 10 resources
agent_a_pool.increment(10)

# Agent B removes 3 resources (concurrent)
agent_b_pool.decrement(3)

# Merge
merged = agent_a_pool.merge(agent_b_pool)
assert merged.value() == 7  # 10 - 3

Workflow

Step 1: Initialize Distributed State

from skills.orchestration.distributed_state_sync import ORSet, GCounter

# Initialize state manager
class DistributedStateManager:
    def __init__(self, replica_id: str):
        self.replica_id = replica_id
        self.pending_tasks = ORSet()       # Shared task list
        self.completed_count = GCounter(replica_id=replica_id)  # Task counter
        self.agent_status = {}             # Per-agent state

    def add_task(self, task_id: str) -> None:
        """Add task to pending list."""
        self.pending_tasks.add(task_id)

    def complete_task(self, task_id: str) -> None:
        """Mark task as completed."""
        self.pending_tasks.remove(task_id)
        self.completed_count.increment()

    def get_pending_tasks(self) -> Set[str]:
        """Get all pending tasks."""
        return self.pending_tasks.get_elements()

    def get_completed_count(self) -> int:
        """Get total completed tasks across all agents."""
        return self.completed_count.value()

Step 2: Local Operations

# Agent A performs local operations
agent_a_state = DistributedStateManager(replica_id="agent-a")

agent_a_state.add_task("task-1")
agent_a_state.add_task("task-2")
agent_a_state.complete_task("task-1")

# Agent B performs concurrent operations
agent_b_state = DistributedStateManager(replica_id="agent-b")

agent_b_state.add_task("task-3")
agent_b_state.complete_task("task-3")

Step 3: Periodic Synchronization

def synchronize_state(local_state: DistributedStateManager,
                      remote_state: DistributedStateManager) -> DistributedStateManager:
    """
    Synchronize state between two replicas.

    Returns:
        New state with merged updates
    """
    merged_state = DistributedStateManager(replica_id=local_state.replica_id)

    # Merge pending tasks (OR-Set)
    merged_state.pending_tasks = local_state.pending_tasks.merge(remote_state.pending_tasks)

    # Merge completed count (G-Counter)
    merged_state.completed_count = local_state.completed_count.merge(remote_state.completed_count)

    return merged_state

# Synchronize every 30 seconds
import time

while True:
    # Get state from other agents
    remote_states = fetch_remote_states()

    # Merge with local state
    for remote_state in remote_states:
        agent_a_state = synchronize_state(agent_a_state, remote_state)

    # Broadcast local state to others
    broadcast_state(agent_a_state)

    time.sleep(30)

Step 4: Conflict Resolution

# Conflicts are automatically resolved by CRDT semantics

# Example: Two agents add different tasks concurrently
agent_a_state.add_task("task-1")
agent_b_state.add_task("task-2")

# After merge: Both tasks present (OR-Set preserves both adds)
merged = synchronize_state(agent_a_state, agent_b_state)
assert "task-1" in merged.get_pending_tasks()
assert "task-2" in merged.get_pending_tasks()

# Example: Two agents increment counter concurrently
agent_a_state.completed_count.increment(5)
agent_b_state.completed_count.increment(3)

# After merge: Counts are added (G-Counter sums all increments)
merged = synchronize_state(agent_a_state, agent_b_state)
assert merged.get_completed_count() == 8  # 5 + 3

Advanced Patterns

1. Multi-Value Register (MVRegister)

Purpose: Keep all concurrent values until resolved

@dataclass
class MVRegister:
    """
    Multi-Value Register CRDT.

    Maintains all concurrent values with vector clocks.
    Application can choose resolution strategy.
    """
    values: Dict[Tuple, Any] = field(default_factory=dict)  # vector_clock -> value

    def set(self, value: Any, vector_clock: Tuple) -> None:
        """Set value with vector clock."""
        # Remove values dominated by this clock
        self.values = {
            vc: v for vc, v in self.values.items()
            if not self._dominates(vector_clock, vc)
        }

        self.values[vector_clock] = value

    def get(self) -> Set[Any]:
        """Get all concurrent values."""
        return set(self.values.values())

    def _dominates(self, vc1: Tuple, vc2: Tuple) -> bool:
        """Check if vc1 dominates vc2 (happens-before)."""
        return all(a >= b for a, b in zip(vc1, vc2)) and vc1 != vc2

    def merge(self, other: 'MVRegister') -> 'MVRegister':
        """Merge two MV-Registers."""
        merged = MVRegister()

        all_clocks = set(self.values.keys()) | set(other.values.keys())

        for vc in all_clocks:
            # Keep if not dominated by any other clock
            if not any(self._dominates(other_vc, vc) for other_vc in all_clocks if other_vc != vc):
                value = self.values.get(vc) or other.values.get(vc)
                merged.values[vc] = value

        return merged

2. LWW-Map (Last-Write-Wins Map)

Purpose: Distributed key-value map with LWW resolution

class LWWMap:
    """
    Last-Write-Wins Map CRDT.

    Each key is an LWW-Register.
    """
    def __init__(self):
        self.map: Dict[str, LWWRegister] = {}

    def set(self, key: str, value: Any) -> None:
        """Set key-value pair."""
        if key not in self.map:
            self.map[key] = LWWRegister()

        self.map[key].set(value)

    def get(self, key: str) -> Any:
        """Get value for key."""
        if key not in self.map:
            return None

        return self.map[key].get()

    def merge(self, other: 'LWWMap') -> 'LWWMap':
        """Merge two LWW-Maps."""
        merged = LWWMap()

        all_keys = set(self.map.keys()) | set(other.map.keys())

        for key in all_keys:
            self_reg = self.map.get(key)
            other_reg = other.map.get(key)

            if self_reg and other_reg:
                merged.map[key] = self_reg.merge(other_reg)
            elif self_reg:
                merged.map[key] = self_reg
            else:
                merged.map[key] = other_reg

        return merged

Integration Patterns

With State Manager Agent

# State Manager Agent uses this skill for CRDT operations
from skills.orchestration.distributed_state_sync import ORSet, GCounter, LWWMap

class StateManagerAgent:
    def __init__(self, replica_id: str):
        self.replica_id = replica_id

        # Use CRDTs for conflict-free state
        self.tasks = ORSet()           # Pending tasks
        self.metrics = GCounter(replica_id)  # Task counts
        self.config = LWWMap()         # Configuration values

    def handle_state_update(self, operation: str, **kwargs):
        """Handle state update operation."""
        if operation == "add_task":
            self.tasks.add(kwargs["task_id"])

        elif operation == "complete_task":
            self.tasks.remove(kwargs["task_id"])
            self.metrics.increment()

        elif operation == "update_config":
            self.config.set(kwargs["key"], kwargs["value"])

    def sync_with_peer(self, peer_state):
        """Synchronize state with peer agent."""
        self.tasks = self.tasks.merge(peer_state.tasks)
        self.metrics = self.metrics.merge(peer_state.metrics)
        self.config = self.config.merge(peer_state.config)

Examples

Example 1: Collaborative Task List

# Three agents managing shared task list
agent_a = ORSet()
agent_b = ORSet()
agent_c = ORSet()

# Agent A adds tasks
agent_a.add("implement-auth")
agent_a.add("write-tests")

# Agent B adds task concurrently
agent_b.add("update-docs")

# Agent C removes task (also concurrent)
agent_c.add("implement-auth")  # Observed this task
agent_c.remove("implement-auth")  # Then removed it

# Merge all replicas
merged = agent_a.merge(agent_b).merge(agent_c)

# Result: OR-Set semantics preserve correct state
assert not merged.contains("implement-auth")  # Correctly removed
assert merged.contains("write-tests")         # Preserved from A
assert merged.contains("update-docs")         # Preserved from B

Example 2: Distributed Metrics

# Agents tracking workflow progress
orchestrator = GCounter(replica_id="orchestrator")
agent_1 = GCounter(replica_id="agent-1")
agent_2 = GCounter(replica_id="agent-2")

# Each agent completes tasks
agent_1.increment(5)  # Completed 5 tasks
agent_2.increment(7)  # Completed 7 tasks

# Orchestrator tracks overall progress
orchestrator.increment(2)  # Completed 2 coordination tasks

# Merge for total count
total = orchestrator.merge(agent_1).merge(agent_2)
assert total.value() == 14  # 2 + 5 + 7

Best Practices

  1. Choose Right CRDT for Use Case

    # For sets: Use OR-Set (preserves concurrent adds)
    pending_tasks = ORSet()
    
    # For counters: Use G-Counter (increment only) or PN-Counter (inc/dec)
    task_count = GCounter()
    
    # For single values: Use LWW-Register (simple) or MV-Register (complex)
    current_config = LWWRegister()
  2. Synchronize Periodically

    # Every 30-60 seconds is usually sufficient
    sync_interval = 30  # seconds
    
    # More frequent for high-concurrency scenarios
    if high_concurrency:
        sync_interval = 10
  3. Handle Network Partitions

    try:
        remote_state = fetch_state_from_peer()
        merged = local_state.merge(remote_state)
    except NetworkError:
        # Continue with local operations
        # State will eventually sync when partition heals
        log("Network partition - continuing with local state")
  4. Monitor State Size

    # CRDTs can grow unbounded (especially OR-Set tombstones)
    if len(or_set.removed) > 1000:
        # Garbage collect old tombstones
        or_set.gc_tombstones(older_than=7*24*3600)  # 7 days

Related Skills

  • state-manager-skill: Uses CRDTs for distributed state
  • observability-tracker-skill: Tracks CRDT synchronization metrics

References

  • CRDT Wikipedia
  • Document 15, Section 4: State Management Patterns
  • Command: /state-coordinator

Version: 1.0.0 Status: Production Ready Complexity: High (advanced distributed systems concepts) Token Cost: Low (local operations, periodic sync)