Skip to content

Commit d6e397b

Browse files
nuriaricofrmseijasmrabad-sngsonatype-lift[bot]
authored
343 issue refactor schema registry sampler (#373)
* [WIP] Apicurio integration * 302 Fix some unit test. In progress... * 302 Add wiremock server stubs * [WIP] Apicurio integration * 302 Fix some unit test. In progress... * 302 Add wiremock server stubs * [WIP] Apicurio integration * 302 Fix some unit test. In progress... * 302 Add wiremock server stubs * feat: add files in schemaRegistry adapter impl. Pending SchemaResgistryFactory * 343 Fix integration issues * add ParsedSchemaAdapter * feat: refactor parseSchemaParser * feat: change creatorFactory * feat:error casting protobufSchema * 343 Fix casting error to ProtobufSchema * feat:error casting protobufSchema * feat: type json ok * changed version in poms * changed version in poms, resolve posible null in getParsedSchema * Update src/main/java/com/sngular/kloadgen/sampler/schemaregistry/adapter/impl/ConfluentParsedSchemaMetadata.java Co-authored-by: sonatype-lift[bot] <37194012+sonatype-lift[bot]@users.noreply.github.com> * changed return getType in ConfluentParsedSchemaMetadata * changed remove variable Reference in ConfluentParsedSchemaMetadata * resolve nullPointer getSchema and "UnnecessarilyFullyQualified" * resolve nullPointer getSchema and "UnnecessarilyFullyQualified" * resolve nullPointer getSchema * add final in variables and removed unused imports * delete apicurioParsedSchema duplicated, add license in jmeterhelper and changed size buffer serializer avro * delete avroAdapter --------- Co-authored-by: mseijasm <miguel.seijas@sngular.com> Co-authored-by: Raúl Abad <raul.abad@sngular.com> Co-authored-by: sonatype-lift[bot] <37194012+sonatype-lift[bot]@users.noreply.github.com>
1 parent dd931dd commit d6e397b

19 files changed

+490
-285
lines changed

Example-Test-Plan.jmx

Lines changed: 298 additions & 210 deletions
Large diffs are not rendered by default.

pom-maven-central.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<artifactId>kloadgen</artifactId>
99

10-
<version>5.5.1</version>
10+
<version>5.6.0</version>
1111

1212
<name>KLoadGen</name>
1313
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<artifactId>kloadgen</artifactId>
99

10-
<version>5.5.1</version>
10+
<version>5.6.0</version>
1111

1212
<name>KLoadGen</name>
1313
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial

src/main/java/com/sngular/kloadgen/extractor/impl/SchemaExtractorImpl.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import com.sngular.kloadgen.extractor.extractors.JsonExtractor;
2727
import com.sngular.kloadgen.extractor.extractors.ProtoBufExtractor;
2828
import com.sngular.kloadgen.model.FieldValueMapping;
29-
import com.sngular.kloadgen.sampler.schemaregistry.schema.ApicurioParsedSchema;
29+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ApicurioParsedSchemaMetadata;
30+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ParsedSchemaAdapter;
3031
import com.sngular.kloadgen.util.JMeterHelper;
3132
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
33+
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
3234
import com.squareup.wire.schema.internal.parser.TypeElement;
3335
import io.confluent.kafka.schemaregistry.ParsedSchema;
3436
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -62,17 +64,16 @@ public SchemaExtractorImpl(final AvroExtractor avroExtractor, final JsonExtracto
6264
@Override
6365
public final Pair<String, List<FieldValueMapping>> flatPropertiesList(final String subjectName) throws IOException, RestClientException {
6466
String schemaType = null;
65-
Properties properties = JMeterContextService.getContext().getProperties();
67+
final Properties properties = JMeterContextService.getContext().getProperties();
6668

6769
final var schemaParsed = JMeterHelper.getParsedSchema(subjectName, properties);
6870
final List<FieldValueMapping> attributeList = new ArrayList<>();
6971
final HashMap<String, TypeElement> nestedTypes = new HashMap<>();
70-
String registryName = properties.getProperty(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
72+
final String registryName = properties.getProperty(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
73+
final ParsedSchemaAdapter parsedSchemaAdapter = schemaParsed.getParsedSchemaAdapter();
74+
schemaType = parsedSchemaAdapter.getType();
7175
if (Objects.nonNull(registryName) && registryName.equalsIgnoreCase(SchemaRegistryEnum.APICURIO.toString())) {
72-
ApicurioParsedSchema apicurioParsedSchema = (ApicurioParsedSchema) schemaParsed;
73-
Object schema = apicurioParsedSchema.getSchema();
74-
75-
schemaType = apicurioParsedSchema.getType();
76+
final Object schema = ((ApicurioParsedSchemaMetadata) parsedSchemaAdapter).getSchema();
7677
if (SchemaTypeEnum.AVRO.name().equalsIgnoreCase(schemaType)) {
7778
((Schema) schema).getFields().forEach(field -> avroExtractor.processField(field, attributeList, true, false));
7879
} else if (SchemaTypeEnum.JSON.name().equalsIgnoreCase(schemaType)) {
@@ -81,19 +82,17 @@ public final Pair<String, List<FieldValueMapping>> flatPropertiesList(final Stri
8182
final var protoFileElement = ((io.apicurio.registry.utils.protobuf.schema.ProtobufSchema) schema).getProtoFileElement();
8283
protoFileElement.getTypes().forEach(field -> protoBufExtractor.processField(field, attributeList, protoFileElement.getImports(), false, nestedTypes));
8384
} else {
84-
throw new KLoadGenException(String.format("Schema type not supported %s", apicurioParsedSchema.getType()));
85+
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
8586
}
8687

8788
} else if (Objects.nonNull(registryName) && registryName.equalsIgnoreCase(SchemaRegistryEnum.CONFLUENT.toString())) {
88-
ParsedSchema confluentParsedSchema = (ParsedSchema) schemaParsed;
89-
90-
schemaType = confluentParsedSchema.schemaType();
9189
if (SchemaTypeEnum.AVRO.name().equalsIgnoreCase(schemaType)) {
92-
(((AvroSchema) confluentParsedSchema).rawSchema()).getFields().forEach(field -> avroExtractor.processField(field, attributeList, true, false));
90+
((Schema) parsedSchemaAdapter.getRawSchema()).getFields().forEach(field -> avroExtractor.processField(field, attributeList, true, false));
9391
} else if (SchemaTypeEnum.JSON.name().equalsIgnoreCase(schemaType)) {
94-
attributeList.addAll(jsonExtractor.processSchema(((JsonSchema) confluentParsedSchema).toJsonNode()));
92+
final JsonSchema jsonSchema = new JsonSchema(parsedSchemaAdapter.getRawSchema().toString());
93+
attributeList.addAll(jsonExtractor.processSchema(jsonSchema.toJsonNode()));
9594
} else if (SchemaTypeEnum.PROTOBUF.name().equalsIgnoreCase(schemaType)) {
96-
final var protoFileElement = ((ProtobufSchema) confluentParsedSchema).rawSchema();
95+
final var protoFileElement = (ProtoFileElement) parsedSchemaAdapter.getRawSchema();
9796
protoFileElement.getTypes().forEach(field -> protoBufExtractor.processField(field, attributeList, protoFileElement.getImports(), false, nestedTypes));
9897
} else {
9998
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));

src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/AvroObjectCreatorFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import com.sngular.kloadgen.processor.objectcreatorfactory.ObjectCreatorFactory;
1919
import com.sngular.kloadgen.processor.util.SchemaProcessorUtils;
2020
import com.sngular.kloadgen.randomtool.generator.AvroGeneratorTool;
21+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ApicurioParsedSchemaMetadata;
22+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseParsedSchema;
2123
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseSchemaMetadata;
24+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ParsedSchemaAdapter;
2225
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.SchemaMetadataAdapter;
2326
import com.sngular.kloadgen.serializer.EnrichedRecord;
2427
import io.confluent.kafka.schemaregistry.ParsedSchema;
@@ -45,6 +48,14 @@ public AvroObjectCreatorFactory(final Object schema, final BaseSchemaMetadata<?
4548
this.schema = (Schema) ((ParsedSchema) schema).rawSchema();
4649
} else if (schema instanceof Schema) {
4750
this.schema = (Schema) schema;
51+
} else if (schema instanceof BaseParsedSchema) {
52+
final BaseParsedSchema schemaParse = (BaseParsedSchema) schema;
53+
final ParsedSchemaAdapter adapterParse = schemaParse.getParsedSchemaAdapter();
54+
if (adapterParse instanceof ApicurioParsedSchemaMetadata) {
55+
this.schema = (Schema) ((ApicurioParsedSchemaMetadata) adapterParse).getSchema();
56+
} else {
57+
this.schema = adapterParse.getRawSchema();
58+
}
4859
} else {
4960
throw new KLoadGenException("Unsupported schema type");
5061
}

src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import com.sngular.kloadgen.processor.objectcreatorfactory.ObjectCreatorFactory;
2525
import com.sngular.kloadgen.processor.util.SchemaProcessorUtils;
2626
import com.sngular.kloadgen.randomtool.generator.ProtoBufGeneratorTool;
27+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseParsedSchema;
2728
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseSchemaMetadata;
29+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ParsedSchemaAdapter;
2830
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.SchemaMetadataAdapter;
2931
import com.sngular.kloadgen.serializer.EnrichedRecord;
3032
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
@@ -46,6 +48,11 @@ public ProtobufObjectCreatorFactory(final Object schema, final BaseSchemaMetadat
4648
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema(), metadata);
4749
} else if (schema instanceof ProtoFileElement) {
4850
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema, metadata);
51+
} else if (schema instanceof BaseParsedSchema) {
52+
final BaseParsedSchema schemaParse = (BaseParsedSchema) schema;
53+
final ParsedSchemaAdapter adapterParse = schemaParse.getParsedSchemaAdapter();
54+
final Object schemaParsed = adapterParse.getRawSchema();
55+
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schemaParsed, metadata);
4956
} else {
5057
throw new KLoadGenException("Unsupported schema type");
5158
}

src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.protobuf.Descriptors.DescriptorValidationException;
2727
import com.sngular.kloadgen.model.FieldValueMapping;
2828
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseSchemaMetadata;
29+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.ParsedSchemaAdapter;
2930
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.SchemaMetadataAdapter;
3031
import com.sngular.kloadgen.util.JMeterHelper;
3132
import com.sngular.kloadgen.util.ProtobufHelper;
@@ -150,6 +151,7 @@ public static String[] splitAndNormalizeFullFieldName(final String fullFieldName
150151
return Arrays.stream(fields).map(field -> field.replaceAll("\\[.*]", "")).toArray(String[]::new);
151152
}
152153

154+
@SuppressWarnings("checkstyle:SingleSpaceSeparator")
153155
public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema, final BaseSchemaMetadata<? extends SchemaMetadataAdapter> metadata)
154156
throws Descriptors.DescriptorValidationException, IOException {
155157

@@ -166,8 +168,9 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
166168
schemaBuilder.addSchema(importedSchema);
167169
}
168170
} else {
169-
final var importedProtobufSchema = (ProtobufSchema) JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata),
170-
JMeterContextService.getContext().getProperties());
171+
final ParsedSchemaAdapter protoFileElement = JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata),
172+
JMeterContextService.getContext().getProperties()).getParsedSchemaAdapter();
173+
final var importedProtobufSchema = new ProtobufSchema(protoFileElement.getRawSchema(), metadata.getSchemaMetadataAdapter().getReferences(), new HashMap<>());
171174
if (!ProtobufHelper.NOT_ACCEPTED_IMPORTS.contains(importedClass)) {
172175
schemaBuilder.addDependency(importedProtobufSchema.toDescriptor().getFullName());
173176
schemaBuilder.addSchema(convertDynamicSchema(importedProtobufSchema));

src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,8 @@ private static void verifySecurity(final JavaSamplerContext context, final Prope
327327
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, context.getParameter(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG));
328328

329329
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
330-
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
330+
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
331+
ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
331332
""));
332333

333334
props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, context.getParameter(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG));

src/main/java/com/sngular/kloadgen/sampler/schemaregistry/SchemaRegistryAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Collection;
44
import java.util.Map;
55

6+
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseParsedSchema;
67
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.BaseSchemaMetadata;
78
import com.sngular.kloadgen.sampler.schemaregistry.adapter.impl.SchemaMetadataAdapter;
89

@@ -18,7 +19,7 @@ public interface SchemaRegistryAdapter {
1819

1920
BaseSchemaMetadata getLatestSchemaMetadata(String subjectName);
2021

21-
Object getSchemaBySubject(String subjectName);
22+
BaseParsedSchema getSchemaBySubject(String subjectName);
2223

23-
Object getSchemaBySubjectAndId(String subjectName, BaseSchemaMetadata<? extends SchemaMetadataAdapter> metadata);
24+
BaseParsedSchema getSchemaBySubjectAndId(String subjectName, BaseSchemaMetadata<? extends SchemaMetadataAdapter> metadata);
2425
}

src/main/java/com/sngular/kloadgen/sampler/schemaregistry/schema/ApicurioParsedSchema.java renamed to src/main/java/com/sngular/kloadgen/sampler/schemaregistry/adapter/impl/ApicurioParsedSchemaMetadata.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.sngular.kloadgen.sampler.schemaregistry.schema;
1+
package com.sngular.kloadgen.sampler.schemaregistry.adapter.impl;
22

33
import lombok.Getter;
44
import lombok.NoArgsConstructor;
@@ -7,11 +7,12 @@
77
@Getter
88
@Setter
99
@NoArgsConstructor
10-
public class ApicurioParsedSchema {
11-
12-
private String type;
10+
public class ApicurioParsedSchemaMetadata extends ParsedSchemaAdapter {
1311

1412
private Object schema;
1513

1614
private String rawSchema;
15+
16+
private String type;
17+
1718
}

0 commit comments

Comments
 (0)