Skip to content

Commit 0407910

Browse files
committed
feature #244 [Store] Add support for ClickHouse (lyrixx)
This PR was merged into the main branch. Discussion ---------- [Store] Add support for ClickHouse | Q | A | ------------- | --- | Bug fix? | no | New feature? | yes | Docs? | no | Issues | | License | MIT We already have ClickHouse in our infrastructure, So I'm benchmarkin it. And, let's share my code 🎉 As you can see, I let some extension points (for the metadata). AFAIK, it's not possible to store **structured / indexed** JSON in CH. We have to store it as `String` instead. We could instead use better typed structure, but Symfony cannot know the type, obviously. So with string, if we want to filter the results, we need to use `[JSONExtractString](https://clickhouse.com/docs/sql-reference/functions/json-functions#jsonextractstring)`. And It's slow! So with the current extensions points, I could write the following code: ```php class ClickHouseStore extends SymfonyClickHouseStore { public function initialize(array $options = []): void { $sql = <<<'SQL' CREATE TABLE IF NOT EXISTS {{ table }} ( id UUID, metadata String, embedding Array(Float32), crawlId UUID, ) ENGINE = MergeTree() ORDER BY id SQL; $this->execute('POST', $sql); } protected function formatVectorDocument(VectorDocument $document): array { $formatted = parent::formatVectorDocument($document); $formatted['crawlId'] = $document->metadata['crawlId']; return $formatted; } } ``` And then: ```php $clickHouseStore = new ClickHouseStore( httpClient: HttpClient::createForBaseUri($_SERVER['CLICKHOUSE_URI']), databaseName: 'pocai', tableName: $tableName, ); $documents = $clickHouseStore->query( $vector, [ 'where' => 'crawlId = {crawlId:String} AND id != {currentId:UUID} AND score < 0.1', 'params' => [ 'crawlId' => $crawlPoId, 'currentId' => $row['id'], ], ], ); ``` Commits ------- 48865e2 [Store] Add support for ClickHouse
2 parents 5aca06b + 48865e2 commit 0407910

File tree

6 files changed

+468
-0
lines changed

6 files changed

+468
-0
lines changed

src/ai-bundle/config/options.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,22 @@
171171
->end()
172172
->end()
173173
->end()
174+
->arrayNode('clickhouse')
175+
->normalizeKeys(false)
176+
->useAttributeAsKey('name')
177+
->arrayPrototype()
178+
->children()
179+
->scalarNode('dsn')->cannotBeEmpty()->end()
180+
->scalarNode('http_client')->cannotBeEmpty()->end()
181+
->scalarNode('database')->isRequired()->cannotBeEmpty()->end()
182+
->scalarNode('table')->isRequired()->cannotBeEmpty()->end()
183+
->end()
184+
->validate()
185+
->ifTrue(static fn ($v) => !isset($v['dsn']) && !isset($v['http_client']))
186+
->thenInvalid('Either "dsn" or "http_client" must be configured.')
187+
->end()
188+
->end()
189+
->end()
174190
->arrayNode('meilisearch')
175191
->normalizeKeys(false)
176192
->useAttributeAsKey('name')

src/ai-bundle/src/AiBundle.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
use Symfony\AI\Platform\ResultConverterInterface;
4141
use Symfony\AI\Store\Bridge\Azure\SearchStore as AzureSearchStore;
4242
use Symfony\AI\Store\Bridge\ChromaDb\Store as ChromaDbStore;
43+
use Symfony\AI\Store\Bridge\ClickHouse\Store as ClickHouseStore;
4344
use Symfony\AI\Store\Bridge\Meilisearch\Store as MeilisearchStore;
4445
use Symfony\AI\Store\Bridge\MongoDb\Store as MongoDbStore;
4546
use Symfony\AI\Store\Bridge\Neo4j\Store as Neo4jStore;
@@ -59,8 +60,10 @@
5960
use Symfony\Component\DependencyInjection\Definition;
6061
use Symfony\Component\DependencyInjection\Loader\Configurator\ContainerConfigurator;
6162
use Symfony\Component\DependencyInjection\Reference;
63+
use Symfony\Component\HttpClient\HttpClient;
6264
use Symfony\Component\HttpKernel\Bundle\AbstractBundle;
6365
use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
66+
use Symfony\Contracts\HttpClient\HttpClientInterface;
6467

6568
use function Symfony\Component\String\u;
6669

@@ -482,6 +485,32 @@ private function processStoreConfig(string $type, array $stores, ContainerBuilde
482485
}
483486
}
484487

488+
if ('clickhouse' === $type) {
489+
foreach ($stores as $name => $store) {
490+
if (isset($store['http_client'])) {
491+
$httpClient = new Reference($store['http_client']);
492+
} else {
493+
$httpClient = new Definition(HttpClientInterface::class);
494+
$httpClient
495+
->setFactory([HttpClient::class, 'createForBaseUri'])
496+
->setArguments([$store['dsn']])
497+
;
498+
}
499+
500+
$definition = new Definition(ClickHouseStore::class);
501+
$definition
502+
->setArguments([
503+
$httpClient,
504+
$store['database'],
505+
$store['table'],
506+
])
507+
->addTag('ai.store')
508+
;
509+
510+
$container->setDefinition('ai.store.'.$type.'.'.$name, $definition);
511+
}
512+
}
513+
485514
if ('meilisearch' === $type) {
486515
foreach ($stores as $name => $store) {
487516
$arguments = [

src/ai-bundle/tests/DependencyInjection/AiBundleTest.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@ private function getFullConfig(): array
206206
'collection' => 'my_collection',
207207
],
208208
],
209+
'clickhouse' => [
210+
'my_clickhouse_store' => [
211+
'dsn' => 'http://foo:[email protected]:9999',
212+
'database' => 'my_db',
213+
'table' => 'my_table',
214+
],
215+
],
209216
'meilisearch' => [
210217
'my_meilisearch_store' => [
211218
'endpoint' => 'http://127.0.0.1:7700',

src/store/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ CHANGELOG
3737
* Add store bridge implementations:
3838
- Azure AI Search
3939
- ChromaDB
40+
- ClickHouse
4041
- MariaDB
4142
- Meilisearch
4243
- MongoDB
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\AI\Store\Bridge\ClickHouse;
13+
14+
use Symfony\AI\Platform\Vector\Vector;
15+
use Symfony\AI\Platform\Vector\VectorInterface;
16+
use Symfony\AI\Store\Document\Metadata;
17+
use Symfony\AI\Store\Document\VectorDocument;
18+
use Symfony\AI\Store\Exception\RuntimeException;
19+
use Symfony\AI\Store\InitializableStoreInterface;
20+
use Symfony\AI\Store\VectorStoreInterface;
21+
use Symfony\Component\Uid\Uuid;
22+
use Symfony\Contracts\HttpClient\HttpClientInterface;
23+
use Symfony\Contracts\HttpClient\ResponseInterface;
24+
25+
/**
26+
* @author Grégoire Pineau <[email protected]>
27+
*/
28+
class Store implements VectorStoreInterface, InitializableStoreInterface
29+
{
30+
public function __construct(
31+
private readonly HttpClientInterface $httpClient,
32+
private readonly string $databaseName = 'default',
33+
private readonly string $tableName = 'embedding',
34+
) {
35+
}
36+
37+
public function initialize(array $options = []): void
38+
{
39+
$sql = <<<'SQL'
40+
CREATE TABLE IF NOT EXISTS {{ table }} (
41+
id UUID,
42+
metadata String,
43+
embedding Array(Float32),
44+
) ENGINE = MergeTree()
45+
ORDER BY id
46+
SQL;
47+
48+
$this->execute('POST', $sql);
49+
}
50+
51+
public function add(VectorDocument ...$documents): void
52+
{
53+
$rows = [];
54+
55+
foreach ($documents as $document) {
56+
$rows[] = $this->formatVectorDocument($document);
57+
}
58+
59+
$this->insertBatch($rows);
60+
}
61+
62+
/**
63+
* @return array<string, mixed>
64+
*/
65+
protected function formatVectorDocument(VectorDocument $document): array
66+
{
67+
return [
68+
'id' => $document->id->toRfc4122(),
69+
'metadata' => json_encode($document->metadata->getArrayCopy(), \JSON_THROW_ON_ERROR),
70+
'embedding' => $document->vector->getData(),
71+
];
72+
}
73+
74+
public function query(Vector $vector, array $options = [], ?float $minScore = null): array
75+
{
76+
$sql = <<<'SQL'
77+
SELECT
78+
id,
79+
embedding,
80+
metadata,
81+
cosineDistance(embedding, {query_vector:Array(Float32)}) as score
82+
FROM {{ table }}
83+
WHERE length(embedding) = length({query_vector:Array(Float32)}) {{ where }}
84+
ORDER BY score ASC
85+
LIMIT {limit:UInt32}
86+
SQL;
87+
88+
if (isset($options['where'])) {
89+
$sql = str_replace('{{ where }}', 'AND '.$options['where'], $sql);
90+
} else {
91+
$sql = str_replace('{{ where }}', '', $sql);
92+
}
93+
94+
$results = $this
95+
->execute('GET', $sql, [
96+
'query_vector' => $this->toClickHouseVector($vector),
97+
'limit' => $options['limit'] ?? 5,
98+
...$options['params'] ?? [],
99+
])
100+
->toArray()['data']
101+
;
102+
103+
$documents = [];
104+
foreach ($results as $result) {
105+
$documents[] = new VectorDocument(
106+
id: Uuid::fromString($result['id']),
107+
vector: new Vector($result['embedding']),
108+
metadata: new Metadata(json_decode($result['metadata'] ?? '{}', true, 512, \JSON_THROW_ON_ERROR)),
109+
score: $result['score'],
110+
);
111+
}
112+
113+
return $documents;
114+
}
115+
116+
/**
117+
* @param array<string, mixed> $params
118+
*/
119+
protected function execute(string $method, string $sql, array $params = []): ResponseInterface
120+
{
121+
$sql = str_replace('{{ table }}', $this->tableName, $sql);
122+
123+
$options = [
124+
'query' => [
125+
'query' => $sql,
126+
'database' => $this->databaseName,
127+
'default_format' => 'JSON',
128+
],
129+
];
130+
131+
foreach ($params as $key => $value) {
132+
$options['query']['param_'.$key] = $value;
133+
}
134+
135+
return $this->httpClient->request($method, '/', $options);
136+
}
137+
138+
/**
139+
* @param array<array<string, mixed>> $rows
140+
*/
141+
private function insertBatch(array $rows): void
142+
{
143+
if (!$rows) {
144+
return;
145+
}
146+
147+
$sql = 'INSERT INTO {{ table }} FORMAT JSONEachRow';
148+
$sql = str_replace('{{ table }}', $this->tableName, $sql);
149+
150+
$jsonData = '';
151+
foreach ($rows as $row) {
152+
$jsonData .= json_encode($row)."\n";
153+
}
154+
155+
$options = [
156+
'query' => [
157+
'query' => $sql,
158+
'database' => $this->databaseName,
159+
],
160+
'body' => $jsonData,
161+
'headers' => [
162+
'Content-Type' => 'application/json',
163+
],
164+
];
165+
166+
$response = $this->httpClient->request('POST', '/', $options);
167+
168+
if (200 !== $response->getStatusCode()) {
169+
$content = $response->getContent(false);
170+
171+
throw new RuntimeException("Could not insert data into ClickHouse. Http status code: {$response->getStatusCode()}. Response: {$content}.");
172+
}
173+
}
174+
175+
private function toClickHouseVector(VectorInterface $vector): string
176+
{
177+
return '['.implode(',', $vector->getData()).']';
178+
}
179+
}

0 commit comments

Comments
 (0)