3535import org .apache .kafka .streams .processor .internals .metrics .StreamsMetricsImpl ;
3636import org .apache .kafka .streams .query .Position ;
3737import org .apache .kafka .streams .state .KeyValueIterator ;
38- import org .apache .kafka .streams .state .KeyValueStore ;
3938import org .apache .kafka .test .InternalMockProcessorContext ;
4039import org .apache .kafka .test .MockRecordCollector ;
4140import org .apache .kafka .test .TestUtils ;
4443import org .junit .jupiter .api .BeforeEach ;
4544import org .junit .jupiter .api .Test ;
4645import org .junit .jupiter .api .extension .ExtendWith ;
46+ import org .mockito .Mock ;
4747import org .mockito .junit .jupiter .MockitoExtension ;
4848import org .mockito .junit .jupiter .MockitoSettings ;
4949import org .mockito .quality .Strictness ;
5353import java .util .Arrays ;
5454import java .util .Collections ;
5555import java .util .HashMap ;
56+ import java .util .Iterator ;
5657import java .util .List ;
5758import java .util .Map ;
5859
6263import static org .hamcrest .CoreMatchers .nullValue ;
6364import static org .hamcrest .MatcherAssert .assertThat ;
6465import static org .hamcrest .Matchers .hasEntry ;
66+ import static org .mockito .ArgumentMatchers .any ;
67+ import static org .mockito .ArgumentMatchers .anyString ;
68+ import static org .mockito .ArgumentMatchers .anyList ;
69+ import static org .mockito .Mockito .doAnswer ;
6570import static org .mockito .Mockito .mock ;
6671import static org .mockito .Mockito .verify ;
6772import static org .mockito .Mockito .when ;
7277class ChangeLoggingKeyValueBytesStoreTest {
7378
7479 private final MockRecordCollector collector = new MockRecordCollector ();
75- private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore ("kv" );
76- private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore (inner );
80+ @ Mock
81+ private InMemoryKeyValueStore innerMock ;
82+ private ChangeLoggingKeyValueBytesStore store ;
7783 private InternalMockProcessorContext <?, ?> context ;
7884 private final StreamsConfig streamsConfig = streamsConfigMock ();
7985 private final Bytes hi = Bytes .wrap ("hi" .getBytes ());
@@ -89,8 +95,81 @@ class ChangeLoggingKeyValueBytesStoreTest {
8995 public void before () {
9096 context = mockContext ();
9197 context .setTime (0 );
98+ store = new ChangeLoggingKeyValueBytesStore (innerMock );
9299 store .init (context , store );
93100 }
101+ private void mockPosition () {
102+ when (innerMock .getPosition ()).thenReturn (Position .emptyPosition ());
103+ }
104+ private void mockGet (final Map <Bytes , byte []> mockMap ) {
105+ when (innerMock .get (any (Bytes .class ))).thenAnswer (invocation -> mockMap .get (invocation .getArgument (0 )));
106+ }
107+ private void mockPut (final Map <Bytes , byte []> mockMap ) {
108+ doAnswer (invocation -> {
109+ mockMap .put (invocation .getArgument (0 ), invocation .getArgument (1 ));
110+ StoreQueryUtils .updatePosition (innerMock .getPosition (), context );
111+ return null ;
112+ }).when (innerMock ).put (any (Bytes .class ), any (byte [].class ));
113+ }
114+ private void mockPutAll (final Map <Bytes , byte []> mockMap ) {
115+ doAnswer (invocation -> {
116+ final List <KeyValue <Bytes , byte []>> entries = invocation .getArgument (0 );
117+ for (final KeyValue <Bytes , byte []> entry : entries ) {
118+ mockMap .put (entry .key , entry .value );
119+ }
120+ return null ;
121+ }).when (innerMock ).putAll (anyList ());
122+ }
123+ private void mockDelete (final Map <Bytes , byte []> mockMap ) {
124+ doAnswer (invocation -> {
125+ final Bytes key = invocation .getArgument (0 );
126+ final byte [] oldValue = mockMap .get (key );
127+ mockMap .remove (key );
128+ return oldValue ;
129+ }).when (innerMock ).delete (any (Bytes .class ));
130+ }
131+ private void mockPutIfAbsent (final Map <Bytes , byte []> mockMap ) {
132+ doAnswer (invocation -> {
133+ final Bytes key = invocation .getArgument (0 );
134+ final byte [] value = invocation .getArgument (1 );
135+ return mockMap .putIfAbsent (key , value );
136+ }).when (innerMock ).putIfAbsent (any (Bytes .class ), any (byte [].class ));
137+ }
138+ private void mockPrefixScan (final Map <Bytes , byte []> mockMap ) {
139+ when (innerMock .prefixScan (anyString (), any ())).thenAnswer (invocation -> {
140+ final String prefix = invocation .getArgument (0 );
141+ final List <KeyValue <Bytes , byte []>> matchingRecords = new ArrayList <>();
142+ for (final Map .Entry <Bytes , byte []> entry : mockMap .entrySet ()) {
143+ if (entry .getKey ().toString ().startsWith (prefix )) {
144+ matchingRecords .add (KeyValue .pair (entry .getKey (), entry .getValue ()));
145+ }
146+ }
147+ return new KeyValueIterator <Bytes , byte []>() {
148+ private final Iterator <KeyValue <Bytes , byte []>> iterator = matchingRecords .iterator ();
149+
150+ @ Override
151+ public boolean hasNext () {
152+ return iterator .hasNext ();
153+ }
154+
155+ @ Override
156+ public KeyValue <Bytes , byte []> next () {
157+ return iterator .next ();
158+ }
159+
160+ @ Override
161+ public void close () {
162+ // No resources to clean up in this mock
163+ }
164+
165+ @ Override
166+ public Bytes peekNextKey () {
167+ return null ;
168+ }
169+ };
170+ });
171+ }
172+
94173
95174 private InternalMockProcessorContext mockContext () {
96175 return new InternalMockProcessorContext <>(
@@ -113,27 +192,36 @@ public void after() {
113192 @ Test
114193 void shouldDelegateInit () {
115194 final InternalMockProcessorContext mockContext = mockContext ();
116- final KeyValueStore <Bytes , byte []> innerMock = mock (InMemoryKeyValueStore .class );
117195 final StateStore outer = new ChangeLoggingKeyValueBytesStore (innerMock );
118196 outer .init (mockContext , outer );
119197 verify (innerMock ).init (mockContext , outer );
120198 }
121199
122200 @ Test
123201 void shouldWriteKeyValueBytesToInnerStoreOnPut () {
202+ final Map <Bytes , byte []> mockMap = new HashMap <>();
203+ mockPut (mockMap );
204+ mockGet (mockMap );
205+ mockPosition ();
206+
124207 store .put (hi , there );
125- assertThat (inner .get (hi ), equalTo (there ));
208+ assertThat (innerMock .get (hi ), equalTo (there ));
126209 assertThat (collector .collected ().size (), equalTo (1 ));
127210 assertThat (collector .collected ().get (0 ).key (), equalTo (hi ));
128211 assertThat (collector .collected ().get (0 ).value (), equalTo (there ));
129212 }
130213
131214 @ Test
132215 void shouldWriteAllKeyValueToInnerStoreOnPutAll () {
216+ final Map <Bytes , byte []> mockMap = new HashMap <>();
217+ mockPutAll (mockMap );
218+ mockGet (mockMap );
219+ mockPosition ();
220+
133221 store .putAll (Arrays .asList (KeyValue .pair (hi , there ),
134222 KeyValue .pair (hello , world )));
135- assertThat (inner .get (hi ), equalTo (there ));
136- assertThat (inner .get (hello ), equalTo (world ));
223+ assertThat (innerMock .get (hi ), equalTo (there ));
224+ assertThat (innerMock .get (hello ), equalTo (world ));
137225
138226 assertThat (collector .collected ().size (), equalTo (2 ));
139227 assertThat (collector .collected ().get (0 ).key (), equalTo (hi ));
@@ -144,20 +232,37 @@ void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
144232
145233 @ Test
146234 void shouldPropagateDelete () {
235+ final Map <Bytes , byte []> mockMap = new HashMap <>();
236+ mockPut (mockMap );
237+ mockGet (mockMap );
238+ mockDelete (mockMap );
239+ mockPosition ();
240+
147241 store .put (hi , there );
148242 store .delete (hi );
149- assertThat (inner .approximateNumEntries (), equalTo (0L ));
150- assertThat (inner .get (hi ), nullValue ());
243+
244+ assertThat (innerMock .approximateNumEntries (), equalTo (0L ));
245+ assertThat (innerMock .get (hi ), nullValue ());
151246 }
152247
153248 @ Test
154249 void shouldReturnOldValueOnDelete () {
250+ final Map <Bytes , byte []> mockMap = new HashMap <>();
251+ mockPut (mockMap );
252+ mockDelete (mockMap );
253+ mockPosition ();
254+
155255 store .put (hi , there );
156256 assertThat (store .delete (hi ), equalTo (there ));
157257 }
158258
159259 @ Test
160260 void shouldLogKeyNullOnDelete () {
261+ final Map <Bytes , byte []> mockMap = new HashMap <>();
262+ mockPut (mockMap );
263+ mockDelete (mockMap );
264+ mockPosition ();
265+
161266 store .put (hi , there );
162267 assertThat (store .delete (hi ), equalTo (there ));
163268
@@ -170,19 +275,34 @@ void shouldLogKeyNullOnDelete() {
170275
171276 @ Test
172277 void shouldWriteToInnerOnPutIfAbsentNoPreviousValue () {
278+ final Map <Bytes , byte []> mockMap = new HashMap <>();
279+ mockPutIfAbsent (mockMap );
280+ mockGet (mockMap );
281+ mockPosition ();
282+
173283 store .putIfAbsent (hi , there );
174- assertThat (inner .get (hi ), equalTo (there ));
284+ assertThat (innerMock .get (hi ), equalTo (there ));
175285 }
176286
177287 @ Test
178288 void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists () {
289+ final Map <Bytes , byte []> mockMap = new HashMap <>();
290+ mockPut (mockMap );
291+ mockPutIfAbsent (mockMap );
292+ mockGet (mockMap );
293+ mockPosition ();
294+
179295 store .put (hi , there );
180296 store .putIfAbsent (hi , world );
181- assertThat (inner .get (hi ), equalTo (there ));
297+ assertThat (innerMock .get (hi ), equalTo (there ));
182298 }
183299
184300 @ Test
185301 void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue () {
302+ final Map <Bytes , byte []> mockMap = new HashMap <>();
303+ mockPutIfAbsent (mockMap );
304+ mockPosition ();
305+
186306 store .putIfAbsent (hi , there );
187307
188308 assertThat (collector .collected ().size (), equalTo (1 ));
@@ -192,6 +312,11 @@ void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
192312
193313 @ Test
194314 void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists () {
315+ final Map <Bytes , byte []> mockMap = new HashMap <>();
316+ mockPut (mockMap );
317+ mockPutIfAbsent (mockMap );
318+ mockPosition ();
319+
195320 store .put (hi , there );
196321 store .putIfAbsent (hi , world );
197322
@@ -202,23 +327,42 @@ void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
202327
203328 @ Test
204329 void shouldReturnCurrentValueOnPutIfAbsent () {
330+ final Map <Bytes , byte []> mockMap = new HashMap <>();
331+ mockPut (mockMap );
332+ mockPutIfAbsent (mockMap );
333+ mockPosition ();
334+
205335 store .put (hi , there );
206336 assertThat (store .putIfAbsent (hi , world ), equalTo (there ));
207337 }
208338
209339 @ Test
210340 void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue () {
341+ final Map <Bytes , byte []> mockMap = new HashMap <>();
342+ mockPutIfAbsent (mockMap );
343+ mockPosition ();
344+
211345 assertThat (store .putIfAbsent (hi , there ), is (nullValue ()));
212346 }
213347
214348 @ Test
215349 void shouldReturnValueOnGetWhenExists () {
350+ final Map <Bytes , byte []> mockMap = new HashMap <>();
351+ mockPut (mockMap );
352+ mockGet (mockMap );
353+ mockPosition ();
354+
216355 store .put (hello , world );
217356 assertThat (store .get (hello ), equalTo (world ));
218357 }
219358
220359 @ Test
221360 void shouldGetRecordsWithPrefixKey () {
361+ final Map <Bytes , byte []> mockMap = new HashMap <>();
362+ mockPut (mockMap );
363+ mockPrefixScan (mockMap );
364+ mockPosition ();
365+
222366 store .put (hi , there );
223367 store .put (Bytes .increment (hi ), world );
224368
@@ -242,11 +386,18 @@ void shouldGetRecordsWithPrefixKey() {
242386
243387 @ Test
244388 void shouldReturnNullOnGetWhenDoesntExist () {
389+ final Map <Bytes , byte []> mockMap = new HashMap <>();
390+ mockGet (mockMap );
391+
245392 assertThat (store .get (hello ), is (nullValue ()));
246393 }
247394
248395 @ Test
249396 void shouldLogPositionOnPut () {
397+ final Map <Bytes , byte []> mockMap = new HashMap <>();
398+ mockPut (mockMap );
399+ mockPosition ();
400+
250401 context .setRecordContext (new ProcessorRecordContext (-1 , INPUT_OFFSET , INPUT_PARTITION , INPUT_TOPIC_NAME , new RecordHeaders ()));
251402 context .setTime (1L );
252403 store .put (hi , there );
0 commit comments