@@ -20,38 +20,39 @@ class Ingestor
2020 LOW_QUEUE_LENGTH = 3
2121 FIELD_REF = /%\{ [^}]+\} /
2222
23- def initialize ( ingest_url , app_id , app_key , app_tenant , managed_identity_id , cli_auth , database , table , json_mapping , proxy_host , proxy_port , proxy_protocol , logger , threadpool = DEFAULT_THREADPOOL )
23+ def initialize ( kusto_logstash_configuration , logger , threadpool = DEFAULT_THREADPOOL )
2424 @workers_pool = threadpool
2525 @logger = logger
26- validate_config ( database , table , json_mapping , proxy_protocol , app_id , app_key , managed_identity_id , cli_auth )
26+ #Validate and assign
27+ kusto_logstash_configuration . validate_config ( )
28+ @kusto_logstash_configuration = kusto_logstash_configuration
29+
2730 @logger . info ( 'Preparing Kusto resources.' )
2831
2932 kusto_java = Java ::com . microsoft . azure . kusto
3033 apache_http = Java ::org . apache . http
31- # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
32- # If there is managed identity, use it. This means the AppId and AppKey are empty/nil
33- # If there is CLI Auth, use that instead of managed identity
34- is_managed_identity = ( app_id . nil? && app_key . nil? && !cli_auth )
34+
35+ is_managed_identity = @kusto_logstash_configuration . kusto_auth . is_managed_identity
3536 # If it is system managed identity, propagate the system identity
36- is_system_assigned_managed_identity = is_managed_identity && 0 == "system" . casecmp ( managed_identity_id )
37+ is_system_assigned_managed_identity = @kusto_logstash_configuration . kusto_auth . is_system_assigned_managed_identity
3738 # Is it direct connection
38- is_direct_conn = ( proxy_host . nil? || proxy_host . empty? )
39+ is_direct_conn = @kusto_logstash_configuration . kusto_proxy . is_direct_conn
3940 # Create a connection string
4041 kusto_connection_string = if is_managed_identity
4142 if is_system_assigned_managed_identity
4243 @logger . info ( 'Using system managed identity.' )
43- kusto_java . data . auth . ConnectionStringBuilder . createWithAadManagedIdentity ( ingest_url )
44+ kusto_java . data . auth . ConnectionStringBuilder . createWithAadManagedIdentity ( @kusto_logstash_configuration . kusto_ingest . ingest_url )
4445 else
4546 @logger . info ( 'Using user managed identity.' )
46- kusto_java . data . auth . ConnectionStringBuilder . createWithAadManagedIdentity ( ingest_url , managed_identity_id )
47+ kusto_java . data . auth . ConnectionStringBuilder . createWithAadManagedIdentity ( @kusto_logstash_configuration . kusto_ingest . ingest_url , @kusto_logstash_configuration . kusto_ingest . managed_identity_id )
4748 end
4849 else
49- if cli_auth
50+ if @kusto_logstash_configuration . kusto_auth . cli_auth
5051 @logger . warn ( '*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*' )
51- kusto_java . data . auth . ConnectionStringBuilder . createWithAzureCli ( ingest_url )
52+ kusto_java . data . auth . ConnectionStringBuilder . createWithAzureCli ( @kusto_logstash_configuration . kusto_ingest . ingest_url )
5253 else
5354 @logger . info ( 'Using app id and app key.' )
54- kusto_java . data . auth . ConnectionStringBuilder . createWithAadApplicationCredentials ( ingest_url , app_id , app_key . value , app_tenant )
55+ kusto_java . data . auth . ConnectionStringBuilder . createWithAadApplicationCredentials ( @kusto_logstash_configuration . kusto_ingest . ingest_url , @kusto_logstash_configuration . kusto_auth . app_id , @kusto_logstash_configuration . kusto_auth . app_key . value , @kusto_logstash_configuration . kusto_auth . app_tenant )
5556 end
5657 end
5758 #
@@ -63,22 +64,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
6364 tuple_utils = Java ::org . apache . commons . lang3 . tuple
6465 # kusto_connection_string.setClientVersionForTracing(name_for_tracing)
6566 version_for_tracing = Gem . loaded_specs [ 'logstash-output-kusto' ] &.version || "unknown"
66- kusto_connection_string . setConnectorDetails ( "Logstash" , version_for_tracing . to_s , "" , "" , false , "" , tuple_utils . Pair . emptyArray ( ) ) ;
67+ kusto_connection_string . setConnectorDetails ( "Logstash" , version_for_tracing . to_s , name_for_tracing . to_s , version_for_tracing . to_s , false , "" , tuple_utils . Pair . emptyArray ( ) ) ;
6768
6869 @kusto_client = begin
6970 if is_direct_conn
7071 kusto_java . ingest . IngestClientFactory . createClient ( kusto_connection_string )
7172 else
72- kusto_http_client_properties = kusto_java . data . HttpClientProperties . builder ( ) . proxy ( apache_http . HttpHost . new ( proxy_host , proxy_port , proxy_protocol ) ) . build ( )
73+ kusto_http_client_properties = kusto_java . data . HttpClientProperties . builder ( ) . proxy ( apache_http . HttpHost . new ( @kusto_logstash_configuration . kusto_proxy . proxy_host , @kusto_logstash_configuration . kusto_proxy . proxy_port , @kusto_logstash_configuration . kusto_proxy . proxy_protocol ) ) . build ( )
7374 kusto_java . ingest . IngestClientFactory . createClient ( kusto_connection_string , kusto_http_client_properties )
7475 end
7576 end
7677
77- @ingestion_properties = kusto_java . ingest . IngestionProperties . new ( database , table )
78- is_mapping_ref_provided = ! ( json_mapping . nil? || json_mapping . empty? )
79- if is_mapping_ref_provided
80- @logger . debug ( 'Using mapping reference.' , json_mapping )
81- @ingestion_properties . setIngestionMapping ( json_mapping , kusto_java . ingest . IngestionMapping ::IngestionMappingKind ::JSON )
78+ @ingestion_properties = kusto_java . ingest . IngestionProperties . new ( @kusto_logstash_configuration . kusto_ingest . database , @kusto_logstash_configuration . kusto_ingest . table )
79+
80+ if @kusto_logstash_configuration . kusto_ingest . is_mapping_ref_provided
81+ @logger . debug ( 'Using mapping reference.' , @kusto_logstash_configuration . kusto_ingest . json_mapping )
82+ @ingestion_properties . setIngestionMapping ( @kusto_logstash_configuration . kusto_ingest . json_mapping , kusto_java . ingest . IngestionMapping ::IngestionMappingKind ::JSON )
8283 @ingestion_properties . setDataFormat ( kusto_java . ingest . IngestionProperties ::DataFormat ::JSON )
8384 else
8485 @logger . debug ( 'No mapping reference provided. Columns will be mapped by names in the logstash output' )
@@ -87,38 +88,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
8788 @logger . debug ( 'Kusto resources are ready.' )
8889 end
8990
90- def validate_config ( database , table , json_mapping , proxy_protocol , app_id , app_key , managed_identity_id , cli_auth )
91- # Add an additional validation and fail this upfront
92- if app_id . nil? && app_key . nil? && managed_identity_id . nil?
93- if cli_auth
94- @logger . info ( 'Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production' )
95- else
96- @logger . error ( 'managed_identity_id is not provided and app_id/app_key is empty.' )
97- raise LogStash ::ConfigurationError . new ( 'managed_identity_id is not provided and app_id/app_key is empty.' )
98- end
99- end
100- if database =~ FIELD_REF
101- @logger . error ( 'database config value should not be dynamic.' , database )
102- raise LogStash ::ConfigurationError . new ( 'database config value should not be dynamic.' )
103- end
104-
105- if table =~ FIELD_REF
106- @logger . error ( 'table config value should not be dynamic.' , table )
107- raise LogStash ::ConfigurationError . new ( 'table config value should not be dynamic.' )
108- end
109-
110- if json_mapping =~ FIELD_REF
111- @logger . error ( 'json_mapping config value should not be dynamic.' , json_mapping )
112- raise LogStash ::ConfigurationError . new ( 'json_mapping config value should not be dynamic.' )
113- end
114-
115- if not( [ "https" , "http" ] . include? proxy_protocol )
116- @logger . error ( 'proxy_protocol has to be http or https.' , proxy_protocol )
117- raise LogStash ::ConfigurationError . new ( 'proxy_protocol has to be http or https.' )
118- end
119-
120- end
121-
12291 def upload_async ( data )
12392 if @workers_pool . remaining_capacity <= LOW_QUEUE_LENGTH
12493 @logger . warn ( "Ingestor queue capacity is running low with #{ @workers_pool . remaining_capacity } free slots." )
0 commit comments