Skip to content

Commit 512c8b3

Browse files
authored
feat(CE): added new destination weaviate connector (#989)
* feat(CE): added new destination weaviate connector * chore(CE): fix gemfile
1 parent 334c0ca commit 512c8b3

File tree

10 files changed

+733
-2
lines changed

10 files changed

+733
-2
lines changed

integrations/Gemfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,5 @@ gem "googleauth", "~> 1.11"
105105
gem "google-apis-drive_v3", "~> 0.67.0"
106106

107107
gem "aws-sdk-textract", "~> 1.58"
108+
109+
gem "weaviate-ruby", "~> 0.9.2"

integrations/Gemfile.lock

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ GIT
77
PATH
88
remote: .
99
specs:
10-
multiwoven-integrations (0.34.20)
10+
multiwoven-integrations (0.35.0)
1111
MailchimpMarketing
1212
activesupport
1313
async-websocket
@@ -40,6 +40,7 @@ PATH
4040
slack-ruby-client
4141
stripe
4242
tiny_tds
43+
weaviate-ruby
4344
zendesk_api
4445

4546
GEM
@@ -269,6 +270,16 @@ GEM
269270
multi_json (~> 1.11)
270271
os (>= 0.9, < 2.0)
271272
signet (>= 0.16, < 2.a)
273+
graphlient (0.8.0)
274+
faraday (~> 2.0)
275+
graphql-client
276+
graphql (2.5.22)
277+
base64
278+
fiber-storage
279+
logger
280+
graphql-client (0.26.0)
281+
activesupport (>= 3.0)
282+
graphql (>= 1.13.0)
272283
grpc (1.66.0-arm64-darwin)
273284
google-protobuf (>= 3.25, < 5.0)
274285
googleapis-common-protos-types (~> 1.0)
@@ -315,6 +326,7 @@ GEM
315326
jwt (2.8.1)
316327
base64
317328
language_server-protocol (3.17.0.3)
329+
logger (1.7.0)
318330
mini_mime (1.1.5)
319331
minitest (5.23.0)
320332
multi_json (1.15.0)
@@ -438,6 +450,9 @@ GEM
438450
concurrent-ruby (~> 1.0)
439451
uber (0.1.0)
440452
unicode-display_width (2.5.0)
453+
weaviate-ruby (0.9.2)
454+
faraday (>= 2.0.1, < 3.0)
455+
graphlient (>= 0.7.0, < 0.9.0)
441456
webmock (3.23.0)
442457
addressable (>= 2.8.0)
443458
crack (>= 0.3.2)
@@ -515,6 +530,7 @@ DEPENDENCIES
515530
slack-ruby-client
516531
stripe
517532
tiny_tds
533+
weaviate-ruby (~> 0.9.2)
518534
webmock (~> 3.0)
519535
xmlrpc (~> 0.3.3)
520536

integrations/lib/multiwoven/integrations.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
require_relative "integrations/destination/qdrant/client"
128128
require_relative "integrations/destination/pinecone_db/client"
129129
require_relative "integrations/destination/odoo/client"
130+
require_relative "integrations/destination/weaviate/client"
130131

131132
module Multiwoven
132133
module Integrations
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# frozen_string_literal: true
2+
3+
require "weaviate"
4+
require "digest"
5+
module Multiwoven::Integrations::Destination
6+
module Weaviate
7+
include Multiwoven::Integrations::Core
8+
class Client < DestinationConnector
9+
WEAVIATE_TYPE_MAP = {
10+
"text" => "string",
11+
"int" => "integer",
12+
"number" => "number",
13+
"boolean" => "boolean",
14+
"date" => "string"
15+
}.freeze
16+
def check_connection(connection_config)
17+
client = build_client(connection_config)
18+
client.schema.list
19+
success_status
20+
rescue StandardError => e
21+
handle_exception(e, {
22+
context: "WEAVIATE:CHECK_CONNECTION:EXCEPTION",
23+
type: "error"
24+
})
25+
failure_status(e)
26+
end
27+
28+
def discover(connection_config)
29+
client = build_client(connection_config)
30+
schema = client.schema.list
31+
classes = schema["classes"] || []
32+
streams = classes.map { |class_data| build_stream(class_data) }
33+
catalog = build_catalog(streams)
34+
catalog.to_multiwoven_message
35+
rescue StandardError => e
36+
handle_exception(e, {
37+
context: "WEAVIATE:DISCOVER:EXCEPTION",
38+
type: "error"
39+
})
40+
end
41+
42+
def write(sync_config, records, _action = "destination_insert")
43+
write_success = 0
44+
write_failure = 0
45+
log_message_array = []
46+
# Passing in the id handles upsert
47+
objects = records.map do |record|
48+
{
49+
class: sync_config.stream.name,
50+
vector: record["vector"],
51+
id: record["id"].present? ? generate_uuid(record["id"]) : SecureRandom.uuid,
52+
properties: coerce_properties(record["properties"], sync_config.stream)
53+
}
54+
end
55+
56+
client = build_client(sync_config.destination.connection_specification)
57+
responses = client.objects.batch_create(objects: objects)
58+
responses.each do |response|
59+
if response["result"]["status"] == "SUCCESS"
60+
write_success += 1
61+
log_message_array << log_request_response("info", { object: response }, response)
62+
else
63+
write_failure += 1
64+
log_message_array << log_request_response("error", { object: response }, response)
65+
end
66+
end
67+
tracking_message(write_success, write_failure, log_message_array)
68+
rescue StandardError => e
69+
handle_exception(e, {
70+
context: "WEAVIATE:RECORD:WRITE:EXCEPTION",
71+
type: "error",
72+
sync_id: sync_config.sync_id,
73+
sync_run_id: sync_config.sync_run_id
74+
})
75+
end
76+
77+
private
78+
79+
def normalize_url(url)
80+
url = url.to_s.strip.downcase
81+
url = url.delete_prefix("www.")
82+
url = "https://#{url}" unless url.start_with?("http://", "https://")
83+
url.chomp("/")
84+
end
85+
86+
def build_client(connection_config)
87+
connection_config = connection_config.with_indifferent_access
88+
::Weaviate::Client.new(
89+
url: normalize_url(connection_config[:api_url]),
90+
api_key: connection_config[:api_key],
91+
logger: Logger.new($stdout, level: Logger::ERROR)
92+
)
93+
end
94+
95+
def build_stream(class_data)
96+
properties = {}
97+
class_data["properties"].each do |property|
98+
properties[property["name"]] = { "type" => WEAVIATE_TYPE_MAP[property["dataType"][0]] } unless property["name"] == "vector"
99+
end
100+
101+
Multiwoven::Integrations::Protocol::Stream.new(
102+
name: class_data["class"], # Weaviate class name
103+
action: "update", # or "update"
104+
supported_sync_modes: %w[incremental],
105+
json_schema: {
106+
"type" => "object",
107+
"required" => %w[id vector properties],
108+
"properties" => {
109+
"id" => { "type" => "string", "required" => true },
110+
"vector" => { "type" => "vector", "required" => true },
111+
"properties" => {
112+
"type" => "object",
113+
"required" => %w[properties],
114+
"properties" => properties
115+
116+
}
117+
}
118+
}
119+
)
120+
end
121+
122+
def build_catalog(streams)
123+
Multiwoven::Integrations::Protocol::Catalog.new(
124+
streams: streams,
125+
request_rate_limit: 60,
126+
request_rate_limit_unit: "minute",
127+
request_rate_concurrency: 10
128+
)
129+
end
130+
131+
def generate_uuid(str)
132+
hash = Digest::SHA1.hexdigest(str)
133+
"#{hash[0, 8]}-#{hash[8, 4]}-#{hash[12, 4]}-#{hash[16, 4]}-#{hash[20, 12]}"
134+
end
135+
136+
def coerce_properties(properties, stream)
137+
schema_props = stream.json_schema.with_indifferent_access.dig("properties", "properties", "properties") || {}
138+
properties.each_with_object({}) do |(key, value), result|
139+
result[key] = case schema_props.dig(key, "type")
140+
when "integer" then value.to_i
141+
when "number" then value.to_f
142+
when "boolean" then value.to_s.downcase == "true"
143+
else value
144+
end
145+
end
146+
end
147+
end
148+
end
149+
end
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"data": {
3+
"name": "Weaviate",
4+
"title": "Weaviate",
5+
"connector_type": "destination",
6+
"category": "Database",
7+
"documentation_url": "https://docs.squared.ai/guides/destinations/retl-destinations/database/weaviate",
8+
"github_issue_label": "destination-weaviate",
9+
"icon": "icon.svg",
10+
"license": "MIT",
11+
"release_stage": "alpha",
12+
"support_level": "community",
13+
"tags": ["language:ruby", "multiwoven"]
14+
}
15+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"documentation_url": "https://docs.squared.ai/guides/destinations/retl-destinations/database/weaviate",
3+
"stream_type": "dynamic",
4+
"connection_specification": {
5+
"$schema": "http://json-schema.org/draft-07/schema#",
6+
"title": "Weaviate",
7+
"type": "object",
8+
"required": ["api_url", "api_key"],
9+
"properties": {
10+
"api_url": {
11+
"type": "string",
12+
"title": "API Url",
13+
"order": 0
14+
},
15+
"api_key": {
16+
"type": "string",
17+
"multiwoven_secret": true,
18+
"title": "API Key",
19+
"order": 1
20+
}
21+
}
22+
}
23+
}

0 commit comments

Comments
 (0)