Skip to content

Commit 30720d1

Browse files
committed
Refactor DfE::Analytics::AirbyteStreamConfig and dry code
1 parent f906a16 commit 30720d1

File tree

7 files changed

+170
-113
lines changed

7 files changed

+170
-113
lines changed

lib/dfe/analytics/airbyte_stream_config.rb

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,38 +13,34 @@ class AirbyteStreamConfig
1313
AIRBYTE_HEARTBEAT_ATTRIBUTES = %w[id last_heartbeat].freeze
1414
AIRBYTE_HEARTBEAT_ENTITY_ATTRIBUTES = { AIRBYTE_HEARTBEAT_ENTITY.to_sym => AIRBYTE_HEARTBEAT_ATTRIBUTES }.freeze
1515

16-
def self.config
17-
JSON.parse(File.read(DfE::Analytics.config.airbyte_stream_config_path)).deep_symbolize_keys
18-
rescue RuntimeError
19-
{}
16+
def self.generate_pretty_json_for(table_attributes)
17+
JSON.pretty_generate(generate_for(table_attributes))
2018
end
2119

22-
def self.generate_for(entity_attributes)
23-
JSON.pretty_generate(
24-
{ configurations: { streams: streams_for(entity_attributes) } }
25-
)
20+
def self.generate_for(table_attributes)
21+
{ configurations: { streams: streams_for(table_attributes) } }
2622
end
2723

2824
def self.entity_attributes
29-
return {} if config.empty?
25+
return {} if DfE::Analytics.airbyte_stream_config.empty?
3026

3127
# Transform the data
32-
config[:configurations][:streams].each_with_object({}) do |stream, memo|
28+
DfE::Analytics.airbyte_stream_config[:configurations][:streams].each_with_object({}) do |stream, memo|
3329
stream_name = stream[:name]
3430
fields = stream[:selectedFields].map { |field| field[:fieldPath].first }
3531
memo[stream_name] = fields - CURSOR_FIELD - AIRBYTE_FIELDS
3632
end.deep_symbolize_keys
3733
end
3834

39-
private_class_method def self.streams_for(entity_attributes)
40-
entity_attributes.each_with_object([]) do |(entity, attributes), streams|
35+
private_class_method def self.streams_for(table_attributes)
36+
table_attributes.each_with_object([]) do |(entity, attributes), streams|
4137
streams << table_for(entity, attributes)
4238
end << heartbeat_table
4339
end
4440

4541
private_class_method def self.table_for(entity, attributes)
4642
{
47-
name: entity,
43+
name: entity.to_s,
4844
syncMode: INCREMENTAL_APPEND_SYNC_MODE,
4945
cursorField: CURSOR_FIELD,
5046
primaryKey: [[primary_key_for(attributes)]],

lib/dfe/analytics/tasks/fields.rake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace :dfe do
1717
task regenerate_airbyte_stream_config: :environment do
1818
File.write(
1919
DfE::Analytics.config.airbyte_stream_config_path,
20-
DfE::Analytics::AirbyteStreamConfig.generate_for(DfE::Analytics.allowlist)
20+
DfE::Analytics::AirbyteStreamConfig.generate_pretty_json_for(DfE::Analytics.allowlist)
2121
)
2222
end
2323
end

lib/generators/dfe/analytics/install_generator.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def install
1515
create_file 'config/analytics_blocklist.yml', { 'shared' => {} }.to_yaml
1616
create_file(
1717
DfE::Analytics.config.airbyte_stream_config_path,
18-
DfE::Analytics::AirbyteStreamConfig.generate_for(table1: %w[id field1 field2])
18+
DfE::Analytics::AirbyteStreamConfig.generate_ptreey_json_for(table1: %w[id field1 field2])
1919
)
2020
end
2121

lib/services/airbyte/connection_update.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ module Services
44
module Airbyte
55
# Fetches the the current schema for given source - Also reloads cache
66
class ConnectionUpdate
7-
class Error < StandardError; end
8-
97
def self.call(access_token:)
108
new(access_token).call
119
end
@@ -19,9 +17,15 @@ def call
1917
Services::Airbyte::ApiServer.patch(
2018
path: "/api/public/v1/connections/#{@connection_id}",
2119
access_token: @access_token,
22-
payload: DfE::Analytics.airbyte_stream_config
20+
payload: airbyte_stream_config
2321
)
2422
end
23+
24+
private
25+
26+
def airbyte_stream_config
27+
DfE::Analytics::AirbyteStreamConfig.generate_for(DfE::Analytics.allowlist)
28+
end
2529
end
2630
end
2731
end

spec/dfe/analytics/airbyte_stream_config_spec.rb

Lines changed: 39 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,73 @@
11
# frozen_string_literal: true
22

33
RSpec.describe DfE::Analytics::AirbyteStreamConfig do
4-
let(:mock_path) { '/fake/path/airbyte_config.json' }
5-
6-
before do
7-
allow(DfE::Analytics.config).to receive(:airbyte_stream_config_path).and_return(mock_path)
8-
end
9-
10-
describe '.config' do
11-
context 'when file contains valid JSON' do
12-
let(:json_data) do
13-
{
14-
configurations: {
15-
streams: [
16-
{
17-
name: 'users',
18-
syncMode: 'incremental_append',
19-
cursorField: ['_ab_cdc_lsn'],
20-
primaryKey: [['id']],
21-
selectedFields: [
22-
{ fieldPath: ['_ab_cdc_lsn'] },
23-
{ fieldPath: ['_ab_cdc_updated_at'] },
24-
{ fieldPath: ['_ab_cdc_deleted_at'] },
25-
{ fieldPath: ['email'] },
26-
{ fieldPath: ['name'] }
27-
]
28-
}
29-
]
30-
}
31-
}.to_json
32-
end
33-
34-
before do
35-
allow(File).to receive(:read).with(mock_path).and_return(json_data)
36-
end
37-
38-
it 'returns the deep symbolized hash' do
39-
expect(described_class.config).to include(
40-
configurations: {
41-
streams: include(
42-
a_hash_including(
43-
name: 'users',
44-
selectedFields: include({ fieldPath: ['email'] })
45-
)
46-
)
47-
}
48-
)
49-
end
50-
end
51-
52-
context 'when File.read raises a RuntimeError' do
53-
before do
54-
allow(File).to receive(:read).with(mock_path).and_raise(RuntimeError)
55-
end
56-
57-
it 'returns an empty hash' do
58-
expect(described_class.config).to eq({})
59-
end
60-
end
61-
end
62-
634
describe '.generate_for' do
64-
subject(:parsed_config) { JSON.parse(described_class.generate_for(entity_attributes)) }
5+
subject(:airbyte_stream_config) { described_class.generate_for(entity_attributes) }
656

667
context 'when attributes include "id"' do
67-
let(:entity_attributes) { { 'users' => %w[id name email] } }
8+
let(:entity_attributes) { { users: %w[id name email] } }
689

6910
it 'uses "id" as the primary key and includes all fields' do
70-
streams = parsed_config['configurations']['streams']
71-
users_stream = streams.find { |stream| stream['name'] == 'users' }
72-
73-
expect(users_stream['name']).to eq('users')
74-
expect(users_stream['syncMode']).to eq('incremental_append')
75-
expect(users_stream['cursorField']).to eq(['_ab_cdc_lsn'])
76-
expect(users_stream['primaryKey']).to eq([['id']])
77-
expect(users_stream['selectedFields'])
11+
streams = airbyte_stream_config[:configurations][:streams]
12+
users_stream = streams.find { |stream| stream[:name] == 'users' }
13+
14+
expect(users_stream[:name]).to eq('users')
15+
expect(users_stream[:syncMode]).to eq('incremental_append')
16+
expect(users_stream[:cursorField]).to eq(['_ab_cdc_lsn'])
17+
expect(users_stream[:primaryKey]).to eq([['id']])
18+
expect(users_stream[:selectedFields])
7819
.to match_array([
79-
{ 'fieldPath' => ['_ab_cdc_lsn'] },
80-
{ 'fieldPath' => ['_ab_cdc_updated_at'] },
81-
{ 'fieldPath' => ['_ab_cdc_deleted_at'] },
82-
{ 'fieldPath' => ['id'] },
83-
{ 'fieldPath' => ['name'] },
84-
{ 'fieldPath' => ['email'] }
20+
{ fieldPath: ['_ab_cdc_lsn'] },
21+
{ fieldPath: ['_ab_cdc_updated_at'] },
22+
{ fieldPath: ['_ab_cdc_deleted_at'] },
23+
{ fieldPath: ['id'] },
24+
{ fieldPath: ['name'] },
25+
{ fieldPath: ['email'] }
8526
])
8627
end
8728

8829
it 'adds the airbyte heartbeat stream' do
89-
streams = parsed_config['configurations']['streams']
90-
heartbeat_stream = streams.find { |stream| stream['name'] == 'airbyte_heartbeat' }
30+
streams = airbyte_stream_config[:configurations][:streams]
31+
heartbeat_stream = streams.find { |stream| stream[:name] == 'airbyte_heartbeat' }
9132

9233
expect(heartbeat_stream).to include(
93-
'name' => 'airbyte_heartbeat',
94-
'syncMode' => 'full_refresh_overwrite',
95-
'primaryKey' => [['id']]
34+
name: 'airbyte_heartbeat',
35+
syncMode: 'full_refresh_overwrite',
36+
primaryKey: [['id']]
9637
)
9738

98-
expect(heartbeat_stream['selectedFields'])
39+
expect(heartbeat_stream[:selectedFields])
9940
.to match_array([
100-
{ 'fieldPath' => ['id'] },
101-
{ 'fieldPath' => ['last_heartbeat'] }
41+
{ fieldPath: ['id'] },
42+
{ fieldPath: ['last_heartbeat'] }
10243
])
10344
end
10445
end
10546

10647
context 'when attributes do not include "id"' do
107-
let(:entity_attributes) { { 'users' => %w[email name] } }
48+
let(:entity_attributes) { { users: %w[email name] } }
10849

10950
it 'uses the first attribute as the primary key' do
110-
streams = parsed_config['configurations']['streams']
111-
users_stream = streams.find { |stream| stream['name'] == 'users' }
51+
streams = airbyte_stream_config[:configurations][:streams]
52+
users_stream = streams.find { |stream| stream[:name] == 'users' }
11253

113-
expect(users_stream['primaryKey']).to eq([['email']])
54+
expect(users_stream[:primaryKey]).to eq([['email']])
11455
end
11556
end
11657
end
11758

59+
describe '.generate_pretty_json_for' do
60+
let(:entity_attributes) { { users: %w[id name] } }
61+
62+
it 'returns a pretty JSON version of generate_for' do
63+
expect(described_class.generate_pretty_json_for(entity_attributes))
64+
.to eq(JSON.pretty_generate(described_class.generate_for(entity_attributes)))
65+
end
66+
end
67+
11868
describe '.entity_attributes' do
11969
context 'when config is empty' do
120-
before { allow(described_class).to receive(:config).and_return({}) }
70+
before { allow(DfE::Analytics).to receive(:airbyte_stream_config).and_return({}) }
12171

12272
it 'returns an empty hash' do
12373
expect(described_class.entity_attributes).to eq({})
@@ -176,7 +126,7 @@
176126
end
177127

178128
before do
179-
allow(described_class).to receive(:config).and_return(config_hash.deep_symbolize_keys)
129+
allow(DfE::Analytics).to receive(:airbyte_stream_config).and_return(config_hash.deep_symbolize_keys)
180130
end
181131

182132
it 'removes the cursor and airbyte fields and returns attributes for all streams' do

spec/dfe/analytics_spec.rb

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,4 +395,102 @@
395395
expect(DfE::Analytics::EntityTableCheckJob).to eq(DfE::Analytics::Jobs::EntityTableCheckJob)
396396
end
397397
end
398+
399+
describe '.airbyte_enabled?' do
400+
context 'when the airbyte enabled config has been set to true' do
401+
before do
402+
described_class.configure { |config| config.airbyte_enabled = true }
403+
end
404+
405+
it 'returns true' do
406+
expect(described_class.airbyte_enabled?).to be true
407+
end
408+
end
409+
410+
context 'when the airbyte enabled config has been set to false' do
411+
before do
412+
described_class.configure { |config| config.airbyte_enabled = false }
413+
end
414+
415+
it 'returns false' do
416+
expect(described_class.airbyte_enabled?).to be false
417+
end
418+
end
419+
420+
context 'when the airbyte_enabled config has not been set' do
421+
before do
422+
described_class.configure { |config| config.airbyte_enabled = nil }
423+
end
424+
425+
it 'defaults to false' do
426+
expect(described_class.airbyte_enabled?).to be false
427+
end
428+
end
429+
end
430+
431+
describe '.airbyte_stream_config' do
432+
let(:mock_path) { '/fake/path/airbyte_stream_config.json' }
433+
434+
let(:config_double) do
435+
instance_double(
436+
'DfE::Analytics.config',
437+
airbyte_stream_config_path: mock_path
438+
)
439+
end
440+
441+
before do
442+
allow(DfE::Analytics).to receive(:config).and_return(config_double)
443+
end
444+
445+
context 'when the JSON file is valid' do
446+
let(:json_data) do
447+
{
448+
configurations: {
449+
streams: [
450+
{
451+
name: 'teachers',
452+
syncMode: 'incremental_append',
453+
selectedFields: [{ fieldPath: ['id'] }]
454+
}
455+
]
456+
}
457+
}.to_json
458+
end
459+
460+
before do
461+
allow(File).to receive(:read).with(mock_path).and_return(json_data)
462+
end
463+
464+
it 'returns the parsed JSON with symbolized keys' do
465+
result = described_class.airbyte_stream_config
466+
467+
expect(result).to eq(
468+
configurations: {
469+
streams: [
470+
{
471+
name: 'teachers',
472+
syncMode: 'incremental_append',
473+
selectedFields: [{ fieldPath: ['id'] }]
474+
}
475+
]
476+
}
477+
)
478+
479+
# verify keys are deep symbolized
480+
expect(result.keys).to all(be_a(Symbol))
481+
expect(result[:configurations][:streams].first[:name]).to eq('teachers')
482+
expect(result[:configurations][:streams].first[:selectedFields].first[:fieldPath]).to eq(['id'])
483+
end
484+
end
485+
486+
context 'when File.read raises a RuntimeError' do
487+
before do
488+
allow(File).to receive(:read).with(mock_path).and_raise(RuntimeError)
489+
end
490+
491+
it 'returns an empty hash' do
492+
expect(described_class.airbyte_stream_config).to eq({})
493+
end
494+
end
495+
end
398496
end

0 commit comments

Comments
 (0)