|
| 1 | +# 🌐 DPSN Plugin for Virtuals Protocol (Python) |
| 2 | + |
| 3 | +> Decentralized Publish-Subscribe Network (DPSN) plugin for Virtuals Protocol agents, implemented in Python. |
| 4 | +
|
| 5 | +[](https://virtuals.io/) |
| 6 | +[](https://github.com/virtuals-protocol/virtuals-game-python) |
| 7 | +[](../../LICENSE) |
| 8 | + |
| 9 | +## 📋 Overview |
| 10 | + |
| 11 | +This plugin enables Virtuals Protocol agents (written in Python) to connect to, subscribe to, and interact with data streams available on the [DPSN Data Streams Store](https://streams.dpsn.org/). |
| 12 | + |
| 13 | +Agents can leverage this plugin to consume real-time data for decision-making, reacting to events, or integrating external information feeds. |
| 14 | + |
| 15 | +To provide personalized data streams for your agents, you can create and publish data into your own DPSN topics using the [dpsn-client for Python](https://github.com/DPSN-org/dpsn-python-client). |
| 16 | + |
| 17 | +For more information, visit: |
| 18 | +- [DPSN Official Website](https://dpsn.org) |
| 19 | + |
| 20 | +## ✨ Features |
| 21 | + |
| 22 | +- **Seamless Integration**: Connects Virtuals Protocol agents (Python) to the DPSN decentralized pub/sub network. |
| 23 | +- **Real-time Data Handling**: Subscribe to topics and process incoming messages via a configurable callback. |
| 24 | +- **Topic Management**: Provides agent-executable functions to `subscribe` and `unsubscribe` from DPSN topics. |
| 25 | +- **Error Handling**: Includes basic error handling and logging for connection and subscription issues. |
| 26 | +- **Graceful Shutdown**: Allows the agent to explicitly shut down the DPSN connection. |
| 27 | + |
| 28 | +## ⚙️ Configuration |
| 29 | + |
| 30 | +Ensure the following environment variables are set, typically in a `.env` file in your project root: |
| 31 | + |
| 32 | +> **Note**: The EVM private key (`PVT_KEY`) is used solely for signing authentication messages with the DPSN network. This process does not execute any on-chain transactions or incur gas fees. |
| 33 | +
|
| 34 | +```dotenv |
| 35 | +# Your EVM-compatible wallet private key (e.g., Metamask) |
| 36 | +PVT_KEY=your_evm_wallet_private_key_here |
| 37 | +
|
| 38 | +# The URL of the DPSN node to connect to (e.g., betanet.dpsn.org) |
| 39 | +DPSN_URL=betanet.dpsn.org |
| 40 | +
|
| 41 | +# Optional: Add VIRTUALS_API_KEY if required by your GameAgent setup |
| 42 | +# VIRTUALS_API_KEY=your_virtuals_api_key_here |
| 43 | +``` |
| 44 | + |
| 45 | +## 📚 Usage |
| 46 | + |
| 47 | +### Basic Setup |
| 48 | + |
| 49 | +The `DpsnPlugin` is designed to be used within the Virtuals Protocol Game SDK framework. You would typically instantiate it and potentially pass it to your `GameAgent` or similar construct. |
| 50 | + |
| 51 | +```python |
| 52 | +# Import the pre-instantiated plugin (recommended) |
| 53 | +from plugins.dpsn.dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin |
| 54 | +load_dotenv() |
| 55 | + |
| 56 | +dpsn_plugin=DpsnPlugin( |
| 57 | + dpsn_url=os.getenv("DPSN_URL"), |
| 58 | + pvt_key=os.getenv("PVT_KEY") |
| 59 | + ) |
| 60 | + |
| 61 | +# Define a simple message handler |
| 62 | +def handle_message(message_data): |
| 63 | + topic = message_data.get('topic', 'unknown') |
| 64 | + payload = message_data.get('payload', {}) |
| 65 | + print(f"Message on {topic}: {payload}") |
| 66 | + |
| 67 | +# Register the message handler |
| 68 | +dpsn_plugin.set_message_callback(handle_message) |
| 69 | + |
| 70 | +# Subscribe to a topic |
| 71 | +status, message, details = dpsn_plugin.subscribe( |
| 72 | + topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker" |
| 73 | +) |
| 74 | +print(f"Subscription status: {status}, Message: {message}") |
| 75 | + |
| 76 | +# Later when done: |
| 77 | +dpsn_plugin.unsubscribe(topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker") |
| 78 | +dpsn_plugin.shutdown() |
| 79 | + |
| 80 | +``` |
| 81 | + |
| 82 | +### Interacting via Agent Tasks |
| 83 | + |
| 84 | +The Game Agent interacts with the plugin by executing tasks that map to the plugin's `Function` objects. The exact syntax depends on your Game SDK's `runTask` or equivalent method. |
| 85 | + |
| 86 | +```python |
| 87 | +from game_sdk.game.agent import Agent, WorkerConfig |
| 88 | +from game_sdk.game.custom_types import FunctionResult |
| 89 | +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin |
| 90 | + |
| 91 | +load_dotenv() |
| 92 | + |
| 93 | +dpsn_plugin=DpsnPlugin( |
| 94 | + dpsn_url=os.getenv("DPSN_URL"), |
| 95 | + pvt_key=os.getenv("PVT_KEY") |
| 96 | + ) |
| 97 | +# --- Add Message Handler --- |
| 98 | +def handle_incoming_message(message_data: dict): |
| 99 | + """Callback function to process messages received via the plugin.""" |
| 100 | + try: |
| 101 | + topic = message_data.get('topic', 'N/A') |
| 102 | + payload = message_data.get('payload', '{}') |
| 103 | + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| 104 | + print(f"\n--- Message Received ({timestamp}) ---") |
| 105 | + print(f"Topic: {topic}") |
| 106 | + # Pretty print payload if it's likely JSON/dict |
| 107 | + if isinstance(payload, (dict, list)): |
| 108 | + print(f"Payload:\n{json.dumps(payload, indent=2)}") |
| 109 | + return payload |
| 110 | + else: |
| 111 | + print(f"Payload: {payload}") |
| 112 | + return payload |
| 113 | + print("-----------------------------------") |
| 114 | + except Exception as e: |
| 115 | + print(f"Error in message handler: {e}") |
| 116 | + |
| 117 | +# Set the callback in the plugin instance *before* running the agent |
| 118 | +dpsn_plugin.set_message_callback(handle_incoming_message) |
| 119 | + |
| 120 | +def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: |
| 121 | + """Update state based on the function results""" |
| 122 | + init_state = {} |
| 123 | + |
| 124 | + if current_state is None: |
| 125 | + return init_state |
| 126 | + |
| 127 | + if function_result.info is not None: |
| 128 | + current_state.update(function_result.info) |
| 129 | + |
| 130 | + return current_state |
| 131 | + |
| 132 | +def get_worker_state(function_result: FunctionResult, current_state: dict) -> dict: |
| 133 | + """Update state based on the function results""" |
| 134 | + init_state = {} |
| 135 | + |
| 136 | + if current_state is None: |
| 137 | + return init_state |
| 138 | + |
| 139 | + if function_result.info is not None: |
| 140 | + current_state.update(function_result.info) |
| 141 | + |
| 142 | + return current_state |
| 143 | + |
| 144 | + |
| 145 | +subscription_worker = WorkerConfig( |
| 146 | + id="subscription_worker", |
| 147 | + worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", |
| 148 | + get_state_fn=get_worker_state, |
| 149 | + action_space=[ |
| 150 | + dpsn_plugin.get_function("subscribe"), |
| 151 | + dpsn_plugin.get_function("unsubscribe"), |
| 152 | + dpsn_plugin.get_function("shutdown") |
| 153 | + ], |
| 154 | +) |
| 155 | + |
| 156 | +# Initialize the agent |
| 157 | +agent = Agent( |
| 158 | + api_key=os.environ.get("GAME_API_KEY"), |
| 159 | + name="DPSN Market Data Agent", |
| 160 | + agent_goal="Monitor SOLUSDT market data from DPSN and process real-time updates.", |
| 161 | + agent_description=( |
| 162 | + "You are an AI agent specialized in DPSN market data processing" |
| 163 | + "You can subscribe dpsn topic" |
| 164 | + "after 5 minutes unsubscribe the topic" |
| 165 | + "next 5 minutes close the connection" |
| 166 | + "\n\nAvailable topics:" |
| 167 | + "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" |
| 168 | + ), |
| 169 | + get_agent_state_fn=get_agent_state_fn, |
| 170 | + workers=[ |
| 171 | + subscription_worker |
| 172 | + ] |
| 173 | +) |
| 174 | +``` |
| 175 | + |
| 176 | +## 📖 API Reference (`DpsnPlugin`) |
| 177 | + |
| 178 | +Key components of the `DpsnPlugin` class: |
| 179 | + |
| 180 | +- `__init__(dpsn_url: Optional[str] = ..., pvt_key: Optional[str] = ...)`: Constructor. Reads credentials from env vars by default. Raises `ValueError` if credentials are missing. |
| 181 | +- `get_function(fn_name: str) -> Function`: Retrieves the `Function` object (`subscribe`, `unsubscribe`, `shutdown`) for the Game SDK. |
| 182 | +- `set_message_callback(callback: Callable[[Dict[str, Any]], None])`: Registers the function to call when a message is received on a subscribed topic. The callback receives a dictionary (structure depends on the underlying `dpsn-client`). |
| 183 | +- `subscribe(topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Subscribes to a topic. Handles initialization if needed. Returns status, message, and details. |
| 184 | +- `unsubscribe(topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Unsubscribes from a topic. Handles initialization if needed. Returns status, message, and details. |
| 185 | +- `shutdown() -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Disconnects the DPSN client gracefully. Returns status, message, and details. |
| 186 | +- `_ensure_initialized()`: (Internal) Manages the lazy initialization of the DPSN client connection. Raises `DpsnInitializationError` on failure. |
| 187 | + |
| 188 | +### Agent-Executable Functions |
| 189 | + |
| 190 | +The plugin exposes the following functions intended to be called via the Game Agent's task execution mechanism: |
| 191 | + |
| 192 | +- **`subscribe`**: |
| 193 | + - Description: Subscribe to a DPSN topic. |
| 194 | + - Args: `topic` (string, required) - The topic to subscribe to. |
| 195 | +- **`unsubscribe`**: |
| 196 | + - Description: Unsubscribe from a DPSN topic. |
| 197 | + - Args: `topic` (string, required) - The topic to unsubscribe from. |
| 198 | +- **`shutdown`**: |
| 199 | + - Description: Shutdown the DPSN client connection. |
| 200 | + - Args: None. |
| 201 | + |
| 202 | + |
| 203 | + |
| 204 | +> In case of any queries regarding DPSN, please reach out to the team on [Telegram](https://t.me/dpsn_dev) 📥. |
0 commit comments