diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Schema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Schema.java index 8edbd431ab9..f6f8f815e84 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Schema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Schema.java @@ -25,6 +25,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.FindSchemaIdRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import java.nio.charset.StandardCharsets; @@ -164,6 +165,12 @@ public Schema(String subject, RegisterSchemaRequest request) { this.schema = request.getSchema(); } + public Schema(FindSchemaIdRequest request) { + this.schemaType = request.getSchemaType() != null + ? request.getSchemaType() : AvroSchema.TYPE; + this.schema = request.getSchema(); + } + public Schema(String subject, RegisterSchemaResponse response) { this.subject = subject; this.version = response.getVersion() != null ? response.getVersion() : 0; diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/FindSchemaIdRequest.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/FindSchemaIdRequest.java new file mode 100644 index 00000000000..28f89aab2dc --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/FindSchemaIdRequest.java @@ -0,0 +1,98 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTypeConverter; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +@io.swagger.v3.oas.annotations.media.Schema(description = "Find schema id request") +public class FindSchemaIdRequest { + + private String schemaType; + private String schema; + + public FindSchemaIdRequest() { + } + + public static FindSchemaIdRequest fromJson(String json) throws IOException { + return JacksonMapper.INSTANCE.readValue(json, FindSchemaIdRequest.class); + } + + @io.swagger.v3.oas.annotations.media.Schema(description = Schema.TYPE_DESC) + @JsonProperty("schemaType") + @JsonSerialize(converter = SchemaTypeConverter.class) + public String getSchemaType() { + return this.schemaType; + } + + @JsonProperty("schemaType") + public void setSchemaType(String schemaType) { + this.schemaType = schemaType; + } + + @io.swagger.v3.oas.annotations.media.Schema(description = Schema.SCHEMA_DESC) + @JsonProperty("schema") + public String getSchema() { + return this.schema; + } + + @JsonProperty("schema") + public void setSchema(String schema) { + this.schema = schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FindSchemaIdRequest that = (FindSchemaIdRequest) o; + return Objects.equals(schemaType, that.schemaType) + && Objects.equals(schema, that.schema); + } + + @Override + public int hashCode() { + return Objects.hash(schemaType, schema); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("{"); + buf.append("schemaType=").append(this.schemaType).append(", "); + buf.append("schema=").append(schema).append("}"); + return buf.toString(); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } + +} diff --git a/core/generated/swagger-ui/schema-registry-api-spec.yaml b/core/generated/swagger-ui/schema-registry-api-spec.yaml index d40aa604306..2b997bd1445 100644 --- a/core/generated/swagger-ui/schema-registry-api-spec.yaml +++ b/core/generated/swagger-ui/schema-registry-api-spec.yaml @@ -2131,6 +2131,70 @@ paths: Error code 40403 -- Schema not found "500": description: Internal server error + /schemas/ids: + post: + tags: + - Schemas (v1) + summary: Find global schema id + description: Get the schemas id matching the specified schema. + operationId: findSchemaId + requestBody: + description: Schema + content: + application/vnd.schemaregistry.v1+json: + schema: + $ref: '#/components/schemas/FindSchemaIdRequest' + application/vnd.schemaregistry+json: + schema: + $ref: '#/components/schemas/FindSchemaIdRequest' + application/json: + schema: + $ref: '#/components/schemas/FindSchemaIdRequest' + application/octet-stream: + schema: + $ref: '#/components/schemas/FindSchemaIdRequest' + required: true + responses: + "200": + description: Returns the global schema id. + content: + application/vnd.schemaregistry.v1+json: + schema: + type: integer + format: int32 + application/vnd.schemaregistry+json; qs=0.9: + schema: + type: integer + format: int32 + application/json; qs=0.5: + schema: + type: integer + format: int32 + "404": + description: Not Found. Error code 40403 indicates schema not found. + content: + application/vnd.schemaregistry.v1+json: + schema: + $ref: '#/components/schemas/ErrorMessage' + application/vnd.schemaregistry+json; qs=0.9: + schema: + $ref: '#/components/schemas/ErrorMessage' + application/json; qs=0.5: + schema: + $ref: '#/components/schemas/ErrorMessage' + "500": + description: Internal Server Error. Error code 50001 indicates a failure + in the backend data store. + content: + application/vnd.schemaregistry.v1+json: + schema: + $ref: '#/components/schemas/ErrorMessage' + application/vnd.schemaregistry+json; qs=0.9: + schema: + $ref: '#/components/schemas/ErrorMessage' + application/json; qs=0.5: + schema: + $ref: '#/components/schemas/ErrorMessage' components: schemas: CompatibilityCheckResponse: @@ -2495,3 +2559,23 @@ components: items: $ref: '#/components/schemas/Rule' description: Schema rule set + FindSchemaIdRequest: + type: object + properties: + schemaType: + type: string + description: Schema type + schema: + type: string + description: Schema definition string + description: Find schema id request + SchemaEntity: + type: object + properties: + entityPath: + type: string + entityType: + type: string + enum: + - sr_record + - sr_field \ No newline at end of file diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java index 385d67a1220..3a9726d8c8c 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java @@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.FindSchemaIdRequest; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; @@ -33,21 +34,23 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tags; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import javax.validation.constraints.NotNull; import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; -import javax.ws.rs.DefaultValue; import javax.ws.rs.QueryParam; -import javax.ws.rs.PathParam; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/schemas") @Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, @@ -66,6 +69,48 @@ public SchemasResource(KafkaSchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; } + @POST + @Path("/ids") + @DocumentedName("findSchemaId") + @Operation(summary = "Find global schema id", + description = "Get the schemas id matching the specified schema.", + responses = { + @ApiResponse(responseCode = "200", + description = "Returns the global schema id.", content = @Content(schema = @io.swagger.v3.oas.annotations.media.Schema( + implementation = Integer.class))), + @ApiResponse(responseCode = "404", + description = "Not Found. Error code 40403 indicates schema not found.", + content = @Content(schema = @io.swagger.v3.oas.annotations.media.Schema(implementation = + ErrorMessage.class))), + @ApiResponse(responseCode = "500", + description = "Internal Server Error. " + + "Error code 50001 indicates a failure in the backend data store.", + content = @Content(schema = @io.swagger.v3.oas.annotations.media.Schema(implementation = + ErrorMessage.class)))}) + @Tags(@Tag(name = apiTag)) + @PerformanceMetric("schemas.find-schema-id") + public Integer findSchemaId( + @Parameter(description = "Schema", required = true) + @NotNull FindSchemaIdRequest request + ) { + Optional id; + String errorMessage = "Error while finding schema"; + Schema schema = new Schema(request); + + try { + id = schemaRegistry.findSchemaId(schema); + } catch (SchemaRegistryStoreException e) { + log.debug(errorMessage, e); + throw Errors.storeException(errorMessage, e); + } + + if (!id.isPresent()) { + throw Errors.schemaNotFoundException(); + } + + return id.get(); + } + @GET @DocumentedName("getSchemas") @Operation(summary = "List schemas", diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 28382b09fa4..373cdea83fb 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -93,6 +93,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -1669,6 +1670,20 @@ public Set listSubjectsWithPrefix(String prefix, LookupFilter filter) } } + public Optional findSchemaId(Schema schema) + throws SchemaRegistryStoreException { + + SchemaIdAndSubjects schemaIdAndSubjects; + + try { + schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema); + } catch (StoreException e) { + throw new SchemaRegistryStoreException("Error while retrieving schema", e); + } + + return schemaIdAndSubjects!= null ? Optional.of(schemaIdAndSubjects.getSchemaId()) : Optional.empty(); + } + public Set listSubjectsForId(int id, String subject) throws SchemaRegistryException { return listSubjectsForId(id, subject, false); }