Skip to content

Commit d6c395c

Browse files
committed
Get Airbyte source id and connection id from config rather then APIs
1 parent 91427ab commit d6c395c

21 files changed

+115
-137
lines changed

config/locales/en.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ en:
136136
description: |
137137
Airbyte server base URL
138138
default: ENV['AIRBYTE_SERVER_URL']
139-
airbyte_workspace_id:
139+
airbyte_configuration:
140140
description: |
141-
Airbyte Workspace Id for this App contains source, destination and connection
142-
default: ENV['AIRBYTE_WORKSPACE_ID']
141+
Airbyte configuration in JSON containing workspace, source, destination and connection identifiers
142+
default: ENV['airbyte_configuration']

lib/dfe/analytics/config.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ module Config
3939
airbyte_client_id
4040
airbyte_client_secret
4141
airbyte_server_url
42-
airbyte_workspace_id
42+
airbyte_configuration
4343
].freeze
4444

4545
def self.params
@@ -74,7 +74,8 @@ def self.configure(config)
7474
config.airbyte_client_id ||= ENV.fetch('AIRBYTE_CLIENT_ID', nil)
7575
config.airbyte_client_secret ||= ENV.fetch('AIRBYTE_CLIENT_SECRET', nil)
7676
config.airbyte_server_url ||= ENV.fetch('AIRBYTE_SERVER_URL', nil)
77-
config.airbyte_workspace_id ||= ENV.fetch('AIRBYTE_WORKSPACE_ID', nil)
77+
config.airbyte_configuration ||=
78+
JSON.parse(ENV.fetch('airbyte_configuration', '{}')).transform_keys(&:underscore).symbolize_keys
7879

7980
config.async = true if config.async.nil?
8081

lib/dfe/analytics/jobs/airbyte_deploy_job.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@ def perform
1616
Rails.logger.info('Finished WaitForMigrations')
1717

1818
access_token = ::Services::Airbyte::AccessToken.call
19-
connection_id, source_id = ::Services::Airbyte::ConnectionList.call(access_token:)
2019

2120
# Refresh schema
22-
::Services::Airbyte::ConnectionRefresh.call(access_token:, connection_id:, source_id:)
21+
::Services::Airbyte::ConnectionRefresh.call(access_token:)
2322

2423
Rails.logger.info('Finished ConnectionRefresh')
2524

2625
# Check if a sync job is already running
27-
last_job = ::Services::Airbyte::JobLast.call(access_token:, connection_id:)
26+
last_job = ::Services::Airbyte::JobLast.call(access_token:)
2827
status = last_job&.dig('job', 'status')
2928
job_id = last_job&.dig('job', 'id')
3029

3130
Rails.logger.info("JobLast status: #{status} id: #{job_id}")
3231

33-
job_id = ::Services::Airbyte::StartSync.call(access_token:, connection_id:) if status != 'running'
32+
job_id = ::Services::Airbyte::StartSync.call(access_token:) if status != 'running'
3433

3534
# Wait for the job (existing or new) to finish
36-
::Services::Airbyte::WaitForSync.call(access_token:, connection_id:, job_id:)
35+
::Services::Airbyte::WaitForSync.call(access_token:, job_id:)
3736

3837
Rails.logger.info('Finished WaitForSync')
3938

lib/services/airbyte/connection_list.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def initialize(access_token:)
1616

1717
def call
1818
payload = {
19-
workspaceId: config.airbyte_workspace_id
19+
workspaceId: config.airbyte_configuration[:workspace_id]
2020
}
2121

2222
response = Services::Airbyte::ApiServer.post(

lib/services/airbyte/connection_refresh.rb

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,28 @@ module Airbyte
66
class ConnectionRefresh
77
class Error < StandardError; end
88

9-
def self.call(access_token: nil, connection_id: nil, source_id: nil)
10-
new(access_token:, connection_id:, source_id:).call
9+
def self.call(access_token: nil)
10+
new(access_token:).call
1111
end
1212

13-
def initialize(access_token:, connection_id:, source_id:)
13+
def initialize(access_token:)
1414
@access_token = access_token
15-
@connection_id = connection_id
16-
@source_id = source_id
1715
end
1816

1917
def call
2018
@access_token ||= AccessToken.call
21-
@connection_id, @source_id = ConnectionList.call(access_token:) if connection_id.blank? || source_id.blank?
22-
discovered_schema = DiscoverSchema.call(access_token:, source_id:)
19+
discovered_schema = DiscoverSchema.call(access_token:)
2320
allowed_list = DfE::Analytics.allowlist
2421

25-
ConnectionUpdate.call(access_token:, connection_id:, allowed_list:, discovered_schema:)
22+
ConnectionUpdate.call(access_token:, allowed_list:, discovered_schema:)
2623
rescue StandardError => e
2724
Rails.logger.error("Airbyte connection refresh failed: #{e.message}")
2825
raise Error, "Connection refresh failed: #{e.message}"
2926
end
3027

3128
private
3229

33-
attr_reader :access_token, :connection_id, :source_id
30+
attr_reader :access_token
3431
end
3532
end
3633
end

lib/services/airbyte/connection_update.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@ class ConnectionUpdate
1212

1313
class Error < StandardError; end
1414

15-
def self.call(access_token:, connection_id:, allowed_list:, discovered_schema:)
16-
new(access_token, connection_id, allowed_list, discovered_schema).call
15+
def self.call(access_token:, allowed_list:, discovered_schema:)
16+
new(access_token, allowed_list, discovered_schema).call
1717
end
1818

19-
def initialize(access_token, connection_id, allowed_list, discovered_schema)
19+
def initialize(access_token, allowed_list, discovered_schema)
2020
@access_token = access_token
21-
@connection_id = connection_id
2221
@allowed_list = allowed_list
2322
@discovered_streams = discovered_schema&.dig('catalog', 'streams')
2423
end
@@ -45,7 +44,7 @@ def discovered_stream_for(stream_name)
4544

4645
def connection_update_payload
4746
{
48-
connectionId: @connection_id,
47+
connectionId: DfE::Analytics.config.airbyte_configuration[:connection_id],
4948
syncCatalog: {
5049
streams: @allowed_list.map do |stream_name, fields|
5150
discovered_stream = discovered_stream_for(stream_name)

lib/services/airbyte/discover_schema.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@ module Airbyte
66
class DiscoverSchema
77
class Error < StandardError; end
88

9-
def self.call(access_token:, source_id:)
10-
new(access_token, source_id).call
9+
def self.call(access_token:)
10+
new(access_token).call
1111
end
1212

13-
def initialize(access_token, source_id)
13+
def initialize(access_token)
1414
@access_token = access_token
15-
@source_id = source_id
1615
end
1716

1817
def call
1918
payload = {
20-
sourceId: @source_id
19+
sourceId: DfE::Analytics.config.airbyte_configuration[:source_id]
2120
}
2221

2322
Services::Airbyte::ApiServer.post(

lib/services/airbyte/job_last.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,18 @@ class Error < StandardError; end
88

99
PAGE_SIZE = 5
1010

11-
def self.call(access_token:, connection_id:)
12-
new(access_token:, connection_id:).call
11+
def self.call(access_token:)
12+
new(access_token:).call
1313
end
1414

15-
def initialize(access_token:, connection_id:)
15+
def initialize(access_token:)
1616
@access_token = access_token
17-
@connection_id = connection_id
1817
end
1918

2019
def call
2120
payload = {
2221
configTypes: ['sync'],
23-
configId: @connection_id,
22+
configId: DfE::Analytics.config.airbyte_configuration[:connection_id],
2423
pagination: { pageSize: PAGE_SIZE, rowOffset: 0 }
2524
}
2625

lib/services/airbyte/job_status.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,19 @@ class Error < StandardError; end
1313

1414
PAGE_SIZE = 10 # Number of recent jobs to fetch when searching for a job_id
1515

16-
def self.call(access_token:, connection_id:, job_id:)
17-
new(access_token:, connection_id:, job_id:).call
16+
def self.call(access_token:, job_id:)
17+
new(access_token:, job_id:).call
1818
end
1919

20-
def initialize(access_token:, connection_id:, job_id:)
20+
def initialize(access_token:, job_id:)
2121
@access_token = access_token
22-
@connection_id = connection_id
2322
@job_id = job_id
2423
end
2524

2625
def call
2726
payload = {
2827
configTypes: ['sync'],
29-
configId: @connection_id,
28+
configId: DfE::Analytics.config.airbyte_configuration[:connection_id],
3029
pagination: { pageSize: PAGE_SIZE, rowOffset: 0 }
3130
}
3231

lib/services/airbyte/start_sync.rb

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,16 @@ module Airbyte
66
class StartSync
77
class Error < StandardError; end
88

9-
def self.call(access_token:, connection_id:)
10-
new(access_token:, connection_id:).call
9+
def self.call(access_token:)
10+
new(access_token:).call
1111
end
1212

13-
def initialize(access_token:, connection_id:)
13+
def initialize(access_token:)
1414
@access_token = access_token
15-
@connection_id = connection_id
1615
end
1716

1817
def call
19-
payload = { connectionId: connection_id }
18+
payload = { connectionId: DfE::Analytics.config.airbyte_configuration[:connection_id] }
2019

2120
response = Services::Airbyte::ApiServer.post(
2221
path: '/api/v1/connections/sync',
@@ -31,7 +30,7 @@ def call
3130
# HTTP Status code: 409 indicates a job is already running - Get last job id
3231
Rails.logger.info('Sync already in progress, retrieving last job instead.')
3332

34-
last_job = JobLast.call(access_token:, connection_id:)
33+
last_job = JobLast.call(access_token:)
3534

3635
job_id_for!(last_job)
3736
rescue StandardError => e
@@ -49,7 +48,7 @@ def job_id_for!(job)
4948
job_id
5049
end
5150

52-
attr_reader :access_token, :connection_id
51+
attr_reader :access_token
5352
end
5453
end
5554
end

0 commit comments

Comments
 (0)