Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions lib/services/airbyte/api_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,59 @@ def initialize(code, message)
end

def self.post(path:, access_token:, payload:)
new(path:, access_token:, payload:).post
new(path:, access_token:, payload:, method: :post).call
end

def initialize(path:, access_token:, payload:)
def self.patch(path:, access_token:, payload:)
new(path:, access_token:, payload:, method: :patch).call
end

def initialize(path:, access_token:, payload:, method:)
@path = path
@access_token = access_token
@payload = payload
@method = method
end

def post
url = "#{config.airbyte_server_url}#{@path}"

response = HTTParty.post(
url,
headers: {
'Accept' => 'application/json',
'Content-Type' => 'application/json',
'Authorization' => "Bearer #{@access_token}"
},
body: @payload.to_json
)
def call
# Only :post and :patch are supported. This is internally controlled.
response =
case @method
when :post then HTTParty.post(url, request_options)
when :patch then HTTParty.patch(url, request_options)
end

handle_http_error(response)

response.parsed_response
rescue HttpError
raise
rescue StandardError => e
Rails.logger.error("HTTP post failed to url: #{url}, failed with error: #{e.message}")
Rails.logger.error("HTTP #{@method} failed to url: #{url}, failed with error: #{e.message}")
raise Error, e.message
end

private

def config
DfE::Analytics.config
def url
"#{DfE::Analytics.config.airbyte_server_url}#{@path}"
end

def request_options
{
headers: {
'Accept' => 'application/json',
'Content-Type' => 'application/json',
'Authorization' => "Bearer #{@access_token}"
},
body: @payload.to_json
}
end

def handle_http_error(response)
return if response.success?

error_message = "Error calling Airbyte API (#{@path}): status: #{response.code} body: #{response.body}"
error_message = "Error calling Airbyte API (#{@path}): method: #{@method} status: #{response.code} body: #{response.body}"
Rails.logger.info(error_message)
raise HttpError.new(response.code, response.body)
end
Expand Down
3 changes: 1 addition & 2 deletions lib/services/airbyte/connection_refresh.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ def initialize(access_token:)

def call
@access_token ||= AccessToken.call
discovered_schema = DiscoverSchema.call(access_token:)
allowed_list = DfE::Analytics.allowlist

ConnectionUpdate.call(access_token:, allowed_list:, discovered_schema:)
ConnectionUpdate.call(access_token:, allowed_list:)
rescue StandardError => e
Rails.logger.error("Airbyte connection refresh failed: #{e.message}")
raise Error, "Connection refresh failed: #{e.message}"
Expand Down
65 changes: 24 additions & 41 deletions lib/services/airbyte/connection_update.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,72 +7,55 @@ class ConnectionUpdate
CURSOR_FIELD = %w[_ab_cdc_lsn].freeze
AIRBYTE_FIELDS = %w[_ab_cdc_deleted_at _ab_cdc_updated_at].freeze
DEFAULT_PRIMARY_KEY = 'id'
SYNC_MODE = 'incremental'
SYNC_MODE = 'incremental_append'
DESTINATION_SYNC_MODE = 'append'

class Error < StandardError; end

def self.call(access_token:, allowed_list:, discovered_schema:)
new(access_token, allowed_list, discovered_schema).call
def self.call(access_token:, allowed_list:)
new(access_token, allowed_list).call
end

def initialize(access_token, allowed_list, discovered_schema)
def initialize(access_token, allowed_list)
raise Error, 'allowed_list must be a Hash of table_name => fields' unless allowed_list.is_a?(Hash)

@access_token = access_token
@allowed_list = allowed_list
@discovered_streams = discovered_schema&.dig('catalog', 'streams')
@connection_id = DfE::Analytics.config.airbyte_configuration[:connection_id]
end

def call
Services::Airbyte::ApiServer.post(
path: '/api/v1/connections/update',
Services::Airbyte::ApiServer.patch(
path: "/api/public/v1/connections/#{@connection_id}",
access_token: @access_token,
payload: connection_update_payload
payload: connection_patch_payload
)
end

private

def discovered_stream_for(stream_name)
discovered_stream = @discovered_streams.find { |s| s.dig('stream', 'name') == stream_name.to_s } if @discovered_streams.present?

return discovered_stream if discovered_stream.present?

error_message = "Stream definition not found in discovered_schema for: #{stream_name}"
Rails.logger.error(error_message)
raise Error, error_message
end

def connection_update_payload
def connection_patch_payload
{
connectionId: DfE::Analytics.config.airbyte_configuration[:connection_id],
syncCatalog: {
configurations: {
streams: @allowed_list.map do |stream_name, fields|
discovered_stream = discovered_stream_for(stream_name)
{
stream: {
name: stream_name.to_s,
namespace: discovered_stream.dig('stream', 'namespace'),
jsonSchema: discovered_stream.dig('stream', 'jsonSchema'),
supportedSyncModes: discovered_stream.dig('stream', 'supportedSyncModes'),
defaultCursorField: discovered_stream.dig('stream', 'defaultCursorField'),
sourceDefinedCursor: discovered_stream.dig('stream', 'sourceDefinedCursor'),
sourceDefinedPrimaryKey: discovered_stream.dig('stream', 'sourceDefinedPrimaryKey')
},
config: {
syncMode: SYNC_MODE,
destinationSyncMode: DESTINATION_SYNC_MODE,
cursorField: discovered_stream.dig('config', 'cursorField') || CURSOR_FIELD,
primaryKey: discovered_stream.dig('config', 'primaryKey') || [[DEFAULT_PRIMARY_KEY]],
aliasName: stream_name.to_s,
selected: true,
fieldSelectionEnabled: true,
selectedFields: (CURSOR_FIELD + AIRBYTE_FIELDS + fields).uniq.map { |f| { fieldPath: [f] } }
}
name: stream_name.to_s,
selected: true,
syncMode: SYNC_MODE,
cursorField: CURSOR_FIELD,
primaryKey: [[DEFAULT_PRIMARY_KEY]],
selectedFields: selected_fields(fields)
}
end
}
}
end

def selected_fields(fields)
(CURSOR_FIELD + AIRBYTE_FIELDS + fields).uniq.map do |field|
{ fieldPath: [field] }
end
end
end
end
end
107 changes: 56 additions & 51 deletions spec/services/airbyte/api_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

RSpec.describe Services::Airbyte::ApiServer do
let(:access_token) { 'test-token' }
let(:path) { '/api/v1/connections/sync' }
let(:payload) { { connectionId: 'abc-123' } }
let(:path) { '/api/v1/connections/sync' }
let(:airbyte_url) { 'https://mock.airbyte.api' }
let(:url) { "#{airbyte_url}#{path}" }

Expand All @@ -15,69 +15,74 @@
allow(DfE::Analytics).to receive(:config).and_return(config_double)
end

describe '.post' do
context 'when the request is successful' do
let(:response_body) { { 'status' => 'ok' } }
let(:http_response) do
instance_double(
HTTParty::Response,
success?: true,
parsed_response: response_body
)
end
shared_examples 'HTTP method behavior' do |http_method|
describe ".#{http_method}" do
context 'when the request is successful' do
let(:response_body) { { 'status' => 'ok' } }
let(:http_response) do
instance_double(
HTTParty::Response,
success?: true,
parsed_response: response_body
)
end

it 'returns the parsed response' do
allow(HTTParty).to receive(:post).and_return(http_response)
it 'returns the parsed response' do
allow(HTTParty).to receive(http_method).and_return(http_response)

result = described_class.post(path: path, access_token: access_token, payload: payload)
expect(result).to eq(response_body)
result = described_class.public_send(http_method, path: path, access_token: access_token, payload: payload)
expect(result).to eq(response_body)
end
end
end

context 'when the response is an HTTP error' do
let(:http_response) do
instance_double(
HTTParty::Response,
success?: false,
code: 403,
body: 'Forbidden'
)
end
context 'when the response is an HTTP error' do
let(:http_response) do
instance_double(
HTTParty::Response,
success?: false,
code: 403,
body: 'Forbidden'
)
end

before do
allow(HTTParty).to receive(:post).and_return(http_response)
allow(Rails.logger).to receive(:info)
end
before do
allow(HTTParty).to receive(http_method).and_return(http_response)
allow(Rails.logger).to receive(:info)
end

it 'logs and raises a HttpError with code and message' do
expect(Rails.logger).to receive(:info).with(/Error calling Airbyte API/)
it 'logs and raises a HttpError with code and message' do
expect(Rails.logger).to receive(:info).with(/Error calling Airbyte API/)

error = nil
expect do
described_class.post(path: path, access_token: access_token, payload: payload)
rescue described_class::HttpError => e
error = e
raise
end.to raise_error(described_class::HttpError)
error = nil
expect do
described_class.public_send(http_method, path: path, access_token: access_token, payload: payload)
rescue described_class::HttpError => e
error = e
raise
end.to raise_error(described_class::HttpError)

expect(error.code).to eq(403)
expect(error.message).to eq('Forbidden')
expect(error.code).to eq(403)
expect(error.message).to eq('Forbidden')
end
end
end

context 'when a low-level network error occurs' do
before do
allow(HTTParty).to receive(:post).and_raise(StandardError.new('Socket hang up'))
allow(Rails.logger).to receive(:error)
end
context 'when a network-level error occurs' do
before do
allow(HTTParty).to receive(http_method).and_raise(StandardError.new('Socket hang up'))
allow(Rails.logger).to receive(:error)
end

it 'logs and raises a generic ApiServer::Error' do
expect(Rails.logger).to receive(:error).with(/HTTP post failed to url/)
it 'logs and raises a generic ApiServer::Error' do
expect(Rails.logger).to receive(:error).with(/HTTP #{http_method} failed to url/)

expect do
described_class.post(path: path, access_token: access_token, payload: payload)
end.to raise_error(described_class::Error, /Socket hang up/)
expect do
described_class.public_send(http_method, path: path, access_token: access_token, payload: payload)
end.to raise_error(described_class::Error, /Socket hang up/)
end
end
end
end

include_examples 'HTTP method behavior', :post
include_examples 'HTTP method behavior', :patch
end
10 changes: 2 additions & 8 deletions spec/services/airbyte/connection_refresh_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

before do
allow(Services::Airbyte::AccessToken).to receive(:call).and_return(token)
allow(Services::Airbyte::DiscoverSchema).to receive(:call).and_return(schema)
allow(Services::Airbyte::ConnectionUpdate).to receive(:call)
allow(DfE::Analytics).to receive(:allowlist).and_return(allowlist)
end
Expand All @@ -25,21 +24,16 @@
it 'uses the given values and refreshes the connection' do
described_class.call(access_token: token)

expect(Services::Airbyte::DiscoverSchema).to have_received(:call).with(
access_token: token
)

expect(Services::Airbyte::ConnectionUpdate).to have_received(:call).with(
access_token: token,
allowed_list: allowlist,
discovered_schema: schema
allowed_list: allowlist
)
end
end

context 'when an error occurs' do
before do
allow(Services::Airbyte::DiscoverSchema).to receive(:call)
allow(Services::Airbyte::ConnectionUpdate).to receive(:call)
.and_raise(StandardError.new('boom'))
allow(Rails.logger).to receive(:error)
end
Expand Down
Loading
Loading