Skip to content

Commit 19cc0ab

Browse files
authored
Simplify defaults type mappings (#377)
* Remove redundant SchemaInfo.messageType as the key of the type mappings tells us that info
1 parent 0f84199 commit 19cc0ab

File tree

6 files changed

+26
-30
lines changed

6 files changed

+26
-30
lines changed

spring-pulsar-docs/src/main/asciidoc/schema-info/custom-schema-mapping.adoc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ spring:
1010
- message-type: com.acme.User
1111
schema-info:
1212
schema-type: AVRO
13-
message-type: com.acme.User
1413
- message-type: com.acme.Address
1514
schema-info:
1615
schema-type: JSON
17-
message-type: com.acme.Address
1816
----
1917

2018
NOTE: The `message-type` is the fully-qualified name of the message class.

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ public DefaultSchemaResolver schemaResolver(PulsarProperties pulsarProperties,
117117
if (pulsarProperties.getDefaults().getTypeMappings() != null) {
118118
pulsarProperties.getDefaults().getTypeMappings().stream().filter((tm) -> tm.schemaInfo() != null)
119119
.forEach((tm) -> {
120-
var schema = schemaResolver.resolveSchema(tm.schemaInfo().schemaType(),
121-
tm.schemaInfo().messageType(), tm.schemaInfo().messageKeyType()).orElseThrow();
120+
var schema = schemaResolver.resolveSchema(tm.schemaInfo().schemaType(), tm.messageType(),
121+
tm.schemaInfo().messageKeyType()).orElseThrow();
122122
schemaResolver.addCustomSchemaMapping(tm.messageType(), schema);
123123
});
124124
}

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,16 +1365,17 @@ public record TypeMapping(Class<?> messageType, @Nullable String topicName, @Nul
13651365
* Represents a schema - holds enough information to construct an actual schema
13661366
* instance.
13671367
* @param schemaType schema type
1368-
* @param messageType message type (not required for primitive schema types or key
1369-
* value type)
13701368
* @param messageKeyType message key type (required for key value type)
13711369
*/
1372-
public record SchemaInfo(SchemaType schemaType, @Nullable Class<?> messageType, @Nullable Class<?> messageKeyType) {
1370+
public record SchemaInfo(SchemaType schemaType, @Nullable Class<?> messageKeyType) {
13731371
public SchemaInfo {
13741372
Objects.requireNonNull(schemaType, "schemaType must not be null");
13751373
if (schemaType == SchemaType.NONE) {
13761374
throw new IllegalArgumentException("schemaType NONE not supported");
13771375
}
1376+
if (schemaType != SchemaType.KEY_VALUE && messageKeyType != null) {
1377+
throw new IllegalArgumentException("messageKeyType can only be set when schemaType is KEY_VALUE");
1378+
}
13781379
}
13791380
}
13801381

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,7 @@ void schemaMappingForStructIsAddedToSchemaResolver() {
321321
contextRunner
322322
.withPropertyValues(
323323
"spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(Foo.class.getName()),
324-
"spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON",
325-
"spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
326-
.formatted(Foo.class.getName()))
324+
"spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON")
327325
.run((context -> assertThat(context).hasNotFailed().getBean(SchemaResolver.class)
328326
.asInstanceOf(InstanceOfAssertFactories.type(DefaultSchemaResolver.class))
329327
.extracting(DefaultSchemaResolver::getCustomSchemaMappings,
@@ -339,8 +337,6 @@ void schemaMappingForKeyValueIsAddedToSchemaResolver() {
339337
"spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(Foo.class.getName()),
340338
"spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=%s"
341339
.formatted(SchemaType.KEY_VALUE.name()),
342-
"spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
343-
.formatted(Foo.class.getName()),
344340
"spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=%s"
345341
.formatted(String.class.getName()))
346342
.run((context -> assertThat(context).hasNotFailed().getBean(SchemaResolver.class)

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -329,10 +329,9 @@ void withSchemaOnly() {
329329
Map<String, String> props = new HashMap<>();
330330
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
331331
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
332-
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
333332
bind(props);
334-
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
335-
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, Foo.class, null)));
333+
assertThat(properties.getDefaults().getTypeMappings())
334+
.containsExactly(new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, null)));
336335
}
337336

338337
@Test
@@ -341,29 +340,27 @@ void withTopicAndSchema() {
341340
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
342341
props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic");
343342
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
344-
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
345343
bind(props);
346-
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
347-
new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, Foo.class, null)));
344+
assertThat(properties.getDefaults().getTypeMappings())
345+
.containsExactly(new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, null)));
348346
}
349347

350348
@Test
351349
void withKeyValueSchema() {
352350
Map<String, String> props = new HashMap<>();
353351
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
354352
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "KEY_VALUE");
355-
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
356353
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
357354
bind(props);
358355
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
359-
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, Foo.class, String.class)));
356+
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, String.class)));
360357
}
361358

362359
@Test
363360
void schemaTypeRequired() {
364361
Map<String, String> props = new HashMap<>();
365362
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
366-
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
363+
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
367364
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
368365
.withMessageContaining("schemaType must not be null");
369366
}
@@ -377,6 +374,16 @@ void schemaTypeNoneNotAllowed() {
377374
.withMessageContaining("schemaType NONE not supported");
378375
}
379376

377+
@Test
378+
void messageKeyTypeOnlyAllowedForKeyValueSchemaType() {
379+
Map<String, String> props = new HashMap<>();
380+
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
381+
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
382+
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
383+
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
384+
.withMessageContaining("messageKeyType can only be set when schemaType is KEY_VALUE");
385+
}
386+
380387
record Foo(String value) {
381388
}
382389

spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderIntegrationTests.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,7 @@ void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaProps(CapturedOutput outp
258258
+ User.class.getName(),
259259
"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-user-sub2",
260260
"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
261-
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO",
262-
"--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
263-
.formatted(User.class.getName()))) {
261+
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
264262
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
265263
.until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}"));
266264
}
@@ -312,9 +310,7 @@ void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutput
312310
+ String.class.getName(),
313311
"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub1",
314312
"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
315-
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO",
316-
"--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
317-
.formatted(User.class.getName()))) {
313+
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
318314
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
319315
.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
320316
}
@@ -342,9 +338,7 @@ void keyValueAvroTypeWithoutSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutp
342338
+ String.class.getName(),
343339
"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub2",
344340
"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
345-
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO",
346-
"--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
347-
.formatted(User.class.getName()))) {
341+
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
348342
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
349343
.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
350344
}

0 commit comments

Comments
 (0)