Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nodestream_plugin_semantic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .argument_resolvers import FieldDeclaration

__all__ = ("FieldDeclaration",)
3 changes: 3 additions & 0 deletions nodestream_plugin_semantic/argument_resolvers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .field_declaration import FieldDeclaration

__all__ = ("FieldDeclaration",)
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from nodestream.pipeline.argument_resolvers import ArgumentResolver
from typing import Any
from nodestream.file_io import LazyLoadedTagSafeLoader, LazyLoadedArgument
from enum import Enum


TYPE_DECLARATION_ERROR = (
"Type conversion was unsuccesful. Attempted to convert {data} to type: {type}"
)


class TypeDeclaration(str, Enum):
STRING = "string"
FLOAT = "float"
INTEGER = "integer"
BOOLEAN = "boolean"

_PYTHON_TRANSLATOR = {
"string": str,
"float": float,
"integer": int,
"boolean": bool,
}

def convert(self, data: Any) -> str | int | bool | list:
try:
return self._PYTHON_TRANSLATOR[self.value](data)
except Exception:
raise ValueError(TYPE_DECLARATION_ERROR.format(data=data, type=self.value))


def wrap_declared_tag(self, node):
value = self.construct_mapping(node)
return LazyLoadedArgument(node.tag[1:], value)


LazyLoadedTagSafeLoader.add_constructor("!declare", wrap_declared_tag)


class FieldDeclaration(ArgumentResolver, alias="declare"):
@staticmethod
def resolve_argument(value: dict):
return FieldDeclaration(
type=value.get("type", None),
description=value.get("description", None),
examples=value.get("examples", []),
required=value.get("required", False),
)

def __init__(
self,
type: TypeDeclaration | None = None,
description: str | None = None,
examples: list[str] = [],
required: bool = False,
) -> None:
self.type = type
self.description = description
self.examples = examples
self.required = required

@staticmethod
def field(**kwargs) -> str:
return [f"{key}={value}" for key, value in kwargs.items()]

def __str__(self) -> str:
return "; ".join(
self.field(
type=self.type,
description=self.description,
examples=[{",".join(self.examples)}],
required=self.required,
)
)

def __repr__(self) -> str:
return self.__str__()

def validate(self, data: Any) -> bool:
if self.required and data is None:
return False
if not self.type:
return True
return self.type.convert(data)
32 changes: 32 additions & 0 deletions nodestream_plugin_semantic/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from abc import ABC, abstractmethod, abstractproperty

from nodestream.pluggable import Pluggable
from nodestream.subclass_registry import SubclassRegistry

INFERENCE_SUBCLASS_REGISTRY = SubclassRegistry()


@INFERENCE_SUBCLASS_REGISTRY.connect_baseclass
class InferenceRequestor(ABC, Pluggable):
"""Embedder is a mechanism to embed content into a vector space."""

entrypoint_name = "inferencers"

@classmethod
def from_file_data(cls, type, **inference_kwargs) -> "InferenceRequestor":
cls.import_all() # Import all inferencers to register them.
return INFERENCE_SUBCLASS_REGISTRY.get(type)(**inference_kwargs)

@abstractproperty
def context_window(self) -> int:
"""
The context window of the model.
"""
...

@abstractmethod
async def execute_prompt(self, prompt: str) -> str:
"""
Executes the given prompt and returns the response from an arbitrary model.
"""
...
53 changes: 51 additions & 2 deletions nodestream_plugin_semantic/model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import hashlib
from dataclasses import dataclass
from typing import Iterable, List, Optional

from typing import Iterable, List, Optional, Any
from .argument_resolvers.field_declaration import FieldDeclaration
from nodestream.model import DesiredIngestion, Node, Relationship
import json


Embedding = List[float | int]
CONTENT_NODE_TYPE_ID_PROPERTY = "id"
Expand Down Expand Up @@ -71,3 +73,50 @@ def make_ingestible(
)

return ingest


class DeclarativeJsonSchema:
@classmethod
def from_file_data(
cls, declaration: dict[str, FieldDeclaration | dict[str, Any] | list[Any]]
):
schema = {}
for key, value in declaration.items():
if isinstance(value, dict):
schema[key] = cls.from_file_data(value)
elif isinstance(value, list):
schema[key] = [cls.from_file_data(item) for item in value]
elif isinstance(value, FieldDeclaration):
schema[key] = str(value)
return DeclarativeJsonSchema(schema)

def __init__(self, schema: dict):
self.schema = schema

@staticmethod
def recursive_search(expected_schema: Any, data: Any) -> bool:
if isinstance(expected_schema, FieldDeclaration):
return expected_schema.validate(data)
elif isinstance(expected_schema, list):
for item in expected_schema:
if not DeclarativeJsonSchema.recursive_search(item, data):
return False
elif isinstance(expected_schema, dict):
for key, value in expected_schema.items():
if key not in data:
return False
if not DeclarativeJsonSchema.recursive_search(value, data[key]):
return False
return True

def validate(self, data: dict) -> bool:
for key, value in data.items():
if key not in self.schema:
return False
if not DeclarativeJsonSchema.recursive_search(self.schema[key], value):
return False
return True

@property
def prompt_representation(self) -> str:
return json.dumps(self.schema, indent=4, default=lambda o: o.__dict__)
95 changes: 94 additions & 1 deletion nodestream_plugin_semantic/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from glob import glob
from pathlib import Path
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

from nodestream.model import DesiredIngestion
from nodestream.pipeline import Extractor, Transformer
Expand All @@ -21,6 +21,13 @@
from .embed import Embedder
from .model import Content

from .model import DeclarativeJsonSchema
from .inference import InferenceRequestor
from logging import getLogger
from math import ceil
import re


DEFAULT_ID = JmespathValueProvider.from_string_expression("id")
DEFAULT_CONTENT = JmespathValueProvider.from_string_expression("content")
DEFAULT_NODE_TYPE = "Content"
Expand Down Expand Up @@ -173,3 +180,89 @@ def expand_schema(self, coordinator: SchemaExpansionCoordinator):
Cardinality.MANY,
Cardinality.SINGLE,
)


CHARS_PER_TOKEN = 2
DEFAULT_PROMPT_LOCATION = "nodestream_plugin_semantic/prompts/text2json.txt"
PROMPT_FILE_ERROR = (
"The prompt must contain formatting fields for the {text} and the {schema}."
)


class TextToJson(Transformer):
_required_parameters = set(["schema", "text"])

def __init__(
self,
schema: dict,
inference_requestor_kwargs: dict,
discard_invalid: bool = False,
prompt_location: str = DEFAULT_PROMPT_LOCATION,
):
self.schema = DeclarativeJsonSchema.from_file_data(schema)
self.inference_requestor = InferenceRequestor.from_file_data(
**inference_requestor_kwargs
)
self.discard_invalid = discard_invalid
self.prompt = self.read_prompt_from_file(prompt_location)
self.logger = getLogger(name=self.__class__.__name__)

def read_prompt_from_file(self, path: str) -> str:
with open(path, "r") as prompt_file:
prompt_string = prompt_file.read()
variables = re.findall(r"{(\w+)}", prompt_string)
if not set(variables) == self._required_parameters:
raise ValueError(PROMPT_FILE_ERROR)
return prompt_file

async def transform_record(self, data: dict) -> Any:
text = data.pop("content")
additional_args = data

"""
Handle the chunking and partitioning of the text/text stream to fit the
context window. Make an assumption that the length of text*2 will be under
the maximum token limit of the model. TODO find ways to officially
determine token length.
"""
string_size = len(text)
chunk_size = self.inference_requestor.context_window * CHARS_PER_TOKEN
chunk_count = int(ceil(string_size / (chunk_size)))
for index in range(chunk_count):
begin = index * chunk_size
end = min(string_size, (index + 1) * chunk_size)
substring = text[begin:end]

# Create the prompt using the schema and the truncated text.
# Handle entity resolution orchestration here.
prompt = self.prompt.format(
schema=self.schema.prompt_representation, text=substring
)

# Execute the prompt using the inference requestor.
result: list[dict] | dict = await self.inference_requestor.execute_prompt(
prompt
)

# Parse the response and yield the records.
# The response should be a JSON object.
if isinstance(result, list):
for piece in result:
if self.schema.validate(piece):
piece.update(additional_args)
yield piece
elif not self.discard_invalid:
self.logger.info(f"Invalid item passed: {piece}.")
piece.update(additional_args)
yield piece

elif isinstance(result, dict):
if self.schema.validate(result):
result.update(additional_args)
yield result
elif not self.discard_invalid:
self.logger.info(f"Invalid item passed: {result}.")
result.update(additional_args)
yield result
else:
raise ValueError(f"Invalid result format: {result}.")
61 changes: 61 additions & 0 deletions nodestream_plugin_semantic/prompts/text2json.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
BASE_PROMPT = """
You are a JSON schema generator.
You will be given a JSON object and you will generate a JSON schema for it.
The JSON schema should be in the format of a JSON object.
This object will end with text representing the description of the object, along with the datatype we want the result to be contained as.
The examples I want you to use s reference are within the EXAMPLES section.
The schema that I want you to format the JSON as will be located within the SCHEMA field.
The text I want you to parse and attempt to retrieve the relevant information from is within the TEXT field.
DO NOT PROVIDE ANYTHING OTHER THAN THE RESULTING JSON.
IF YOU DO NOT UNDERSTAND THE TEXT OR CANNOT FIND THE RELEVANT INFORMATION FILL THE JSON WITH NULLS.
INCLUDE ALL FIELDS IN THE JSON AS PROVIDED IN THE SCHEMA.
EXAMPLES ARE PROVIDED TO HELP YOU UNDERSTAND THE SCHEMA AND THE TEXT.
EXAMPLES IN THE SCHEMA ARE PROVIDED TO HELP UNDERSTAND THE FORMATTING OF THE OBJECT BEING REQUESTED.
PROVIDE THE ENTIRE JSON. MAKE SURE THAT IT IS VALID JSON.
DO NOT INCLUDE ANY ```json ``` OR ANY OTHER MARKUP AROUND THE JSON. ONLY USE VALID JSON.

---EXAMPLES----
Input:
---EXAMPLE SCHEMA---
{{
"subject_name": "type=string; description=Name of the subject extracted from the text document.; examples=[Amy, Isabella, Bob]; required=True;",
"subject_age": "type=integer; description=Age of the subject extracted from the text document.; examples=[10, 12, 14, 25]; required=False;"
"friends": [
{{
"name": "type=string; description=Name of any other subject extracted from the text document.; examples=[Amy, Isabella, Bob]; required=True;",
"age": "type=integer; description=Age of any other subject extracted from the text document.; examples=[10, 12, 14, 25]; required=False;"
"activity": "type=string; description=Activity parties were participating in.; examples=[running, dancing, playing, fishing]; required=False;"
}}
]
}}
---EXAMPLE END SCHEMA---
---EXAMPLE TEXT---
John lived in a farm in Wyoming when he was 30 years old. He had two friends, Jane and Jim, who were 25 and 28 years old respectively.
They used to go fishing together every weekend. John loved fishing and he was very good at it. He had a big boat and a lot of fishing gear.
---EXAMPLE END TEXT---
Output:
{{
"subject_name": "John",
"subject_age": 30,
"friends": [
{{
"name": "Jane",
"age": 25
"activity": "fishing"
}},
{{
"name": "Jim",
"age": 28
"activity": "fishing"
}}
]
}}
---END EXAMPLES----
---SCHEMA---
{schema}
---END SCHEMA---

---TEXT---
{text}
---END TEXT---
"""
Loading
Loading