1313import com .google .common .annotations .VisibleForTesting ;
1414
1515import org .opensearch .dataprepper .aws .api .AwsCredentialsSupplier ;
16- import org .opensearch .dataprepper .aws .api .AwsConfig ;
17- import org .opensearch .dataprepper .aws .api .AwsCredentialsOptions ;
18- import org .opensearch .dataprepper .plugins .sink .prometheus .configuration .PrometheusSinkThresholdConfig ;
19- import software .amazon .awssdk .auth .credentials .AwsCredentialsProvider ;
2016import org .opensearch .dataprepper .model .annotations .Experimental ;
2117import org .opensearch .dataprepper .model .annotations .DataPrepperPlugin ;
2218import org .opensearch .dataprepper .model .annotations .DataPrepperPluginConstructor ;
3228import org .opensearch .dataprepper .common .sink .DefaultSinkMetrics ;
3329import org .opensearch .dataprepper .plugins .sink .prometheus .configuration .PrometheusSinkConfiguration ;
3430import org .opensearch .dataprepper .plugins .sink .prometheus .service .PrometheusSinkService ;
35- import software .amazon .awssdk .regions .Region ;
3631
3732import java .util .Collection ;
3833
@@ -42,8 +37,8 @@ public class PrometheusSink extends AbstractSink<Record<Event>> {
4237
4338 private volatile boolean sinkInitialized ;
4439 private final PrometheusSinkService prometheusSinkService ;
45- private PrometheusHttpSender httpSender ;
46- private SinkMetrics sinkMetrics ;
40+ private final PrometheusHttpSender httpSender ;
41+ private final SinkMetrics sinkMetrics ;
4742
4843 @ DataPrepperPluginConstructor
4944 public PrometheusSink (final PluginSetting pluginSetting ,
@@ -52,45 +47,31 @@ public PrometheusSink(final PluginSetting pluginSetting,
5247 final PrometheusSinkConfiguration prometheusSinkConfiguration ,
5348 final AwsCredentialsSupplier awsCredentialsSupplier ) {
5449 super (pluginSetting );
55- this .sinkInitialized = Boolean .FALSE ;
56- AwsConfig awsConfig = prometheusSinkConfiguration .getAwsConfig ();
57- final AwsCredentialsProvider awsCredentialsProvider = (awsConfig != null ) ? awsCredentialsSupplier .getProvider (convertToCredentialOptions (awsConfig )) : awsCredentialsSupplier .getProvider (AwsCredentialsOptions .builder ().build ());
58- Region region = (awsConfig != null ) ? awsConfig .getAwsRegion () : awsCredentialsSupplier .getDefaultRegion ().get ();
59-
50+ this .sinkInitialized = false ;
51+
6052 sinkMetrics = new DefaultSinkMetrics (pluginMetrics , "Metric" );
6153 httpSender = new PrometheusHttpSender (awsCredentialsSupplier , prometheusSinkConfiguration , sinkMetrics );
6254
63- PrometheusSinkThresholdConfig thresholdConfig = prometheusSinkConfiguration .getThresholdConfig ();
64-
6555 this .prometheusSinkService = new PrometheusSinkService (
6656 prometheusSinkConfiguration ,
6757 sinkMetrics ,
6858 httpSender ,
6959 pipelineDescription );
7060 }
7161
72- private static AwsCredentialsOptions convertToCredentialOptions (final AwsConfig awsConfig ) {
73- return AwsCredentialsOptions .builder ()
74- .withRegion (awsConfig .getAwsRegion ())
75- .withStsRoleArn (awsConfig .getAwsStsRoleArn ())
76- .withStsExternalId (awsConfig .getAwsStsExternalId ())
77- .withStsHeaderOverrides (awsConfig .getAwsStsHeaderOverrides ())
78- .build ();
79- }
80-
8162 @ Override
8263 public boolean isReady () {
8364 return sinkInitialized ;
8465 }
8566
8667 @ Override
8768 public void doInitialize () {
88- sinkInitialized = Boolean . TRUE ;
69+ sinkInitialized = true ;
8970 prometheusSinkService .setDlqPipeline (getFailurePipeline ());
9071 }
9172
9273 @ VisibleForTesting
93- void setDlqPipeline (HeadlessPipeline dlqPipeline ) {
74+ void setDlqPipeline (final HeadlessPipeline dlqPipeline ) {
9475 prometheusSinkService .setDlqPipeline (dlqPipeline );
9576 }
9677
0 commit comments