Skip to content

Commit c753a0a

Browse files
committed
Use hard-coded AWS user-agent (#146)
We don't want to make AWS user-agent configurable anymore. It will be hard-coded value.
1 parent bee1518 commit c753a0a

File tree

10 files changed

+34
-152
lines changed

10 files changed

+34
-152
lines changed

modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSinkSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain
2020
import software.amazon.awssdk.regions.Region
2121

2222
import com.snowplowanalytics.snowplow.streams.{ListOfList, Sink, Sinkable}
23-
import com.snowplowanalytics.snowplow.streams.kinesis.{KinesisFactory, KinesisFactoryConfig}
23+
import com.snowplowanalytics.snowplow.streams.kinesis.KinesisFactory
2424

2525
import Utils._
2626
import org.specs2.specification.BeforeAll
@@ -39,7 +39,7 @@ class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, Sin
3939
for {
4040
region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion))
4141
localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS, KinesisSinkSpec.getClass.getSimpleName)
42-
testFactory <- KinesisFactory.resource[IO](KinesisFactoryConfig(awsUserAgent = None))
42+
testFactory <- KinesisFactory.resource[IO]
4343
testSink <- testFactory.sink(getKinesisSinkConfig(localstack.getEndpoint)(testStream1Name))
4444
} yield (region, localstack, testSink)
4545

modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain
2222

2323
import com.snowplowanalytics.snowplow.streams.EventProcessingConfig
2424
import com.snowplowanalytics.snowplow.streams.EventProcessingConfig.NoWindowing
25-
import com.snowplowanalytics.snowplow.streams.kinesis.{KinesisFactory, KinesisFactoryConfig, KinesisSourceConfig}
25+
import com.snowplowanalytics.snowplow.streams.kinesis.{KinesisFactory, KinesisSourceConfig}
2626

2727
import Utils._
2828

@@ -46,7 +46,7 @@ class KinesisSourceSpec
4646
region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion))
4747
localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS, KinesisSourceSpec.getClass.getSimpleName)
4848
kinesisClient <- Resource.eval(getKinesisClient(localstack.getEndpoint, region))
49-
kinesisFactory <- KinesisFactory.resource[IO](KinesisFactoryConfig(awsUserAgent = None))
49+
kinesisFactory <- KinesisFactory.resource[IO]
5050
} yield (localstack, kinesisClient, getKinesisSourceConfig(localstack.getEndpoint)(_), kinesisFactory)
5151

5252
override def is = s2"""

modules/kinesis/src/main/resources/reference.conf

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
11
snowplow.defaults: {
2-
factories: {
3-
kinesis: {
4-
# Optional AWS user-agent string for tracking cloud spend attribution
5-
# No default value for open-source
6-
# awsUserAgent: "MyApp/1.0"
7-
}
8-
}
9-
102
sources: {
113
kinesis: {
124
workerIdentifier: ${?HOSTNAME}

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/KinesisFactory.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,20 @@ import com.snowplowanalytics.snowplow.streams.kinesis.sink.KinesisSink
1616
import com.snowplowanalytics.snowplow.streams.kinesis.source.KinesisSource
1717

1818
class KinesisFactory[F[_]: Async] private (
19-
client: SdkAsyncHttpClient,
20-
awsUserAgent: Option[String]
19+
client: SdkAsyncHttpClient
2120
) extends Factory[F, KinesisSourceConfig, KinesisSinkConfig] {
2221

2322
def sink(config: KinesisSinkConfig): Resource[F, Sink[F]] =
24-
KinesisSink.resource(config, client, awsUserAgent)
23+
KinesisSink.resource(config, client)
2524

2625
def source(config: KinesisSourceConfig): Resource[F, SourceAndAck[F]] =
27-
Resource.eval(KinesisSource.build(config, client, awsUserAgent))
26+
Resource.eval(KinesisSource.build(config, client))
2827
}
2928

3029
object KinesisFactory {
3130

32-
def resource[F[_]: Async](config: KinesisFactoryConfig): Resource[F, KinesisFactory[F]] =
33-
makeClient[F].map(new KinesisFactory(_, config.awsUserAgent))
31+
def resource[F[_]: Async]: Resource[F, KinesisFactory[F]] =
32+
makeClient[F].map(new KinesisFactory(_))
3433

3534
def makeClient[F[_]: Sync]: Resource[F, SdkAsyncHttpClient] =
3635
Resource.fromAutoCloseable {

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/KinesisFactoryConfig.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ package com.snowplowanalytics.snowplow.streams
1010
import cats.Id
1111

1212
package object kinesis {
13+
val AWS_USER_AGENT = "APN/1.1 (ak035lu2m8ge2f9qx90duo3ww)"
14+
1315
type KinesisSinkConfig = KinesisSinkConfigM[Id]
1416
}

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/sink/KinesisSink.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import software.amazon.awssdk.services.kinesis.model.{
3030
}
3131
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy
3232

33-
import com.snowplowanalytics.snowplow.streams.kinesis.{BackoffPolicy, KinesisSinkConfig, Retries}
33+
import com.snowplowanalytics.snowplow.streams.kinesis.{AWS_USER_AGENT, BackoffPolicy, KinesisSinkConfig, Retries}
3434

3535
import java.util.UUID
3636
import java.nio.charset.StandardCharsets.UTF_8
@@ -43,10 +43,9 @@ private[kinesis] object KinesisSink {
4343

4444
def resource[F[_]: Async](
4545
config: KinesisSinkConfig,
46-
client: SdkAsyncHttpClient,
47-
awsUserAgent: Option[String]
46+
client: SdkAsyncHttpClient
4847
): Resource[F, Sink[F]] =
49-
mkProducer[F](config, client, awsUserAgent).map { p =>
48+
mkProducer[F](config, client).map { p =>
5049
new Sink[F] {
5150
def sink(batch: ListOfList[Sinkable]): F[Unit] =
5251
writeToKinesis[F](
@@ -66,8 +65,7 @@ private[kinesis] object KinesisSink {
6665

6766
private def mkProducer[F[_]: Sync](
6867
config: KinesisSinkConfig,
69-
client: SdkAsyncHttpClient,
70-
awsUserAgent: Option[String]
68+
client: SdkAsyncHttpClient
7169
): Resource[F, KinesisAsyncClient] =
7270
Resource.fromAutoCloseable {
7371
Sync[F].delay {
@@ -81,7 +79,7 @@ private[kinesis] object KinesisSink {
8179
.defaultsMode(DefaultsMode.AUTO)
8280
.overrideConfiguration { c =>
8381
c.retryStrategy(retryStrategy)
84-
awsUserAgent.foreach(ua => c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, ua))
82+
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
8583
()
8684
}
8785
config.customEndpoint.foreach(uri => builder.endpointOverride(uri))

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/source/KCLScheduler.scala

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,19 @@ import java.util.Date
2828
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
2929
import java.util.concurrent.atomic.AtomicReference
3030

31-
import com.snowplowanalytics.snowplow.streams.kinesis.KinesisSourceConfig
31+
import com.snowplowanalytics.snowplow.streams.kinesis.{AWS_USER_AGENT, KinesisSourceConfig}
3232

3333
private[source] object KCLScheduler {
3434

3535
def populateQueue[F[_]: Async](
3636
config: KinesisSourceConfig,
3737
queue: LinkedBlockingQueue[KCLAction],
38-
client: SdkAsyncHttpClient,
39-
awsUserAgent: Option[String]
38+
client: SdkAsyncHttpClient
4039
): Resource[F, Unit] =
4140
for {
42-
kinesis <- mkKinesisClient[F](config.customEndpoint, client, awsUserAgent)
43-
dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint, client, awsUserAgent)
44-
cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint, client, awsUserAgent)
41+
kinesis <- mkKinesisClient[F](config.customEndpoint, client)
42+
dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint, client)
43+
cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint, client)
4544
scheduler <- Resource.eval(mkScheduler(kinesis, dynamo, cloudWatch, config, queue))
4645
_ <- runInBackground(scheduler)
4746
} yield ()
@@ -125,65 +124,56 @@ private[source] object KCLScheduler {
125124

126125
private def mkKinesisClient[F[_]: Sync](
127126
customEndpoint: Option[URI],
128-
client: SdkAsyncHttpClient,
129-
awsUserAgent: Option[String]
127+
client: SdkAsyncHttpClient
130128
): Resource[F, KinesisAsyncClient] =
131129
Resource.fromAutoCloseable {
132130
Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint
133131
val builder = KinesisAsyncClient
134132
.builder()
135133
.defaultsMode(DefaultsMode.AUTO)
136134
.httpClient(client)
137-
awsUserAgent.foreach { ua =>
138-
builder.overrideConfiguration { c =>
139-
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, ua)
135+
.overrideConfiguration { c =>
136+
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
140137
()
141138
}
142-
}
143139
customEndpoint.foreach(uri => builder.endpointOverride(uri))
144140
builder.build()
145141
}
146142
}
147143

148144
private def mkDynamoDbClient[F[_]: Sync](
149145
customEndpoint: Option[URI],
150-
client: SdkAsyncHttpClient,
151-
awsUserAgent: Option[String]
146+
client: SdkAsyncHttpClient
152147
): Resource[F, DynamoDbAsyncClient] =
153148
Resource.fromAutoCloseable {
154149
Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint
155150
val builder = DynamoDbAsyncClient
156151
.builder()
157152
.defaultsMode(DefaultsMode.AUTO)
158153
.httpClient(client)
159-
awsUserAgent.foreach { ua =>
160-
builder.overrideConfiguration { c =>
161-
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, ua)
154+
.overrideConfiguration { c =>
155+
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
162156
()
163157
}
164-
}
165158
customEndpoint.foreach(uri => builder.endpointOverride(uri))
166159
builder.build
167160
}
168161
}
169162

170163
private def mkCloudWatchClient[F[_]: Sync](
171164
customEndpoint: Option[URI],
172-
client: SdkAsyncHttpClient,
173-
awsUserAgent: Option[String]
165+
client: SdkAsyncHttpClient
174166
): Resource[F, CloudWatchAsyncClient] =
175167
Resource.fromAutoCloseable {
176168
Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint
177169
val builder = CloudWatchAsyncClient
178170
.builder()
179171
.defaultsMode(DefaultsMode.AUTO)
180172
.httpClient(client)
181-
awsUserAgent.foreach { ua =>
182-
builder.overrideConfiguration { c =>
183-
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, ua)
173+
.overrideConfiguration { c =>
174+
c.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWS_USER_AGENT)
184175
()
185176
}
186-
}
187177
customEndpoint.foreach(uri => builder.endpointOverride(uri))
188178
builder.build
189179
}

modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/streams/kinesis/source/KinesisSource.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,11 @@ private[kinesis] object KinesisSource {
2929

3030
private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F]
3131

32-
def build[F[_]: Async](
33-
config: KinesisSourceConfig,
34-
client: SdkAsyncHttpClient,
35-
awsUserAgent: Option[String]
36-
): F[SourceAndAck[F]] =
32+
def build[F[_]: Async](config: KinesisSourceConfig, client: SdkAsyncHttpClient): F[SourceAndAck[F]] =
3733
LowLevelSource.toSourceAndAck {
3834
new LowLevelSource[F, Map[String, Checkpointable]] {
3935
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] =
40-
kinesisStream(config, client, awsUserAgent)
36+
kinesisStream(config, client)
4137

4238
def checkpointer: KinesisCheckpointer[F] =
4339
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)
@@ -48,12 +44,11 @@ private[kinesis] object KinesisSource {
4844

4945
private def kinesisStream[F[_]: Async](
5046
config: KinesisSourceConfig,
51-
client: SdkAsyncHttpClient,
52-
awsUserAgent: Option[String]
47+
client: SdkAsyncHttpClient
5348
): Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = {
5449
val actionQueue = new LinkedBlockingQueue[KCLAction]()
5550
for {
56-
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue, client, awsUserAgent))
51+
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue, client))
5752
events <- Stream.emit(pullFromQueueAndEmit(actionQueue).stream).repeat
5853
} yield events
5954
}

modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/streams/kinesis/KinesisFactoryConfigSpec.scala

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)