diff --git a/lib/services/airbyte/api_server.rb b/lib/services/airbyte/api_server.rb index 80dd924..f2d1ebf 100644 --- a/lib/services/airbyte/api_server.rb +++ b/lib/services/airbyte/api_server.rb @@ -17,27 +17,27 @@ def initialize(code, message) end def self.post(path:, access_token:, payload:) - new(path:, access_token:, payload:).post + new(path:, access_token:, payload:, method: :post).call end - def initialize(path:, access_token:, payload:) + def self.patch(path:, access_token:, payload:) + new(path:, access_token:, payload:, method: :patch).call + end + + def initialize(path:, access_token:, payload:, method:) @path = path @access_token = access_token @payload = payload + @method = method end - def post - url = "#{config.airbyte_server_url}#{@path}" - - response = HTTParty.post( - url, - headers: { - 'Accept' => 'application/json', - 'Content-Type' => 'application/json', - 'Authorization' => "Bearer #{@access_token}" - }, - body: @payload.to_json - ) + def call + # Only :post and :patch are supported. This is internally controlled. + response = + case @method + when :post then HTTParty.post(url, request_options) + when :patch then HTTParty.patch(url, request_options) + end handle_http_error(response) @@ -45,20 +45,31 @@ def post rescue HttpError raise rescue StandardError => e - Rails.logger.error("HTTP post failed to url: #{url}, failed with error: #{e.message}") + Rails.logger.error("HTTP #{@method} failed to url: #{url}, failed with error: #{e.message}") raise Error, e.message end private - def config - DfE::Analytics.config + def url + "#{DfE::Analytics.config.airbyte_server_url}#{@path}" + end + + def request_options + { + headers: { + 'Accept' => 'application/json', + 'Content-Type' => 'application/json', + 'Authorization' => "Bearer #{@access_token}" + }, + body: @payload.to_json + } end def handle_http_error(response) return if response.success? - error_message = "Error calling Airbyte API (#{@path}): status: #{response.code} body: #{response.body}" + error_message = "Error calling Airbyte API (#{@path}): method: #{@method} status: #{response.code} body: #{response.body}" Rails.logger.info(error_message) raise HttpError.new(response.code, response.body) end diff --git a/lib/services/airbyte/connection_refresh.rb b/lib/services/airbyte/connection_refresh.rb index 3ffc4cc..2e38a97 100644 --- a/lib/services/airbyte/connection_refresh.rb +++ b/lib/services/airbyte/connection_refresh.rb @@ -16,10 +16,9 @@ def initialize(access_token:) def call @access_token ||= AccessToken.call - discovered_schema = DiscoverSchema.call(access_token:) allowed_list = DfE::Analytics.allowlist - ConnectionUpdate.call(access_token:, allowed_list:, discovered_schema:) + ConnectionUpdate.call(access_token:, allowed_list:) rescue StandardError => e Rails.logger.error("Airbyte connection refresh failed: #{e.message}") raise Error, "Connection refresh failed: #{e.message}" diff --git a/lib/services/airbyte/connection_update.rb b/lib/services/airbyte/connection_update.rb index a12bbcd..0666745 100644 --- a/lib/services/airbyte/connection_update.rb +++ b/lib/services/airbyte/connection_update.rb @@ -7,72 +7,55 @@ class ConnectionUpdate CURSOR_FIELD = %w[_ab_cdc_lsn].freeze AIRBYTE_FIELDS = %w[_ab_cdc_deleted_at _ab_cdc_updated_at].freeze DEFAULT_PRIMARY_KEY = 'id' - SYNC_MODE = 'incremental' + SYNC_MODE = 'incremental_append' DESTINATION_SYNC_MODE = 'append' class Error < StandardError; end - def self.call(access_token:, allowed_list:, discovered_schema:) - new(access_token, allowed_list, discovered_schema).call + def self.call(access_token:, allowed_list:) + new(access_token, allowed_list).call end - def initialize(access_token, allowed_list, discovered_schema) + def initialize(access_token, allowed_list) + raise Error, 'allowed_list must be a Hash of table_name => fields' unless allowed_list.is_a?(Hash) + @access_token = access_token @allowed_list = allowed_list - @discovered_streams = discovered_schema&.dig('catalog', 'streams') + @connection_id = DfE::Analytics.config.airbyte_configuration[:connection_id] end def call - Services::Airbyte::ApiServer.post( - path: '/api/v1/connections/update', + Services::Airbyte::ApiServer.patch( + path: "/api/public/v1/connections/#{@connection_id}", access_token: @access_token, - payload: connection_update_payload + payload: connection_patch_payload ) end private - def discovered_stream_for(stream_name) - discovered_stream = @discovered_streams.find { |s| s.dig('stream', 'name') == stream_name.to_s } if @discovered_streams.present? - - return discovered_stream if discovered_stream.present? - - error_message = "Stream definition not found in discovered_schema for: #{stream_name}" - Rails.logger.error(error_message) - raise Error, error_message - end - - def connection_update_payload + def connection_patch_payload { - connectionId: DfE::Analytics.config.airbyte_configuration[:connection_id], - syncCatalog: { + configurations: { streams: @allowed_list.map do |stream_name, fields| - discovered_stream = discovered_stream_for(stream_name) { - stream: { - name: stream_name.to_s, - namespace: discovered_stream.dig('stream', 'namespace'), - jsonSchema: discovered_stream.dig('stream', 'jsonSchema'), - supportedSyncModes: discovered_stream.dig('stream', 'supportedSyncModes'), - defaultCursorField: discovered_stream.dig('stream', 'defaultCursorField'), - sourceDefinedCursor: discovered_stream.dig('stream', 'sourceDefinedCursor'), - sourceDefinedPrimaryKey: discovered_stream.dig('stream', 'sourceDefinedPrimaryKey') - }, - config: { - syncMode: SYNC_MODE, - destinationSyncMode: DESTINATION_SYNC_MODE, - cursorField: discovered_stream.dig('config', 'cursorField') || CURSOR_FIELD, - primaryKey: discovered_stream.dig('config', 'primaryKey') || [[DEFAULT_PRIMARY_KEY]], - aliasName: stream_name.to_s, - selected: true, - fieldSelectionEnabled: true, - selectedFields: (CURSOR_FIELD + AIRBYTE_FIELDS + fields).uniq.map { |f| { fieldPath: [f] } } - } + name: stream_name.to_s, + selected: true, + syncMode: SYNC_MODE, + cursorField: CURSOR_FIELD, + primaryKey: [[DEFAULT_PRIMARY_KEY]], + selectedFields: selected_fields(fields) } end } } end + + def selected_fields(fields) + (CURSOR_FIELD + AIRBYTE_FIELDS + fields).uniq.map do |field| + { fieldPath: [field] } + end + end end end end diff --git a/spec/services/airbyte/api_server_spec.rb b/spec/services/airbyte/api_server_spec.rb index 3546ef6..08b35b9 100644 --- a/spec/services/airbyte/api_server_spec.rb +++ b/spec/services/airbyte/api_server_spec.rb @@ -2,8 +2,8 @@ RSpec.describe Services::Airbyte::ApiServer do let(:access_token) { 'test-token' } - let(:path) { '/api/v1/connections/sync' } let(:payload) { { connectionId: 'abc-123' } } + let(:path) { '/api/v1/connections/sync' } let(:airbyte_url) { 'https://mock.airbyte.api' } let(:url) { "#{airbyte_url}#{path}" } @@ -15,69 +15,74 @@ allow(DfE::Analytics).to receive(:config).and_return(config_double) end - describe '.post' do - context 'when the request is successful' do - let(:response_body) { { 'status' => 'ok' } } - let(:http_response) do - instance_double( - HTTParty::Response, - success?: true, - parsed_response: response_body - ) - end + shared_examples 'HTTP method behavior' do |http_method| + describe ".#{http_method}" do + context 'when the request is successful' do + let(:response_body) { { 'status' => 'ok' } } + let(:http_response) do + instance_double( + HTTParty::Response, + success?: true, + parsed_response: response_body + ) + end - it 'returns the parsed response' do - allow(HTTParty).to receive(:post).and_return(http_response) + it 'returns the parsed response' do + allow(HTTParty).to receive(http_method).and_return(http_response) - result = described_class.post(path: path, access_token: access_token, payload: payload) - expect(result).to eq(response_body) + result = described_class.public_send(http_method, path: path, access_token: access_token, payload: payload) + expect(result).to eq(response_body) + end end - end - context 'when the response is an HTTP error' do - let(:http_response) do - instance_double( - HTTParty::Response, - success?: false, - code: 403, - body: 'Forbidden' - ) - end + context 'when the response is an HTTP error' do + let(:http_response) do + instance_double( + HTTParty::Response, + success?: false, + code: 403, + body: 'Forbidden' + ) + end - before do - allow(HTTParty).to receive(:post).and_return(http_response) - allow(Rails.logger).to receive(:info) - end + before do + allow(HTTParty).to receive(http_method).and_return(http_response) + allow(Rails.logger).to receive(:info) + end - it 'logs and raises a HttpError with code and message' do - expect(Rails.logger).to receive(:info).with(/Error calling Airbyte API/) + it 'logs and raises a HttpError with code and message' do + expect(Rails.logger).to receive(:info).with(/Error calling Airbyte API/) - error = nil - expect do - described_class.post(path: path, access_token: access_token, payload: payload) - rescue described_class::HttpError => e - error = e - raise - end.to raise_error(described_class::HttpError) + error = nil + expect do + described_class.public_send(http_method, path: path, access_token: access_token, payload: payload) + rescue described_class::HttpError => e + error = e + raise + end.to raise_error(described_class::HttpError) - expect(error.code).to eq(403) - expect(error.message).to eq('Forbidden') + expect(error.code).to eq(403) + expect(error.message).to eq('Forbidden') + end end - end - context 'when a low-level network error occurs' do - before do - allow(HTTParty).to receive(:post).and_raise(StandardError.new('Socket hang up')) - allow(Rails.logger).to receive(:error) - end + context 'when a network-level error occurs' do + before do + allow(HTTParty).to receive(http_method).and_raise(StandardError.new('Socket hang up')) + allow(Rails.logger).to receive(:error) + end - it 'logs and raises a generic ApiServer::Error' do - expect(Rails.logger).to receive(:error).with(/HTTP post failed to url/) + it 'logs and raises a generic ApiServer::Error' do + expect(Rails.logger).to receive(:error).with(/HTTP #{http_method} failed to url/) - expect do - described_class.post(path: path, access_token: access_token, payload: payload) - end.to raise_error(described_class::Error, /Socket hang up/) + expect do + described_class.public_send(http_method, path: path, access_token: access_token, payload: payload) + end.to raise_error(described_class::Error, /Socket hang up/) + end end end end + + include_examples 'HTTP method behavior', :post + include_examples 'HTTP method behavior', :patch end diff --git a/spec/services/airbyte/connection_refresh_spec.rb b/spec/services/airbyte/connection_refresh_spec.rb index cbffb37..63c65e0 100644 --- a/spec/services/airbyte/connection_refresh_spec.rb +++ b/spec/services/airbyte/connection_refresh_spec.rb @@ -16,7 +16,6 @@ before do allow(Services::Airbyte::AccessToken).to receive(:call).and_return(token) - allow(Services::Airbyte::DiscoverSchema).to receive(:call).and_return(schema) allow(Services::Airbyte::ConnectionUpdate).to receive(:call) allow(DfE::Analytics).to receive(:allowlist).and_return(allowlist) end @@ -25,21 +24,16 @@ it 'uses the given values and refreshes the connection' do described_class.call(access_token: token) - expect(Services::Airbyte::DiscoverSchema).to have_received(:call).with( - access_token: token - ) - expect(Services::Airbyte::ConnectionUpdate).to have_received(:call).with( access_token: token, - allowed_list: allowlist, - discovered_schema: schema + allowed_list: allowlist ) end end context 'when an error occurs' do before do - allow(Services::Airbyte::DiscoverSchema).to receive(:call) + allow(Services::Airbyte::ConnectionUpdate).to receive(:call) .and_raise(StandardError.new('boom')) allow(Rails.logger).to receive(:error) end diff --git a/spec/services/airbyte/connection_update_spec.rb b/spec/services/airbyte/connection_update_spec.rb index 71befd6..bf91293 100644 --- a/spec/services/airbyte/connection_update_spec.rb +++ b/spec/services/airbyte/connection_update_spec.rb @@ -9,32 +9,13 @@ } end - let(:discovered_schema) do - { - 'catalog' => { - 'streams' => [ - { - 'stream' => { - 'name' => 'academic_cycles', - 'jsonSchema' => { 'type' => 'object', 'properties' => {} }, - 'namespace' => 'public', - 'supportedSyncModes' => %w[full_refresh incremental] - }, - 'config' => { - 'syncMode' => 'incremental', - 'destinationSyncMode' => 'append', - 'cursorField' => ['_ab_cdc_lsn'], - 'primaryKey' => [['id']] - } - } - ] - } - } - end - let(:connection_id) { 'abc-123' } + let(:config_double) do - instance_double('DfE::Analytics.config', airbyte_configuration: { connection_id: connection_id }) + instance_double( + 'DfE::Analytics.config', + airbyte_configuration: { connection_id: connection_id } + ) end before do @@ -45,86 +26,48 @@ let(:api_result) { { 'status' => 'ok' } } before do - allow(Services::Airbyte::ApiServer).to receive(:post).and_return(api_result) + allow(Services::Airbyte::ApiServer).to receive(:patch).and_return(api_result) end - it 'delegates to ApiServer and returns parsed response' do + it 'delegates to ApiServer.patch and returns the response' do result = described_class.call( - access_token:, - allowed_list:, - discovered_schema: + access_token: access_token, + allowed_list: allowed_list ) expect(result).to eq(api_result) - expect(Services::Airbyte::ApiServer).to have_received(:post).with( - path: '/api/v1/connections/update', + expect(Services::Airbyte::ApiServer).to have_received(:patch).with( + path: "/api/public/v1/connections/#{connection_id}", access_token: access_token, payload: kind_of(Hash) ) end - context 'when stream is missing from discovered schema' do - let(:discovered_schema) { { 'catalog' => { 'streams' => [] } } } - - before { allow(Rails.logger).to receive(:error) } - - it 'logs and raises ConnectionUpdate::Error' do - expect(Rails.logger).to receive(:error).with(/Stream definition not found/) - - expect do - described_class.call( - access_token:, - allowed_list:, - discovered_schema: - ) - end.to raise_error(described_class::Error) - end - end - - context 'when ApiServer.post raises an error' do - before do - allow(Services::Airbyte::ApiServer).to receive(:post) - .and_raise(Services::Airbyte::ApiServer::Error.new('Boom')) - end - - it 'does not wrap or swallow ApiServer errors' do - expect do - described_class.call( - access_token:, - allowed_list:, - discovered_schema: - ) - end.to raise_error(Services::Airbyte::ApiServer::Error, /Boom/) - end - end - - it 'builds and sends the correct connection update payload' do + it 'builds the correct connection patch payload' do described_class.call( - access_token:, - allowed_list:, - discovered_schema: + access_token: access_token, + allowed_list: allowed_list ) - expect(Services::Airbyte::ApiServer).to have_received(:post) do |args| + expect(Services::Airbyte::ApiServer).to have_received(:patch) do |args| payload = args[:payload] - expect(payload[:connectionId]).to eq(connection_id) - expect(payload[:syncCatalog]).to be_a(Hash) - expect(payload[:syncCatalog][:streams].size).to eq(1) + expect(payload).to have_key(:configurations) + expect(payload[:configurations]).to have_key(:streams) - stream_payload = payload[:syncCatalog][:streams].first + streams = payload[:configurations][:streams] + expect(streams.size).to eq(1) - expect(stream_payload[:stream][:name]).to eq('academic_cycles') - expect(stream_payload[:stream][:namespace]).to eq('public') + stream = streams.first - config = stream_payload[:config] - expect(config[:syncMode]).to eq('incremental') - expect(config[:destinationSyncMode]).to eq('append') - expect(config[:cursorField]).to eq(['_ab_cdc_lsn']) - expect(config[:primaryKey]).to eq([['id']]) + expect(stream[:name]).to eq('academic_cycles') + expect(stream[:selected]).to eq(true) + expect(stream[:syncMode]).to eq('incremental') + expect(stream[:destinationSyncMode]).to eq('append') + expect(stream[:cursorField]).to eq(['_ab_cdc_lsn']) + expect(stream[:primaryKey]).to eq([['id']]) - # Selected fields include standard ones + allowed list expected_fields = %w[ _ab_cdc_lsn _ab_cdc_deleted_at @@ -136,7 +79,34 @@ updated_at ].map { |f| { fieldPath: [f] } } - expect(config[:selectedFields]).to match_array(expected_fields) + expect(stream[:selectedFields]).to match_array(expected_fields) + end + end + + context 'when ApiServer.patch raises an error' do + before do + allow(Services::Airbyte::ApiServer).to receive(:patch) + .and_raise(Services::Airbyte::ApiServer::Error, 'Boom') + end + + it 'propagates the error' do + expect do + described_class.call( + access_token: access_token, + allowed_list: allowed_list + ) + end.to raise_error(Services::Airbyte::ApiServer::Error, /Boom/) + end + end + + context 'when allowed_list is not a hash' do + it 'raises ConnectionUpdate::Error' do + expect do + described_class.call( + access_token: access_token, + allowed_list: 'invalid' + ) + end.to raise_error(Services::Airbyte::ConnectionUpdate::Error) end end end