diff --git a/.github/workflows/ci-test-build.yaml b/.github/workflows/ci-test-build.yaml index 70f3f7d8..1a22a2f9 100644 --- a/.github/workflows/ci-test-build.yaml +++ b/.github/workflows/ci-test-build.yaml @@ -26,16 +26,17 @@ jobs: workload_identity_provider: ${{ secrets.GCP_STREAMX_RELEASES_WORKLOAD_IDENTITY_PROVIDER }} service_account: ${{ secrets.GCP_STREAMX_RELEASES_READ_SA }} - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' cache: 'maven' - name: Build project run: | ./mvnw clean verify -P all-tests + windows-test-build: runs-on: windows-latest steps: @@ -51,11 +52,11 @@ jobs: workload_identity_provider: ${{ secrets.GCP_STREAMX_RELEASES_WORKLOAD_IDENTITY_PROVIDER }} service_account: ${{ secrets.GCP_STREAMX_RELEASES_READ_SA }} - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' cache: 'maven' - name: Build project diff --git a/.github/workflows/mvn-build-and-check-streamx-cli-commands-linux.yaml b/.github/workflows/mvn-build-and-check-streamx-cli-commands-linux.yaml index 8bbd4b51..ab61944f 100644 --- a/.github/workflows/mvn-build-and-check-streamx-cli-commands-linux.yaml +++ b/.github/workflows/mvn-build-and-check-streamx-cli-commands-linux.yaml @@ -35,8 +35,8 @@ jobs: - name: Set up JDK 17s uses: actions/setup-java@v4 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' - name: Set up homebrew if: inputs.streamx_source == 'installed' diff --git a/.github/workflows/release-preview.yaml b/.github/workflows/release-preview.yaml index a70d7d89..a383f8c6 100644 --- a/.github/workflows/release-preview.yaml +++ b/.github/workflows/release-preview.yaml @@ -35,11 +35,11 @@ jobs: printf '%s\n' "${{ secrets.SSH_SECRET_KEY }}" > ~/.ssh/id_rsa chmod 0600 ~/.ssh/id_rsa - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' - name: Preview release prepare run: ./mvnw -B clean install -DskipTests -P preview diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index a584a3b8..b1c23342 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -31,11 +31,11 @@ jobs: printf '%s\n' "${{ secrets.SSH_SECRET_KEY }}" > ~/.ssh/id_rsa chmod 0600 ~/.ssh/id_rsa - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' - name: Release prepare id: prepare-release diff --git a/.github/workflows/streamx-homebrew-instalation-check.yaml b/.github/workflows/streamx-homebrew-instalation-check.yaml index 66f1bfe2..376e6076 100644 --- a/.github/workflows/streamx-homebrew-instalation-check.yaml +++ b/.github/workflows/streamx-homebrew-instalation-check.yaml @@ -14,11 +14,11 @@ jobs: runs-on: ${{ matrix.os }} steps: - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' - name: Set up homebrew uses: 'Homebrew/actions/setup-homebrew@master' diff --git a/.github/workflows/streamx-scoop-instalation-check.yaml.yaml b/.github/workflows/streamx-scoop-instalation-check.yaml.yaml index ffeedcf0..e0f0a013 100644 --- a/.github/workflows/streamx-scoop-instalation-check.yaml.yaml +++ b/.github/workflows/streamx-scoop-instalation-check.yaml.yaml @@ -9,11 +9,11 @@ jobs: runs-on: windows-latest steps: - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' - distribution: 'adopt' + java-version: '21' + distribution: 'temurin' - name: Set up Scoop uses: MinoruSekine/setup-scoop@v4 diff --git a/core/pom.xml b/core/pom.xml index a39e505e..d3c4a941 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -12,7 +12,7 @@ StreamX Cli : Core - 17 + 21 true uber-jar @@ -44,13 +44,9 @@ bcpkix-jdk18on - dev.streamx + com.streamx streamx-runner - - dev.streamx - streamx-operator-mesh-api - com.fasterxml.jackson.datatype jackson-datatype-jdk8 @@ -60,7 +56,7 @@ jackson-datatype-jsr310 - dev.streamx + com.streamx ingestion-client diff --git a/core/src/main/java/dev/streamx/cli/BannerPrinter.java b/core/src/main/java/dev/streamx/cli/BannerPrinter.java index ee98e3c7..b474e0d9 100644 --- a/core/src/main/java/dev/streamx/cli/BannerPrinter.java +++ b/core/src/main/java/dev/streamx/cli/BannerPrinter.java @@ -2,7 +2,6 @@ import static dev.streamx.cli.util.Output.print; -import dev.streamx.cli.command.dev.DevCommand; import dev.streamx.cli.command.run.RunCommand; import jakarta.enterprise.context.ApplicationScoped; import java.util.Set; @@ -13,7 +12,7 @@ public class BannerPrinter { private static final Set COMMANDS_REQUIRING_PRINTING_BANNER = - Set.of(DevCommand.COMMAND_NAME, RunCommand.COMMAND_NAME); + Set.of(RunCommand.COMMAND_NAME); private static final String BANNER = """ ____ _ __ __ diff --git a/core/src/main/java/dev/streamx/cli/SchemaProvider.java b/core/src/main/java/dev/streamx/cli/SchemaProvider.java deleted file mode 100644 index 42bd7630..00000000 --- a/core/src/main/java/dev/streamx/cli/SchemaProvider.java +++ /dev/null @@ -1,96 +0,0 @@ -package dev.streamx.cli; - -import static dev.streamx.cli.util.ExceptionUtils.sneakyThrow; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import dev.streamx.cli.command.ingestion.IngestionClientConfig; -import dev.streamx.cli.exception.IngestionClientException; -import dev.streamx.cli.exception.UnableToConnectIngestionServiceException; -import dev.streamx.cli.exception.UnknownChannelException; -import dev.streamx.clients.ingestion.exceptions.StreamxClientException; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; -import javax.net.ssl.SSLHandshakeException; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.util.EntityUtils; - -@ApplicationScoped -public class SchemaProvider { - - private static final ObjectMapper objectMapper = new ObjectMapper(); - - @Inject - CloseableHttpClient httpClient; - - @Inject - IngestionClientConfig ingestionClientConfig; - - public JsonNode getSchema(String channel) throws UnknownChannelException { - Map schemas = fetchSchemas(ingestionClientConfig.url()); - if (schemas.containsKey(channel)) { - return schemas.get(channel); - } - throw new UnknownChannelException(channel, schemas.keySet().toString()); - } - - private Map fetchSchemas(String ingestionUrl) { - try { - URI channelsEndpointUri = buildUri(ingestionUrl); - HttpGet httpRequest = new HttpGet(channelsEndpointUri); - - ingestionClientConfig.authToken() - .ifPresent(authToken -> addAuthorizationHeader(httpRequest, authToken)); - - HttpResponse execute = httpClient.execute(httpRequest); - verifyStatusCode(execute); - - HttpEntity entity = execute.getEntity(); - - String body = EntityUtils.toString(entity, "UTF-8"); - - return objectMapper.readValue(body, new TypeReference<>() { - }); - } catch (SSLHandshakeException e) { - throw IngestionClientException.sslException(ingestionClientConfig.url()); - } catch (IOException e) { - throw new UnableToConnectIngestionServiceException(ingestionClientConfig.url(), e); - } catch (StreamxClientException e) { - throw sneakyThrow(e); - } - } - - private static void verifyStatusCode(HttpResponse execute) throws StreamxClientException { - if (execute.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { - throw new StreamxClientException( - // this message is copy-pasted from StreamxIngestionClient - "Authentication failed. Make sure that the given token is valid."); - } - } - - private URI buildUri(String publicationsEndpointUri) { - String uriString = String.format("%s/ingestion/v1/channels", publicationsEndpointUri); - try { - return new URI(uriString); - } catch (URISyntaxException e) { - throw sneakyThrow(e); - } - } - - private void addAuthorizationHeader(HttpGet httpRequest, String authToken) { - if (StringUtils.isNotBlank(authToken)) { - httpRequest.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + authToken); - } - } -} diff --git a/core/src/main/java/dev/streamx/cli/StreamxCommand.java b/core/src/main/java/dev/streamx/cli/StreamxCommand.java index c16b95aa..d196fb73 100644 --- a/core/src/main/java/dev/streamx/cli/StreamxCommand.java +++ b/core/src/main/java/dev/streamx/cli/StreamxCommand.java @@ -1,13 +1,7 @@ package dev.streamx.cli; -import dev.streamx.cli.command.cloud.deploy.DeployCommand; -import dev.streamx.cli.command.cloud.undeploy.UndeployCommand; -import dev.streamx.cli.command.dev.DevCommand; import dev.streamx.cli.command.ingestion.batch.BatchCommand; -import dev.streamx.cli.command.ingestion.publish.PublishCommand; import dev.streamx.cli.command.ingestion.stream.StreamCommand; -import dev.streamx.cli.command.ingestion.unpublish.UnpublishCommand; -import dev.streamx.cli.command.init.InitCommand; import dev.streamx.cli.command.run.RunCommand; import dev.streamx.cli.config.ArgumentConfigSource; import dev.streamx.cli.config.validation.ConfigSourcesValidator; @@ -30,11 +24,8 @@ @Command(mixinStandardHelpOptions = true, name = "streamx", subcommands = { - InitCommand.class, - RunCommand.class, DevCommand.class, - PublishCommand.class, UnpublishCommand.class, + RunCommand.class, BatchCommand.class, StreamCommand.class, - DeployCommand.class, UndeployCommand.class, HelpCommand.class }, versionProvider = VersionProvider.class) @@ -82,7 +73,7 @@ private static void initializeArgumentConfigSource(String[] args) { } @Override - public int run(String... args) throws Exception { + public int run(String... args) { this.args = args; commandLine = new CommandLine(this, factory) diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesArguments.java b/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesArguments.java deleted file mode 100644 index f5e98465..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesArguments.java +++ /dev/null @@ -1,30 +0,0 @@ -package dev.streamx.cli.command.cloud; - -import dev.streamx.cli.config.ArgumentConfigSource; -import picocli.CommandLine.Option; - -public class KubernetesArguments { - - @Option(names = {"-n", "--namespace"}, paramLabel = "", - description = "Forces provided namespace.") - void namespace(String namespace) { - ArgumentConfigSource.registerValue(KubernetesConfig.STREAMX_KUBERNETES_NAMESPACE, namespace); - } - - @Option(names = {"-d", "--resources-directories"}, paramLabel = "", - description = "Specifies one or more comma-separated relative directory paths (from the " - + "mesh.yaml directory) where managed Kubernetes resource definitions are located.") - void resourceDirectories(String resourceDirectories) { - ArgumentConfigSource.registerValue(KubernetesConfig.STREAMX_KUBERNETES_RESOURCE_DIRECTORIES, - resourceDirectories); - } - - @Option(names = {"-r", - "--controlled-resource-definitions"}, paramLabel = "", - description = "Specifies one or more comma-separated definitions in the form of" - + " [cluster:]group/version/kind to be managed by CLI during undeploy.") - void controlledResourceDefinitions(String resourceDefinitions) { - ArgumentConfigSource.registerValue( - KubernetesConfig.STREAMX_KUBERNETES_CONTROLLED_RESOURCE_DEFINITIONS, resourceDefinitions); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesConfig.java b/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesConfig.java deleted file mode 100644 index 5ca85dc8..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesConfig.java +++ /dev/null @@ -1,24 +0,0 @@ -package dev.streamx.cli.command.cloud; - -import io.smallrye.config.ConfigMapping; -import io.smallrye.config.WithName; -import java.util.Optional; - -@ConfigMapping -public interface KubernetesConfig { - - String STREAMX_KUBERNETES_NAMESPACE = "streamx.kubernetes.namespace"; - String STREAMX_KUBERNETES_RESOURCE_DIRECTORIES = "streamx.kubernetes.resource-directories"; - String STREAMX_KUBERNETES_CONTROLLED_RESOURCE_DEFINITIONS - = "streamx.kubernetes.controlled-resource-definitions"; - - @WithName(STREAMX_KUBERNETES_NAMESPACE) - Optional namespace(); - - @WithName(STREAMX_KUBERNETES_RESOURCE_DIRECTORIES) - Optional resourceDirectories(); - - @WithName(STREAMX_KUBERNETES_CONTROLLED_RESOURCE_DEFINITIONS) - Optional controlledResourceDefinitions(); - -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesService.java b/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesService.java deleted file mode 100644 index 3d5f7289..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/KubernetesService.java +++ /dev/null @@ -1,147 +0,0 @@ -package dev.streamx.cli.command.cloud; - -import static dev.streamx.cli.command.cloud.MetadataUtils.CONFIG_TYPE_LABEL; -import static dev.streamx.cli.command.cloud.MetadataUtils.DEFAULT_K8S_NAMESPACE; -import static dev.streamx.cli.command.cloud.MetadataUtils.SERVICEMESH_CRD_NAME; -import static dev.streamx.cli.command.cloud.MetadataUtils.setLabel; -import static dev.streamx.cli.command.cloud.MetadataUtils.setMetadata; - -import dev.streamx.cli.command.cloud.collector.ClusterResourcesCollector; -import dev.streamx.cli.command.cloud.collector.TypedClusterResourceCollector; -import dev.streamx.cli.command.cloud.deploy.Config; -import dev.streamx.cli.exception.KubernetesException; -import dev.streamx.operator.Component; -import dev.streamx.operator.crd.ServiceMesh; -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.Secret; -import io.fabric8.kubernetes.api.model.SecretBuilder; -import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.dsl.NonDeletingOperation; -import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.jetbrains.annotations.NotNull; - -@ApplicationScoped -public class KubernetesService { - - @Inject - KubernetesClient kubernetesClient; - @Inject - KubernetesConfig kubernetesConfig; - - public void deploy(List resources) { - resources.forEach(this::deploy); - } - - private void deploy(T resource) { - try { - kubernetesClient.resource(resource).inNamespace(getNamespace()) - .createOr(NonDeletingOperation::update); - } catch (KubernetesClientException e) { - throw KubernetesException.kubernetesClientException(e); - } - } - - public void undeploy(String meshName) { - undeploy(collectManagedResources(meshName)); - } - - public void undeploy(List resources) { - try { - resources.forEach(r -> kubernetesClient.resource(r).delete()); - } catch (KubernetesClientException e) { - throw KubernetesException.kubernetesClientException(e); - } - } - - public List collectManagedResources(String meshName) { - List result = new ArrayList<>(); - // Collect mesh - ServiceMesh mesh = kubernetesClient.resources(ServiceMesh.class).inNamespace(getNamespace()) - .withName(meshName).get(); - if (mesh != null) { - result.add(mesh); - } - - // Collect configs and secrets - result.addAll( - new TypedClusterResourceCollector(kubernetesClient, List.of(ConfigMap.class, Secret.class), - getNamespace()).collect(meshName)); - - // Collect other resources controlled by the CLI - result.addAll(new ClusterResourcesCollector(kubernetesClient, - getControlledResourceDefinitions(), getNamespace()).collect(meshName)); - - return result; - } - - public void validateCrdInstallation() { - try { - CustomResourceDefinition crd = kubernetesClient.apiextensions().v1() - .customResourceDefinitions() - .withName(SERVICEMESH_CRD_NAME) - .get(); - if (crd == null) { - throw KubernetesException.serviceMeshCrdNotFound(); - } - } catch (KubernetesClientException e) { - throw KubernetesException.kubernetesClientException(e); - } - } - - @NotNull - public ConfigMap buildConfigMap(String meshName, Config config) { - ConfigMap configMap = new ConfigMapBuilder() - .withNewMetadata() - .endMetadata() - .addToData(config.data()) - .build(); - String sanitizedName = KubernetesResourceUtil.sanitizeName(config.name()); - setMetadata(meshName, Component.EXTERNAL_CONFIG, sanitizedName, configMap); - setLabel(configMap, CONFIG_TYPE_LABEL, config.configType().getLabelValue()); - return configMap; - } - - @NotNull - public Secret buildSecret(String meshName, Config config) { - Secret secret = new SecretBuilder() - .withNewMetadata() - .endMetadata() - .withStringData(config.data()) - .build(); - String sanitizedName = KubernetesResourceUtil.sanitizeName(config.name()); - setMetadata(meshName, Component.EXTERNAL_SECRET, sanitizedName, secret); - setLabel(secret, CONFIG_TYPE_LABEL, config.configType().getLabelValue()); - return secret; - } - - public String getNamespace() { - return kubernetesConfig.namespace() - .orElse(Optional.ofNullable(kubernetesClient.getNamespace()).orElse(DEFAULT_K8S_NAMESPACE)); - } - - public List getResourcePaths() { - return kubernetesConfig.resourceDirectories().map(paths -> Arrays.stream(paths.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList())).orElse(List.of()); - } - - public List getControlledResourceDefinitions() { - return kubernetesConfig.controlledResourceDefinitions() - .map(paths -> Arrays.stream(paths.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList())).orElse(List.of()); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/MetadataUtils.java b/core/src/main/java/dev/streamx/cli/command/cloud/MetadataUtils.java deleted file mode 100644 index d2e3205d..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/MetadataUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -package dev.streamx.cli.command.cloud; - -import dev.streamx.operator.Component; -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; -import java.util.HashMap; -import java.util.Map; - -public class MetadataUtils { - - public static final String NAME_LABEL = "app.kubernetes.io/name"; - public static final String INSTANCE_LABEL = "app.kubernetes.io/instance"; - public static final String COMPONENT_LABEL = "app.kubernetes.io/component"; - public static final String MANAGED_BY_LABEL = "app.kubernetes.io/managed-by"; - public static final String MANAGED_BY_LABEL_VALUE = "streamx-cli"; - public static final String CONFIG_TYPE_LABEL = "mesh.streamx.dev/config-type"; - public static final String PART_OF_LABEL = "app.kubernetes.io/part-of"; - public static final String SERVICEMESH_CRD_NAME = "servicemeshes.streamx.dev"; - public static final String DEFAULT_K8S_NAMESPACE = "default"; - - private MetadataUtils() { - // No instances - } - - public static Map createPartOfAndManagedByLabels(String meshName) { - return Map.of( - PART_OF_LABEL, meshName, - MANAGED_BY_LABEL, MANAGED_BY_LABEL_VALUE - ); - } - - public static void setMetadata(String meshName, Component component, String name, - HasMetadata resource) { - String instanceName = getResourceName(meshName, component.getShortName(), name); - resource.getMetadata().setName(instanceName); - setLabel(resource, INSTANCE_LABEL, instanceName); - setLabel(resource, COMPONENT_LABEL, component.getName()); - setLabel(resource, NAME_LABEL, name); - setManagedByAndPartOfLabels(resource, meshName); - } - - public static void setManagedByAndPartOfLabels(HasMetadata resource, String meshName) { - setLabel(resource, PART_OF_LABEL, meshName); - setLabel(resource, MANAGED_BY_LABEL, MANAGED_BY_LABEL_VALUE); - } - - public static String getResourceName(String meshName, String componentName, String name) { - return KubernetesResourceUtil.sanitizeName(meshName + "-" + componentName + "-" + name); - } - - public static void setLabel(HasMetadata resource, String key, String value) { - if (resource.getMetadata().getLabels() == null) { - resource.getMetadata().setLabels(new HashMap<>()); - } - resource.getMetadata().getLabels().put(key, value); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/ProjectPathsResolver.java b/core/src/main/java/dev/streamx/cli/command/cloud/ProjectPathsResolver.java deleted file mode 100644 index 14d9b397..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/ProjectPathsResolver.java +++ /dev/null @@ -1,38 +0,0 @@ -package dev.streamx.cli.command.cloud; - -import dev.streamx.cli.command.meshprocessing.MeshResolver; -import jakarta.enterprise.context.ApplicationScoped; -import java.nio.file.Path; -import org.jetbrains.annotations.NotNull; - -@ApplicationScoped -public class ProjectPathsResolver { - - public static final String CONFIGS_DIRECTORY = "configs"; - public static final String SECRETS_DIRECTORY = "secrets"; - protected static final String YAML_EXT = ".yaml"; - static final String DEPLOYMENT = "deployment"; - static final String DEPLOYMENT_FILE_NAME = DEPLOYMENT + YAML_EXT; - - @NotNull - public Path resolveDeploymentPath(Path meshPath) { - String meshFileName = meshPath.getFileName().toString(); - String deploymentFileName = DEPLOYMENT_FILE_NAME; - if (!MeshResolver.MESH_YAML.equals(meshFileName)) { - deploymentFileName = DEPLOYMENT + "." + meshFileName; - } - return meshPath.getParent().resolve(deploymentFileName); - } - - public Path resolveSecretPath(Path projectPath, String sourcePath) { - return resolveSourcePath(projectPath, SECRETS_DIRECTORY, sourcePath); - } - - public Path resolveConfigPath(Path projectPath, String sourcePath) { - return resolveSourcePath(projectPath, CONFIGS_DIRECTORY, sourcePath); - } - - private Path resolveSourcePath(Path projectPath, String sourceDirectory, String sourcePath) { - return projectPath.resolve(sourceDirectory).resolve(sourcePath); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/ServiceMeshResolver.java b/core/src/main/java/dev/streamx/cli/command/cloud/ServiceMeshResolver.java deleted file mode 100644 index 0b963aa0..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/ServiceMeshResolver.java +++ /dev/null @@ -1,182 +0,0 @@ -package dev.streamx.cli.command.cloud; - -import com.fasterxml.jackson.databind.ObjectMapper; -import dev.streamx.cli.interpolation.Interpolating; -import dev.streamx.cli.util.ExceptionUtils; -import dev.streamx.mesh.model.AbstractContainer; -import dev.streamx.mesh.model.AbstractFromSource; -import dev.streamx.mesh.model.DeliveryService; -import dev.streamx.mesh.model.EnvironmentFrom; -import dev.streamx.mesh.model.VolumesFrom; -import dev.streamx.operator.crd.ServiceMesh; -import dev.streamx.operator.crd.ServiceMeshSpec; -import dev.streamx.operator.crd.deployment.ServiceMeshDeploymentConfig; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -@ApplicationScoped -public class ServiceMeshResolver { - - public static final String SERVICE_MESH_NAME = "sx"; - @Inject - @Interpolating - ObjectMapper objectMapper; - - @Inject - ProjectPathsResolver projectPathsResolver; - - @NotNull - public ServiceMesh resolveMesh(Path meshPath) { - File meshPathFile = meshPath.toFile(); - if (!meshPathFile.exists()) { - throw new RuntimeException("Mesh file with provided path '" + meshPath + "' does not exist."); - } - if (meshPathFile.length() < 1) { - throw new RuntimeException("Mesh file with provided path '" + meshPath + "' is empty."); - } - ServiceMesh serviceMesh = new ServiceMesh(); - try { - ServiceMeshSpec spec = objectMapper.readValue(meshPathFile, - ServiceMeshSpec.class); - ServiceMeshDeploymentConfig serviceMeshDeploymentConfig = readDeploymentConfig(meshPath); - spec.setDeploymentConfig(serviceMeshDeploymentConfig); - serviceMesh.setSpec(spec); - serviceMesh.getMetadata().setName(SERVICE_MESH_NAME); - } catch (IOException e) { - throw new RuntimeException( - ExceptionUtils.appendLogSuggestion( - "Unable to read mesh definition from '" + meshPath + "'.\n" - + "\n" - + "Details:\n" - + e.getMessage()), e); - } - return serviceMesh; - } - - @NotNull - public ConfigSourcesPaths extractConfigSourcesPaths(ServiceMesh serviceMesh) { - Set configEnvPaths = new HashSet<>(); - Set secretEnvPaths = new HashSet<>(); - Set configVolumePaths = new HashSet<>(); - Set secretVolumePaths = new HashSet<>(); - processGlobalEnvSources(serviceMesh, configEnvPaths, secretEnvPaths); - List containers = extractContainers(serviceMesh); - containers.forEach(container -> { - EnvironmentFrom environmentFrom = container.getEnvironmentFrom(); - configEnvPaths.addAll( - extractConfigSourcesPaths(environmentFrom, AbstractFromSource::getConfigs, null)); - secretEnvPaths.addAll( - extractConfigSourcesPaths(environmentFrom, AbstractFromSource::getSecrets, null)); - VolumesFrom volumesFrom = container.getVolumesFrom(); - configVolumePaths.addAll( - extractConfigSourcesPaths(volumesFrom, AbstractFromSource::getConfigs, - this::mapToHostPath)); - secretVolumePaths.addAll( - extractConfigSourcesPaths(volumesFrom, AbstractFromSource::getSecrets, - this::mapToHostPath)); - }); - - return new ConfigSourcesPaths(configEnvPaths, secretEnvPaths, configVolumePaths, - secretVolumePaths); - } - - @NotNull - private List extractConfigSourcesPaths(AbstractFromSource fromSource, - Function> pathsExtractor, Function mapper) { - List configsPaths = Collections.emptyList(); - if (fromSource != null) { - List extractedPaths = pathsExtractor.apply(fromSource); - if (extractedPaths != null) { - configsPaths = extractedPaths.stream().filter(Objects::nonNull).toList(); - if (mapper != null) { - configsPaths = configsPaths.stream().map(mapper).collect(Collectors.toList()); - } - } - } - return configsPaths; - } - - @Nullable - private ServiceMeshDeploymentConfig readDeploymentConfig(Path meshPath) { - Path deploymentPath = projectPathsResolver.resolveDeploymentPath(meshPath); - ServiceMeshDeploymentConfig serviceMeshDeploymentConfig = null; - File deploymentFile = deploymentPath.toFile(); - if (deploymentFile.exists() && deploymentFile.length() > 0) { - try { - serviceMeshDeploymentConfig = objectMapper.readValue(deploymentFile, - ServiceMeshDeploymentConfig.class); - } catch (IOException e) { - throw new RuntimeException( - ExceptionUtils.appendLogSuggestion( - "Unable to read deployment from '" + deploymentPath + "'.\n" - + "\n" - + "Details:\n" - + e.getMessage()), e); - } - } - return serviceMeshDeploymentConfig; - } - - @NotNull - List extractContainers(ServiceMesh serviceMesh) { - ServiceMeshSpec serviceMeshSpec = serviceMesh.getSpec(); - List containers = Stream.of( - serviceMeshSpec.getIngestion(), - serviceMeshSpec.getProcessing(), - serviceMeshSpec.getDelivery() - ).filter(Objects::nonNull).map(Map::values) - .flatMap(Collection::stream).collect(Collectors.toList()); - containers.addAll( - Optional.ofNullable(serviceMeshSpec.getDelivery()) - .orElse(Collections.emptyMap()) - .values() - .stream() - .map(DeliveryService::getComponents) - .filter(Objects::nonNull) - .map(Map::values) - .flatMap(Collection::stream) - .toList() - ); - return containers; - } - - private void processGlobalEnvSources(ServiceMesh serviceMesh, Set envConfigsPaths, - Set envSecretsPaths) { - EnvironmentFrom globalEnvironmentFrom = serviceMesh.getSpec().getEnvironmentFrom(); - if (globalEnvironmentFrom != null) { - List globalEnvironmentFromConfigs = globalEnvironmentFrom.getConfigs(); - if (globalEnvironmentFromConfigs != null) { - envConfigsPaths.addAll(globalEnvironmentFromConfigs); - } - List globalEnvironmentFromSecrets = globalEnvironmentFrom.getSecrets(); - if (globalEnvironmentFromSecrets != null) { - envSecretsPaths.addAll(globalEnvironmentFromSecrets); - } - } - } - - private String mapToHostPath(String volumeConf) { - return volumeConf.split(":")[0]; - } - - public record ConfigSourcesPaths(Set configEnvPaths, Set secretEnvPaths, - Set configVolumePaths, Set secretVolumePaths) { - - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/collector/ClusterResourcesCollector.java b/core/src/main/java/dev/streamx/cli/command/cloud/collector/ClusterResourcesCollector.java deleted file mode 100644 index 6d0f38a4..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/collector/ClusterResourcesCollector.java +++ /dev/null @@ -1,84 +0,0 @@ -package dev.streamx.cli.command.cloud.collector; - -import static dev.streamx.cli.command.cloud.MetadataUtils.createPartOfAndManagedByLabels; -import static org.apache.commons.lang3.StringUtils.removeStart; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Collects controlled resources deployed to the Kubernetes cluster. Identifies the resources by the - * assigned labels. The scope of the class is the mesh namespace and the list of resources - * definitions, i.e: - *
- *   - v1/Pod
- *   - apps/v1/Deployment
- *   - v1/Service
- *   - batch/v1/Job
- *   - cluster:rbac.authorization.k8s.io/v1/ClusterRole
- * 
- */ -public class ClusterResourcesCollector implements KubernetesResourcesCollector { - - private static final String CLUSTER_RESOURCE_TYPE_MARKER = "cluster:"; - - private final KubernetesClient kubernetesClient; - private final List resourceDefinitions; - private final String namespace; - - public ClusterResourcesCollector(KubernetesClient kubernetesClient, - List resourceDefinitions, - String namespace) { - this.kubernetesClient = kubernetesClient; - this.resourceDefinitions = resourceDefinitions; - this.namespace = namespace; - } - - public List collect(String meshName) { - - List result = new ArrayList<>(); - // Split the property string by comma - resourceDefinitions.forEach(entry -> { - boolean clusterScoped = entry.startsWith(CLUSTER_RESOURCE_TYPE_MARKER); - String[] parts = (clusterScoped ? removeStart(entry, CLUSTER_RESOURCE_TYPE_MARKER) - : entry).split("/"); - - if (parts.length != 3 && parts.length != 2) { - throw new IllegalArgumentException("Invalid resource specification: " + entry - + ". Expected format: group/version/kind or version/kind"); - } - // For resources like Pod or ConfigMap, the group is skipped - boolean isCoreResource = parts.length == 2; - String group = isCoreResource ? "" : parts[0].trim(); - String version = isCoreResource ? parts[0].trim() : parts[1].trim(); - String kind = isCoreResource ? parts[1].trim() : parts[2].trim(); - - result.addAll(collectResourcesByType(meshName, group, version, kind, clusterScoped)); - }); - - return result; - } - - /** - * Collects resources of the given API group, version, and kind, using the global label selector. - */ - private List collectResourcesByType(String meshName, String group, String version, - String kind, boolean clusterScoped) { - Map selector = createPartOfAndManagedByLabels(meshName); - - ResourceDefinitionContext context = new ResourceDefinitionContext.Builder() - .withGroup(group) - .withVersion(version) - .withKind(kind) - .withNamespaced(!clusterScoped) - .build(); - - return new ArrayList<>(kubernetesClient.genericKubernetesResources(context) - .inNamespace(namespace) - .withLabels(selector).list().getItems()); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/collector/DirectoryResourcesCollector.java b/core/src/main/java/dev/streamx/cli/command/cloud/collector/DirectoryResourcesCollector.java deleted file mode 100644 index 6b8a511e..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/collector/DirectoryResourcesCollector.java +++ /dev/null @@ -1,71 +0,0 @@ -package dev.streamx.cli.command.cloud.collector; - -import static dev.streamx.cli.command.cloud.MetadataUtils.setManagedByAndPartOfLabels; - -import com.fasterxml.jackson.databind.MappingIterator; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.fabric8.kubernetes.api.model.HasMetadata; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -/** - * Collects controlled resources defined in the resources directory. - */ -public class DirectoryResourcesCollector implements KubernetesResourcesCollector { - - private final ObjectMapper objectMapper; - private final Path projectPath; - private final List resourcesDirectories; - - - public DirectoryResourcesCollector(ObjectMapper objectMapper, Path projectPath, - List resourcesDirectories) { - this.objectMapper = objectMapper; - this.projectPath = projectPath; - this.resourcesDirectories = resourcesDirectories; - } - - @Override - public List collect(String meshName) { - List resources = new ArrayList<>(); - - return resourcesDirectories.stream().flatMap(resourcesDirectory -> { - Path dirPath = projectPath.resolve(resourcesDirectory); - - if (!Files.exists(dirPath) || !Files.isDirectory(dirPath)) { - throw new IllegalArgumentException( - "Kubernetes resources directory: %s does not exist".formatted(dirPath)); - } - - try (Stream files = Files.list(dirPath)) { - files.forEach(file -> processResourceFile(file, resources)); - } catch (IOException e) { - throw new RuntimeException("Error reading directory: " + dirPath, e); - } - - resources.forEach(r -> setManagedByAndPartOfLabels(r, meshName)); - return resources.stream(); - }).toList(); - } - - private void processResourceFile(Path file, List resources) { - String fileName = file.getFileName().toString(); - if (!fileName.endsWith(".yaml") && !fileName.endsWith(".yml")) { - return; - } - try (MappingIterator iterator = - this.objectMapper.readerFor(HasMetadata.class).readValues(file.toFile())) { - - while (iterator.hasNext()) { - resources.add(iterator.next()); - } - - } catch (IOException e) { - throw new RuntimeException("Failed to load resource from file: " + file, e); - } - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/collector/KubernetesResourcesCollector.java b/core/src/main/java/dev/streamx/cli/command/cloud/collector/KubernetesResourcesCollector.java deleted file mode 100644 index d14f9864..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/collector/KubernetesResourcesCollector.java +++ /dev/null @@ -1,10 +0,0 @@ -package dev.streamx.cli.command.cloud.collector; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import java.util.List; - -public interface KubernetesResourcesCollector { - - List collect(String meshName); - -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/collector/TypedClusterResourceCollector.java b/core/src/main/java/dev/streamx/cli/command/cloud/collector/TypedClusterResourceCollector.java deleted file mode 100644 index 5738cc9b..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/collector/TypedClusterResourceCollector.java +++ /dev/null @@ -1,39 +0,0 @@ -package dev.streamx.cli.command.cloud.collector; - -import static dev.streamx.cli.command.cloud.MetadataUtils.createPartOfAndManagedByLabels; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClient; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Collects controlled resources deployed to the Kubernetes cluster. Identifies the resources by the - * assigned labels. The scope of the class is the mesh namespace and the resource classes. - */ -public class TypedClusterResourceCollector implements KubernetesResourcesCollector { - - private final KubernetesClient kubernetesClient; - private final List> resourceDefinitions; - private final String namespace; - - public TypedClusterResourceCollector(KubernetesClient kubernetesClient, - List> resourceDefinitions, - String namespace) { - this.kubernetesClient = kubernetesClient; - this.resourceDefinitions = resourceDefinitions; - this.namespace = namespace; - } - - @Override - public List collect(String meshName) { - Map selector = createPartOfAndManagedByLabels(meshName); - List result = new ArrayList<>(); - resourceDefinitions.forEach(r -> - result.addAll(kubernetesClient.resources(r).inNamespace(namespace) - .withLabels(selector).list().getItems()) - ); - return result; - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/Config.java b/core/src/main/java/dev/streamx/cli/command/cloud/deploy/Config.java deleted file mode 100644 index 3b51421e..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/Config.java +++ /dev/null @@ -1,14 +0,0 @@ -package dev.streamx.cli.command.cloud.deploy; - -import java.util.Map; - -public record Config(String name, Map data, ConfigType configType) { - - public enum ConfigType { - DIR, FILE; - - public String getLabelValue() { - return this.toString().toLowerCase(); - } - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ConfigService.java b/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ConfigService.java deleted file mode 100644 index 44d31829..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ConfigService.java +++ /dev/null @@ -1,77 +0,0 @@ -package dev.streamx.cli.command.cloud.deploy; - -import dev.streamx.cli.command.cloud.ProjectPathsResolver; -import dev.streamx.cli.command.cloud.deploy.Config.ConfigType; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import java.io.File; -import java.nio.file.Path; -import java.util.Map; -import java.util.function.Function; -import org.jetbrains.annotations.NotNull; - -@ApplicationScoped -public class ConfigService { - - @Inject - DataService dataService; - - @Inject - ProjectPathsResolver projectPathsResolver; - - @NotNull - public Config getSecretVolume(Path projectPath, String configPath) { - return getConfig(configPath, getSecretConfigPathMapper(projectPath), - dataService::loadDataFromFiles, this::getConfigType); - } - - @NotNull - public Config getConfigVolume(Path projectPath, String configPath) { - return getConfig(configPath, getConfigPathMapper(projectPath), - dataService::loadDataFromFiles, this::getConfigType); - } - - @NotNull - public Config getSecretEnv(Path projectPath, String configPath) { - return getConfig(configPath, getSecretConfigPathMapper(projectPath), - dataService::loadDataFromProperties, path -> ConfigType.FILE); - } - - @NotNull - public Config getConfigEnv(Path projectPath, String configPath) { - return getConfig(configPath, getConfigPathMapper(projectPath), - dataService::loadDataFromProperties, path -> ConfigType.FILE); - } - - @NotNull - private Function getSecretConfigPathMapper(Path projectPath) { - return (path) -> projectPathsResolver.resolveSecretPath(projectPath, path); - } - - @NotNull - private Function getConfigPathMapper(Path projectPath) { - return (path) -> projectPathsResolver.resolveConfigPath(projectPath, path); - } - - @NotNull - private Config getConfig(String configPath, Function pathMapper, - Function> dataMapper, Function configTypeMapper) { - Path mappedPath = pathMapper.apply(configPath); - Map data = dataMapper.apply(mappedPath); - ConfigType configType = configTypeMapper.apply(mappedPath); - return new Config(configPath, data, configType); - } - - @NotNull - ConfigType getConfigType(Path dataSourcePath) { - File dataSource = dataSourcePath.toFile(); - if (dataSource.isFile()) { - return ConfigType.FILE; - } - if (dataSource.isDirectory()) { - return ConfigType.DIR; - } - throw new IllegalStateException( - "Config source " + dataSource + " provided in Mesh should be file or directory."); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/DataService.java b/core/src/main/java/dev/streamx/cli/command/cloud/deploy/DataService.java deleted file mode 100644 index 8dce4263..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/DataService.java +++ /dev/null @@ -1,99 +0,0 @@ -package dev.streamx.cli.command.cloud.deploy; - -import jakarta.enterprise.context.ApplicationScoped; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Stream; - -@ApplicationScoped -public class DataService { - - private static final Pattern validKeyPattern = Pattern.compile("[-._a-zA-Z0-9]+"); - - public Map loadDataFromProperties(Path propertiesFilePath) { - File propertiesFile = propertiesFilePath.toFile(); - if (!propertiesFile.exists() || !propertiesFile.isFile()) { - throw new IllegalStateException("Path " + propertiesFilePath.normalize() - + " provided in Mesh must be a valid properties file."); - } - Properties properties = new Properties(); - try (FileInputStream fis = new FileInputStream(propertiesFile)) { - properties.load(fis); - Map data = new HashMap<>(); - for (Map.Entry entry : properties.entrySet()) { - String propertyKey = entry.getKey().toString(); - validatePropertyKey(propertiesFilePath.toString(), propertyKey); - data.put(propertyKey, entry.getValue().toString()); - } - return data; - } catch (IOException e) { - throw new IllegalStateException("Error reading properties file: " + propertiesFile, e); - } - } - - public Map loadDataFromFiles(Path path) { - Map data = new HashMap<>(); - try { - if (Files.isRegularFile(path)) { - loadDataFromFile(path.toString(), path, data); - } else if (Files.isDirectory(path)) { - try (Stream walk = Files.walk(path, 1)) { - walk - .filter(Files::isRegularFile) - .forEach(file -> { - try { - loadDataFromFile(path.toString(), file, data); - } catch (IOException e) { - throw new RuntimeException("Failed to read data file: " + file.toAbsolutePath(), - e); - } - }); - } - } else { - throw new IllegalArgumentException( - "Path " + path.normalize() + " provided in Mesh must be a file or a directory."); - } - } catch (IOException e) { - throw new IllegalStateException("Failed to convert " + path.normalize() + " to data", e); - } - - return data; - } - - private void loadDataFromFile(String configPath, Path filePath, Map data) - throws IOException { - String content = Files.readString(filePath); - String fileName = filePath.getFileName().toString(); - validateFileName(configPath, fileName); - data.put(fileName, content); - } - - private void validateFileName(String configPath, String fileName) { - if (isConfigDataKeyInvalid(fileName)) { - throw new IllegalArgumentException( - "Invalid file name: " + fileName + " in volumesFrom: " + configPath - + ". Valid file name must consist of alphanumeric characters, '-', '_' or '.'."); - } - } - - private void validatePropertyKey(String configPath, String key) { - if (isConfigDataKeyInvalid(key)) { - throw new IllegalArgumentException( - "Invalid properties key: " + key + " in environmentFrom: " + configPath - + ". Valid property key must consist of alphanumeric characters, '-', '_' or '.'."); - } - } - - private boolean isConfigDataKeyInvalid(String key) { - Matcher matcher = validKeyPattern.matcher(key); - return !matcher.matches(); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/DeployCommand.java b/core/src/main/java/dev/streamx/cli/command/cloud/deploy/DeployCommand.java deleted file mode 100644 index 4ec4112f..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/DeployCommand.java +++ /dev/null @@ -1,123 +0,0 @@ -package dev.streamx.cli.command.cloud.deploy; - -import static dev.streamx.cli.util.Output.printf; - -import com.fasterxml.jackson.databind.ObjectMapper; -import dev.streamx.cli.VersionProvider; -import dev.streamx.cli.command.cloud.KubernetesArguments; -import dev.streamx.cli.command.cloud.KubernetesService; -import dev.streamx.cli.command.cloud.ServiceMeshResolver; -import dev.streamx.cli.command.cloud.ServiceMeshResolver.ConfigSourcesPaths; -import dev.streamx.cli.command.cloud.collector.DirectoryResourcesCollector; -import dev.streamx.cli.command.cloud.collector.KubernetesResourcesCollector; -import dev.streamx.cli.command.meshprocessing.MeshConfig; -import dev.streamx.cli.command.meshprocessing.MeshResolver; -import dev.streamx.cli.command.meshprocessing.MeshSource; -import dev.streamx.cli.interpolation.Interpolating; -import dev.streamx.operator.crd.ServiceMesh; -import io.fabric8.kubernetes.api.model.HasMetadata; -import jakarta.inject.Inject; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Command; - -@Command( - name = DeployCommand.COMMAND_NAME, - mixinStandardHelpOptions = true, - versionProvider = VersionProvider.class, - description = "Deploy the StreamX project to the cloud.", - footer = DeployCommand.CLOUD_COMMAND_FOOTER -) -public class DeployCommand implements Runnable { - - public static final String COMMAND_NAME = "deploy"; - - public static final String CLOUD_COMMAND_FOOTER = """ - - The command automatically uses the cluster connection and namespace settings from the \ - current context in your @|italic kubeconfig|@ file. Ensure that your @|italic kubeconfig|@ \ - is configured correctly and pointing to the desired cluster and namespace. You can verify \ - your current context and namespace by running: - - @|yellow kubectl config current-context|@ - @|yellow kubectl config view --minify | grep namespace|@ - - If necessary, switch to the correct context using: - - @|yellow kubectl config use-context |@ - - This command assumes the StreamX Operator is installed and the required CRDs are available \ - on the target cluster. If not, please install the operator and ensure the cluster meets \ - the prerequisites before running this command."""; - - @ArgGroup - MeshSource meshSource; - - @ArgGroup(exclusive = false) - KubernetesArguments kubernetesArguments; - - @Inject - MeshConfig meshConfig; - - @Inject - MeshResolver meshResolver; - - @Inject - ServiceMeshResolver serviceMeshResolver; - - @Inject - KubernetesService kubernetesService; - - @Inject - ProjectResourcesExtractor projectResourcesExtractor; - - @Inject - @Interpolating - ObjectMapper objectMapper; - - @Override - public void run() { - Path meshPath = meshResolver.resolveMeshPath(meshConfig); - meshPath = meshPath.toAbsolutePath(); - ServiceMesh serviceMesh = serviceMeshResolver.resolveMesh(meshPath); - Path projectPath = meshPath.getParent(); - deploy(serviceMesh, projectPath); - } - - private void deploy(ServiceMesh serviceMesh, Path projectPath) { - kubernetesService.validateCrdInstallation(); - ConfigSourcesPaths configPaths = serviceMeshResolver.extractConfigSourcesPaths(serviceMesh); - String serviceMeshName = serviceMesh.getMetadata().getName(); - - List resourcesToDeploy = new ArrayList<>(); - List managedResources = kubernetesService.collectManagedResources(serviceMeshName); - - // Collect all resources to deploy - resourcesToDeploy.addAll(collectKubernetesResources(projectPath, serviceMeshName)); - resourcesToDeploy.addAll( - projectResourcesExtractor.getSecrets(projectPath, configPaths, serviceMeshName)); - resourcesToDeploy.addAll( - projectResourcesExtractor.getConfigMaps(projectPath, configPaths, serviceMeshName)); - resourcesToDeploy.add(serviceMesh); - - // Collect all resources to delete - ResourceCleaner cleaner = new ResourceCleaner(resourcesToDeploy, managedResources); - kubernetesService.deploy(resourcesToDeploy); - printf("Project %s successfully deployed to '%s' namespace.%n", - projectPath.toAbsolutePath().normalize(), kubernetesService.getNamespace()); - List orphanedResources = cleaner.getOrphanedResources(); - printf("Deleting %d orphaned resources.\n", orphanedResources.size()); - kubernetesService.undeploy(orphanedResources); - - } - - private List collectKubernetesResources(Path projectPath, String serviceMeshName) { - List resourcesDirectories = kubernetesService.getResourcePaths(); - - KubernetesResourcesCollector collector = new DirectoryResourcesCollector(objectMapper, - projectPath, resourcesDirectories); - return collector.collect(serviceMeshName); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ProjectResourcesExtractor.java b/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ProjectResourcesExtractor.java deleted file mode 100644 index 178ed241..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ProjectResourcesExtractor.java +++ /dev/null @@ -1,80 +0,0 @@ -package dev.streamx.cli.command.cloud.deploy; - -import dev.streamx.cli.command.cloud.KubernetesService; -import dev.streamx.cli.command.cloud.ServiceMeshResolver.ConfigSourcesPaths; -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.Secret; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import java.nio.file.Path; -import java.util.List; -import java.util.Set; -import java.util.stream.Stream; -import org.jetbrains.annotations.NotNull; - -@ApplicationScoped -public class ProjectResourcesExtractor { - - @Inject - ConfigService configService; - - @Inject - KubernetesService kubernetesService; - - @NotNull - public List getSecrets(Path projectPath, ConfigSourcesPaths configSourcesPaths, - String serviceMeshName) { - List<@NotNull Secret> envSecrets = getEnvSecrets(projectPath, - configSourcesPaths.secretEnvPaths(), - serviceMeshName); - List<@NotNull Secret> volumeSecrets = getVolumeSecrets(projectPath, - configSourcesPaths.secretVolumePaths(), serviceMeshName); - return Stream.concat(envSecrets.stream(), volumeSecrets.stream()).toList(); - } - - @NotNull - public List getConfigMaps(Path projectPath, ConfigSourcesPaths configSourcesPaths, - String serviceMeshName) { - List<@NotNull ConfigMap> envConfigMaps = getEnvConfigMaps(projectPath, - configSourcesPaths.configEnvPaths(), serviceMeshName); - List<@NotNull ConfigMap> volumeConfigMaps = getVolumeConfigMaps(projectPath, - configSourcesPaths.configVolumePaths(), serviceMeshName); - return Stream.concat(envConfigMaps.stream(), volumeConfigMaps.stream()).toList(); - } - - @NotNull - private List<@NotNull Secret> getVolumeSecrets(Path projectPath, Set volumePaths, - String serviceMeshName) { - return volumePaths.stream() - .map(path -> configService.getSecretVolume(projectPath, path)) - .map(config -> kubernetesService.buildSecret(serviceMeshName, config)) - .toList(); - } - - @NotNull - private List<@NotNull Secret> getEnvSecrets(Path projectPath, Set envPaths, - String serviceMeshName) { - return envPaths.stream() - .map(path -> configService.getSecretEnv(projectPath, path)) - .map(config -> kubernetesService.buildSecret(serviceMeshName, config)) - .toList(); - } - - @NotNull - private List<@NotNull ConfigMap> getEnvConfigMaps(Path projectPath, Set envPaths, - String serviceMeshName) { - return envPaths.stream() - .map(path -> configService.getConfigEnv(projectPath, path)) - .map(config -> kubernetesService.buildConfigMap(serviceMeshName, config)) - .toList(); - } - - @NotNull - private List<@NotNull ConfigMap> getVolumeConfigMaps(Path projectPath, Set volumePaths, - String serviceMeshName) { - return volumePaths.stream() - .map(path -> configService.getConfigVolume(projectPath, path)) - .map(config -> kubernetesService.buildConfigMap(serviceMeshName, config)) - .toList(); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ResourceCleaner.java b/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ResourceCleaner.java deleted file mode 100644 index 5047c859..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/deploy/ResourceCleaner.java +++ /dev/null @@ -1,47 +0,0 @@ -package dev.streamx.cli.command.cloud.deploy; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -public class ResourceCleaner { - - private final List resourcesToDeploy; - private final List managedResources; - - /** - * Constructor takes two lists: - * - * @param resourcesToDeploy list of resources planned for deployment - * @param managedResources list of currently managed resources - */ - public ResourceCleaner(List resourcesToDeploy, List managedResources) { - this.resourcesToDeploy = resourcesToDeploy; - this.managedResources = managedResources; - } - - /** - * Returns the list of orphaned managed resources, i.e. those managed resources that are not - * present in the deployment list, based on namespace and name. - * - * @return List of resources to remove. - */ - public List getOrphanedResources() { - // Build a set of keys (namespace/name) for resources to deploy. - Set deployKeys = resourcesToDeploy.stream() - .map(this::keyFor) - .collect(Collectors.toSet()); - - // Filter managedResources to keep only those not present in deployKeys. - return managedResources.stream() - .filter(resource -> !deployKeys.contains(keyFor(resource))) - .collect(Collectors.toList()); - } - - private String keyFor(HasMetadata resource) { - String api = resource.getApiVersion() + "/" + resource.getKind(); - String name = resource.getMetadata().getName(); - return api + "/" + name; - } -} \ No newline at end of file diff --git a/core/src/main/java/dev/streamx/cli/command/cloud/undeploy/UndeployCommand.java b/core/src/main/java/dev/streamx/cli/command/cloud/undeploy/UndeployCommand.java deleted file mode 100644 index eb439bac..00000000 --- a/core/src/main/java/dev/streamx/cli/command/cloud/undeploy/UndeployCommand.java +++ /dev/null @@ -1,38 +0,0 @@ -package dev.streamx.cli.command.cloud.undeploy; - -import static dev.streamx.cli.command.cloud.ServiceMeshResolver.SERVICE_MESH_NAME; -import static dev.streamx.cli.util.Output.printf; - -import dev.streamx.cli.VersionProvider; -import dev.streamx.cli.command.cloud.KubernetesArguments; -import dev.streamx.cli.command.cloud.KubernetesService; -import dev.streamx.cli.command.cloud.deploy.DeployCommand; -import jakarta.inject.Inject; -import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Command; - -@Command( - name = UndeployCommand.COMMAND_NAME, - mixinStandardHelpOptions = true, - versionProvider = VersionProvider.class, - description = "Undeploy the StreamX Project from the cloud.", - footer = DeployCommand.CLOUD_COMMAND_FOOTER -) -public class UndeployCommand implements Runnable { - - public static final String COMMAND_NAME = "undeploy"; - - @ArgGroup(exclusive = false) - KubernetesArguments kubernetesArguments; - @Inject - KubernetesService kubernetesService; - - @Override - public void run() { - kubernetesService.validateCrdInstallation(); - kubernetesService.undeploy(SERVICE_MESH_NAME); - - printf("StreamX project successfully undeployed from '%s' namespace.%n", - kubernetesService.getNamespace()); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/BrowserOpener.java b/core/src/main/java/dev/streamx/cli/command/dev/BrowserOpener.java deleted file mode 100644 index 6448571a..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/BrowserOpener.java +++ /dev/null @@ -1,36 +0,0 @@ -package dev.streamx.cli.command.dev; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import java.awt.Desktop; -import java.net.URI; -import org.jboss.logging.Logger; - -@ApplicationScoped -public class BrowserOpener { - - @Inject - Logger logger; - - @Inject - DevConfig devConfig; - - public void tryOpenBrowser() { - if (!devConfig.openOnStart()) { - return; - } - - try { - if (Desktop.isDesktopSupported()) { - Desktop desktop = Desktop.getDesktop(); - String entryUrl = "http://localhost:%d/overview".formatted(devConfig.dashboardPort()); - URI meshManagerUri = new URI(entryUrl); - desktop.browse(meshManagerUri); - } else { - logger.warn("Opening browser is not supported"); - } - } catch (Exception e) { - logger.error("Opening browser failed", e); - } - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/DashboardContainer.java b/core/src/main/java/dev/streamx/cli/command/dev/DashboardContainer.java deleted file mode 100644 index 7c032368..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/DashboardContainer.java +++ /dev/null @@ -1,31 +0,0 @@ -package dev.streamx.cli.command.dev; - -import java.util.List; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; - -public class DashboardContainer extends GenericContainer { - - public static final String CONTAINER_NAME = "streamx-dashboard"; - - private static final int DASHBOARDS_CONTAINER_PORT = 8080; - - public DashboardContainer(String fullImageName, int exposedPort, String meshPath, - String meshDirectory, String projectDirectory) { - super(DockerImageName.parse(fullImageName)); - setExposedPorts(List.of(DASHBOARDS_CONTAINER_PORT)); - addFixedExposedPort(exposedPort, DASHBOARDS_CONTAINER_PORT); - - withFileSystemBind(meshPath, "/data/mesh.yaml", BindMode.READ_WRITE); - withFileSystemBind(meshDirectory, "/data/mesh", BindMode.READ_WRITE); - - if (projectDirectory != null) { - withFileSystemBind(projectDirectory, "/data/project", BindMode.READ_WRITE); - } - - withCreateContainerCmdModifier(cmd -> cmd.withName(CONTAINER_NAME)); - withEnv("streamx.platform.mesh.services-metadata-registry-roots", - "/data/project/services,/data/mesh/services"); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/DashboardRunner.java b/core/src/main/java/dev/streamx/cli/command/dev/DashboardRunner.java deleted file mode 100644 index f2c2a171..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/DashboardRunner.java +++ /dev/null @@ -1,56 +0,0 @@ -package dev.streamx.cli.command.dev; - -import static dev.streamx.cli.util.Output.print; - -import dev.streamx.cli.command.dev.event.DashboardStarted; -import dev.streamx.cli.util.StreamxMavenPropertiesUtils; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Event; -import jakarta.inject.Inject; -import java.time.Duration; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; - -@ApplicationScoped -public class DashboardRunner { - private static final long CONTAINER_TIMEOUT_IN_SECS = 60_000L; - - @Inject - DevConfig devConfig; - - @Inject - BrowserOpener browserOpener; - - @Inject - Event dashboardStartedEvent; - - private DashboardContainer dashboardContainer; - - public void startStreamxDashboard(String meshPath, String meshDirectory, - String projectDirectory, Network network) { - dashboardContainer = new DashboardContainer( - StreamxMavenPropertiesUtils.getDashboardImage(), - devConfig.dashboardPort(), - meshPath, - meshDirectory, - projectDirectory - ) - .withNetwork(network) - .waitingFor(Wait.forHttp("/q/health") - .forPort(8080) - ) - .withStartupTimeout(Duration.ofSeconds(CONTAINER_TIMEOUT_IN_SECS)); - - dashboardContainer.start(); - - print("StreamX Dashboard started on http://localhost:" + devConfig.dashboardPort()); - - browserOpener.tryOpenBrowser(); - dashboardStartedEvent.fire(new DashboardStarted()); - } - - public void stopStreamxDashboard() { - dashboardContainer.stop(); - print("StreamX Dashboard stopped"); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/DevCommand.java b/core/src/main/java/dev/streamx/cli/command/dev/DevCommand.java deleted file mode 100644 index 0a0e8b69..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/DevCommand.java +++ /dev/null @@ -1,135 +0,0 @@ -package dev.streamx.cli.command.dev; - -import static dev.streamx.cli.util.Output.print; - -import dev.streamx.cli.BannerPrinter; -import dev.streamx.cli.VersionProvider; -import dev.streamx.cli.command.dev.event.DevReady; -import dev.streamx.cli.command.meshprocessing.MeshConfig; -import dev.streamx.cli.command.meshprocessing.MeshManager; -import dev.streamx.cli.command.meshprocessing.MeshResolver; -import dev.streamx.cli.command.meshprocessing.MeshSource; -import dev.streamx.cli.command.meshprocessing.MeshWatcher; -import dev.streamx.cli.exception.DockerException; -import dev.streamx.runner.StreamxRunner; -import dev.streamx.runner.container.PulsarContainer; -import dev.streamx.runner.exception.ContainerStartupTimeoutException; -import io.quarkus.runtime.Quarkus; -import jakarta.enterprise.event.Event; -import jakarta.inject.Inject; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Set; -import org.jboss.logging.Logger; -import picocli.CommandLine; -import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Command; -import picocli.CommandLine.Spec; - -@Command(name = DevCommand.COMMAND_NAME, - mixinStandardHelpOptions = true, - versionProvider = VersionProvider.class, - description = "Develop a StreamX Mesh locally.") -public class DevCommand implements Runnable { - - public static final String COMMAND_NAME = "dev"; - - @Inject - Logger logger; - - @ArgGroup - MeshSource meshSource; - - @Spec - CommandLine.Model.CommandSpec spec; - - @Inject - MeshConfig meshConfig; - - @Inject - MeshResolver meshResolver; - - @Inject - DockerValidator dockerValidator; - - @Inject - StreamxRunner runner; - - @Inject - MeshWatcher meshWatcher; - - @Inject - BannerPrinter bannerPrinter; - - @Inject - MeshManager meshManager; - - @Inject - DashboardRunner dashboardRunner; - - @Inject - Event devReadyEvent; - - @Override - public void run() { - try { - Path meshPath = meshResolver.resolveMeshPath(meshConfig, false); - - dockerValidator.validateDockerEnvironment(Set.of( - DashboardContainer.CONTAINER_NAME, - PulsarContainer.NAME - )); - - bannerPrinter.printBanner(); - boolean meshFileExists = meshPath.toFile().exists(); - if (!meshFileExists) { - Files.createFile(meshPath); - } - - this.runner.initializeBase(); - startDashboard(meshPath); - - meshManager.initializeDevMode(meshPath, spec.commandLine()); - meshWatcher.watchMeshChanges(meshPath); - - if (meshFileExists) { - meshManager.start(); - } - - devReadyEvent.fire(new DevReady()); - - Quarkus.waitForExit(); - } catch (ContainerStartupTimeoutException e) { - throw DockerException.containerStartupFailed( - e.getContainerName(), - runner.getContext().getStreamxBaseConfig().getContainerStartupTimeout()); - } catch (IOException e) { - // handle creat file - throw new RuntimeException(e); - } - } - - private void startDashboard(Path meshPath) { - print("Setting up StreamX Dashboard..."); - var meshPathAsString = meshPath.toAbsolutePath().normalize().toString(); - Path meshDirectory = meshPath.resolve(".."); - Path projectDirectory = null; - if (Files.exists(meshDirectory.resolve("..").normalize())) { - projectDirectory = meshDirectory.resolve("..").normalize(); - } - - var meshDirectoryAsString = meshDirectory.toAbsolutePath().normalize().toString(); - var projectDirectoryAsString = projectDirectory != null - ? projectDirectory.toAbsolutePath().normalize().toString() - : null; - - logger.infov("Resolved mesh {0}, mesh directory {1} and project directory {2}", - meshPathAsString, meshDirectoryAsString, projectDirectoryAsString); - dashboardRunner.startStreamxDashboard( - meshPathAsString, - meshDirectoryAsString, - projectDirectoryAsString, - runner.getContext().getNetwork()); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/DevConfig.java b/core/src/main/java/dev/streamx/cli/command/dev/DevConfig.java deleted file mode 100644 index 616bc97d..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/DevConfig.java +++ /dev/null @@ -1,20 +0,0 @@ -package dev.streamx.cli.command.dev; - -import io.smallrye.config.ConfigMapping; -import io.smallrye.config.WithDefault; -import io.smallrye.config.WithName; - -@ConfigMapping -public interface DevConfig { - - String STREAMX_DEV_DASHBOARD_PORT = "streamx.dev.dashboard.port"; - String STREAMX_DEV_DASHBOARD_OPEN_ON_STARTUP = "streamx.dev.dashboard.open-on-startup"; - - @WithName(STREAMX_DEV_DASHBOARD_PORT) - @WithDefault("9088") - int dashboardPort(); - - @WithName(STREAMX_DEV_DASHBOARD_OPEN_ON_STARTUP) - @WithDefault("true") - boolean openOnStart(); -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/DockerValidator.java b/core/src/main/java/dev/streamx/cli/command/dev/DockerValidator.java deleted file mode 100644 index 7dfb0c47..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/DockerValidator.java +++ /dev/null @@ -1,28 +0,0 @@ -package dev.streamx.cli.command.dev; - -import com.github.dockerjava.api.DockerClient; -import dev.streamx.runner.validation.DockerContainerValidator; -import dev.streamx.runner.validation.DockerEnvironmentValidator; -import jakarta.enterprise.context.Dependent; -import jakarta.inject.Inject; -import java.util.Set; -import org.jboss.logging.Logger; - -@Dependent -public class DockerValidator { - - private static final Logger LOG = Logger.getLogger(DockerValidator.class); - - @Inject - DockerEnvironmentValidator dockerEnvironmentValidator; - - @Inject - DockerContainerValidator dockerContainerValidator; - - public void validateDockerEnvironment(Set validatedContainerNames) { - LOG.info("Validating environment..."); - - DockerClient client = dockerEnvironmentValidator.validateDockerClient(); - dockerContainerValidator.verifyExistingContainers(client, validatedContainerNames); - } -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/event/DashboardStarted.java b/core/src/main/java/dev/streamx/cli/command/dev/event/DashboardStarted.java deleted file mode 100644 index d23e7b49..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/event/DashboardStarted.java +++ /dev/null @@ -1,5 +0,0 @@ -package dev.streamx.cli.command.dev.event; - -public class DashboardStarted { - -} diff --git a/core/src/main/java/dev/streamx/cli/command/dev/event/DevReady.java b/core/src/main/java/dev/streamx/cli/command/dev/event/DevReady.java deleted file mode 100644 index 0a754e0e..00000000 --- a/core/src/main/java/dev/streamx/cli/command/dev/event/DevReady.java +++ /dev/null @@ -1,5 +0,0 @@ -package dev.streamx.cli.command.dev.event; - -public class DevReady { - -} diff --git a/core/src/main/java/dev/streamx/cli/command/ingestion/BaseIngestionCommand.java b/core/src/main/java/dev/streamx/cli/command/ingestion/BaseIngestionCommand.java index d0e88f12..24bdbe48 100644 --- a/core/src/main/java/dev/streamx/cli/command/ingestion/BaseIngestionCommand.java +++ b/core/src/main/java/dev/streamx/cli/command/ingestion/BaseIngestionCommand.java @@ -1,25 +1,14 @@ package dev.streamx.cli.command.ingestion; -import com.fasterxml.jackson.databind.JsonNode; -import dev.streamx.cli.SchemaProvider; +import com.streamx.clients.ingestion.StreamxClient; +import com.streamx.clients.ingestion.exceptions.StreamxClientException; +import com.streamx.clients.ingestion.publisher.Publisher; import dev.streamx.cli.exception.IngestionClientException; -import dev.streamx.cli.exception.UnableToConnectIngestionServiceException; -import dev.streamx.cli.exception.UnknownChannelException; import dev.streamx.cli.util.ExceptionUtils; -import dev.streamx.clients.ingestion.StreamxClient; -import dev.streamx.clients.ingestion.exceptions.StreamxClientConnectionException; -import dev.streamx.clients.ingestion.exceptions.StreamxClientException; -import dev.streamx.clients.ingestion.exceptions.UnsupportedChannelException; -import dev.streamx.clients.ingestion.publisher.Publisher; import jakarta.inject.Inject; -import java.util.List; import javax.net.ssl.SSLHandshakeException; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Model.CommandSpec; -import picocli.CommandLine.ParameterException; import picocli.CommandLine.Spec; public abstract class BaseIngestionCommand implements Runnable { @@ -33,24 +22,16 @@ public abstract class BaseIngestionCommand implements Runnable { @Inject StreamxClientProvider streamxClientProvider; - @Inject - SchemaProvider schemaProvider; - @Inject IngestionClientConfig ingestionClientConfig; - protected abstract String getChannel(); - - protected abstract void perform(Publisher publisher) throws StreamxClientException; + protected abstract void perform(Publisher publisher) throws StreamxClientException; @Override public final void run() { try (StreamxClient client = streamxClientProvider.createStreamxClient(ingestionClientConfig)) { - doRun(client); - } catch (UnsupportedChannelException e) { - throw new ParameterException(spec.commandLine(), e.getMessage()); - } catch (StreamxClientConnectionException e) { - throw new UnableToConnectIngestionServiceException(ingestionClientConfig.url(), e); + Publisher publisher = client.newPublisher(); + perform(publisher); } catch (StreamxClientException e) { if (e.getCause() instanceof SSLHandshakeException) { throw IngestionClientException.sslException(ingestionClientConfig.url()); @@ -58,40 +39,4 @@ public final void run() { throw ExceptionUtils.sneakyThrow(e); } } - - protected void doRun(StreamxClient client) throws StreamxClientException { - Publisher publisher = client.newPublisher(getChannel(), JsonNode.class); - perform(publisher); - } - - protected String getPayloadPropertyName() { - JsonNode schemaJson = getSchemaForChannel(); - return getPayloadPropertyName(schemaJson); - } - - private String getPayloadPropertyName(JsonNode schemaJson) { - Schema.Parser parser = new Schema.Parser(); - Schema channelSchema = parser.parse(schemaJson.toString()); - Field payload = channelSchema.getField("payload"); - Schema payloadSchema = payload.schema(); - if (payloadSchema.getType() == Type.UNION) { - List unionSchemas = payloadSchema.getTypes(); - for (Schema schema : unionSchemas) { - if (schema.getType() == Type.RECORD) { - return schema.getFullName(); - } - } - } - return payloadSchema.getFullName(); - } - - private JsonNode getSchemaForChannel() { - try { - return schemaProvider.getSchema(getChannel()); - } catch (UnknownChannelException e) { - throw new ParameterException(spec.commandLine(), - "Channel '" + e.getChannel() + "' not found. " - + "Available channels: " + e.getAvailableChannels()); - } - } } diff --git a/core/src/main/java/dev/streamx/cli/command/ingestion/IngestionArgumentConfig.java b/core/src/main/java/dev/streamx/cli/command/ingestion/IngestionArgumentConfig.java deleted file mode 100644 index bfe4b687..00000000 --- a/core/src/main/java/dev/streamx/cli/command/ingestion/IngestionArgumentConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -package dev.streamx.cli.command.ingestion; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.Option; -import com.jayway.jsonpath.spi.json.JsonProvider; -import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; -import com.jayway.jsonpath.spi.mapper.MappingProvider; -import dev.streamx.cli.command.ingestion.publish.payload.PropertyCreatingJacksonJsonNodeJsonProvider; -import io.quarkus.runtime.Startup; -import jakarta.enterprise.context.ApplicationScoped; -import java.util.Set; - -@ApplicationScoped -@Startup -public class IngestionArgumentConfig { - - private final JsonProvider jsonProvider; - private final MappingProvider mappingProvider; - - public IngestionArgumentConfig(@PayloadProcessing ObjectMapper objectMapper) { - jsonProvider = new PropertyCreatingJacksonJsonNodeJsonProvider(objectMapper); - mappingProvider = new JacksonMappingProvider(objectMapper); - configureDefaults(); - } - - private void configureDefaults() { - Configuration.setDefaults(new Configuration.Defaults() { - @Override - public JsonProvider jsonProvider() { - return jsonProvider; - } - - @Override - public Set
+ + org.assertj + assertj-core + compile + org.jboss.logging commons-logging-jboss-logging @@ -44,7 +49,7 @@ test - dev.streamx + com.streamx streamx-runner test @@ -64,7 +69,6 @@ src/test/resources/filtered mesh.yaml - overridden-nginx-conf diff --git a/e2e-tests/src/main/java/dev/streamx/cli/StreamxTerminalCommandProducer.java b/e2e-tests/src/main/java/dev/streamx/cli/StreamxTerminalCommandProducer.java index 082e099c..c24370ce 100644 --- a/e2e-tests/src/main/java/dev/streamx/cli/StreamxTerminalCommandProducer.java +++ b/e2e-tests/src/main/java/dev/streamx/cli/StreamxTerminalCommandProducer.java @@ -1,6 +1,7 @@ package dev.streamx.cli; import java.io.File; +import java.nio.file.Path; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -39,7 +40,10 @@ private String findStreamxJar() { .filter(File::exists) .map(File::listFiles) .flatMap(Stream::of) - .map(File::getAbsolutePath) + .map(File::toPath) + .map(Path::toAbsolutePath) + .map(Path::normalize) + .map(Path::toString) .filter(p -> pattern.matcher(p.replace("\\", "/")).matches()) .findFirst() .orElseThrow(() -> new RuntimeException("Could not find streamx Jar")); diff --git a/e2e-tests/src/main/java/dev/streamx/cli/test/tools/terminal/OsTerminalCommandRunner.java b/e2e-tests/src/main/java/dev/streamx/cli/test/tools/terminal/OsTerminalCommandRunner.java index 30a2a335..390434e1 100644 --- a/e2e-tests/src/main/java/dev/streamx/cli/test/tools/terminal/OsTerminalCommandRunner.java +++ b/e2e-tests/src/main/java/dev/streamx/cli/test/tools/terminal/OsTerminalCommandRunner.java @@ -25,7 +25,6 @@ public OsTerminalCommandRunner() { } public ShellProcess run(String command) { - logger.info("Running terminal command: " + command); ProcessBuilder processBuilder = osCommand.create(command); try { ShellProcess shellProcess = ShellProcess.run(processBuilder); @@ -33,6 +32,7 @@ public ShellProcess run(String command) { processes.add(shellProcess); return shellProcess; } catch (IOException e) { + logger.info("Error running terminal command: " + command); throw new RuntimeException(e); } } diff --git a/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/HttpValidator.java b/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/HttpValidator.java index 8829f8c4..385e8e14 100644 --- a/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/HttpValidator.java +++ b/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/HttpValidator.java @@ -1,17 +1,21 @@ package dev.streamx.cli.test.tools.validators; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.io.IOException; +import java.time.Duration; +import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; +import org.assertj.core.api.ThrowingConsumer; import org.jboss.logging.Logger; @ApplicationScoped @@ -22,32 +26,36 @@ public class HttpValidator { @Inject CloseableHttpClient httpClient; - public void validate(String url, int expectedStatusCode, String expectedBody, int timeout) { - await() - .atMost(timeout, SECONDS) - .alias("Assertion of response from url: " + url - + " with expecting status:" + expectedStatusCode - + " body:" + expectedBody - + " timeout:" + timeout) - .pollInterval(100, MILLISECONDS) - .until(() -> - validate(url, expectedStatusCode, expectedBody) - ); + public void validate(String url, int expectedStatusCode, String expectedBody, Duration timeout) { + validate(url, expectedStatusCode, timeout, httpEntity -> { + String responseBody = EntityUtils.toString(httpEntity); + assertThat(responseBody).describedAs(url).contains(expectedBody); + }); } - private boolean validate(String url, int expectedStatusCode, String expectedBody) { - HttpGet request = new HttpGet(url); - try (CloseableHttpResponse response = httpClient.execute(request)) { - int actualStatusCode = response.getStatusLine().getStatusCode(); - String responseBody = EntityUtils.toString(response.getEntity()); - logger.info("Request to " + url - + " return statusCode " + actualStatusCode - + " and body " + responseBody); - return actualStatusCode == expectedStatusCode && responseBody.contains(expectedBody); - } catch (IOException e) { - logger.error("Request to " + url + "failed: " + e.getMessage(), e); - throw new RuntimeException("Can not make request:" + url, e); - } + public void validate(String url, int expectedStatusCode, byte[] expectedBody, Duration timeout) { + validate(url, expectedStatusCode, timeout, httpEntity -> { + byte[] responseBody = EntityUtils.toByteArray(httpEntity); + assertThat(responseBody).describedAs(url).containsExactly(expectedBody); + }); + } + + public void validate(String url, int expectedStatusCode, Duration timeout, + ThrowingConsumer responseEntityAssertion) { + await() + .atMost(timeout) + .pollInterval(100, MILLISECONDS) + .untilAsserted(() -> { + HttpGet request = new HttpGet(url); + try (CloseableHttpResponse response = httpClient.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + assertThat(statusCode).describedAs(url).isEqualTo(expectedStatusCode); + HttpEntity responseEntity = response.getEntity(); + responseEntityAssertion.accept(responseEntity); + } catch (IOException e) { + fail("Request to " + url + "failed", e); + } + }); } @PreDestroy diff --git a/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/ProcessOutputValidator.java b/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/ProcessOutputValidator.java index dd39218b..d1c58a69 100644 --- a/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/ProcessOutputValidator.java +++ b/e2e-tests/src/main/java/dev/streamx/cli/test/tools/validators/ProcessOutputValidator.java @@ -1,10 +1,12 @@ package dev.streamx.cli.test.tools.validators; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import dev.streamx.cli.test.tools.terminal.process.ShellProcess; import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -21,15 +23,19 @@ public class ProcessOutputValidator { + "[\\p{Alnum}.,%_=?&#\\-+()\\[\\]\\*$~@!:/{};']*)", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL); - public void validate(List output, String expectedContent, long timeout) { + public void validate(ShellProcess process, List output, String expectedContent, + Duration timeout) { try { await() - .atMost(timeout, SECONDS) + .atMost(timeout) .pollInterval(100, MILLISECONDS) - .alias("Finding expectedContent:" + expectedContent) - .until(() -> - output - .stream() + .alias("Finding expectedContent: " + expectedContent) + .untilAsserted(() -> + assertThat(output) + .describedAs(() -> "Full output is:\n" + + String.join("\n", process.getCurrentOutputLines()) + + "\n" + + String.join("\n", process.getCurrentErrorLines())) .anyMatch(line -> line.contains(expectedContent)) ); } catch (ConditionTimeoutException e) { @@ -38,15 +44,13 @@ public void validate(List output, String expectedContent, long timeout) } } - public String validateContainsUrl(List output, long timeout) { + public String validateContainsUrl(List output, Duration timeout) { await() - .atMost(timeout, SECONDS) + .atMost(timeout) .pollInterval(100, MILLISECONDS) .alias("Finding any url") - .until(() -> - output - .stream() - .anyMatch(line -> urlPattern.matcher(line).find()) + .untilAsserted(() -> + assertThat(output).anyMatch(line -> urlPattern.matcher(line).find()) ); return output.stream() diff --git a/e2e-tests/src/test/java/dev/streamx/cli/OsUtils.java b/e2e-tests/src/test/java/dev/streamx/cli/OsUtils.java index f6b7a575..0a531834 100644 --- a/e2e-tests/src/test/java/dev/streamx/cli/OsUtils.java +++ b/e2e-tests/src/test/java/dev/streamx/cli/OsUtils.java @@ -1,6 +1,6 @@ package dev.streamx.cli; -import dev.streamx.runner.validation.DockerEnvironmentValidator; +import com.streamx.runner.validation.DockerEnvironmentValidator; public class OsUtils { public static boolean isDockerAvailable() { diff --git a/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliLicenseIT.java b/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliLicenseIT.java index c2141d0f..e5055942 100644 --- a/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliLicenseIT.java +++ b/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliLicenseIT.java @@ -12,6 +12,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -20,7 +21,7 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class StreamxCliLicenseIT { - private static final int CLI_SHORT_TIMEOUT_IN_SEC = 5; + private static final Duration CLI_TIMEOUT = Duration.ofSeconds(10); @Inject @Named("StreamxCommandRunner") @@ -85,9 +86,9 @@ public void shouldAcceptingLicenseAllowUsingWithoutMoreQuestions() { private void assertIfLicenseIsAccessible(ShellProcess process) { String url = processOutputValidator.validateContainsUrl( process.getCurrentOutputLines(), - CLI_SHORT_TIMEOUT_IN_SEC); + CLI_TIMEOUT); httpValidator.validate(url, 200, "License", - CLI_SHORT_TIMEOUT_IN_SEC); + CLI_TIMEOUT); } private static void declineLicense(ShellProcess p) { @@ -99,15 +100,15 @@ private static void acceptLicense(ShellProcess p) { } private void assertIfAcceptanceLicenseErrorPresent(ShellProcess p) { - processOutputValidator.validate( + processOutputValidator.validate(p, p.getCurrentErrorLines(), "License acceptance is required for using StreamX", - CLI_SHORT_TIMEOUT_IN_SEC); + CLI_TIMEOUT); } private void assertIfStreamxCommandWork(ShellProcess p) { - processOutputValidator.validate(p.getCurrentOutputLines(), "streamx-cli version", - CLI_SHORT_TIMEOUT_IN_SEC); + processOutputValidator.validate(p, p.getCurrentOutputLines(), "streamx-cli version", + CLI_TIMEOUT); } private ShellProcess runStreamxCommand() { @@ -115,10 +116,10 @@ private ShellProcess runStreamxCommand() { } private void assertIfAcceptLicenseQuestionPresent(ShellProcess p) { - processOutputValidator.validate( + processOutputValidator.validate(p, p.getCurrentOutputLines(), "Do you accept the license agreement?", - CLI_SHORT_TIMEOUT_IN_SEC); + CLI_TIMEOUT); } private static void resetLicenseAcceptance() { diff --git a/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliPublicationIT.java b/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliPublicationIT.java index 9aa8b477..1d398963 100644 --- a/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliPublicationIT.java +++ b/e2e-tests/src/test/java/dev/streamx/cli/StreamxCliPublicationIT.java @@ -1,8 +1,6 @@ package dev.streamx.cli; - import static dev.streamx.cli.test.tools.ResourcePathResolver.absolutePath; -import static org.junit.jupiter.params.provider.Arguments.arguments; import dev.streamx.cli.test.tools.terminal.TerminalCommandRunner; import dev.streamx.cli.test.tools.terminal.process.ShellProcess; @@ -11,26 +9,24 @@ import io.quarkus.test.junit.QuarkusTest; import jakarta.inject.Inject; import jakarta.inject.Named; -import java.util.List; -import java.util.stream.Stream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.condition.EnabledIf; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; @QuarkusTest @EnabledIf("dev.streamx.cli.OsUtils#isDockerAvailable") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class StreamxCliPublicationIT { - private static final int CLI_SHORT_TIMEOUT_IN_SEC = 5; + private static final Duration CLI_TIMEOUT = Duration.ofSeconds(10); @ConfigProperty(name = "streamx.cli.e2e.web.delivery.url", defaultValue = "http://localhost:8087/") - String webDeliveryPortUrl; - @ConfigProperty(name = "streamx.cli.e2e.nginx.url", defaultValue = "http://localhost:8089/overridden/") - String nginxPortUrl; + String webDeliveryUrl; @ConfigProperty(name = "streamx.cli.e2e.setup.timeoutInSec", defaultValue = "60") int setupTimeoutInSec; @@ -48,85 +44,92 @@ public class StreamxCliPublicationIT { @BeforeAll public void setup() { runStreamxCommand( - "--accept-license run -f " + absolutePath("mesh.yaml"), + "--accept-license run_v2 -f " + absolutePath("mesh.yaml"), "STREAMX IS READY!", - setupTimeoutInSec); + Duration.ofSeconds(setupTimeoutInSec)); } - private void runStreamxCommand(String command, String expectedOutput, long timeoutInS) { + private void runStreamxCommand(String command, String expectedOutput, Duration timeout) { ShellProcess p = terminalCommandRunner.run(command); - processOutputValidator.validate(p.getCurrentOutputLines(), expectedOutput, timeoutInS); + processOutputValidator.validate(p, p.getCurrentOutputLines(), expectedOutput, timeout); } + private void runStreamxIngestionCommand(String commandName, String path, String expectedOutput) { + String command = "--accept-license %s %s".formatted(commandName, path); + runStreamxCommand(command, expectedOutput, CLI_TIMEOUT); + } - @ParameterizedTest - @MethodSource("testCases") - public void shouldTestPublishAndUnpublishPageOnStreamx( - String pageName, - String commandContentPart, - String expectedPageContent - ) { - - runStreamxCommand( - "--accept-license publish pages " + pageName + " " + commandContentPart, - "Sent data publication message to", - CLI_SHORT_TIMEOUT_IN_SEC); + @Test + public void shouldPublishAndUnpublishPageUsingStreamOperation() { + runStreamxIngestionCommand( + "stream_v2", + "src/test/resources/stream/page-publish-event.json", + "Sent com.streamx.blueprints.page.published.v1 event using stream with key 'hello.html'" + ); - validateStreamxPage(pageName, 200, expectedPageContent); + validateStreamxPage("hello.html", "Hello World!"); - runStreamxCommand( - "--accept-license unpublish pages " + pageName, - "Sent data unpublication message to", - CLI_SHORT_TIMEOUT_IN_SEC); + runStreamxIngestionCommand( + "stream_v2", + "src/test/resources/stream/page-unpublish-event.json", + "Sent com.streamx.blueprints.page.unpublished.v1 event using stream with key 'hello.html'" + ); - validateStreamxPage(pageName, 404, ""); + validateStreamxPageNotAvailable("hello.html"); } - static Stream testCases() { - return Stream.of( - arguments( - "third_param_page.html", - absolutePath("payload.json"), - "third_param_page" - ), - arguments( - "exact_param_page.html", - "-j '{\"content\":{\"bytes\":\"exact_param_page\"}}'", - "exact_param_page" - ), - arguments( - "json_path_exact_param_page.html", - "-s content.bytes='Json exact page'", - "Json exact page" - ), - arguments( - "json_path_exact_param_page.html", - "-b content.bytes='Json exact page'", - "Json exact page" - ), - arguments( - "file_param_page.html", - "-j file://" + absolutePath("file_param_page.json"), - "file_param_page" - ), - arguments( - "json_path_file_param_page.html", - "-s content.bytes=file://" + absolutePath("json_path_file_param_page.txt"), - "json_path_file_param_page" - ), - arguments( - "json_path_file_param_page.html", - "-b content.bytes=file://" + absolutePath("json_path_file_param_page.txt"), - "json_path_file_param_page" - ) + @Test + public void shouldPublishAndUnpublishPageUsingBatchOperation() { + runStreamxIngestionCommand( + "batch_v2", + "src/test/resources/batch/publish/page", + "Sent com.streamx.blueprints.page.published.v1 event using batch with key 'index.html'" + ); + + validateStreamxPage("index.html", "

Hello World!

"); + + runStreamxIngestionCommand( + "batch_v2", + "src/test/resources/batch/unpublish/page", + "Sent com.streamx.blueprints.page.unpublished.v1 event using batch with key 'index.html'" ); + + validateStreamxPageNotAvailable("index.html"); } - private void validateStreamxPage(String resourcePath, int expectedStatusCode, - String expectedBody) { - List.of(webDeliveryPortUrl, nginxPortUrl).forEach( - url -> httpValidator.validate(url + resourcePath, expectedStatusCode, expectedBody, - CLI_SHORT_TIMEOUT_IN_SEC) + @Test + public void shouldPublishAndUnpublishAssetUsingBatchOperation() throws IOException { + runStreamxIngestionCommand( + "batch_v2", + "src/test/resources/batch/publish/asset", + "Sent com.streamx.blueprints.asset.published.v1 event using batch with key 'ds.png'" ); + + validateStreamxPage("ds.png", + Files.readAllBytes(Path.of("src/test/resources/batch/publish/asset/ds.png"))); + + runStreamxIngestionCommand( + "batch_v2", + "src/test/resources/batch/unpublish/asset", + "Sent com.streamx.blueprints.asset.unpublished.v1 event using batch with key 'ds.png'" + ); + + validateStreamxPageNotAvailable("ds.png"); + } + + private void validateStreamxPage(String resourcePath, String expectedBody) { + httpValidator.validate(url(resourcePath), 200, expectedBody, CLI_TIMEOUT); + } + + private void validateStreamxPage(String resourcePath, byte[] expectedBody) { + httpValidator.validate(url(resourcePath), 200, expectedBody, CLI_TIMEOUT); + } + + private void validateStreamxPageNotAvailable(String resourcePath) { + httpValidator.validate(url(resourcePath), 404, "", CLI_TIMEOUT); + } + + private String url(String resourcePath) { + return webDeliveryUrl + resourcePath; } } diff --git a/e2e-tests/src/test/resources/batch/publish/asset/.eventsource.yaml b/e2e-tests/src/test/resources/batch/publish/asset/.eventsource.yaml new file mode 100644 index 00000000..0d4ee277 --- /dev/null +++ b/e2e-tests/src/test/resources/batch/publish/asset/.eventsource.yaml @@ -0,0 +1,9 @@ +eventType: com.streamx.blueprints.asset.published.v1 +eventSource: source + +relativePathLevel: 0 +key: ${relativePath} + +payload: + content: file://${payloadPath} + type: assets/image diff --git a/e2e-tests/src/test/resources/batch/publish/asset/ds.png b/e2e-tests/src/test/resources/batch/publish/asset/ds.png new file mode 100644 index 00000000..4b85fd55 Binary files /dev/null and b/e2e-tests/src/test/resources/batch/publish/asset/ds.png differ diff --git a/e2e-tests/src/test/resources/batch/publish/page/.eventsource.yaml b/e2e-tests/src/test/resources/batch/publish/page/.eventsource.yaml new file mode 100644 index 00000000..eca8625a --- /dev/null +++ b/e2e-tests/src/test/resources/batch/publish/page/.eventsource.yaml @@ -0,0 +1,9 @@ +eventType: com.streamx.blueprints.page.published.v1 +eventSource: source + +relativePathLevel: 0 +key: ${relativePath} + +payload: + content: file://${payloadPath} + type: page/index-page diff --git a/e2e-tests/src/test/resources/batch/publish/page/index.html b/e2e-tests/src/test/resources/batch/publish/page/index.html new file mode 100644 index 00000000..ba7c290e --- /dev/null +++ b/e2e-tests/src/test/resources/batch/publish/page/index.html @@ -0,0 +1 @@ +

Hello World!

\ No newline at end of file diff --git a/e2e-tests/src/test/resources/batch/unpublish/asset/.eventsource.yaml b/e2e-tests/src/test/resources/batch/unpublish/asset/.eventsource.yaml new file mode 100644 index 00000000..e7f18c2f --- /dev/null +++ b/e2e-tests/src/test/resources/batch/unpublish/asset/.eventsource.yaml @@ -0,0 +1,9 @@ +eventType: com.streamx.blueprints.asset.unpublished.v1 +eventSource: source + +relativePathLevel: 0 +key: ${relativePath} + +payload: + content: file://${payloadPath} + type: assets/image diff --git a/e2e-tests/src/test/resources/batch/unpublish/asset/ds.png b/e2e-tests/src/test/resources/batch/unpublish/asset/ds.png new file mode 100644 index 00000000..4b85fd55 Binary files /dev/null and b/e2e-tests/src/test/resources/batch/unpublish/asset/ds.png differ diff --git a/e2e-tests/src/test/resources/batch/unpublish/page/.eventsource.yaml b/e2e-tests/src/test/resources/batch/unpublish/page/.eventsource.yaml new file mode 100644 index 00000000..b6db9ffc --- /dev/null +++ b/e2e-tests/src/test/resources/batch/unpublish/page/.eventsource.yaml @@ -0,0 +1,8 @@ +eventType: com.streamx.blueprints.page.unpublished.v1 +eventSource: source + +relativePathLevel: 0 +key: ${relativePath} + +payload: + type: page/index-page diff --git a/e2e-tests/src/test/resources/batch/unpublish/page/index.html b/e2e-tests/src/test/resources/batch/unpublish/page/index.html new file mode 100644 index 00000000..ba7c290e --- /dev/null +++ b/e2e-tests/src/test/resources/batch/unpublish/page/index.html @@ -0,0 +1 @@ +

Hello World!

\ No newline at end of file diff --git a/e2e-tests/src/test/resources/configs/overridden-nginx-conf b/e2e-tests/src/test/resources/configs/overridden-nginx-conf deleted file mode 100644 index 329f89b7..00000000 --- a/e2e-tests/src/test/resources/configs/overridden-nginx-conf +++ /dev/null @@ -1,10 +0,0 @@ -server { - listen 80; - listen [::]:80; - server_name localhost; - - location /overridden { - alias /usr/share/nginx/html; - } -} - diff --git a/e2e-tests/src/test/resources/file_param_page.json b/e2e-tests/src/test/resources/file_param_page.json deleted file mode 100644 index 6cb37e37..00000000 --- a/e2e-tests/src/test/resources/file_param_page.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "content": { - "bytes": "file_param_page" - } -} \ No newline at end of file diff --git a/e2e-tests/src/test/resources/filtered/mesh.yaml b/e2e-tests/src/test/resources/filtered/mesh.yaml index 71e1ac93..b163ea17 100644 --- a/e2e-tests/src/test/resources/filtered/mesh.yaml +++ b/e2e-tests/src/test/resources/filtered/mesh.yaml @@ -1,44 +1,70 @@ -defaultRegistry: ghcr.io/streamx-dev/streamx -defaultImageTag: ${streamx.version}-jvm +descriptors: + relay: + type: "processing" + containers: + service: + incoming: + events: {} + outgoing: + relayed-events: {} + + web-server-sink: + type: "edge" + containers: + sink: + image: "ghcr.io/streamx-com/streamx-blueprints/web-server-sink:3.0.7-jvm" + incoming: + resources: {} + servicePorts: + - "8087:8080" sources: cli: outgoing: - - "pages" + - ref: "inbox.pages" + produces: + - "com.streamx.blueprints.page.published.v1" + - "com.streamx.blueprints.page.unpublished.v1" + - ref: "inbox.assets" + produces: + - "com.streamx.blueprints.asset.published.v1" + - "com.streamx.blueprints.asset.unpublished.v1" + +ingestion: + rest-ingestion: + containers: + proxy: + environment: + QUARKUS_HTTP_AUTH_PERMISSION_BEARER_POLICY: permit processing: - relay: - image: sample-relay-processing-service - incoming: - incoming-pages: - topic: inboxes/pages - outgoing: - outgoing-pages: - topic: outboxes/pages + pages-relay: + descriptor: relay + containers: + service: + incoming: + events: + ref: inbox.pages + outgoing: + relayed-events: + ref: outbox.pages -delivery: - web-delivery: - image: sample-web-delivery-service - incoming: - pages: - topic: outboxes/pages - port: 8087 - composite-web-delivery: - image: sample-web-delivery-service - incoming: - pages: - topic: outboxes/pages - port: 8088 - environment: - REPOSITORY_RESOURCE_ROOT_DIRECTORY: "/home/runner/work/streamx-cli/streamx-cli/srv/www" - repositoryVolume: "/home/runner/work/streamx-cli/streamx-cli/srv/www" - components: - webserver: - image: "docker.io/library/nginx:1.26.0" - ports: - - ${streamx.cli.e2e.port}:80 - repositoryVolume: "/usr/share/nginx/html" - volumesFrom: - configs: - - "overridden-nginx-conf:/etc/nginx/conf.d/default.conf" + assets-relay: + descriptor: relay + containers: + service: + incoming: + events: + ref: inbox.assets + outgoing: + relayed-events: + ref: outbox.assets +edge: + web-server-sink: + descriptor: web-server-sink + containers: + sink: + incoming: + resources: + ref: outbox.pages,outbox.assets diff --git a/e2e-tests/src/test/resources/json_path_file_param_page.txt b/e2e-tests/src/test/resources/json_path_file_param_page.txt deleted file mode 100644 index bb24ea82..00000000 --- a/e2e-tests/src/test/resources/json_path_file_param_page.txt +++ /dev/null @@ -1 +0,0 @@ -json_path_file_param_page \ No newline at end of file diff --git a/e2e-tests/src/test/resources/payload.json b/e2e-tests/src/test/resources/payload.json deleted file mode 100644 index d685f897..00000000 --- a/e2e-tests/src/test/resources/payload.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "content": { - "bytes": "third_param_page" - } -} \ No newline at end of file diff --git a/e2e-tests/src/test/resources/stream/.stream.properties b/e2e-tests/src/test/resources/stream/.stream.properties new file mode 100644 index 00000000..15b64ec4 --- /dev/null +++ b/e2e-tests/src/test/resources/stream/.stream.properties @@ -0,0 +1 @@ +json.fields.as-base64=/data/content \ No newline at end of file diff --git a/e2e-tests/src/test/resources/stream/page-publish-event.json b/e2e-tests/src/test/resources/stream/page-publish-event.json new file mode 100644 index 00000000..c8b1f8f3 --- /dev/null +++ b/e2e-tests/src/test/resources/stream/page-publish-event.json @@ -0,0 +1,13 @@ +{ + "specversion" : "1.0", + "id" : "11111111-327e-4bee-96a1-3e4224a1e71d", + "source" : "source", + "type" : "com.streamx.blueprints.page.published.v1", + "datacontenttype" : "application/json", + "subject" : "hello.html", + "time" : "2025-12-23T10:28:23.435253Z", + "data" : { + "content" : "Hello World!", + "type" : "page/test" + } +} \ No newline at end of file diff --git a/e2e-tests/src/test/resources/stream/page-unpublish-event.json b/e2e-tests/src/test/resources/stream/page-unpublish-event.json new file mode 100644 index 00000000..ec767e45 --- /dev/null +++ b/e2e-tests/src/test/resources/stream/page-unpublish-event.json @@ -0,0 +1,8 @@ +{ + "specversion" : "1.0", + "id" : "11111111-327e-4bee-96a1-3e4224a1e71d", + "source" : "source", + "type" : "com.streamx.blueprints.page.unpublished.v1", + "subject" : "hello.html", + "time" : "2025-12-23T10:28:23.435253Z" +} \ No newline at end of file diff --git a/entrypoint/src/main/java/dev/streamx/cli/EntrypointMain.java b/entrypoint/src/main/java/dev/streamx/cli/EntrypointMain.java index 81edfde6..cc3f86f1 100644 --- a/entrypoint/src/main/java/dev/streamx/cli/EntrypointMain.java +++ b/entrypoint/src/main/java/dev/streamx/cli/EntrypointMain.java @@ -14,8 +14,8 @@ public class EntrypointMain { public static void main(String[] args) { int javaVersion = getJavaVersion(); - if (javaVersion < 17) { - System.out.println("Java 17 or higher is required!"); + if (javaVersion < 21) { + System.out.println("Java 21 or higher is required!"); return; } diff --git a/entrypoint/src/test/java/dev/streamx/cli/EntrypointMainTest.java b/entrypoint/src/test/java/dev/streamx/cli/EntrypointMainTest.java index a790c2e5..e6daad54 100644 --- a/entrypoint/src/test/java/dev/streamx/cli/EntrypointMainTest.java +++ b/entrypoint/src/test/java/dev/streamx/cli/EntrypointMainTest.java @@ -53,7 +53,7 @@ void shouldFailTooLowJavaVersions(String javaVersion) { // then Assertions.assertFalse(StreamxCommand.isLaunched()); Assertions.assertTrue( - byteArrayOutputStream.toString().contains("Java 17 or higher is required!") + byteArrayOutputStream.toString().contains("Java 21 or higher is required!") ); } diff --git a/pom.xml b/pom.xml index e0ec4b0a..060e1317 100644 --- a/pom.xml +++ b/pom.xml @@ -41,9 +41,7 @@ io.quarkus.platform 3.18.3 - 1.0.16 - 1.1.1 - 0.0.16 + 2.0.8 2.9.0 3.27.3 3.12.1 @@ -76,20 +74,14 @@ - dev.streamx + com.streamx streamx-runner ${streamx.version} - dev.streamx + com.streamx ingestion-client - ${ingestion-client.version} - - - - dev.streamx - streamx-operator-mesh-api - ${streamx-operator.version} + ${streamx.version} com.jayway.jsonpath