Skip to content

Commit 0609cc5

Browse files
*Add dynamic targets for data
1 parent acc9005 commit 0609cc5

File tree

7 files changed

+323
-159
lines changed

7 files changed

+323
-159
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,4 @@ local-run.sh
6565
logs2.txt
6666
**/.vscode/*.*
6767
**/settings.json
68+
run.sh

e2e/e2e.rb

Lines changed: 128 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ def initialize
1616
@ingest_url = ENV["INGEST_URL"]
1717
@database = ENV['TEST_DATABASE']
1818
@lslocalpath = ENV['LS_LOCAL_PATH']
19+
puts "DEBUG: ENV['LS_LOCAL_PATH'] = #{ENV['LS_LOCAL_PATH'].inspect}"
1920
if @lslocalpath.nil?
2021
@lslocalpath = "/usr/share/logstash/bin/logstash"
2122
end
23+
puts "DEBUG: @lslocalpath = #{@lslocalpath}"
2224
@table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}"
23-
@table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}"
25+
@table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}"
26+
@table_dynamic = "RubyE2EDynamic#{Time.now.getutc.to_i}"
2427
@mapping_name = "test_mapping"
2528
@csv_file = "dataset.csv"
2629

@@ -30,84 +33,144 @@ def initialize
3033
}
3134
filter {
3235
csv { columns => [#{@csv_columns}]}
36+
# Add metadata for dynamic routing test
37+
mutate {
38+
add_field => {
39+
"[@metadata][database]" => "#{@database}"
40+
"[@metadata][table]" => "#{@table_dynamic}"
41+
"[@metadata][mapping]" => "#{@mapping_name}"
42+
}
43+
}
3344
}
3445
output {
3546
file { path => "#{@output_file}"}
3647
stdout { codec => rubydebug }
48+
# Test 1: Static routing with mapping
3749
kusto {
38-
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
50+
path => "tmp%{+YYYY-MM-dd-HH-mm}"
3951
ingest_url => "#{@ingest_url}"
4052
cli_auth => true
4153
database => "#{@database}"
4254
table => "#{@table_with_mapping}"
4355
json_mapping => "#{@mapping_name}"
4456
}
57+
# Test 2: Static routing without mapping
4558
kusto {
46-
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt"
59+
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}"
4760
cli_auth => true
4861
ingest_url => "#{@ingest_url}"
4962
database => "#{@database}"
5063
table => "#{@table_without_mapping}"
5164
}
65+
# Test 3: Dynamic routing with metadata fields
66+
kusto {
67+
path => "dynamictmp%{+YYYY-MM-dd-HH-mm}"
68+
cli_auth => true
69+
ingest_url => "#{@ingest_url}"
70+
database => "placeholder"
71+
table => "placeholder"
72+
dynamic_event_routing => true
73+
}
5274
}
5375
}
5476
end
5577

5678
def create_table_and_mapping
57-
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
58-
puts "Creating table #{tableop}"
79+
puts "\n[#{Time.now}] === PHASE 1: Creating tables and mappings ==="
80+
Array[@table_with_mapping, @table_without_mapping, @table_dynamic].each { |tableop|
81+
puts "[#{Time.now}] Creating table #{tableop}"
82+
puts "[#{Time.now}] - Dropping table if exists..."
5983
@query_client.executeMgmt(@database, ".drop table #{tableop} ifexists")
6084
sleep(1)
85+
puts "[#{Time.now}] - Creating table with schema..."
6186
@query_client.executeMgmt(@database, ".create table #{tableop} #{@columns}")
87+
puts "[#{Time.now}] - Setting ingestion batching policy..."
6288
@query_client.executeMgmt(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'")
89+
puts "[#{Time.now}] ✓ Table #{tableop} created successfully"
6390
}
64-
# Mapping only for one table
65-
@query_client.executeMgmt(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
91+
# Mapping for tables that need it
92+
puts "[#{Time.now}] Creating JSON mappings..."
93+
Array[@table_with_mapping, @table_dynamic].each { |tableop|
94+
puts "[#{Time.now}] - Creating mapping '#{@mapping_name}' for table #{tableop}..."
95+
@query_client.executeMgmt(@database, ".create table #{tableop} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
96+
puts "[#{Time.now}] ✓ Mapping created for #{tableop}"
97+
}
98+
puts "[#{Time.now}] ✓ All tables and mappings created successfully\n"
6699
end
67100

68101

69102
def drop_and_cleanup
70-
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
71-
puts "Dropping table #{tableop}"
103+
puts "\n[#{Time.now}] === PHASE 4: Cleanup ==="
104+
Array[@table_with_mapping, @table_without_mapping, @table_dynamic].each { |tableop|
105+
puts "[#{Time.now}] Dropping table #{tableop}..."
72106
@query_client.executeMgmt(@database, ".drop table #{tableop} ifexists")
107+
puts "[#{Time.now}] ✓ Table #{tableop} dropped"
73108
sleep(1)
74109
}
110+
puts "[#{Time.now}] ✓ Cleanup completed successfully\n"
75111
end
76112

77113
def run_logstash
114+
puts "\n[#{Time.now}] === PHASE 2: Running Logstash ==="
115+
puts "[#{Time.now}] Writing logstash configuration..."
78116
File.write("logstash.conf", @logstash_config)
79117
logstashpath = File.absolute_path("logstash.conf")
118+
puts "[#{Time.now}] - Config file: #{logstashpath}"
119+
120+
puts "[#{Time.now}] Preparing input/output files..."
80121
File.write(@output_file, "")
81122
File.write(@input_file, "")
123+
puts "[#{Time.now}] - Input file: #{@input_file}"
124+
puts "[#{Time.now}] - Output file: #{@output_file}"
125+
82126
lscommand = "#{@lslocalpath} -f #{logstashpath}"
83-
puts "Running logstash from config path #{logstashpath} and final command #{lscommand}"
84-
spawn(lscommand)
127+
pid = spawn(lscommand)
128+
puts "[#{Time.now}] - Process ID: #{pid}"
85129
sleep(60)
86130
data = File.read(@csv_file)
131+
csv_lines = data.lines.count
87132
f = File.open(@input_file, "a")
88133
f.write(data)
89134
f.close
90135
sleep(60)
91-
puts File.read(@output_file)
136+
puts "[#{Time.now}] ✓ Logstash processing phase completed\n"
92137
end
93138

94139
def assert_data
140+
puts "\n[#{Time.now}] === PHASE 3: Data Validation ==="
95141
max_timeout = 10
96142
csv_data = CSV.read(@csv_file)
97-
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
98-
puts "Validating results for table #{tableop}"
99-
(0...max_timeout).each do |_|
143+
puts "[#{Time.now}] Expected data: #{csv_data.length} rows from CSV\n"
144+
145+
Array[@table_with_mapping, @table_without_mapping, @table_dynamic].each_with_index { |tableop, table_idx|
146+
147+
(0...max_timeout).each do |attempt|
100148
begin
149+
puts "[#{Time.now}] Attempt #{attempt + 1}/#{max_timeout}: Querying table..."
101150
sleep(5)
151+
102152
query = @query_client.executeQuery(@database, "#{tableop} | sort by rownumber asc")
103153
result = query.getPrimaryResults()
104-
raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length
154+
actual_count = result.count()
155+
156+
puts "[#{Time.now}] Query result: #{actual_count} rows found"
157+
158+
if actual_count != csv_data.length
159+
raise "Wrong count - expected #{csv_data.length}, got #{actual_count} in table #{tableop}"
160+
end
161+
105162
rescue Exception => e
106-
puts "Error: #{e}"
163+
puts "[#{Time.now}] ✗ Error on attempt #{attempt + 1}: #{e}"
164+
if attempt == max_timeout - 1
165+
raise "Failed after #{max_timeout} attempts: #{e}"
166+
end
167+
next
107168
end
169+
170+
# Validate each row
108171
(0...csv_data.length).each do |i|
109172
result.next()
110-
puts "Item #{i}"
173+
111174
(0...@column_count).each do |j|
112175
csv_item = csv_data[i][j]
113176
result_item = result.getObject(j) == nil ? "null" : result.getString(j)
@@ -121,24 +184,60 @@ def assert_data
121184
elsif j == 17 #null
122185
next
123186
end
124-
puts " csv[#{j}] = #{csv_item}"
125-
puts " result[#{j}] = #{result_item}"
126-
raise "Result Doesn't match csv in table #{tableop}" unless csv_item == result_item
187+
188+
if csv_item != result_item
189+
puts "[#{Time.now}] ✗ Mismatch at row #{i}, column #{j}:"
190+
puts "[#{Time.now}] Expected (CSV): #{csv_item}"
191+
puts "[#{Time.now}] Actual (Kusto): #{result_item}"
192+
raise "Result doesn't match CSV in table #{tableop} at row #{i}, column #{j}"
193+
end
127194
end
128-
puts ""
129195
end
130-
return
196+
break
131197
end
132-
raise "Failed after timeouts"
133198
}
134199
end
135200

136201
def start
137-
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAzureCli(@engine_url))
138-
create_table_and_mapping
139-
run_logstash
140-
assert_data
141-
drop_and_cleanup
202+
puts "\n" + "="*80
203+
puts "[#{Time.now}] E2E TEST STARTED"
204+
puts "="*80
205+
puts "[#{Time.now}] Configuration:"
206+
puts "[#{Time.now}] - Engine URL: #{@engine_url}"
207+
puts "[#{Time.now}] - Ingest URL: #{@ingest_url}"
208+
puts "[#{Time.now}] - Database: #{@database}"
209+
puts "[#{Time.now}] - Table (with mapping): #{@table_with_mapping}"
210+
puts "[#{Time.now}] - Table (without mapping): #{@table_without_mapping}"
211+
puts "[#{Time.now}] - Table (dynamic routing): #{@table_dynamic}"
212+
puts "[#{Time.now}] - Mapping name: #{@mapping_name}"
213+
puts "[#{Time.now}] - Logstash path: #{@lslocalpath}"
214+
puts "="*80 + "\n"
215+
216+
begin
217+
puts "[#{Time.now}] Initializing Kusto client..."
218+
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAzureCli(@engine_url))
219+
puts "[#{Time.now}] ✓ Kusto client initialized\n"
220+
221+
create_table_and_mapping
222+
run_logstash
223+
assert_data
224+
225+
puts "\n" + "="*80
226+
puts "[#{Time.now}] ✓✓✓ E2E TEST COMPLETED SUCCESSFULLY ✓✓✓"
227+
puts "="*80 + "\n"
228+
rescue Exception => e
229+
puts "\n" + "="*80
230+
puts "[#{Time.now}] ✗✗✗ E2E TEST FAILED ✗✗✗"
231+
puts "[#{Time.now}] Error: #{e.class}"
232+
puts "[#{Time.now}] Message: #{e.message}"
233+
puts "[#{Time.now}] Backtrace:"
234+
e.backtrace.each { |line| puts "[#{Time.now}] #{line}" }
235+
puts "="*80 + "\n"
236+
raise
237+
ensure
238+
# Always cleanup tables, whether test passes or fails
239+
drop_and_cleanup if @query_client
240+
end
142241
end
143242
end
144243

0 commit comments

Comments
 (0)