diff --git a/config/schema-registry.properties b/config/schema-registry.properties index 5ed0cfba1e3..e02c975ae04 100644 --- a/config/schema-registry.properties +++ b/config/schema-registry.properties @@ -24,14 +24,21 @@ listeners=http://0.0.0.0:8081 # Use this setting to specify the bootstrap servers for your Kafka cluster and it # will be used both for selecting the leader schema registry instance and for storing the data for # registered schemas. -kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092 +#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092 +kafkastore.bootstrap.servers=SASL_PLAINTEXT://localhost:9092 +kafkastore.security.protocol=SASL_PLAINTEXT +kafkastore.sasl.mechanism=PLAIN +kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="owner" \ + password="owner_password"; # The name of the topic to store schemas in kafkastore.topic=_schemas # If true, API requests that fail will include extra debugging information, including stack traces -debug=false +debug=true metadata.encoder.secret=REPLACE_ME_WITH_HIGH_ENTROPY_STRING -resource.extension.class=io.confluent.dekregistry.DekRegistryResourceExtension,io.confluent.dpregistry.DataProductRegistryResourceExtension +#resource.extension.class=io.confluent.dekregistry.DekRegistryResourceExtension,io.confluent.dpregistry.DataProductRegistryResourceExtension +resource.extension.class=io.confluent.dpregistry.DataProductRegistryResourceExtension diff --git a/dataproduct-registry/src/main/java/io/confluent/dpregistry/web/rest/resources/DataProductRegistryResource.java b/dataproduct-registry/src/main/java/io/confluent/dpregistry/web/rest/resources/DataProductRegistryResource.java index e8ccd4b8f5e..505c1e15c35 100644 --- a/dataproduct-registry/src/main/java/io/confluent/dpregistry/web/rest/resources/DataProductRegistryResource.java +++ b/dataproduct-registry/src/main/java/io/confluent/dpregistry/web/rest/resources/DataProductRegistryResource.java @@ -74,6 +74,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.clients.admin.AdminClientConfig.SECURITY_PROTOCOL_CONFIG; + @Path("/dataproduct-registry/v1/environments/{env}/clusters/{cluster}/dataproducts") @Singleton @Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, @@ -120,6 +122,9 @@ public List getDataProductNames( @DefaultValue("0") @QueryParam("offset") int offset, @Parameter(description = "Pagination size for results. Ignored if negative") @DefaultValue("-1") @QueryParam("limit") int limit) { + + log.info("get data product for cluster {}", cluster); + limit = dataProductRegistry.normalizeNameSearchLimit(limit); List dataProductNames = dataProductRegistry.getDataProductNames( env, cluster, lookupDeleted); @@ -327,6 +332,15 @@ private void createTopicIfNotExists(String env, String cluster, DataProduct prod props.put(propKey, value); } }); + + //S et the security protocol and password. + props.put(SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN"); // Or "SCRAM-SHA-256", etc. + String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"owner\" " + + "password=\"owner_password\";"; + props.put("sasl.jaas.config", jaasConfig); + try (AdminClient admin = AdminClient.create(props)) { String topicName = product.getInfo().getName(); // TODO RAY make configurable @@ -386,7 +400,7 @@ public void deleteDataProduct( dataProductRegistry.deleteDataProductOrForward( cluster, env, name, permanentDelete, headerProperties); - deleteTopic(env, cluster, product); + //deleteTopic(env, cluster, product); asyncResponse.resume(Response.status(204).build()); } catch (SchemaRegistryException e) {