Skip to content

Commit 7509fe4

Browse files
author
Oleg Smirnov
authored
fix: event batcher does not set parent event ID (#47)
1 parent c50a978 commit 7509fe4

File tree

8 files changed

+92
-42
lines changed

8 files changed

+92
-42
lines changed

.github/workflows/build-dev-release.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ jobs:
1414
sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }}
1515
sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }}
1616
sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
17-
nvd-api-key: ${{ secrets.NVD_APIKEY }}
17+
nvd-api-key: ${{ secrets.NVD_APIKEY }}
18+
cisa-domain: ${{ secrets.CISA_DOMAIN }}
19+
cisa-user: ${{ secrets.CISA_USER }}
20+
cisa-password: ${{ secrets.CISA_PWD }}

.github/workflows/build-release.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ jobs:
1414
sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }}
1515
sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }}
1616
sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
17-
nvd-api-key: ${{ secrets.NVD_APIKEY }}
17+
nvd-api-key: ${{ secrets.NVD_APIKEY }}
18+
cisa-domain: ${{ secrets.CISA_DOMAIN }}
19+
cisa-user: ${{ secrets.CISA_USER }}
20+
cisa-password: ${{ secrets.CISA_PWD }}

.github/workflows/build-sanpshot.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,7 @@ jobs:
1919
sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }}
2020
sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }}
2121
sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
22-
nvd-api-key: ${{ secrets.NVD_APIKEY }}
22+
nvd-api-key: ${{ secrets.NVD_APIKEY }}
23+
cisa-domain: ${{ secrets.CISA_DOMAIN }}
24+
cisa-user: ${{ secrets.CISA_USER }}
25+
cisa-password: ${{ secrets.CISA_PWD }}

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# th2 common util library (2.3.0)
1+
# th2 common util library (2.3.1)
22

33
This is th2 java library with useful functions for developers and QA needs.
44

@@ -31,6 +31,11 @@ elapsed or number of events in it has reached `maxBatchSize` or batch size in by
3131

3232
# Changelog
3333

34+
## 2.3.1
35+
36+
* fixed:
37+
* `EventBatcher` does not set parent ID that leads to storing each event as an individual entity in cradle
38+
3439
## 2.3.0
3540

3641
* added `RawMessageBatcher` for th2 transport protocol

build.gradle

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'maven-publish'
33
id 'java-test-fixtures'
44
id 'org.jetbrains.kotlin.jvm' version '1.8.22'
5-
id 'com.exactpro.th2.gradle.publish' version '0.1.1'
5+
id 'com.exactpro.th2.gradle.publish' version '0.2.3'
66
}
77

88
group = 'com.exactpro.th2'
@@ -29,10 +29,6 @@ repositories {
2929
}
3030
}
3131

32-
kotlin {
33-
jvmToolchain(11)
34-
}
35-
3632
test {
3733
useJUnitPlatform()
3834
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
release_version=2.3.0
1+
release_version=2.3.1
22
description='th2 common utils (Java)'
33
vcs_url=https://github.com/th2-net/th2-common-utils-j

src/main/kotlin/com/exactpro/th2/common/utils/event/EventBatcher.kt

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,20 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
2727
import java.util.concurrent.locks.ReentrantLock
2828
import kotlin.concurrent.withLock
2929

30+
private const val DEFAULT_MAX_BATCH_SIZE_BYTES = 1_024L * 1_024L
31+
32+
private const val DEFAULT_MAX_BATCH_SIZE_EVENTS = 100
33+
34+
private const val DEFAULT_MAX_FLUSH_TIME_MILLIS = 1000L
35+
3036
/**
3137
* Collects and groups events by their parent-event-id and calls `onBatch` method when `maxFlushTime`
3238
* for a group has elapsed or number of events in it has reached `maxBatchSize`.
3339
*/
3440
class EventBatcher(
35-
private val maxBatchSizeInBytes: Long = 1_024 * 1_024,
36-
private val maxBatchSizeInItems: Int = 100,
37-
private val maxFlushTime: Long = 1000,
41+
private val maxBatchSizeInBytes: Long = DEFAULT_MAX_BATCH_SIZE_BYTES,
42+
private val maxBatchSizeInItems: Int = DEFAULT_MAX_BATCH_SIZE_EVENTS,
43+
private val maxFlushTime: Long = DEFAULT_MAX_FLUSH_TIME_MILLIS,
3844
private val executor: ScheduledExecutorService,
3945
private val onBatch: (EventBatch) -> Unit,
4046
) : AutoCloseable {
@@ -44,11 +50,13 @@ class EventBatcher(
4450
.build<EventID, Batch>()
4551
.asMap()
4652

47-
fun onEvent(event: Event) = batches.getOrPut(event.parentId, ::Batch).add(event)
53+
fun onEvent(event: Event) = batches.getOrPut(event.parentId) { Batch(event.parentId) }.add(event)
4854

4955
override fun close() = batches.values.forEach(Batch::close)
5056

51-
private inner class Batch : AutoCloseable {
57+
private inner class Batch(
58+
private val parentEventID: EventID,
59+
) : AutoCloseable {
5260
private val lock = ReentrantLock()
5361
private var batch = EventBatch.newBuilder()
5462
private var batchSizeInBytes = BATCH_LEN_CONST
@@ -73,38 +81,47 @@ class EventBatcher(
7381

7482
private fun send() = lock.withLock<Unit> {
7583
if (batch.eventsCount == 0) return
84+
if (propagateParentIdToBatch() && batch.eventsCount > 1) {
85+
// it only makes sense to store events as batch if we have more than one event in the batch
86+
batch.parentEventId = parentEventID
87+
}
7688
batch.build().runCatching(onBatch)
7789
batch.clearEvents()
90+
batch.clearParentEventId()
7891
batchSizeInBytes = BATCH_LEN_CONST
7992
future.cancel(false)
8093
}
94+
95+
private fun propagateParentIdToBatch(): Boolean =
96+
parentEventID != EventID.getDefaultInstance()
8197
}
8298

8399
companion object {
84100
/**
85-
* 4 - magic number
86-
* 1 - protocol version
87-
* 4 - message sizes
88-
* Collapsed constant = 9
89-
*/
101+
* 4 - magic number
102+
* 1 - protocol version
103+
* 4 - message sizes
104+
* Collapsed constant = 9
105+
*/
90106
private const val BATCH_LEN_CONST = 9L
91107

92108
/**
93-
* 2 - magic number
94-
* 4 + 8 = Instant (start timestamp) long (seconds) + int (nanos) - start timestamp ID
95-
* 4 - id length
96-
* 4 - name length
97-
* 4 - type length
98-
* 4 + 8 = Instant (start timestamp) long (seconds) + int (nanos) - start timestamp parent ID
99-
* 4 - parent id length
100-
* 4 + 8 = Instant (end timestamp) long (seconds) + int (nanos)
101-
* 1 = is success
102-
* 4 = body len
103-
* ===
104-
* 59
105-
*/
109+
* 2 - magic number
110+
* 4 + 8 = Instant (start timestamp) long (seconds) + int (nanos) - start timestamp ID
111+
* 4 - id length
112+
* 4 - name length
113+
* 4 - type length
114+
* 4 + 8 = Instant (start timestamp) long (seconds) + int (nanos) - start timestamp parent ID
115+
* 4 - parent id length
116+
* 4 + 8 = Instant (end timestamp) long (seconds) + int (nanos)
117+
* 1 = is success
118+
* 4 = body len
119+
* ===
120+
* 59
121+
*/
106122
private const val EVENT_RECORD_CONST = 59L
107123
private const val BATCH_LENGTH_IN_BATCH = 4L
124+
108125
/**
109126
* This logic is copied the estore project.
110127
*/

src/test/kotlin/com/exactpro/th2/common/utils/event/EventBatcherTest.kt

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@ package com.exactpro.th2.common.utils.event
1818

1919
import com.exactpro.th2.common.grpc.Event
2020
import com.exactpro.th2.common.grpc.EventBatch
21+
import com.exactpro.th2.common.grpc.EventID
22+
import com.exactpro.th2.common.utils.event.EventBatcher.Companion
2123
import com.exactpro.th2.common.utils.event.EventBatcher.Companion.calculateSizeInBytes
24+
import com.google.protobuf.util.Timestamps
25+
import org.junit.jupiter.api.Assertions
2226
import org.junit.jupiter.api.Assertions.assertEquals
27+
import org.junit.jupiter.api.Assertions.assertFalse
2328
import org.junit.jupiter.api.Test
2429
import org.junit.jupiter.api.assertAll
2530
import org.mockito.kotlin.any
@@ -55,10 +60,12 @@ class EventBatcherTest {
5560
verify(executor, times(1)).schedule(any(), any(), any())
5661
verify(future, times(1)).cancel(any())
5762

63+
val batch = batchCaptor.firstValue
5864
assertAll(
59-
{ assertEquals(2, batchCaptor.firstValue.eventsCount) },
60-
{ assertEquals(EVENT_1, batchCaptor.firstValue.getEvents(0)) },
61-
{ assertEquals(EVENT_2, batchCaptor.firstValue.getEvents(1)) },
65+
{ assertEquals(2, batch.eventsCount) },
66+
{ assertEquals(PARENT_ID, batch.parentEventId, "batch should same parent ID as events") },
67+
{ assertEquals(EVENT_1, batch.getEvents(0)) },
68+
{ assertEquals(EVENT_2, batch.getEvents(1)) },
6269
)
6370
}
6471
}
@@ -89,13 +96,17 @@ class EventBatcherTest {
8996
verify(executor, times(3)).schedule(any(), any(), any())
9097
verify(future, times(2)).cancel(any())
9198

99+
val batch1 = batchCaptor.firstValue
92100
assertAll(
93-
{ assertEquals(1, batchCaptor.firstValue.eventsCount) },
94-
{ assertEquals(EVENT_1, batchCaptor.firstValue.getEvents(0)) },
101+
{ assertEquals(1, batch1.eventsCount) },
102+
{ assertFalse(batch1.hasParentEventId(), "batch with single event should not have parent ID") },
103+
{ assertEquals(EVENT_1, batch1.getEvents(0)) },
95104
)
105+
val batch2 = batchCaptor.secondValue
96106
assertAll(
97-
{ assertEquals(1, batchCaptor.secondValue.eventsCount) },
98-
{ assertEquals(EVENT_2, batchCaptor.secondValue.getEvents(0)) },
107+
{ assertEquals(1, batch2.eventsCount) },
108+
{ assertFalse(batch2.hasParentEventId(), "batch with single event should not have parent ID") },
109+
{ assertEquals(EVENT_2, batch2.getEvents(0)) },
99110
)
100111
}
101112
}
@@ -121,28 +132,40 @@ class EventBatcherTest {
121132
runnableCaptor.firstValue.run()
122133
verify(onBatch, times(1))(any())
123134
verify(future, times(1)).cancel(any())
135+
val batch = batchCaptor.firstValue
124136
assertAll(
125-
{ assertEquals(1, batchCaptor.firstValue.eventsCount) },
126-
{ assertEquals(EVENT_1, batchCaptor.firstValue.getEvents(0)) },
137+
{ assertEquals(1, batch.eventsCount) },
138+
{ assertFalse(batch.hasParentEventId(), "batch with single event should not have parent ID") },
139+
{ assertEquals(EVENT_1, batch.getEvents(0)) },
127140
)
128141
}
129142
}
130143

131144
companion object {
145+
private val PARENT_ID = EventID.newBuilder()
146+
.setId("test")
147+
.setScope("scope")
148+
.setBookName("book")
149+
.setStartTimestamp(Timestamps.now())
150+
.build()
151+
132152
private val EVENT_1 = Event.newBuilder().apply {
133153
idBuilder.apply {
134154
id = "test_1"
135155
}
156+
parentId = PARENT_ID
136157
}.build()
137158
private val EVENT_2 = Event.newBuilder().apply {
138159
idBuilder.apply {
139160
id = "test_2"
140161
}
162+
parentId = PARENT_ID
141163
}.build()
142164
private val EVENT_3 = Event.newBuilder().apply {
143165
idBuilder.apply {
144166
id = "test_3"
145167
}
168+
parentId = PARENT_ID
146169
}.build()
147170

148171
private val EVENT_SIZE_IN_BYTES = maxOf(

0 commit comments

Comments
 (0)