1+ import jwt
2+ import requests
3+ import nilql
4+ import os
5+ import time
6+ import uuid
7+
8+ from jsonschema import Draft7Validator , validators
9+ from ecdsa import SECP256k1 , SigningKey
10+ from langchain_openai import ChatOpenAI
11+ from typing import Any , Generator
12+
13+ # from collections import deque, defaultdict
14+ # import json
15+ # import re
16+
17+ class NillionPlugin :
18+ """
19+ Nillion Plugin for interacting with Nillion subnets via Nillion API
20+ """
21+
22+ def __init__ (self ) -> None :
23+ """Initialize the Nillion plugin"""
24+ self .id : str = "nillion_plugin"
25+ self .name : str = "Nillion Plugin"
26+ self .secret_key = os .getenv ("NILLION_SECRET_KEY" )
27+ self .org_did = os .getenv ("NILLION_ORG_ID" )
28+ self .key = None
29+ self .nodes = None
30+
31+ def initialize (self ):
32+ """Initialize the plugin"""
33+ if not self .secret_key :
34+ raise ValueError ("NILLION_SECRET_KEY is not configured." )
35+ if not self .org_did :
36+ raise ValueError ("NILLION_ORG_ID is not configured." )
37+ if not os .getenv ("OPENAI_API_KEY" ):
38+ raise ValueError ("OPENAI_API_KEY is not configured." )
39+
40+ """Initialize config with JWTs signed with ES256K for multiple node_ids; Add cluster key."""
41+ response = requests .post (
42+ "https://sv-sda-registration.replit.app/api/config" ,
43+ headers = {
44+ "Content-Type" : "application/json" ,
45+ },
46+ json = {"org_did" : self .org_did },
47+ )
48+ self .nodes = response .json ()["nodes" ]
49+
50+ # Convert the secret key from hex to bytes
51+ private_key = bytes .fromhex (self .secret_key )
52+ signer = SigningKey .from_string (private_key , curve = SECP256k1 )
53+
54+ for node in self .nodes :
55+ # Create payload for each node_id
56+ payload = {
57+ "iss" : self .org_did ,
58+ "aud" : node ["did" ],
59+ "exp" : int (time .time ()) + 3600 ,
60+ }
61+
62+ # Create and sign the JWT
63+ node ["bearer" ] = jwt .encode (payload , signer .to_pem (), algorithm = "ES256K" )
64+
65+ self .key = nilql .ClusterKey .generate (
66+ {"nodes" : [{}] * len (self .nodes )}, {"store" : True }
67+ )
68+
69+ def _post (
70+ self , nodes : list , endpoint : str , payload : dict
71+ ) -> Generator [requests .Response , Any , Any ]:
72+ """Post payload to nildb nodes."""
73+ for node in nodes :
74+ headers = {
75+ "Authorization" : f'Bearer { node ["bearer" ]} ' ,
76+ "Content-Type" : "application/json" ,
77+ }
78+
79+ response = requests .post (
80+ f"{ node ['url' ]} /api/v1/{ endpoint } " ,
81+ headers = headers ,
82+ json = payload ,
83+ )
84+
85+ assert (
86+ response .status_code == 200 and response .json ().get ("errors" , []) == []
87+ ), response .content .decode ("utf8" )
88+
89+ yield response
90+
91+ def fetch_schemas (self ) -> list :
92+ """Get all my schemas from the first server."""
93+ headers = {
94+ "Authorization" : f'Bearer { self .nodes [0 ]["bearer" ]} ' ,
95+ "Content-Type" : "application/json" ,
96+ }
97+
98+ response = requests .get (
99+ f"{ self .nodes [0 ]['url' ]} /api/v1/schemas" , headers = headers
100+ )
101+
102+ assert (
103+ response .status_code == 200 and response .json ().get ("errors" , []) == []
104+ ), response .content .decode ("utf8" )
105+
106+ schema_list = response .json ()["data" ]
107+ assert len (schema_list ) > 0 , "failed to fetch schemas from nildb"
108+ return schema_list
109+
110+ def find_schema (self , schema_uuid : str , schema_list : list | None = None ) -> dict :
111+ """Filter a list of schemas by single desired schema id."""
112+ if not schema_list :
113+ schema_list = self .fetch_schemas ()
114+
115+ my_schema = None
116+ for this_schema in schema_list :
117+ if this_schema ["_id" ] == schema_uuid :
118+ my_schema = this_schema ["schema" ]
119+ break
120+ assert my_schema is not None , "failed to lookup schema"
121+ return my_schema
122+
123+ def _mutate_secret_attributes (self , entry : dict ) -> None :
124+ """Apply encryption or secret sharing to all fields in schema that are indicated w/ $share keyname."""
125+ keys = list (entry .keys ())
126+ for key in keys :
127+ value = entry [key ]
128+ if key == "_id" :
129+ entry [key ] = str (uuid .uuid4 ())
130+ elif key == "$share" :
131+ del entry ["$share" ]
132+ entry ["$allot" ] = nilql .encrypt (self .key , value )
133+ elif isinstance (value , dict ):
134+ self ._mutate_secret_attributes (value )
135+
136+ def _validator_builder (self ):
137+ """Build a validator to validate the candidate document against loaded schema."""
138+ return validators .extend (Draft7Validator )
139+
140+ def lookup_schema (self , args : dict [str , Any ]) -> tuple :
141+ """Lookup a JSON schema based on input description and return it's UUID.
142+ Args:
143+ args (dict[str, Any]): Arguments containing schema_description
144+ Returns:
145+ tuple[str, dict]: The schema_uuid and the corresponding schema definition
146+ """
147+ try :
148+ validated_args = NillionLookupSchemaInput (** args )
149+ schema_list = self .fetch_schemas ()
150+
151+ schema_prompt = f"""
152+ 1. I'll provide you with a description of the schema I want to use
153+ 2. I'll provide you with a list of available schemas
154+ 3. You will select the best match and return the associated UUID from the outermost `_id` field
155+ 4. Do not include explanation or comments. Only a valid UUID string
156+ 5. Based on the provided description, select a schema from the provided schemas.
157+ DESIRED SCHEMA DESCRIPTION:
158+ { validated_args .schema_description }
159+ AVAILABLE SCHEMAS:
160+ { json .dumps (schema_list )}
161+ """
162+
163+ llm = ChatOpenAI (model = "gpt-4o-mini" )
164+ response = llm .invoke (schema_prompt )
165+
166+ my_uuid = str (response .content )
167+ my_uuid = re .sub (r"[^0-9a-fA-F-]" , "" , my_uuid )
168+
169+ my_schema = self .find_schema (my_uuid , schema_list )
170+ return my_uuid , my_schema
171+
172+ except Exception as e :
173+ print (f"Error looking up schema: { e !r} " )
174+ return None , None
175+
176+ def create_schema (self , args : dict [str , Any ]) -> tuple :
177+ """Create a schema in your privacy preserving database, called the Nillion SecretVault
178+ (or nildb), based on a natural language description. Do not use this tool for any other
179+ purpose.
180+ Args:
181+ args (dict[str, Any]): Arguments containing a complete description of the desired schema
182+ Returns:
183+ tuple[str, dict]: The schema_uuid and the corresponding schema definition
184+ """
185+ try :
186+
187+ validated_args = NillionCreateSchemaInput (** args )
188+ print (f"fn:create_schema [{ validated_args .schema_description } ]" )
189+
190+ # ruff: noqa
191+ schema_prompt = f"""
192+ 1. I'll provide you with a description of the schema I want to implement
193+ 3. For any fields that could be considered financial, secret, currency, value holding, political, family values, sexual, criminal, risky, personal, private or personally
194+ identifying (PII), I want you to replace that type and value, instead, with an object that has a key named `$share` and the value of string as shown in this example:
195+ ORIGINAL ATTRIBUTE:
196+ "password": {{
197+ "type": "string"
198+ }}
199+ REPLACED WITH UPDATED ATTRIBUTE PRESERVING NAME:
200+ "password": {{
201+ "type": "object",
202+ "properties": {{
203+ "$share": {{
204+ "type": "string",
205+ }}
206+ }}
207+ }}
208+ 4. The JSON document should follow the patterns shown in these examples contained herein where the final result is ready to be included in the POST JSON payload
209+ 5. Do not include explanation or comments. Only a valid JSON payload document.
210+ START OF JSON SCHEMA DESECRIPTION
211+ a JSON Schema following these requirements:
212+ - Use JSON Schema draft-07, type "array"
213+ - Each record needs a unique _id (UUID format, coerce: true)
214+ - Use "date-time" format for dates (coerce: true)
215+ - Mark required fields (_id is always required)
216+ - Set additionalProperties to false
217+ - Avoid "$" prefix in field names to prevent query conflicts
218+ - The schema to create is embedded in the "schema" attribute
219+ - "_id" should be the only "keys"
220+ - Note: System adds _created and _updated fields automatically
221+ Example `POST /schema` Payload
222+ {{
223+ "name": "My services",
224+ "keys": ["_id"],
225+ "schema": {{
226+ "$schema": "http://json-schema.org/draft-07/schema#",
227+ "type": "array",
228+ "items": {{
229+ "type": "object",
230+ "properties": {{
231+ "_id": {{
232+ "type": "string",
233+ "format": "uuid",
234+ "coerce": true
235+ }},
236+ "username": {{
237+ "type": "string"
238+ }},
239+ "password": {{
240+ "type": "string"
241+ }},
242+ }},
243+ "required": ["_id", "username", "password"],
244+ "additionalProperties": false
245+ }}
246+ }}
247+ }}
248+ Based on this description, create a JSON schema:
249+ { validated_args .schema_description }
250+ """
251+
252+ llm = ChatOpenAI (model = "gpt-4o-mini" )
253+ response = llm .invoke (schema_prompt )
254+
255+ schema = json .loads (str (response .content ))
256+
257+ schema ["_id" ] = str (uuid .uuid4 ())
258+ schema ["owner" ] = self .org_did
259+
260+ deque (
261+ self ._post (self .nodes , "schemas" , schema ), maxlen = 0
262+ ) # discard results since we throw on err
263+ print (f'fn:create_schema [{ schema ["_id" ]} ]' )
264+ return schema ["_id" ], schema
265+ except Exception as e :
266+ print (f"Error creating schema: { str (e )} " )
267+ return None , None
268+
269+ def data_upload (self , args : dict [str , Any ]) -> list [str ]:
270+ """Create a schema in your privacy preserving database, called the Nillion SecretVault
271+ (or nildb), based on a natural language description. Do not use this tool for any other
272+ purpose.
273+ Args:
274+ args (dict[str, Any]): Arguments containing a UUID and the data to upload.
275+ Returns:
276+ list[str]: A list of the uploaded record's UUIDs
277+ """
278+ try :
279+ validated_args = NillionDataUploadInput (** args )
280+ print (
281+ f"fn:data_upload [{ validated_args .schema_uuid } ] [{ validated_args .data_to_store } ]"
282+ )
283+
284+ schema_definition = self .find_schema (validated_args .schema_uuid )
285+
286+ builder = self ._validator_builder ()
287+ validator = builder (schema_definition )
288+
289+ for entry in validated_args .data_to_store :
290+ self ._mutate_secret_attributes (entry )
291+
292+ record_uuids = [x ["_id" ] for x in validated_args .data_to_store ]
293+ payloads = nilql .allot (validated_args .data_to_store )
294+
295+ for idx , shard in enumerate (payloads ):
296+
297+ validator .validate (shard )
298+
299+ node = self .nodes [idx ]
300+ headers = {
301+ "Authorization" : f'Bearer { node ["bearer" ]} ' ,
302+ "Content-Type" : "application/json" ,
303+ }
304+
305+ body = {"schema" : validated_args .schema_uuid , "data" : shard }
306+
307+ response = requests .post (
308+ f"{ node ['url' ]} /api/v1/data/create" ,
309+ headers = headers ,
310+ json = body ,
311+ )
312+
313+ assert (
314+ response .status_code == 200
315+ and response .json ().get ("errors" , []) == []
316+ ), f"upload (host-{ idx } ) failed: " + response .content .decode ("utf8" )
317+ print (f"fn:data_upload COMPLETED: { record_uuids } " )
318+ return record_uuids
319+
320+ except Exception as e :
321+ print (f"Error creating records in node: { str (e )} " )
322+ return []
323+
324+ def data_download (self , args : dict [str , Any ]) -> list [dict ]:
325+ """Create a schema in your privacy preserving database, called the Nillion SecretVault
326+ (or nildb), based on a natural language description. Do not use this tool for any other
327+ purpose.
328+ Args:
329+ args (dict[str, Any]): Arguments containing a target schema UUID
330+ Returns:
331+ list[dict]: A list of the downloaded records
332+ """
333+ try :
334+ validated_args = NillionDataDownloadInput (** args )
335+ print (f"fn:data_download [{ validated_args .schema_uuid } ]" )
336+
337+ shares = defaultdict (list )
338+ for node in self .nodes :
339+ headers = {
340+ "Authorization" : f'Bearer { node ["bearer" ]} ' ,
341+ "Content-Type" : "application/json" ,
342+ }
343+
344+ body = {
345+ "schema" : validated_args .schema_uuid ,
346+ "filter" : {},
347+ }
348+
349+ response = requests .post (
350+ f"{ node ['url' ]} /api/v1/data/read" ,
351+ headers = headers ,
352+ json = body ,
353+ )
354+ assert (
355+ response .status_code == 200
356+ ), "upload failed: " + response .content .decode ("utf8" )
357+ data = response .json ().get ("data" )
358+ for d in data :
359+ shares [d ["_id" ]].append (d )
360+ decrypted = []
361+ for k in shares :
362+ decrypted .append (nilql .unify (self .key , shares [k ]))
363+ return decrypted
364+ except Exception as e :
365+ print (f"Error retrieving records in node: { e !r} " )
366+ return []
0 commit comments