Skip to content

Commit 6a591ce

Browse files
committed
Add suppport for updating airbyte connection / source schema and invocation using rake
1 parent 134e0d6 commit 6a591ce

17 files changed

+728
-1
lines changed

config/locales/en.yml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,20 @@ en:
112112
airbyte_stream_config_path:
113113
description: |
114114
Path of airbyte stream config file relative to the App root (Rails.root)
115-
default: /terraform/aks/workspace-variables/airbyte_streams.json
115+
default: /terraform/aks/workspace-variables/airbyte_streams.json
116+
airbyte_client_id:
117+
description: |
118+
Airbyte Client Id for retrieving access token
119+
default: ENV['AIRBYTE_CLIENT_ID']
120+
airbyte_client_secret:
121+
description: |
122+
Airbyte Client secret for retrieving access token
123+
default: ENV['AIRBYTE_CLIENT_SECRET']
124+
airbyte_server_url:
125+
description: |
126+
Airbyte server base URL
127+
default: ENV['AIRBYTE_SERVER_URL']
128+
airbyte_workspace_id:
129+
description: |
130+
Airbyte Workspace Id for this App contains source, destination and connection
131+
default: ENV['AIRBYTE_WORKSPACE_ID']

lib/dfe/analytics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
require 'dfe/analytics/azure_federated_auth'
3030
require 'dfe/analytics/api_requests'
3131
require 'dfe/analytics/airbyte_stream_config'
32+
require 'services/airbyte'
3233

3334
module DfE
3435
module Analytics

lib/dfe/analytics/config.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ module Config
3131
database_events_enabled
3232
airbyte_enabled
3333
airbyte_stream_config_path
34+
airbyte_client_id
35+
airbyte_client_secret
36+
airbyte_server_url
37+
airbyte_workspace_id
3438
].freeze
3539

3640
def self.params
@@ -58,6 +62,10 @@ def self.configure(config)
5862
config.excluded_models_proc ||= proc { |_model| false }
5963
config.database_events_enabled ||= true
6064
config.airbyte_enabled ||= false
65+
config.airbyte_client_id ||= ENV.fetch('AIRBYTE_CLIENT_ID', nil)
66+
config.airbyte_client_secret ||= ENV.fetch('AIRBYTE_CLIENT_SECRET', nil)
67+
config.airbyte_server_url ||= ENV.fetch('AIRBYTE_SERVER_URL', nil)
68+
config.airbyte_workspace_id ||= ENV.fetch('AIRBYTE_WORKSPACE_ID', nil)
6169

6270
config.airbyte_stream_config_path = File.join(Rails.root, config.airbyte_stream_config_path) if config.airbyte_stream_config_path.present?
6371

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace :dfe do
2+
namespace :analytics do
3+
desc 'Refresh the airbyte connection and schema'
4+
task :airbyte_connection_refresh do
5+
Services::Airbyte::ConnectionRefresh.call
6+
7+
puts 'Airbyte connection and schema refreshed OK'
8+
end
9+
end
10+
end

lib/services/airbyte.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
require 'services/airbyte/access_token'
2+
require 'services/airbyte/connection_list'
3+
require 'services/airbyte/discover_schema'
4+
require 'services/airbyte/connection_update'
5+
require 'services/airbyte/connection_refresh'
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# frozen_string_literal: true
2+
3+
module Services
4+
module Airbyte
5+
# Fetches access token for Airbyte API calls
6+
class AccessToken
7+
class Error < StandardError; end
8+
9+
def self.call
10+
new.call
11+
end
12+
13+
def call
14+
response = HTTParty.post(
15+
"#{config.airbyte_server_url}/api/v1/applications/token",
16+
headers: {
17+
'Accept' => 'application/json',
18+
'Content-Type' => 'application/json'
19+
},
20+
body: {
21+
client_id: config.airbyte_client_id,
22+
client_secret: config.airbyte_client_secret,
23+
'grant-type': 'client_credentials'
24+
}.to_json
25+
)
26+
27+
unless response.success?
28+
error_message = "Error calling Airbyte token API: status: #{response.code} body: #{response.body}"
29+
Rails.logger.error(error_message)
30+
raise Error, error_message
31+
end
32+
33+
response.parsed_response['access_token']
34+
end
35+
36+
private
37+
38+
def config
39+
DfE::Analytics.config
40+
end
41+
end
42+
end
43+
end
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# frozen_string_literal: true
2+
3+
module Services
4+
module Airbyte
5+
# Fetches the connection info for given worksapce - Assumes single connection
6+
class ConnectionList
7+
class Error < StandardError; end
8+
9+
def self.call(access_token:)
10+
new(access_token).call
11+
end
12+
13+
def initialize(access_token)
14+
@access_token = access_token
15+
end
16+
17+
def call
18+
response = HTTParty.post(
19+
"#{config.airbyte_server_url}/api/v1/connections/list",
20+
headers: {
21+
'Authorization' => "Bearer #{@access_token}",
22+
'Content-Type' => 'application/json'
23+
},
24+
body: {
25+
workspaceId: config.airbyte_workspace_id
26+
}.to_json
27+
)
28+
29+
unless response.success?
30+
error_message = "Error calling Airbyte connections/list API: status: #{response.code} body: #{response.body}"
31+
Rails.logger.error(error_message)
32+
raise Error, error_message
33+
end
34+
35+
connection = response.parsed_response.dig('connections', 0)
36+
37+
raise Error, 'No connections returned in response.' unless connection
38+
39+
[connection['connectionId'], connection['sourceId']]
40+
end
41+
42+
private
43+
44+
def config
45+
DfE::Analytics.config
46+
end
47+
end
48+
end
49+
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
module Services
4+
module Airbyte
5+
# Refreshes an Airbyte connection: gets token, schema, and updates the connection
6+
class ConnectionRefresh
7+
class Error < StandardError; end
8+
9+
def self.call
10+
new.call
11+
end
12+
13+
def call
14+
access_token = AccessToken.call
15+
connection_id, source_id = ConnectionList.call(access_token:)
16+
discovered_schema = DiscoverSchema.call(access_token:, source_id:)
17+
allowed_list = DfE::Analytics.allowlist
18+
19+
ConnectionUpdate.call(access_token:, connection_id:, allowed_list:, discovered_schema:)
20+
rescue StandardError => e
21+
Rails.logger.error("Airbyte connection refresh failed: #{e.message}")
22+
raise Error, "Connection refresh failed: #{e.message}"
23+
end
24+
25+
def config
26+
DfE::Analytics.config
27+
end
28+
end
29+
end
30+
end
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# frozen_string_literal: true
2+
3+
module Services
4+
module Airbyte
5+
# Fetches the the current schema for given sourec - Also reloads cache
6+
class ConnectionUpdate
7+
CURSOR_FIELD = '_ab_cdc_lsn'
8+
DEFAULT_PRIMARY_KEY = 'id'
9+
SYNC_MODE = 'incremental'
10+
DESTINATION_SYNC_MODE = 'append_dedup'
11+
12+
class Error < StandardError; end
13+
14+
def self.call(access_token:, connection_id:, allowed_list:, discovered_schema:)
15+
new(access_token, connection_id, allowed_list, discovered_schema).call
16+
end
17+
18+
def initialize(access_token, connection_id, allowed_list, discovered_schema)
19+
@access_token = access_token
20+
@connection_id = connection_id
21+
@allowed_list = allowed_list
22+
@discovered_streams = discovered_schema&.dig('catalog', 'streams')
23+
end
24+
25+
def call
26+
response = HTTParty.post(
27+
"#{config.airbyte_server_url}/api/v1/connections/update",
28+
headers: {
29+
'Authorization' => "Bearer #{@access_token}",
30+
'Content-Type' => 'application/json'
31+
},
32+
body: connection_update_payload.to_json
33+
)
34+
35+
unless response.success?
36+
error_message = "Error calling Airbyte discover_schema API: status: #{response.code} body: #{response.body}"
37+
Rails.logger.error(error_message)
38+
raise Error, error_message
39+
end
40+
41+
response.parsed_response
42+
end
43+
44+
private
45+
46+
def config
47+
DfE::Analytics.config
48+
end
49+
50+
def discovered_stream_for(stream_name)
51+
discovered_stream = @discovered_streams.find { |s| s.dig('stream', 'name') == stream_name.to_s } if @discovered_streams.present?
52+
53+
return discovered_stream if discovered_stream.present?
54+
55+
error_message = "Stream definition not found in discovered_schema for: #{stream_name}"
56+
Rails.logger.error(error_message)
57+
raise Error, error_message
58+
end
59+
60+
def connection_update_payload
61+
{
62+
connectionId: @connection_id,
63+
syncCatalog: {
64+
streams: @allowed_list.map do |stream_name, fields|
65+
discovered_stream = discovered_stream_for(stream_name)
66+
{
67+
stream: {
68+
name: stream_name.to_s,
69+
namespace: discovered_stream.dig('stream', 'namespace'),
70+
jsonSchema: discovered_stream.dig('stream', 'jsonSchema'),
71+
supportedSyncModes: discovered_stream.dig('stream', 'supportedSyncModes'),
72+
defaultCursorField: discovered_stream.dig('stream', 'defaultCursorField'),
73+
sourceDefinedCursor: discovered_stream.dig('stream', 'sourceDefinedCursor'),
74+
sourceDefinedPrimaryKey: discovered_stream.dig('stream', 'sourceDefinedPrimaryKey'),
75+
},
76+
config: {
77+
syncMode: discovered_stream.dig('config', 'syncMode') || SYNC_MODE,
78+
destinationSyncMode: discovered_stream.dig('config', 'destinationSyncMode') || DESTINATION_SYNC_MODE,
79+
cursorField: discovered_stream.dig('config', 'cursorField') || [CURSOR_FIELD],
80+
primaryKey: discovered_stream.dig('config', 'primaryKey') || [[DEFAULT_PRIMARY_KEY]],
81+
aliasName: stream_name.to_s,
82+
selected: true,
83+
fieldSelectionEnabled: true,
84+
selectedFields: ([CURSOR_FIELD] + fields).uniq.map { |f| { fieldPath: [f] } }
85+
}
86+
}
87+
end
88+
}
89+
}
90+
end
91+
end
92+
end
93+
end
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# frozen_string_literal: true
2+
3+
module Services
4+
module Airbyte
5+
# Fetches the the current schema for given sourec - Also reloads cache
6+
class DiscoverSchema
7+
class Error < StandardError; end
8+
9+
def self.call(access_token:, source_id:)
10+
new(access_token, source_id).call
11+
end
12+
13+
def initialize(access_token, source_id)
14+
@access_token = access_token
15+
@source_id = source_id
16+
end
17+
18+
def call
19+
response = HTTParty.post(
20+
"#{config.airbyte_server_url}/api/v1/sources/discover_schema",
21+
headers: {
22+
'Authorization' => "Bearer #{@access_token}",
23+
'Content-Type' => 'application/json'
24+
},
25+
body: {
26+
sourceId: @source_id
27+
}.to_json
28+
)
29+
30+
unless response.success?
31+
error_message = "Error calling Airbyte discover_schema API: status: #{response.code} body: #{response.body}"
32+
Rails.logger.error(error_message)
33+
raise Error, error_message
34+
end
35+
36+
response.parsed_response
37+
end
38+
39+
private
40+
41+
def config
42+
DfE::Analytics.config
43+
end
44+
end
45+
end
46+
end

0 commit comments

Comments
 (0)