Skip to content

Commit c3dd1fc

Browse files
committed
Change airbyte update comnnection API to public patch method
1 parent 503dfbe commit c3dd1fc

File tree

6 files changed

+166
-203
lines changed

6 files changed

+166
-203
lines changed

lib/services/airbyte/api_server.rb

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,59 @@ def initialize(code, message)
1717
end
1818

1919
def self.post(path:, access_token:, payload:)
20-
new(path:, access_token:, payload:).post
20+
new(path:, access_token:, payload:, method: :post).call
2121
end
2222

23-
def initialize(path:, access_token:, payload:)
23+
def self.patch(path:, access_token:, payload:)
24+
new(path:, access_token:, payload:, method: :patch).call
25+
end
26+
27+
def initialize(path:, access_token:, payload:, method:)
2428
@path = path
2529
@access_token = access_token
2630
@payload = payload
31+
@method = method
2732
end
2833

29-
def post
30-
url = "#{config.airbyte_server_url}#{@path}"
31-
32-
response = HTTParty.post(
33-
url,
34-
headers: {
35-
'Accept' => 'application/json',
36-
'Content-Type' => 'application/json',
37-
'Authorization' => "Bearer #{@access_token}"
38-
},
39-
body: @payload.to_json
40-
)
34+
def call
35+
# Only :post and :patch are supported. This is internally controlled.
36+
response =
37+
case @method
38+
when :post then HTTParty.post(url, request_options)
39+
when :patch then HTTParty.patch(url, request_options)
40+
end
4141

4242
handle_http_error(response)
4343

4444
response.parsed_response
4545
rescue HttpError
4646
raise
4747
rescue StandardError => e
48-
Rails.logger.error("HTTP post failed to url: #{url}, failed with error: #{e.message}")
48+
Rails.logger.error("HTTP #{@method} failed to url: #{url}, failed with error: #{e.message}")
4949
raise Error, e.message
5050
end
5151

5252
private
5353

54-
def config
55-
DfE::Analytics.config
54+
def url
55+
"#{DfE::Analytics.config.airbyte_server_url}#{@path}"
56+
end
57+
58+
def request_options
59+
{
60+
headers: {
61+
'Accept' => 'application/json',
62+
'Content-Type' => 'application/json',
63+
'Authorization' => "Bearer #{@access_token}"
64+
},
65+
body: @payload.to_json
66+
}
5667
end
5768

5869
def handle_http_error(response)
5970
return if response.success?
6071

61-
error_message = "Error calling Airbyte API (#{@path}): status: #{response.code} body: #{response.body}"
72+
error_message = "Error calling Airbyte API (#{@path}): method: #{@method} status: #{response.code} body: #{response.body}"
6273
Rails.logger.info(error_message)
6374
raise HttpError.new(response.code, response.body)
6475
end

lib/services/airbyte/connection_refresh.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ def initialize(access_token:)
1616

1717
def call
1818
@access_token ||= AccessToken.call
19-
discovered_schema = DiscoverSchema.call(access_token:)
2019
allowed_list = DfE::Analytics.allowlist
2120

22-
ConnectionUpdate.call(access_token:, allowed_list:, discovered_schema:)
21+
ConnectionUpdate.call(access_token:, allowed_list:)
2322
rescue StandardError => e
2423
Rails.logger.error("Airbyte connection refresh failed: #{e.message}")
2524
raise Error, "Connection refresh failed: #{e.message}"

lib/services/airbyte/connection_update.rb

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,67 +12,51 @@ class ConnectionUpdate
1212

1313
class Error < StandardError; end
1414

15-
def self.call(access_token:, allowed_list:, discovered_schema:)
16-
new(access_token, allowed_list, discovered_schema).call
15+
def self.call(access_token:, allowed_list:)
16+
new(access_token, allowed_list).call
1717
end
1818

19-
def initialize(access_token, allowed_list, discovered_schema)
19+
def initialize(access_token, allowed_list)
20+
raise Error, 'allowed_list must be a Hash of table_name => fields' unless allowed_list.is_a?(Hash)
21+
2022
@access_token = access_token
2123
@allowed_list = allowed_list
22-
@discovered_streams = discovered_schema&.dig('catalog', 'streams')
24+
@connection_id = DfE::Analytics.config.airbyte_configuration[:connection_id]
2325
end
2426

2527
def call
26-
Services::Airbyte::ApiServer.post(
27-
path: '/api/v1/connections/update',
28+
Services::Airbyte::ApiServer.patch(
29+
path: "/api/public/v1/connections/#{@connection_id}",
2830
access_token: @access_token,
29-
payload: connection_update_payload
31+
payload: connection_patch_payload
3032
)
3133
end
3234

3335
private
3436

35-
def discovered_stream_for(stream_name)
36-
discovered_stream = @discovered_streams.find { |s| s.dig('stream', 'name') == stream_name.to_s } if @discovered_streams.present?
37-
38-
return discovered_stream if discovered_stream.present?
39-
40-
error_message = "Stream definition not found in discovered_schema for: #{stream_name}"
41-
Rails.logger.error(error_message)
42-
raise Error, error_message
43-
end
44-
45-
def connection_update_payload
37+
def connection_patch_payload
4638
{
47-
connectionId: DfE::Analytics.config.airbyte_configuration[:connection_id],
48-
syncCatalog: {
39+
configurations: {
4940
streams: @allowed_list.map do |stream_name, fields|
50-
discovered_stream = discovered_stream_for(stream_name)
5141
{
52-
stream: {
53-
name: stream_name.to_s,
54-
namespace: discovered_stream.dig('stream', 'namespace'),
55-
jsonSchema: discovered_stream.dig('stream', 'jsonSchema'),
56-
supportedSyncModes: discovered_stream.dig('stream', 'supportedSyncModes'),
57-
defaultCursorField: discovered_stream.dig('stream', 'defaultCursorField'),
58-
sourceDefinedCursor: discovered_stream.dig('stream', 'sourceDefinedCursor'),
59-
sourceDefinedPrimaryKey: discovered_stream.dig('stream', 'sourceDefinedPrimaryKey')
60-
},
61-
config: {
62-
syncMode: SYNC_MODE,
63-
destinationSyncMode: DESTINATION_SYNC_MODE,
64-
cursorField: discovered_stream.dig('config', 'cursorField') || CURSOR_FIELD,
65-
primaryKey: discovered_stream.dig('config', 'primaryKey') || [[DEFAULT_PRIMARY_KEY]],
66-
aliasName: stream_name.to_s,
67-
selected: true,
68-
fieldSelectionEnabled: true,
69-
selectedFields: (CURSOR_FIELD + AIRBYTE_FIELDS + fields).uniq.map { |f| { fieldPath: [f] } }
70-
}
42+
name: stream_name.to_s,
43+
selected: true,
44+
syncMode: SYNC_MODE,
45+
destinationSyncMode: DESTINATION_SYNC_MODE,
46+
cursorField: CURSOR_FIELD,
47+
primaryKey: [[DEFAULT_PRIMARY_KEY]],
48+
selectedFields: selected_fields(fields)
7149
}
7250
end
7351
}
7452
}
7553
end
54+
55+
def selected_fields(fields)
56+
(CURSOR_FIELD + AIRBYTE_FIELDS + fields).uniq.map do |field|
57+
{ fieldPath: [field] }
58+
end
59+
end
7660
end
7761
end
7862
end

spec/services/airbyte/api_server_spec.rb

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
RSpec.describe Services::Airbyte::ApiServer do
44
let(:access_token) { 'test-token' }
5-
let(:path) { '/api/v1/connections/sync' }
65
let(:payload) { { connectionId: 'abc-123' } }
6+
let(:path) { '/api/v1/connections/sync' }
77
let(:airbyte_url) { 'https://mock.airbyte.api' }
88
let(:url) { "#{airbyte_url}#{path}" }
99

@@ -15,69 +15,74 @@
1515
allow(DfE::Analytics).to receive(:config).and_return(config_double)
1616
end
1717

18-
describe '.post' do
19-
context 'when the request is successful' do
20-
let(:response_body) { { 'status' => 'ok' } }
21-
let(:http_response) do
22-
instance_double(
23-
HTTParty::Response,
24-
success?: true,
25-
parsed_response: response_body
26-
)
27-
end
18+
shared_examples 'HTTP method behavior' do |http_method|
19+
describe ".#{http_method}" do
20+
context 'when the request is successful' do
21+
let(:response_body) { { 'status' => 'ok' } }
22+
let(:http_response) do
23+
instance_double(
24+
HTTParty::Response,
25+
success?: true,
26+
parsed_response: response_body
27+
)
28+
end
2829

29-
it 'returns the parsed response' do
30-
allow(HTTParty).to receive(:post).and_return(http_response)
30+
it 'returns the parsed response' do
31+
allow(HTTParty).to receive(http_method).and_return(http_response)
3132

32-
result = described_class.post(path: path, access_token: access_token, payload: payload)
33-
expect(result).to eq(response_body)
33+
result = described_class.public_send(http_method, path: path, access_token: access_token, payload: payload)
34+
expect(result).to eq(response_body)
35+
end
3436
end
35-
end
3637

37-
context 'when the response is an HTTP error' do
38-
let(:http_response) do
39-
instance_double(
40-
HTTParty::Response,
41-
success?: false,
42-
code: 403,
43-
body: 'Forbidden'
44-
)
45-
end
38+
context 'when the response is an HTTP error' do
39+
let(:http_response) do
40+
instance_double(
41+
HTTParty::Response,
42+
success?: false,
43+
code: 403,
44+
body: 'Forbidden'
45+
)
46+
end
4647

47-
before do
48-
allow(HTTParty).to receive(:post).and_return(http_response)
49-
allow(Rails.logger).to receive(:info)
50-
end
48+
before do
49+
allow(HTTParty).to receive(http_method).and_return(http_response)
50+
allow(Rails.logger).to receive(:info)
51+
end
5152

52-
it 'logs and raises a HttpError with code and message' do
53-
expect(Rails.logger).to receive(:info).with(/Error calling Airbyte API/)
53+
it 'logs and raises a HttpError with code and message' do
54+
expect(Rails.logger).to receive(:info).with(/Error calling Airbyte API/)
5455

55-
error = nil
56-
expect do
57-
described_class.post(path: path, access_token: access_token, payload: payload)
58-
rescue described_class::HttpError => e
59-
error = e
60-
raise
61-
end.to raise_error(described_class::HttpError)
56+
error = nil
57+
expect do
58+
described_class.public_send(http_method, path: path, access_token: access_token, payload: payload)
59+
rescue described_class::HttpError => e
60+
error = e
61+
raise
62+
end.to raise_error(described_class::HttpError)
6263

63-
expect(error.code).to eq(403)
64-
expect(error.message).to eq('Forbidden')
64+
expect(error.code).to eq(403)
65+
expect(error.message).to eq('Forbidden')
66+
end
6567
end
66-
end
6768

68-
context 'when a low-level network error occurs' do
69-
before do
70-
allow(HTTParty).to receive(:post).and_raise(StandardError.new('Socket hang up'))
71-
allow(Rails.logger).to receive(:error)
72-
end
69+
context 'when a network-level error occurs' do
70+
before do
71+
allow(HTTParty).to receive(http_method).and_raise(StandardError.new('Socket hang up'))
72+
allow(Rails.logger).to receive(:error)
73+
end
7374

74-
it 'logs and raises a generic ApiServer::Error' do
75-
expect(Rails.logger).to receive(:error).with(/HTTP post failed to url/)
75+
it 'logs and raises a generic ApiServer::Error' do
76+
expect(Rails.logger).to receive(:error).with(/HTTP #{http_method} failed to url/)
7677

77-
expect do
78-
described_class.post(path: path, access_token: access_token, payload: payload)
79-
end.to raise_error(described_class::Error, /Socket hang up/)
78+
expect do
79+
described_class.public_send(http_method, path: path, access_token: access_token, payload: payload)
80+
end.to raise_error(described_class::Error, /Socket hang up/)
81+
end
8082
end
8183
end
8284
end
85+
86+
include_examples 'HTTP method behavior', :post
87+
include_examples 'HTTP method behavior', :patch
8388
end

spec/services/airbyte/connection_refresh_spec.rb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
before do
1818
allow(Services::Airbyte::AccessToken).to receive(:call).and_return(token)
19-
allow(Services::Airbyte::DiscoverSchema).to receive(:call).and_return(schema)
2019
allow(Services::Airbyte::ConnectionUpdate).to receive(:call)
2120
allow(DfE::Analytics).to receive(:allowlist).and_return(allowlist)
2221
end
@@ -25,21 +24,16 @@
2524
it 'uses the given values and refreshes the connection' do
2625
described_class.call(access_token: token)
2726

28-
expect(Services::Airbyte::DiscoverSchema).to have_received(:call).with(
29-
access_token: token
30-
)
31-
3227
expect(Services::Airbyte::ConnectionUpdate).to have_received(:call).with(
3328
access_token: token,
34-
allowed_list: allowlist,
35-
discovered_schema: schema
29+
allowed_list: allowlist
3630
)
3731
end
3832
end
3933

4034
context 'when an error occurs' do
4135
before do
42-
allow(Services::Airbyte::DiscoverSchema).to receive(:call)
36+
allow(Services::Airbyte::ConnectionUpdate).to receive(:call)
4337
.and_raise(StandardError.new('boom'))
4438
allow(Rails.logger).to receive(:error)
4539
end

0 commit comments

Comments
 (0)