@@ -23,7 +23,8 @@ def initialize
2323 puts "DEBUG: @lslocalpath = #{ @lslocalpath } "
2424 @table_with_mapping = "RubyE2E#{ Time . now . getutc . to_i } "
2525 @table_without_mapping = "RubyE2ENoMapping#{ Time . now . getutc . to_i } "
26- @table_dynamic = "RubyE2EDynamic#{ Time . now . getutc . to_i } "
26+ @table_dynamic_odd = "RubyE2EDynamicOdd#{ Time . now . getutc . to_i } "
27+ @table_dynamic_even = "RubyE2EDynamicEven#{ Time . now . getutc . to_i } "
2728 @mapping_name = "test_mapping"
2829 @csv_file = "dataset.csv"
2930
@@ -33,13 +34,18 @@ def initialize
3334 }
3435 filter {
3536 csv { columns => [#{ @csv_columns } ]}
36- # Add metadata for dynamic routing test
37- mutate {
38- add_field => {
39- "[@metadata][database]" => "#{ @database } "
40- "[@metadata][table]" => "#{ @table_dynamic } "
41- "[@metadata][mapping]" => "#{ @mapping_name } "
42- }
37+ # Add metadata for dynamic routing test - route odd/even rows to different tables
38+ ruby {
39+ code => "
40+ rownumber = event.get('rownumber').to_i
41+ if rownumber % 2 == 0
42+ event.set('[@metadata][table]', '#{ @table_dynamic_even } ')
43+ else
44+ event.set('[@metadata][table]', '#{ @table_dynamic_odd } ')
45+ end
46+ event.set('[@metadata][database]', '#{ @database } ')
47+ event.set('[@metadata][mapping]', '#{ @mapping_name } ')
48+ "
4349 }
4450 }
4551 output {
@@ -62,7 +68,7 @@ def initialize
6268 database => "#{ @database } "
6369 table => "#{ @table_without_mapping } "
6470 }
65- # Test 3: Dynamic routing with metadata fields
71+ # Test 3: Dynamic routing with metadata fields (odd/even routing)
6672 kusto {
6773 path => "dynamictmp%{+YYYY-MM-dd-HH-mm}"
6874 cli_auth => true
@@ -77,7 +83,7 @@ def initialize
7783
7884 def create_table_and_mapping
7985 puts "\n [#{ Time . now } ] === PHASE 1: Creating tables and mappings ==="
80- Array [ @table_with_mapping , @table_without_mapping , @table_dynamic ] . each { |tableop |
86+ Array [ @table_with_mapping , @table_without_mapping , @table_dynamic_odd , @table_dynamic_even ] . each { |tableop |
8187 puts "[#{ Time . now } ] Creating table #{ tableop } "
8288 puts "[#{ Time . now } ] - Dropping table if exists..."
8389 @query_client . executeMgmt ( @database , ".drop table #{ tableop } ifexists" )
@@ -90,7 +96,7 @@ def create_table_and_mapping
9096 }
9197 # Mapping for tables that need it
9298 puts "[#{ Time . now } ] Creating JSON mappings..."
93- Array [ @table_with_mapping , @table_dynamic ] . each { |tableop |
99+ Array [ @table_with_mapping , @table_dynamic_odd , @table_dynamic_even ] . each { |tableop |
94100 puts "[#{ Time . now } ] - Creating mapping '#{ @mapping_name } ' for table #{ tableop } ..."
95101 @query_client . executeMgmt ( @database , ".create table #{ tableop } ingestion json mapping '#{ @mapping_name } ' '#{ File . read ( "dataset_mapping.json" ) } '" )
96102 puts "[#{ Time . now } ] ✓ Mapping created for #{ tableop } "
@@ -101,7 +107,7 @@ def create_table_and_mapping
101107
102108 def drop_and_cleanup
103109 puts "\n [#{ Time . now } ] === PHASE 4: Cleanup ==="
104- Array [ @table_with_mapping , @table_without_mapping , @table_dynamic ] . each { |tableop |
110+ Array [ @table_with_mapping , @table_without_mapping , @table_dynamic_odd , @table_dynamic_even ] . each { |tableop |
105111 puts "[#{ Time . now } ] Dropping table #{ tableop } ..."
106112 @query_client . executeMgmt ( @database , ".drop table #{ tableop } ifexists" )
107113 puts "[#{ Time . now } ] ✓ Table #{ tableop } dropped"
@@ -142,7 +148,8 @@ def assert_data
142148 csv_data = CSV . read ( @csv_file )
143149 puts "[#{ Time . now } ] Expected data: #{ csv_data . length } rows from CSV\n "
144150
145- Array [ @table_with_mapping , @table_without_mapping , @table_dynamic ] . each_with_index { |tableop , table_idx |
151+ # For static routing tests, validate full dataset
152+ Array [ @table_with_mapping , @table_without_mapping ] . each_with_index { |tableop , table_idx |
146153
147154 ( 0 ...max_timeout ) . each do |attempt |
148155 begin
@@ -196,6 +203,82 @@ def assert_data
196203 break
197204 end
198205 }
206+
207+ # For dynamic routing tests, validate odd/even split
208+ puts "\n [#{ Time . now } ] Validating dynamic routing (odd/even tables)..."
209+
210+ # Separate CSV data into odd and even rows based on rownumber
211+ odd_rows = csv_data . select { |row | row [ 0 ] . to_i % 2 == 1 }
212+ even_rows = csv_data . select { |row | row [ 0 ] . to_i % 2 == 0 }
213+
214+ puts "[#{ Time . now } ] Expected: #{ odd_rows . length } odd rows, #{ even_rows . length } even rows"
215+
216+ # Validate odd table
217+ puts "\n [#{ Time . now } ] Validating table: #{ @table_dynamic_odd } "
218+ validate_dynamic_table ( @table_dynamic_odd , odd_rows , "odd" )
219+
220+ # Validate even table
221+ puts "\n [#{ Time . now } ] Validating table: #{ @table_dynamic_even } "
222+ validate_dynamic_table ( @table_dynamic_even , even_rows , "even" )
223+
224+ puts "[#{ Time . now } ] ✓ Dynamic routing validation completed successfully\n "
225+ end
226+
227+ def validate_dynamic_table ( table_name , expected_data , row_type )
228+ max_timeout = 10
229+
230+ ( 0 ...max_timeout ) . each do |attempt |
231+ begin
232+ puts "[#{ Time . now } ] Attempt #{ attempt + 1 } /#{ max_timeout } : Querying #{ row_type } table..."
233+ sleep ( 5 )
234+
235+ query = @query_client . executeQuery ( @database , "#{ table_name } | sort by rownumber asc" )
236+ result = query . getPrimaryResults ( )
237+ actual_count = result . count ( )
238+
239+ puts "[#{ Time . now } ] Query result: #{ actual_count } rows found (expected #{ expected_data . length } )"
240+
241+ if actual_count != expected_data . length
242+ raise "Wrong count - expected #{ expected_data . length } , got #{ actual_count } in table #{ table_name } "
243+ end
244+
245+ rescue Exception => e
246+ puts "[#{ Time . now } ] ✗ Error on attempt #{ attempt + 1 } : #{ e } "
247+ if attempt == max_timeout - 1
248+ raise "Failed after #{ max_timeout } attempts: #{ e } "
249+ end
250+ next
251+ end
252+
253+ # Validate each row
254+ ( 0 ...expected_data . length ) . each do |i |
255+ result . next ( )
256+
257+ ( 0 ...@column_count ) . each do |j |
258+ csv_item = expected_data [ i ] [ j ]
259+ result_item = result . getObject ( j ) == nil ? "null" : result . getString ( j )
260+ #special cases for data that is different in csv vs kusto
261+ if j == 4 #kusto boolean field
262+ csv_item = csv_item . to_s == "1" ? "true" : "false"
263+ elsif j == 12 # date formatting
264+ csv_item = csv_item . sub ( ".0000000" , "" )
265+ elsif j == 15 # numbers as text
266+ result_item = expected_data [ i ] [ 0 ] . to_s
267+ elsif j == 17 #null
268+ next
269+ end
270+
271+ if csv_item != result_item
272+ puts "[#{ Time . now } ] ✗ Mismatch at #{ row_type } row #{ i } , column #{ j } :"
273+ puts "[#{ Time . now } ] Expected (CSV): #{ csv_item } "
274+ puts "[#{ Time . now } ] Actual (Kusto): #{ result_item } "
275+ raise "Result doesn't match CSV in table #{ table_name } at row #{ i } , column #{ j } "
276+ end
277+ end
278+ end
279+ puts "[#{ Time . now } ] ✓ All #{ expected_data . length } #{ row_type } rows validated successfully"
280+ break
281+ end
199282 end
200283
201284 def start
@@ -208,7 +291,8 @@ def start
208291 puts "[#{ Time . now } ] - Database: #{ @database } "
209292 puts "[#{ Time . now } ] - Table (with mapping): #{ @table_with_mapping } "
210293 puts "[#{ Time . now } ] - Table (without mapping): #{ @table_without_mapping } "
211- puts "[#{ Time . now } ] - Table (dynamic routing): #{ @table_dynamic } "
294+ puts "[#{ Time . now } ] - Table (dynamic odd routing): #{ @table_dynamic_odd } "
295+ puts "[#{ Time . now } ] - Table (dynamic even routing): #{ @table_dynamic_even } "
212296 puts "[#{ Time . now } ] - Mapping name: #{ @mapping_name } "
213297 puts "[#{ Time . now } ] - Logstash path: #{ @lslocalpath } "
214298 puts "=" *80 + "\n "
0 commit comments