Skip to content

Commit 8f904f7

Browse files
committed
opensearch vector store
1 parent 9f7b1cf commit 8f904f7

File tree

9 files changed

+358
-10
lines changed

9 files changed

+358
-10
lines changed

.github/workflows/tests.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ jobs:
3131
ports:
3232
- 3306:3306
3333
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
34+
opensearch:
35+
image: opensearchproject/opensearch:latest
36+
ports:
37+
- 9201:9200 # Avoid port conflict with Elasticsearch
38+
- 9600:9600
39+
env:
40+
discovery.type: single-node
41+
plugins.security.disabled: "true"
42+
DISABLE_INSTALL_DEMO_CONFIG: true
43+
options: --health-cmd="curl -s http://localhost:9200/_cluster/health | grep '\"status\":\"green\"'" --health-interval=10s --health-timeout=5s --health-retries=5
3444
neo4j:
3545
image: neo4j:5.26.19-ubi9
3646
ports:

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"html2text/html2text": "^4.3",
2727
"illuminate/database": "^10.0|^11.0|^12.0",
2828
"laudis/neo4j-php-client": "^3.4",
29+
"opensearch-project/opensearch-php": "^2.5",
2930
"phpstan/phpstan": "^2.1",
3031
"phpunit/phpunit": "^9.0",
3132
"rector/rector": "^2.0",

src/MCP/SseHttpTransport.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public function connect(): void
148148

149149
} catch (GuzzleException $e) {
150150
$this->cleanup();
151-
throw new McpException('HTTP connection failed: ' . $e->getMessage());
151+
throw new McpException('HTTP connection failed: ' . $e->getMessage(), $e->getCode(), $e);
152152
}
153153
}
154154

@@ -273,9 +273,9 @@ public function send(array $data): void
273273
}
274274

275275
} catch (GuzzleException $e) {
276-
throw new McpException('HTTP POST failed: ' . $e->getMessage());
276+
throw new McpException('HTTP POST failed: ' . $e->getMessage(), $e->getCode(), $e);
277277
} catch (JsonException $e) {
278-
throw new McpException('Failed to encode JSON: ' . $e->getMessage());
278+
throw new McpException('Failed to encode JSON: ' . $e->getMessage(), $e->getCode(), $e);
279279
}
280280
}
281281

src/MCP/StreamableHttpTransport.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ public function send(array $data): void
109109
$this->lastResponse = $response;
110110

111111
} catch (GuzzleException $e) {
112-
throw new McpException('HTTP request failed: ' . $e->getMessage());
112+
throw new McpException('HTTP request failed: ' . $e->getMessage(), $e->getCode(), $e);
113113
} catch (JsonException $e) {
114-
throw new McpException('Failed to encode JSON: ' . $e->getMessage());
114+
throw new McpException('Failed to encode JSON: ' . $e->getMessage(), $e->getCode(), $e);
115115
}
116116
}
117117

@@ -145,7 +145,7 @@ public function receive(): array
145145
}
146146

147147
} catch (JsonException $e) {
148-
throw new McpException('Invalid JSON response: ' . $e->getMessage());
148+
throw new McpException('Invalid JSON response: ' . $e->getMessage(), $e->getCode(), $e);
149149
}
150150
}
151151

src/Providers/OpenAI/Responses/HandleStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ protected function parseNextDataLine(StreamInterface $stream): ?array
170170
try {
171171
$event = json_decode($line, true, flags: JSON_THROW_ON_ERROR);
172172
} catch (Throwable $exception) {
173-
throw new ProviderException('OpenAI streaming JSON decode error: ' . $exception->getMessage());
173+
throw new ProviderException('OpenAI streaming JSON decode error: ' . $exception->getMessage(), $exception->getCode(), $exception);
174174
}
175175

176176
if (!isset($event['type'])) {

src/Providers/SSEParser.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public static function parseNextSSEEvent(StreamInterface $stream): ?array
3939
try {
4040
return json_decode($line, true, flags: JSON_THROW_ON_ERROR);
4141
} catch (Throwable $exception) {
42-
throw new ProviderException('Streaming error - '.$exception->getMessage());
42+
throw new ProviderException('Streaming error - '.$exception->getMessage(), $exception->getCode(), $exception);
4343
}
4444
}
4545
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace NeuronAI\RAG\VectorStore;
6+
7+
use NeuronAI\RAG\Document;
8+
use OpenSearch\Client;
9+
use Exception;
10+
11+
use function array_key_exists;
12+
use function array_keys;
13+
use function array_map;
14+
use function count;
15+
use function in_array;
16+
use function max;
17+
18+
class OpenSearchVectorStore implements VectorStoreInterface
19+
{
20+
protected bool $vectorDimSet = false;
21+
22+
protected array $filters = [];
23+
24+
public function __construct(
25+
protected Client $client,
26+
protected string $index,
27+
protected int $topK = 4,
28+
) {
29+
}
30+
31+
protected function checkIndexStatus(Document $document): void
32+
{
33+
$indexExists = $this->client->indices()->exists(['index' => $this->index]);
34+
35+
if ($indexExists) {
36+
$this->mapVectorDimension(count($document->getEmbedding()));
37+
38+
return;
39+
}
40+
41+
$properties = [
42+
'content' => [
43+
'type' => 'text',
44+
],
45+
'sourceType' => [
46+
'type' => 'keyword',
47+
],
48+
'sourceName' => [
49+
'type' => 'keyword',
50+
],
51+
'embedding' => [
52+
'type' => 'knn_vector',
53+
'dimension' => count($document->getEmbedding()),
54+
'index' => true,
55+
'method' => [
56+
'name' => 'hnsw',
57+
'engine' => 'lucene',
58+
'space_type' => 'cosinesimil',
59+
'parameters' => [
60+
'encoder' => [
61+
'name' => 'sq'
62+
]
63+
]
64+
],
65+
]
66+
];
67+
68+
// Map metadata
69+
foreach (array_keys($document->metadata) as $name) {
70+
$properties[$name] = [
71+
'type' => 'keyword',
72+
];
73+
}
74+
75+
$this->client->indices()->create([
76+
'index' => $this->index,
77+
'body' => [
78+
'settings' => [
79+
'index' => [
80+
'knn' => true,
81+
'number_of_replicas' => 0,
82+
],
83+
],
84+
'mappings' => [
85+
'properties' => $properties
86+
]
87+
]
88+
]);
89+
}
90+
91+
/**
92+
* @throws Exception
93+
*/
94+
public function addDocument(Document $document): VectorStoreInterface
95+
{
96+
if ($document->embedding === []) {
97+
throw new Exception('Document embedding must be set before adding a document');
98+
}
99+
100+
$this->checkIndexStatus($document);
101+
102+
$this->client->index([
103+
'index' => $this->index,
104+
'body' => [
105+
'embedding' => $document->getEmbedding(),
106+
'content' => $document->getContent(),
107+
'sourceType' => $document->getSourceType(),
108+
'sourceName' => $document->getSourceName(),
109+
...$document->metadata,
110+
],
111+
]);
112+
113+
$this->client->indices()->refresh(['index' => $this->index]);
114+
115+
return $this;
116+
}
117+
118+
public function addDocuments(array $documents): VectorStoreInterface
119+
{
120+
if ($documents === []) {
121+
return $this;
122+
}
123+
124+
if (empty($documents[0]->getEmbedding())) {
125+
throw new Exception('Document embedding must be set before adding a document');
126+
}
127+
128+
$this->checkIndexStatus($documents[0]);
129+
130+
/*
131+
* Generate a bulk payload
132+
*/
133+
$params = ['body' => []];
134+
foreach ($documents as $document) {
135+
$params['body'][] = [
136+
'index' => [
137+
'_index' => $this->index,
138+
],
139+
];
140+
$params['body'][] = [
141+
'embedding' => $document->getEmbedding(),
142+
'content' => $document->getContent(),
143+
'sourceType' => $document->getSourceType(),
144+
'sourceName' => $document->getSourceName(),
145+
...$document->metadata,
146+
];
147+
}
148+
$this->client->bulk($params);
149+
$this->client->indices()->refresh(['index' => $this->index]);
150+
return $this;
151+
}
152+
153+
public function deleteBySource(string $sourceType, string $sourceName): VectorStoreInterface
154+
{
155+
$this->client->deleteByQuery([
156+
'index' => $this->index,
157+
'q' => "sourceType:{$sourceType} AND sourceName:{$sourceName}",
158+
'body' => []
159+
]);
160+
$this->client->indices()->refresh(['index' => $this->index]);
161+
return $this;
162+
}
163+
164+
/**
165+
* @return Document[]
166+
*/
167+
public function similaritySearch(array $embedding): iterable
168+
{
169+
$searchParams = [
170+
'index' => $this->index,
171+
'body' => [
172+
'query' => [
173+
'knn' => [
174+
'embedding' => [
175+
'vector' => $embedding,
176+
'k' => max(50, $this->topK * 4),
177+
],
178+
],
179+
],
180+
'sort' => [
181+
'_score' => [
182+
'order' => 'desc',
183+
],
184+
],
185+
],
186+
];
187+
188+
// Hybrid search
189+
if ($this->filters !== []) {
190+
$searchParams['body']['query']['knn']['filter'] = $this->filters;
191+
}
192+
193+
$response = $this->client->search($searchParams);
194+
195+
return array_map(function (array $item): Document {
196+
$document = new Document($item['_source']['content']);
197+
//$document->embedding = $item['_source']['embedding']; // avoid carrying large data
198+
$document->sourceType = $item['_source']['sourceType'];
199+
$document->sourceName = $item['_source']['sourceName'];
200+
$document->score = $item['_score'];
201+
202+
foreach ($item['_source'] as $name => $value) {
203+
if (!in_array($name, ['content', 'sourceType', 'sourceName', 'score', 'embedding', 'id'])) {
204+
$document->addMetadata($name, $value);
205+
}
206+
}
207+
208+
return $document;
209+
}, $response['hits']['hits']);
210+
}
211+
212+
/**
213+
* Map vector embeddings dimension on the fly.
214+
*/
215+
private function mapVectorDimension(int $dimension): void
216+
{
217+
if ($this->vectorDimSet) {
218+
return;
219+
}
220+
221+
$response = $this->client->indices()->getFieldMapping([
222+
'index' => $this->index,
223+
'fields' => 'embedding',
224+
]);
225+
226+
$mappings = $response[$this->index]['mappings'];
227+
228+
if (
229+
array_key_exists('embedding', $mappings)
230+
&& $mappings['embedding']['mapping']['embedding']['dimension'] === $dimension
231+
) {
232+
return;
233+
}
234+
235+
$this->client->indices()->putMapping([
236+
'index' => $this->index,
237+
'body' => [
238+
'properties' => [
239+
'embedding' => [
240+
'type' => 'knn_vector',
241+
'dimension' => $dimension,
242+
'index' => true,
243+
'method' => [
244+
'name' => 'hnsw',
245+
'engine' => 'lucene',
246+
'space_type' => 'cosinesimil',
247+
'parameters' => [
248+
'encoder' => [
249+
'name' => 'sq'
250+
]
251+
]
252+
253+
],
254+
],
255+
],
256+
],
257+
]);
258+
259+
$this->vectorDimSet = true;
260+
}
261+
262+
public function withFilters(array $filters): self
263+
{
264+
$this->filters = $filters;
265+
return $this;
266+
}
267+
}

src/Workflow/Workflow.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ protected function loadEventNodeMap(): void
403403
$this->eventNodeMap[$eventClass] = $node;
404404
}
405405
} catch (ReflectionException $e) {
406-
throw new WorkflowException('Failed to load event-node map for '.$node::class.': ' . $e->getMessage());
406+
throw new WorkflowException('Failed to load event-node map for '.$node::class.': ' . $e->getMessage(), $e->getCode(), $e);
407407
}
408408
}
409409
}
@@ -493,7 +493,7 @@ protected function validateInvokeMethodSignature(NodeInterface $node): void
493493
}
494494

495495
} catch (ReflectionException $e) {
496-
throw new WorkflowException('Failed to validate '.$node::class.': ' . $e->getMessage());
496+
throw new WorkflowException('Failed to validate '.$node::class.': ' . $e->getMessage(), $e->getCode(), $e);
497497
}
498498
}
499499

0 commit comments

Comments
 (0)