Skip to content

Commit 323cc25

Browse files
authored
fix: correctly terminate pagination for CloudWatch Logs GetLogEvents (#1383)
1 parent f3cf944 commit 323cc25

File tree

6 files changed

+252
-42
lines changed

6 files changed

+252
-42
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "e58cd6d1-a4b8-42a8-b03e-459d67aeb7c1",
3+
"type": "bugfix",
4+
"description": "Correctly terminate pagination for CloudWatch Logs `GetLogEvents` operation",
5+
"issues": [
6+
"awslabs/aws-sdk-kotlin#1326"
7+
]
8+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.sdk.kotlin.codegen.customization
6+
7+
import software.amazon.smithy.kotlin.codegen.KotlinSettings
8+
import software.amazon.smithy.kotlin.codegen.integration.KotlinIntegration
9+
import software.amazon.smithy.kotlin.codegen.model.traits.PaginationEndBehavior
10+
import software.amazon.smithy.kotlin.codegen.model.traits.PaginationEndBehaviorTrait
11+
import software.amazon.smithy.model.Model
12+
import software.amazon.smithy.model.shapes.BooleanShape
13+
import software.amazon.smithy.model.shapes.OperationShape
14+
import software.amazon.smithy.model.shapes.StructureShape
15+
import software.amazon.smithy.model.transform.ModelTransformer
16+
17+
private val TRUNCATION_MEMBER_IDS = mapOf(
18+
"com.amazonaws.s3#ListParts" to "IsTruncated",
19+
)
20+
21+
private val IDENTICAL_TOKEN_OPERATION_IDS = setOf(
22+
"com.amazonaws.cloudwatchlogs#GetLogEvents", // https://github.com/awslabs/aws-sdk-kotlin/issues/1326
23+
)
24+
25+
/**
26+
* Applies [PaginationEndBehaviorTrait] to a manually-curated list of operations/members for which pagination terminates
27+
* in a non-standard manner
28+
*/
29+
class PaginationEndBehaviorIntegration : KotlinIntegration {
30+
override fun preprocessModel(model: Model, settings: KotlinSettings): Model = ModelTransformer
31+
.create()
32+
.mapShapes(model) { shape ->
33+
val shapeId = shape.id.toString()
34+
when {
35+
shape !is OperationShape -> shape // Pagination behavior trait only applied to operations
36+
37+
shapeId in TRUNCATION_MEMBER_IDS -> {
38+
val output = model.expectShape(shape.outputShape)
39+
require(output is StructureShape) { "Operation output must be a structure shape" }
40+
val memberName = TRUNCATION_MEMBER_IDS.getValue(shapeId)
41+
val member = output.allMembers[memberName] ?: error("Cannot find $memberName in ${output.id}")
42+
val target = model.expectShape(member.target)
43+
check(target is BooleanShape) { "Truncation member must be a boolean shape" }
44+
45+
val behavior = PaginationEndBehavior.TruncationMember(memberName)
46+
shape.toBuilder().addTrait(PaginationEndBehaviorTrait(behavior)).build()
47+
}
48+
49+
shapeId in IDENTICAL_TOKEN_OPERATION_IDS ->
50+
shape.toBuilder().addTrait(PaginationEndBehaviorTrait(PaginationEndBehavior.IdenticalToken)).build()
51+
52+
else -> shape
53+
}
54+
}
55+
}

codegen/aws-sdk-codegen/src/main/kotlin/aws/sdk/kotlin/codegen/customization/s3/TruncatablePaginationIntegration.kt

Lines changed: 0 additions & 39 deletions
This file was deleted.

codegen/aws-sdk-codegen/src/main/resources/META-INF/services/software.amazon.smithy.kotlin.codegen.integration.KotlinIntegration

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ aws.sdk.kotlin.codegen.customization.s3.ClientConfigIntegration
2020
aws.sdk.kotlin.codegen.customization.s3.ContinueIntegration
2121
aws.sdk.kotlin.codegen.customization.s3.GetObjectResponseLengthValidationIntegration
2222
aws.sdk.kotlin.codegen.customization.s3.HttpPathFilter
23-
aws.sdk.kotlin.codegen.customization.s3.TruncatablePaginationIntegration
23+
aws.sdk.kotlin.codegen.customization.PaginationEndBehaviorIntegration
2424
aws.sdk.kotlin.codegen.customization.s3.HostPrefixRequestRouteFilter
2525
aws.sdk.kotlin.codegen.customization.s3.UnwrappedXmlOutputIntegration
2626
aws.sdk.kotlin.codegen.customization.s3control.HostPrefixFilter

gradle/libs.versions.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ coroutines-version = "1.8.1"
99
atomicfu-version = "0.24.0"
1010

1111
# smithy-kotlin codegen and runtime are versioned separately
12-
smithy-kotlin-runtime-version = "1.3.3"
13-
smithy-kotlin-codegen-version = "0.33.3"
12+
smithy-kotlin-runtime-version = "1.3.5"
13+
smithy-kotlin-codegen-version = "0.33.5"
1414

1515
# codegen
1616
smithy-version = "1.50.0"
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
import aws.sdk.kotlin.services.cloudwatchlogs.*
7+
import aws.sdk.kotlin.services.cloudwatchlogs.model.GetLogEventsResponse
8+
import aws.sdk.kotlin.services.cloudwatchlogs.model.InputLogEvent
9+
import aws.sdk.kotlin.services.cloudwatchlogs.model.OutputLogEvent
10+
import aws.sdk.kotlin.services.cloudwatchlogs.paginators.getLogEventsPaginated
11+
import aws.smithy.kotlin.runtime.time.Instant
12+
import aws.smithy.kotlin.runtime.time.epochMilliseconds
13+
import aws.smithy.kotlin.runtime.util.Uuid
14+
import kotlinx.coroutines.*
15+
import kotlinx.coroutines.flow.Flow
16+
import kotlin.test.Test
17+
import kotlin.test.assertTrue
18+
import kotlin.time.Duration.Companion.milliseconds
19+
import kotlin.time.Duration.Companion.seconds
20+
21+
private const val MESSAGES_PER_BATCH = 100
22+
private const val BATCHES = 10
23+
private const val TOTAL_MESSAGES = MESSAGES_PER_BATCH * BATCHES
24+
private val TIMESTAMP_DELTA_PER_MESSAGE = (-1).milliseconds
25+
private val POLLING_DELAY = 5.seconds
26+
27+
// Uncertain what a reliable value for this is...technically, _any_ amount of empty pages is possible given how
28+
// token-based pagination works. 🤷
29+
private const val MAX_SEQUENTIAL_EMPTY_PAGES = 5
30+
31+
class GetLogEventsPaginatorTest {
32+
@Test
33+
fun testGetLogEventsPagination() = runBlocking {
34+
CloudWatchLogsClient.fromEnvironment().use { cwl ->
35+
val (group, stream) = cwl.createLogGroupStream()
36+
37+
try {
38+
cwl.publishMessageBatches(group, stream)
39+
40+
val eventFlow = cwl.getLogEventsPaginated {
41+
logGroupName = group
42+
logStreamName = stream
43+
limit = MESSAGES_PER_BATCH
44+
startFromHead = true
45+
}
46+
47+
assertFlowTerminates(eventFlow)
48+
} finally {
49+
cwl.deleteLogGroupStream(group, stream)
50+
}
51+
}
52+
}
53+
}
54+
55+
private suspend fun assertFlowTerminates(eventFlow: Flow<GetLogEventsResponse>) {
56+
var maxSeen = 0
57+
var sequentialEmptyPages = 0
58+
var totalPages = 0
59+
60+
eventFlow.collect { page ->
61+
totalPages++
62+
println("Page $totalPages:")
63+
println(" + token: ${page.nextForwardToken}")
64+
println(" + items: ${page.events.orEmpty().size}")
65+
println()
66+
67+
val events = page.events.orEmpty()
68+
if (events.isEmpty()) {
69+
assertTrue(
70+
"""
71+
Too many sequential empty pages ($MAX_SEQUENTIAL_EMPTY_PAGES). It's likely the log event flow is not
72+
terminating properly.
73+
* Max message index seen: $maxSeen
74+
* Total pages seen: $totalPages
75+
""".trimIndent(),
76+
) { sequentialEmptyPages++ < MAX_SEQUENTIAL_EMPTY_PAGES }
77+
} else {
78+
sequentialEmptyPages = 0
79+
80+
val batchMaxIndex = events.maxOf { it.messageIndex }
81+
assertTrue(
82+
"""
83+
Unexpected repetition of a log event. Current batch contains message index $batchMaxIndex but already
84+
encountered message index $maxSeen on a prior page.
85+
* Total pages seen: $totalPages
86+
""".trimIndent(),
87+
) { batchMaxIndex >= maxSeen }
88+
89+
maxSeen = batchMaxIndex
90+
}
91+
}
92+
93+
assertTrue(
94+
"""
95+
Not enough pages seen. Expected to see at least $BATCHES but only saw up $totalPages.
96+
* Max message index seen: $maxSeen
97+
""".trimIndent(),
98+
) { totalPages >= BATCHES }
99+
100+
assertTrue(
101+
"""
102+
Saw an unexpected maximum message index. Expected to see exactly $TOTAL_MESSAGES but saw $maxSeen instead.
103+
* Total pages seen: $totalPages
104+
""".trimIndent(),
105+
) { maxSeen == TOTAL_MESSAGES }
106+
}
107+
108+
private fun createMessageBatches(anchorTime: Instant) = (0 until BATCHES).map { batchIndex ->
109+
(0 until MESSAGES_PER_BATCH).map { batchMessageIndex ->
110+
val overallMessageIndex = batchIndex * MESSAGES_PER_BATCH + batchMessageIndex
111+
val timestampDelta = TIMESTAMP_DELTA_PER_MESSAGE * (TOTAL_MESSAGES - overallMessageIndex)
112+
113+
InputLogEvent {
114+
message = String.format(
115+
"Message %d/%d (%d/%d in batch %d/%d)",
116+
overallMessageIndex + 1,
117+
TOTAL_MESSAGES,
118+
batchMessageIndex + 1,
119+
MESSAGES_PER_BATCH,
120+
batchIndex + 1,
121+
BATCHES,
122+
)
123+
124+
timestamp = (anchorTime + timestampDelta).epochMilliseconds
125+
}
126+
}
127+
}
128+
129+
private suspend fun CloudWatchLogsClient.createLogGroupStream(): Pair<String, String> {
130+
val group = "paginator-test-group_${Uuid.random()}"
131+
val stream = "paginator-test-stream_${Uuid.random()}"
132+
133+
createLogGroup { logGroupName = group }
134+
println("Created log group $group")
135+
136+
try {
137+
createLogStream {
138+
logGroupName = group
139+
logStreamName = stream
140+
}
141+
} catch (e: Throwable) {
142+
deleteLogGroup { logGroupName = group }
143+
throw e
144+
}
145+
println("Created log stream $stream")
146+
147+
return group to stream
148+
}
149+
150+
private suspend fun CloudWatchLogsClient.deleteLogGroupStream(group: String, stream: String) {
151+
deleteLogStream {
152+
logGroupName = group
153+
logStreamName = stream
154+
}
155+
println("Deleted log stream $stream")
156+
157+
deleteLogGroup { logGroupName = group }
158+
println("Deleted log group $group")
159+
}
160+
161+
private suspend fun CloudWatchLogsClient.publishMessageBatches(group: String, stream: String) = coroutineScope {
162+
val messageBatches = createMessageBatches(Instant.now())
163+
164+
println()
165+
messageBatches.mapIndexed { index, batch ->
166+
async {
167+
putLogEvents {
168+
logGroupName = group
169+
logStreamName = stream
170+
logEvents = batch
171+
}
172+
println("Published message batch ${index + 1} consisting of ${batch.size} events")
173+
}
174+
}.awaitAll()
175+
println()
176+
177+
println("Delaying for $POLLING_DELAY to allow for eventual consistency...")
178+
delay(POLLING_DELAY)
179+
}
180+
181+
private val OutputLogEvent.messageIndex: Int
182+
// Index is first number in message
183+
get() = requireNotNull(message)
184+
.dropWhile { !it.isDigit() }
185+
.takeWhile { it.isDigit() }
186+
.toInt()

0 commit comments

Comments
 (0)