diff --git a/.travis.yml b/.travis.yml
index 4a5b4267..c441369b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,3 +44,4 @@ notifications:
slack:
rooms:
secure: N6FrJ0l0gw2t1wnty/xPDKvLBZlzOAmkIbHjd4ZrtaC4vLU8iZP+YCRvQd7Vwzc97yDhHIZ41eGtjHjBIIeMX/3rfuS+6GA4VJgyNFGsP0f81YYHxiMOEUlVa2O6LBwbF3LokaFi3l7Iau1c5Op9c3LinhTdE0W6KG+2gxEqSKJkBcVJfK9SZGXg3+6qlq065BWx7xWlzoWgMFyipJJLW1BAqEzP5cc6hHozJ5H/8ucMaJbWQF+nOaIYUJdaop4ChSpe6EFyg0XNp+mIjJ2BxZ6hfYrAs69vwBapBepe4zEoR5xdN1dGUAIDqNY9rr4Npp8InFmBmnlL1xG6FeG73C2qfSr3Y2WziOtwececzmuQWI2Do9ioB5k0KtC4UAV42tpADUODrrwxE8S32tqgr0Pojg8FvgsM6O3twvF1UH3LLEyW7zDzuDpYdtvIROWZGerxyWqQIFI7MzragyL+ZiKKUOG1uKsA8DoeECcMHuUsDEsR/mHalmQGGl2ZNGpphAHg/1HtjB32VDCIuZpFHsqFG7BfMY2dg8Mm+ea4j9imOxM+2TFxbPhoIjIJq1o+/YvolrYviWgmQOpeFw0c1xXWSL5aXmBv2nUnlBaXxfzC9FTl0xmovJmKeYhqJ81TMa33gimsMeccZIDmzcMDsW82idsvtHeNDSX1wil5DXA=
+
diff --git a/Dockerfile b/Dockerfile
index 6f29f7f8..757f9d64 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,10 +1,10 @@
FROM openjdk:11
MAINTAINER Cedrick Lunven
-MAINTAINER Davig Gilardi
+MAINTAINER David Gilardi
ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/killrvideo-java.jar"]
EXPOSE 50101
# To create jar file, run `docker run -v ${PWD}:/opt/killrvideo-java -w /opt/killrvideo-java maven mvn install -DskipTests=true -Dmaven.javadoc.skip=true -B`
-COPY ./killrvideo-services/target/killrvideo-java.jar /killrvideo-java.jar
+COPY ./killrvideo-services/target/killrvideo-java.jar /killrvideo-java.jar
\ No newline at end of file
diff --git a/README.md b/README.md
index 9e93b421..aecc7b55 100644
--- a/README.md
+++ b/README.md
@@ -7,18 +7,6 @@
A reference application for Java developers looking to learn more about using [Apache Cassandra][cassandra] and
[DataStax Enterprise][dse] in their applications and services. Learn more at [Killrvideo].
-## The latest stable build of KillrVideo Java is [v2.1.0][v2.1.0]. Master is experimental. Please use v2.1.0 if you attempting to follow the instructions in the links below.
-
-## Building Locally
-
-**Docker Way**
-
-`docker run -v ${PWD}:/opt/killrvideo-java -w /opt/killrvideo-java maven mvn install -DskipTests=true -Dmaven.javadoc.skip=true -B`
-
-**Maven Way**
-
-`mvn install`
-
## Running Locally
Use these guides to get started running KillrVideo locally on your development machine:
@@ -35,7 +23,7 @@ that you don't see in the code currently, send me a message [@SonicDMG][twitter]
[here][issues] on GitHub.
## License
-Copyright 2018 David Gilardi, Cedrick Lunven, derived from original work by Duy Hai Doan
+Copyright 2018 David Gilardi, Cedrick Lunven, Aleksandr Volochnev derived from original work by Duy Hai Doan
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml
index 525407c4..ac9c1d93 100644
--- a/docker-compose.ci.yml
+++ b/docker-compose.ci.yml
@@ -19,6 +19,6 @@ services:
memlock: -1
dse-config:
- image: killrvideo/killrvideo-dse-config:3.0.0
+ image: killrvideo/killrvideo-dse-config:3
depends_on:
- dse
diff --git a/docker-compose.yml b/docker-compose.yml
index b8a7d7ec..4d16d7a8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,20 +1,9 @@
version: '3'
-#
-# docker-compose.yaml
-# Simple compose file for running the Java services in Docker along with other containers comprising the
-# KillrVideo.
-# For more complex configuration options including metrics, external volumes, OpsCenter,
-# etc., see https://github.com/KillrVideo/killrvideo-docker-common
-#
-# Note: the default configuration here uses the LATEST version of the killrvideo-python
-# Docker image, which can be built using scripts/docker_build.sh.
-#
-
services:
web: # Web Interface
- image: killrvideo/killrvideo-web:3.0.1
+ image: killrvideo/killrvideo-web:3
ports:
- "3000:3000" # Exposes port to be available externally
depends_on:
@@ -60,14 +49,14 @@ services:
# One-Time Bootstrap Container, configures DSE to have required keyspaces etc.
dse-config:
- image: killrvideo/killrvideo-dse-config:3.0.0
+ image: killrvideo/killrvideo-dse-config:3
depends_on:
- dse # Needs DSE to be running
# Sample Data Generator, imitates behaviour of users on the killrVideo website.
# Adds comments and rates videos, upload new videos and so on.
generator:
- image: killrvideo/killrvideo-generator:3.0.2
+ image: killrvideo/killrvideo-generator:3
depends_on:
- backend # Needs Backend to be running
environment:
diff --git a/killrvideo-commons/.factorypath b/killrvideo-commons/.factorypath
index 9867b6a5..ef0f298e 100644
--- a/killrvideo-commons/.factorypath
+++ b/killrvideo-commons/.factorypath
@@ -1,136 +1,58 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/killrvideo-commons/pom.xml b/killrvideo-commons/pom.xml
index cd4d7ccb..6f004c07 100644
--- a/killrvideo-commons/pom.xml
+++ b/killrvideo-commons/pom.xml
@@ -11,71 +11,67 @@
com.datastax
killrvideo-parent
- 3.0.0
+ 3.1.0
-
-
- org.springframework
- spring-context
-
-
-
ch.qos.logback
logback-classic
+
+
- ch.qos.logback
- logback-core
-
-
-
-
- com.datastax.dse
- dse-java-driver-core
+ com.datastax.oss
+ java-driver-core
- com.datastax.dse
- dse-java-driver-mapping
+ com.datastax.oss
+ java-driver-query-builder
- com.datastax.dse
- dse-java-driver-extras
+ com.datastax.oss
+ java-driver-mapper-runtime
+
- com.datastax.dse
- dse-java-driver-graph
+ com.evanlennick
+ retry4j
-
-
+
+
- com.xqbase
- etcd4j
+ org.springframework
+ spring-context
-
- com.evanlennick
- retry4j
+ commons-codec
+ commons-codec
+
org.apache.commons
commons-lang3
- javax.annotation
- javax.annotation-api
-
+ javax.annotation
+ javax.annotation-api
+
javax.validation
validation-api
- org.hibernate
+ org.hibernate.validator
hibernate-validator
+
+
+ org.openjfx
+ javafx.base
+
+
org.glassfish
@@ -85,17 +81,17 @@
javax.el
javax.el-api
-
+
com.google.protobuf
protobuf-java
- io.grpc
- grpc-all
-
-
+ io.grpc
+ grpc-all
+
+
org.apache.kafka
@@ -112,84 +108,6 @@
guava
-
-
- org.junit.platform
- junit-platform-launcher
-
-
- org.junit.platform
- junit-platform-runner
-
-
- org.junit.platform
- junit-platform-console-standalone
-
-
- org.junit.jupiter
- junit-jupiter-engine
-
-
- org.junit.jupiter
- junit-jupiter-params
-
-
- org.springframework.boot
- spring-boot-starter-test
- test
-
-
-
-
-
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
- ${build-helper-maven-plugin.version}
-
-
- generate-sources
-
- add-source
-
-
-
- target/generated-sources/protobuf/java
-
-
-
-
-
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- ${protobuf-maven-plugin.version}
-
- ${basedir}/src/main/resources/proto
- com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc.version}:exe:${os.detected.classifier}
-
-
-
-
- compile
- compile-custom
-
-
-
-
-
-
-
-
-
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/conf/DriverConfigurationFile.java b/killrvideo-commons/src/main/java/com/killrvideo/conf/DriverConfigurationFile.java
new file mode 100644
index 00000000..ff4b4866
--- /dev/null
+++ b/killrvideo-commons/src/main/java/com/killrvideo/conf/DriverConfigurationFile.java
@@ -0,0 +1,137 @@
+package com.killrvideo.conf;
+
+import java.io.File;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
+import com.evanlennick.retry4j.CallExecutor;
+import com.evanlennick.retry4j.config.RetryConfig;
+import com.evanlennick.retry4j.config.RetryConfigBuilder;
+import com.killrvideo.dse.graph.KillrVideoTraversalSource;
+
+/**
+ * The DSE (DataStax Enterprise) Driver configuration.
+ *
+ * The above properties should be typically declared in an {@code application.conf} file.
+ *
+ * @author DataStax Developer Advocates team.
+ */
+@Configuration
+@Profile("!unit-test & !integration-test")
+public class DriverConfigurationFile {
+
+ /** Initialize dedicated connection to ETCD system. */
+ private static final Logger LOGGER = LoggerFactory.getLogger(DriverConfigurationFile.class);
+
+ /** Execution Profile. */
+ public static final String EXECUTION_PROFILE_SEARCH = "search";
+
+ // --- Retries ---
+
+ @Value("${killrvideo.dse.maxNumberOfTries:50}")
+ protected int maxNumberOfTries;
+
+ @Value("${killrvideo.dse.delayBetweenTries:5}")
+ protected int delayBetweenTries;
+
+ @Value("#{environment.KILLRVIDEO_DSE_CONFIGURATION_FILE}")
+ protected Optional driverConfigurationFileEnvVar;
+
+ @Value("${killrvideo.dse.configFile:application.conf}")
+ protected String driverConfigurationFile;
+
+ @Bean
+ public DriverConfigLoader loadConfigurationFile() {
+ // If the env variable is set we override default value
+ if (!driverConfigurationFileEnvVar.isEmpty()) {
+ driverConfigurationFile = driverConfigurationFileEnvVar.get();
+ }
+ LOGGER.info("Loading configuration from File '{}'", driverConfigurationFile);
+ File configFile = new File(DriverConfigurationFile.class.getResource("/" + driverConfigurationFile).getFile());
+ DriverConfigLoader loader = DriverConfigLoader.fromFile(configFile);
+ DriverExecutionProfile basic = loader.getInitialConfig().getDefaultProfile();
+ LOGGER.info("Configuration file has been parsed:");
+ LOGGER.info("+ keyspace '{}'", basic.getString(DefaultDriverOption.SESSION_KEYSPACE));
+ LOGGER.info("+ ContactPoints '{}'", basic.getStringList(DefaultDriverOption.CONTACT_POINTS));
+ return loader;
+ }
+
+ /**
+ * Returns the keyspace to connect to. The keyspace specified here must exist.
+ *
+ * @return The {@linkplain CqlIdentifier keyspace} bean.
+ */
+ @Bean("killrvideo.keyspace")
+ public CqlIdentifier keyspace(CqlSession cqlSession) {
+ return cqlSession.getKeyspace().orElseThrow();
+ }
+
+ @Bean
+ public CqlSession connect(DriverConfigLoader loader) {
+ LOGGER.info("Initializing connection...");
+ CqlSessionBuilder cqlSessionBuilder = CqlSession.builder().withConfigLoader(loader);
+
+ // Connection Lambda
+ final AtomicInteger atomicCount = new AtomicInteger(1);
+ Callable connect = () -> {
+ return cqlSessionBuilder.build();
+ };
+
+ // Retry mechanism policy
+ RetryConfig config = new RetryConfigBuilder()
+ .retryOnAnyException()
+ .withMaxNumberOfTries(maxNumberOfTries)
+ .withDelayBetweenTries(delayBetweenTries, ChronoUnit.SECONDS)
+ .withFixedBackoff()
+ .build();
+
+ long top = System.currentTimeMillis();
+
+ // Let's go
+ return new CallExecutor(config)
+ .afterFailedTry(s -> {
+ LOGGER.info("Attempt #{}/{} [KO] -> waiting {} seconds for Cluster to start", atomicCount.getAndIncrement(),
+ maxNumberOfTries, delayBetweenTries); })
+ .onFailure(s -> {
+ LOGGER.error("Cannot connection to Cluster after {} attempts, exiting", maxNumberOfTries);
+ System.err.println("Can not conenction to Cluster after " + maxNumberOfTries + " attempts, exiting");
+ System.exit(500);
+ })
+ .onSuccess(s -> {
+ long timeElapsed = System.currentTimeMillis() - top;
+ LOGGER.info("[OK] Connection etablished to Cluster in {} millis.", timeElapsed);})
+ .execute(connect).getResult();
+ }
+
+ /**
+ * Graph Traversal for suggested videos.
+ *
+ * @param session
+ * current dse session.
+ * @return
+ * traversal
+ */
+ @Bean
+ public KillrVideoTraversalSource initializeGraphTraversalSource(CqlSession dseSession) {
+ //System.out.println(dseSession.getMetadata().getNodes().values().iterator().next().getExtras().get("DSE_WORKLOADS"));
+ //return new KillrVideoTraversalSource(DseGraph.g.getGraph());
+ return EmptyGraph.instance().traversal(KillrVideoTraversalSource.class);
+ }
+
+}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/conf/GrpcConfiguration.java b/killrvideo-commons/src/main/java/com/killrvideo/conf/GrpcConfiguration.java
new file mode 100644
index 00000000..4cf1f630
--- /dev/null
+++ b/killrvideo-commons/src/main/java/com/killrvideo/conf/GrpcConfiguration.java
@@ -0,0 +1,25 @@
+package com.killrvideo.conf;
+
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class GrpcConfiguration {
+
+ @Value("#{environment.KILLRVIDEO_GRPC_PORT}")
+ private Optional grpcPortEnvironmentVar;
+
+ /** Listening Port for GRPC. */
+ @Value("${killrvideo.grpc-server.port:50101}")
+ private int grpcPort;
+
+ public int getGrpcPort() {
+ if (!grpcPortEnvironmentVar.isEmpty()) {
+ grpcPort = grpcPortEnvironmentVar.get();
+ }
+ return grpcPort;
+ }
+
+}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/messaging/conf/KafkaConfiguration.java b/killrvideo-commons/src/main/java/com/killrvideo/conf/KafkaConfiguration.java
similarity index 74%
rename from killrvideo-commons/src/main/java/com/killrvideo/messaging/conf/KafkaConfiguration.java
rename to killrvideo-commons/src/main/java/com/killrvideo/conf/KafkaConfiguration.java
index 5fbc6bf1..1c9bb4f9 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/messaging/conf/KafkaConfiguration.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/conf/KafkaConfiguration.java
@@ -1,4 +1,4 @@
-package com.killrvideo.messaging.conf;
+package com.killrvideo.conf;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
@@ -8,7 +8,10 @@
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import java.util.Arrays;
+import java.util.Optional;
import java.util.Properties;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -16,53 +19,59 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
-import com.killrvideo.conf.KillrVideoConfiguration;
-import com.killrvideo.discovery.ServiceDiscoveryDao;
-
/**
* Use Kafka to exchange messages between services.
*
- * @author Cedrick LUNVEN (@clunven) *
+ * @author Cedrick LUNVEN (@clunven)
*/
@Configuration
-@Profile(KillrVideoConfiguration.PROFILE_MESSAGING_KAFKA)
+@Profile("messaging_kafka")
public class KafkaConfiguration {
- /** Name of service in ETCD. */
- public static final String SERVICE_KAFKA = "kafka";
-
- /** Default CQL listening port. */
- public static final int DEFAULT_PORT = 8082;
+ /** Initialize dedicated connection to ETCD system. */
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);
+
+ @Value("${killrvideo.kafka.port: 8082}")
+ private int kafkaPort;
- /** Kafka Server to be used. */
- private String kafkaServer;
+ @Value("${killrvideo.kafka.brokers:kafka}")
+ private String kafkaBrokers;
- @Value("${kafka.ack: 1 }")
- private String producerAck;
+ @Value("#{environment.KILLRVIDEO_KAFKA_BROKERS}")
+ private Optional kafkaBrokersEnvironmentVar;
- @Value("${kafka.consumerGroup: killrvideo }")
+ @Value("${killrvideo.kafka.consumerGroup: killrvideo }")
private String consumerGroup;
- @Autowired
- private ServiceDiscoveryDao discoveryDao;
+ @Value("${killrvideo.kafka.ack: 1 }")
+ private String producerAck;
+
+ private String connectionURL;
/**
- * Should we init connection with ETCD or direct.
+ * Should we init connection with Env VAR or values in `application.yaml`
*
* @return
* target kafka adress
*/
private String getKafkaServerConnectionUrl() {
- if (null == kafkaServer) {
- kafkaServer = String.join(",", discoveryDao.lookup(SERVICE_KAFKA));
- }
- return kafkaServer;
+ if (null == connectionURL ) {
+ if (!kafkaBrokersEnvironmentVar.isEmpty() && !kafkaBrokersEnvironmentVar.get().isBlank()) {
+ kafkaBrokers = kafkaBrokersEnvironmentVar.get();
+ LOGGER.info(" + Reading broker from KILLRVIDEO_KAFKA_BROKERS");
+ }
+ connectionURL = String.join(",", Arrays.asList(kafkaBrokers.split(","))
+ .stream().map(ip -> ip + ":" + kafkaPort)
+ .collect(Collectors.toList()));
+ }
+ return connectionURL;
}
@Bean("kafka.producer")
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/conf/KillrVideoConfiguration.java b/killrvideo-commons/src/main/java/com/killrvideo/conf/KillrVideoConfiguration.java
deleted file mode 100644
index 6e1c5b93..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/conf/KillrVideoConfiguration.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.killrvideo.conf;
-
-import javax.validation.Validation;
-import javax.validation.Validator;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Configuration for KillrVideo application leveraging on DSE, ETCD and any external source.
- *
- * @author DataStax Developer Advocates team.
- */
-@Configuration
-public class KillrVideoConfiguration {
-
- @Value("#{environment.KILLRVIDEO_HOST_IP ?: '10.0.75.1'}")
- private String applicationHost;
-
- @Value("${application.name: KillrVideo}")
- private String applicationName;
-
- /** Use Spring profile to adapt behaviours. */
- public static final String PROFILE_MESSAGING_KAFKA = "messaging_kafka";
- public static final String PROFILE_MESSAGING_MEMORY = "messaging_memory";
- public static final String PROFILE_DISCOVERY_ETCD = "discovery_etcd";
- public static final String PROFILE_DISCOVERY_STATIC = "discovery_static";
-
- @Bean
- public Validator getBeanValidator() {
- return Validation.buildDefaultValidatorFactory().getValidator();
- }
-
- /**
- * Getter for attribute 'applicationName'.
- *
- * @return
- * current value of 'applicationName'
- */
- public String getApplicationName() {
- return applicationName;
- }
-
- /**
- * Getter for attribute 'applicationHost'.
- *
- * @return
- * current value of 'applicationHost'
- */
- public String getApplicationHost() {
- return applicationHost;
- }
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/conf/KillrvideoDriverOption.java b/killrvideo-commons/src/main/java/com/killrvideo/conf/KillrvideoDriverOption.java
new file mode 100644
index 00000000..e2b407c4
--- /dev/null
+++ b/killrvideo-commons/src/main/java/com/killrvideo/conf/KillrvideoDriverOption.java
@@ -0,0 +1,30 @@
+package com.killrvideo.conf;
+
+import com.datastax.oss.driver.api.core.config.DriverOption;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Implementing custom properties and profile.
+ *
+ * @author Cedrick LUNVEN (@clunven)
+ */
+public enum KillrvideoDriverOption implements DriverOption {
+
+ GRAPH_NAME("basic.graph.name"),
+ GRAPH_TIMEOUT("basic.graph.timeout"),
+
+ ;
+
+ private final String path;
+
+ KillrvideoDriverOption(String path) {
+ this.path = path;
+ }
+
+ @NonNull
+ @Override
+ public String getPath() {
+ return path;
+ }
+ }
\ No newline at end of file
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/messaging/conf/MessagingConfiguration.java b/killrvideo-commons/src/main/java/com/killrvideo/conf/MessagingGuavaConfiguration.java
similarity index 87%
rename from killrvideo-commons/src/main/java/com/killrvideo/messaging/conf/MessagingConfiguration.java
rename to killrvideo-commons/src/main/java/com/killrvideo/conf/MessagingGuavaConfiguration.java
index dae62374..e33408a0 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/messaging/conf/MessagingConfiguration.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/conf/MessagingGuavaConfiguration.java
@@ -1,4 +1,4 @@
-package com.killrvideo.messaging.conf;
+package com.killrvideo.conf;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -11,7 +11,7 @@
import org.springframework.context.annotation.Profile;
import com.google.common.eventbus.EventBus;
-import com.killrvideo.conf.KillrVideoConfiguration;
+import com.killrvideo.utils.KillrVideoThreadFactory;
/**
* Store all configuration related to Messagging.
@@ -19,16 +19,16 @@
* @author DataStax Developer Advocates team.
*/
@Configuration
-@Profile(KillrVideoConfiguration.PROFILE_MESSAGING_MEMORY)
-public class MessagingConfiguration {
+@Profile("messaging_memory")
+public class MessagingGuavaConfiguration {
/** Event Bus. */
private static final String EVENT_BUS_KILLRVIODEO = "killrvideo_event_bus";
- @Value("${killrvideo.messaging.inmemory.threadpool.min.threads:5}")
+ @Value("${killrvideo.messaging.inmemory.minThreads:5}")
private int minThreads;
- @Value("${killrvideo.messaging.inmemory.max.threads:10}")
+ @Value("${killrvideo.messaging.inmemory.maxThreads:10}")
private int maxThreads;
@Value("${killrvideo.messaging.inmemory.ttlThreads:60}")
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDao.java b/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDao.java
deleted file mode 100644
index 5c4f7428..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDao.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.killrvideo.discovery;
-
-import java.util.List;
-
-/**
- * Work with service registry (ETCD, Consul..)
- *
- * @author Cedrick LUNVEN (@clunven)
- */
-public interface ServiceDiscoveryDao {
-
- /**
- * Register new endpoint for a service.
- * @param serviceName
- * unique service identifier
- * @param hostName
- * current hostname
- * @param portNumber
- * current port number
- * @return
- * service key (service name + namespace)
- */
- String register(String serviceName, String hostName, int portNumber);
-
- /**
- * List endpoints available for a service.
- *
- * @param serviceName
- * service identifier
- * @return
- * list of endpoints like hostname1:port1, hostname2:port2
- */
- List < String > lookup(String serviceName);
-
- /**
- * Unregister all endpoints for a service.
- *
- * @param serviceName
- * service unique identifier
- */
- void unregister(String serviceName);
-
- /**
- * Unregister one endpoint for a service.
- *
- * @param serviceName
- * service unique identifier
- * @param hostName
- * current hostname
- * @param portNumber
- * current port number
- */
- void unregisterEndpoint(String serviceName, String hostName, int portNumber);
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDaoEtcd.java b/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDaoEtcd.java
deleted file mode 100644
index 6ec0958d..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDaoEtcd.java
+++ /dev/null
@@ -1,230 +0,0 @@
-package com.killrvideo.discovery;
-
-import java.net.URI;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Profile;
-import org.springframework.stereotype.Component;
-
-import com.evanlennick.retry4j.CallExecutor;
-import com.evanlennick.retry4j.config.RetryConfig;
-import com.evanlennick.retry4j.config.RetryConfigBuilder;
-import com.killrvideo.conf.KillrVideoConfiguration;
-import com.xqbase.etcd4j.EtcdClient;
-import com.xqbase.etcd4j.EtcdClientException;
-import com.xqbase.etcd4j.EtcdNode;
-
-/**
- * Hanle operation arount ETCD (connection, read, write).
- *
- * @author DataStax Developer Advocates Team
- */
-@Component("killrvideo.discovery.etcd")
-@Profile(KillrVideoConfiguration.PROFILE_DISCOVERY_ETCD)
-public class ServiceDiscoveryDaoEtcd implements ServiceDiscoveryDao {
-
- /** Initialize dedicated connection to ETCD system. */
- private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscoveryDaoEtcd.class);
-
- /** Namespace. */
- public static String KILLRVIDEO_SERVICE_NAMESPACE = "/killrvideo/services/";
-
- @Value("${killrvideo.discovery.etcd.host: 10.0.75.1}")
- private String etcdServerHost;
-
- @Value("${killrvideo.discovery.etcd.port: 2379}")
- private int etcdServerPort;
-
- @Value("${killrvideo.discovery.etcd.maxNumberOfTries: 10}")
- private int maxNumberOfTriesEtcd;
-
- @Value("${killrvideo.discovery.etcd.delayBetweenTries: 2}")
- private int delayBetweenTriesEtcd;
-
- /** Native client. */
- private EtcdClient etcdClient;
-
- @PostConstruct
- public void connect() {
- final String etcdUrl = String.format("http://%s:%d", etcdServerHost, etcdServerPort);
- LOGGER.info("Initialize connection to ETCD Server:");
- LOGGER.info(" + Connecting to '{}'", etcdUrl);
- etcdClient = new EtcdClient(URI.create(etcdUrl));
- waitForEtcd();
- LOGGER.info(" + [OK] Connection established.");
- }
-
- /**
- * Read from ETCD using a retry mecanism.
- *
- * @param key
- * current key to look in ETCD.
- * @param required
- * key is required if not returning empty list
- * @return
- */
- private void waitForEtcd() {
- final AtomicInteger atomicCount = new AtomicInteger(1);
- Callable> getKeyFromEtcd = () -> {
- try {
- List nodes = etcdClient.listDir("/");
- if ((nodes == null || nodes.isEmpty())) {
- throw new IllegalStateException("/ is required in ETCD but not yet present");
- }
- return nodes;
- } catch (EtcdClientException e) {
- throw new IllegalStateException("Cannot Access ETCD Server : " + e.getMessage());
- }
- };
- RetryConfig etcdRetryConfig = new RetryConfigBuilder()
- .retryOnAnyException()
- .withMaxNumberOfTries(maxNumberOfTriesEtcd)
- .withDelayBetweenTries(delayBetweenTriesEtcd, ChronoUnit.SECONDS)
- .withFixedBackoff()
- .build();
- new CallExecutor>(etcdRetryConfig)
- .afterFailedTry(s -> {
- LOGGER.info("Attempt #{}/{} : ETCD is not ready (retry in {}s)",
- atomicCount.getAndIncrement(), maxNumberOfTriesEtcd, delayBetweenTriesEtcd); })
- .onFailure(s -> {
- LOGGER.error("ETCD is not ready after {} attempts, exiting", maxNumberOfTriesEtcd);
- System.err.println("ETCD is not ready after " + maxNumberOfTriesEtcd + " attempts, exiting now.");
- System.exit(500);
- })
- .execute(getKeyFromEtcd).getResult()
- .stream().map(node -> node.value)
- .collect(Collectors.toList());
- }
-
- /**
- * Give a service name like 'CommentServices' look for Directory at namespace killrvideo
- * and list value (keys are generated)
- *
- * @param serviceName
- * unique service name
- * @return
- * list of values
- */
- public List < String > lookup(String serviceName) {
- List< String > endPointList = new ArrayList<>();
- String serviceDirectoryKey = KILLRVIDEO_SERVICE_NAMESPACE + serviceName + "/";
- LOGGER.info(" List endpoints for key '{}':", serviceDirectoryKey);
- try {
- List< EtcdNode > existingNodes = etcdClient.listDir(serviceDirectoryKey);
- if (existingNodes != null) {
- endPointList = existingNodes
- .stream()
- .map(node -> node.value)
- .collect(Collectors.toList());
- }
- } catch (EtcdClientException e) {}
- LOGGER.info(" + [OK] Endpoints retrieved '{}':", endPointList);
- return endPointList;
- }
-
- /**
- * Given a servicename and a host give the latest port if exist. This will be used for
- *
- * @param serviceName
- * target service
- * @return
- */
- public synchronized Optional lookupServicePorts(String serviceName, String hostName) {
- int targetPort = -1;
- LOGGER.info("Accessing last port for endpoint with same host");
- for (String endpoint : lookup(serviceName)) {
- String[] endpointChunks = endpoint.split(":");
- int endPointPort = Integer.valueOf(endpointChunks[1]);
- String endPointHost = endpointChunks[0];
- if (hostName.equalsIgnoreCase(endPointHost)) {
- if (endPointPort > targetPort) {
- targetPort = endPointPort;
- LOGGER.info(" + Found {}", targetPort);
- }
- }
- } ;
- return (targetPort == -1) ? Optional.empty() : Optional.of(targetPort);
- }
-
- /** {@inheritDoc} */
- public String register(String serviceName, String hostName, int portNumber) {
- String serviceDirectoryKey = KILLRVIDEO_SERVICE_NAMESPACE + serviceName.trim() + "/";
- String endPoint = hostName + ":" + portNumber;
- try {
- try {
- LOGGER.info("Register endpoint '{}' for key '{}':", endPoint, serviceDirectoryKey);
- etcdClient.createDir(serviceDirectoryKey, null, false);
- LOGGER.info(" + Dir '{}' has been created", serviceDirectoryKey);
- } catch (EtcdClientException e) {
- LOGGER.info(" + Dir '{}' already exist", serviceDirectoryKey);
- }
- List< EtcdNode > existingNodes = etcdClient.listDir(serviceDirectoryKey);
- if (existingNodes != null) {
- Optional existingEndpoint = existingNodes
- .stream().filter(p -> p.value.equalsIgnoreCase(endPoint))
- .findFirst();
- // Return existing key
- if (existingEndpoint.isPresent()) {
- LOGGER.info(" + [OK] Endpoint '{}' already exist", endPoint);
- return existingEndpoint.get().key;
- }
- }
- // Create new Key
- String serviceKey = serviceDirectoryKey + UUID.randomUUID().toString();
- etcdClient.set(serviceKey, endPoint);
- LOGGER.info(" + [OK] Endpoint registered with key '{}'", serviceKey);
- return serviceKey;
- } catch (EtcdClientException e) {
- throw new IllegalStateException("Cannot register services into ETCD", e);
- }
- }
-
- /** {@inheritDoc} */
- public void unregisterEndpoint(String serviceName, String hostName, int portNumber) {
- String serviceDirectoryKey = KILLRVIDEO_SERVICE_NAMESPACE + serviceName + "/";
- String endPoint = hostName + ":" + portNumber;
- try {
- LOGGER.info("Unregister endpoint '{}' for key '{}':", endPoint, serviceDirectoryKey);
- List< EtcdNode > existingNodes = etcdClient.listDir(serviceDirectoryKey);
- Optional existingEndpoint = Optional.empty();
- if (existingNodes != null) {
- existingEndpoint = existingNodes
- .stream().filter(p -> p.value.equalsIgnoreCase(endPoint))
- .findFirst();
- }
- if (existingEndpoint.isPresent()) {
- etcdClient.delete(existingEndpoint.get().key);
- LOGGER.info(" + [OK] Endpoint has been deleted (key={})", existingEndpoint.get().key);
- } else {
- LOGGER.info(" + [OK] This endpoint does not exist");
- }
- } catch (EtcdClientException e) {
- throw new IllegalStateException("Cannot register services into ETCD", e);
- }
- }
-
- /** {@inheritDoc} */
- public void unregister(String serviceName) {
- String serviceDirectoryKey = KILLRVIDEO_SERVICE_NAMESPACE + serviceName + "/";
- try {
- LOGGER.info("Delete dir '{}'", serviceDirectoryKey);
- etcdClient.deleteDir("/killrvideo/services/" + serviceName, true);
- LOGGER.info(" + [OK] Directory has been deleted");
- } catch (EtcdClientException e) {
- LOGGER.info(" + [OK] Directory did not exist");
- }
- }
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDaoStatic.java b/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDaoStatic.java
deleted file mode 100644
index 25168331..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/discovery/ServiceDiscoveryDaoStatic.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.killrvideo.discovery;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Profile;
-import org.springframework.stereotype.Component;
-
-import com.killrvideo.conf.KillrVideoConfiguration;
-
-/**
- * There is no explicit access to
- *
- * @author Cedrick LUNVEN (@clunven)
- */
-@Component("killrvideo.discovery.network")
-@Profile(KillrVideoConfiguration.PROFILE_DISCOVERY_STATIC)
-public class ServiceDiscoveryDaoStatic implements ServiceDiscoveryDao {
-
- /** Initialize dedicated connection to ETCD system. */
- private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscoveryDaoStatic.class);
-
- @Value("${killrvideo.discovery.service.kafka: kafka}")
- private String kafkaServiceName;
-
- @Value("${killrvideo.discovery.static.kafka.port: 8082}")
- private int kafkaPort;
-
- @Value("${killrvideo.discovery.static.kafka.brokers}")
- private String kafkaBrokers;
- @Value("#{environment.KILLRVIDEO_KAFKA_BROKERS}")
- private Optional kafkaBrokersEnvVar;
-
- @Value("${killrvideo.discovery.service.cassandra: cassandra}")
- private String cassandraServiceName;
-
- @Value("${killrvideo.discovery.static.cassandra.port: 9042}")
- private int cassandraPort;
-
- @Value("${killrvideo.discovery.static.cassandra.contactPoints}")
- private String cassandraContactPoints;
- @Value("#{environment.KILLRVIDEO_DSE_CONTACT_POINTS}")
- private Optional cassandraContactPointsEnvVar;
-
- /** {@inheritDoc} */
- @Override
- public List lookup(String serviceName) {
- List< String > endPointList = new ArrayList<>();
- LOGGER.info(" + Lookup for key '{}':", serviceName);
- if (kafkaServiceName.equalsIgnoreCase(serviceName)) {
- if (!kafkaBrokersEnvVar.isEmpty() && !kafkaBrokersEnvVar.get().isBlank()) {
- cassandraContactPoints = kafkaBrokersEnvVar.get();
- LOGGER.info(" + Reading broker from KILLRVIDEO_KAFKA_BROKERS");
- }
- Arrays.asList(kafkaBrokers.split(",")).stream()
- .forEach(ip -> endPointList.add(ip + ":" + kafkaPort));
-
- } else if (cassandraServiceName.equalsIgnoreCase(serviceName)) {
- // Explicit overwriting of contact points from env var
- // Better than default spring : simpler
- if (!cassandraContactPointsEnvVar.isEmpty() && !cassandraContactPointsEnvVar.get().isBlank()) {
- cassandraContactPoints = cassandraContactPointsEnvVar.get();
- LOGGER.info(" + Reading contactPoints from KILLRVIDEO_DSE_CONTACT_POINTS");
- }
- Arrays.asList(cassandraContactPoints.split(","))
- .stream()
- .forEach(ip -> endPointList.add(ip + ":" + cassandraPort));
- }
- LOGGER.info(" + Endpoints retrieved '{}':", endPointList);
- return endPointList;
- }
-
- /** {@inheritDoc} */
- @Override
- public String register(String serviceName, String hostName, int portNumber) {
- // Do nothing in k8s service are registered through DNS
- return serviceName;
- }
-
- /** {@inheritDoc} */
- @Override
- public void unregister(String serviceName) {}
-
- /** {@inheritDoc} */
- @Override
- public void unregisterEndpoint(String serviceName, String hostName, int portNumber) {}
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/conf/DseConfiguration.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/conf/DseConfiguration.java
deleted file mode 100644
index b4ebcf16..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/conf/DseConfiguration.java
+++ /dev/null
@@ -1,374 +0,0 @@
-package com.killrvideo.dse.conf;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.security.KeyStore;
-import java.security.cert.CertificateException;
-import java.security.cert.CertificateFactory;
-import java.security.cert.X509Certificate;
-import java.time.temporal.ChronoUnit;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.net.ssl.TrustManagerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.RemoteEndpointAwareNettySSLOptions;
-import com.datastax.driver.core.policies.AddressTranslator;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.dse.DseCluster.Builder;
-import com.datastax.driver.dse.DseSession;
-import com.datastax.driver.dse.auth.DsePlainTextAuthProvider;
-import com.datastax.driver.dse.graph.GraphOptions;
-import com.datastax.driver.dse.graph.GraphProtocol;
-import com.datastax.driver.mapping.DefaultPropertyMapper;
-import com.datastax.driver.mapping.MappingConfiguration;
-import com.datastax.driver.mapping.MappingManager;
-import com.datastax.driver.mapping.PropertyMapper;
-import com.datastax.driver.mapping.PropertyTransienceStrategy;
-import com.datastax.dse.graph.api.DseGraph;
-import com.evanlennick.retry4j.CallExecutor;
-import com.evanlennick.retry4j.config.RetryConfig;
-import com.evanlennick.retry4j.config.RetryConfigBuilder;
-import com.killrvideo.discovery.ServiceDiscoveryDao;
-import com.killrvideo.dse.graph.KillrVideoTraversalSource;
-import com.killrvideo.dse.utils.BlobToStringCodec;
-import com.killrvideo.model.CommonConstants;
-
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-
-/**
- * Connectivity to DSE (cassandra, graph, search, analytics).
- *
- * @author DataStax Developer Advocates team.
- */
-@Configuration
-public class DseConfiguration {
-
- /** Internal logger. */
- private static final Logger LOGGER = LoggerFactory.getLogger(DseConfiguration.class);
-
- @Value("${killrvideo.discovery.service.cassandra: cassandra}")
- private String cassandraServiceName;
-
- @Value("${killrvideo.cassandra.clustername: 'killrvideo'}")
- public String dseClustOerName;
-
- @Value("${killrvideo.graph.timeout: 30000}")
- public Integer graphTimeout;
-
- @Value("${killrvideo.graph.recommendation.name: 'killrvideo_video_recommendations'}")
- public String graphRecommendationName;
-
- @Value("#{environment.KILLRVIDEO_DSE_USERNAME}")
- public Optional < String > dseUsername;
-
- @Value("#{environment.KILLRVIDEO_DSE_PASSWORD}")
- public Optional < String > dsePassword;
-
- @Value("${killrvideo.cassandra.maxNumberOfTries: 50}")
- private Integer maxNumberOfTries;
-
- @Value("#{environment.KILLRVIDEO_MAX_NUMBER_RETRY}")
- private Optional < Integer > maxNumberOfTriesFromEnvVar;
-
- @Value("${killrvideo.cassandra.delayBetweenTries: 3}")
- private Integer delayBetweenTries;
-
- @Value("#{environment.KILLRVIDEO_DELAY_BETWEEN_RETRY}")
- private Optional < Integer > delayBetweenTriesFromEnvVar;
-
- @Value("${killrvideo.ssl.CACertFileLocation: cassandra.cert}")
- private String sslCACertFileLocation;
-
- @Value("#{environment.KILLRVIDEO_SSL_CERTIFICATE}")
- public Optional < String > sslCACertFileLocationEnvVar;
-
- @Value("${killrvideo.ssl.enable: false}")
- public boolean dseEnableSSL = false;
-
- @Value("#{environment.KILLRVIDEO_ENABLE_SSL}")
- public Optional < Boolean > dseEnableSSLEnvVar;
-
- @Value("${killrvideo.etcd.enabled : true}")
- private boolean etcdLookup = false;
-
- @Autowired
- private ServiceDiscoveryDao discoveryDao;
-
- @Bean
- public DseSession initializeDSE() {
- long top = System.currentTimeMillis();
- LOGGER.info("Initializing connection to DSE Cluster...");
- Builder clusterConfig = new Builder();
- populateContactPoints(clusterConfig);
- populateAuthentication(clusterConfig);
- populateGraphOptions(clusterConfig);
- populateSSL(clusterConfig);
-
- // More options available with flags through QueryOptions
- QueryOptions options = new QueryOptions();
- options.setConsistencyLevel(ConsistencyLevel.QUORUM);
- options.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
- clusterConfig.withQueryOptions(options);
- clusterConfig.withReconnectionPolicy(new ConstantReconnectionPolicy(2000));
-
- // Required
- clusterConfig.withoutJMXReporting();
- clusterConfig.withoutMetrics();
- clusterConfig.getConfiguration().getSocketOptions().setReadTimeoutMillis(1000);
- clusterConfig.getConfiguration().getCodecRegistry().register(new BlobToStringCodec());
-
- final AtomicInteger atomicCount = new AtomicInteger(1);
- Callable connectionToDse = () -> {
- return clusterConfig.build().connect(CommonConstants.KILLRVIDEO_KEYSPACE);
- };
-
- if (!maxNumberOfTriesFromEnvVar.isEmpty()) {
- maxNumberOfTries = maxNumberOfTriesFromEnvVar.get();
- }
-
- if (!delayBetweenTriesFromEnvVar.isEmpty()) {
- delayBetweenTries = delayBetweenTriesFromEnvVar.get();
- }
-
- // Connecting to DSE with a retry mechanism :
- /* In docker deployments we may have to wait until all components are up and running. */
- RetryConfig config = new RetryConfigBuilder()
- .retryOnAnyException()
- .withMaxNumberOfTries(maxNumberOfTries)
- .withDelayBetweenTries(delayBetweenTries, ChronoUnit.SECONDS)
- .withFixedBackoff()
- .build();
-
- return new CallExecutor(config)
- .afterFailedTry(s -> {
- LOGGER.info("Attempt #{}/{} failed.. trying in {} seconds, waiting for DSE to start...", atomicCount.getAndIncrement(),
- maxNumberOfTries, delayBetweenTries); })
- .onFailure(s -> {
- LOGGER.error("Cannot connection to DSE after {} attempts, exiting", maxNumberOfTries);
- System.err.println("Can not conenction to DSE after " + maxNumberOfTries + " attempts, exiting");
- System.exit(500);
- })
- .onSuccess(s -> {
- long timeElapsed = System.currentTimeMillis() - top;
- LOGGER.info("[OK] Connection etablished to DSE Cluster in {} millis.", timeElapsed);})
- .execute(connectionToDse).getResult();
- }
-
- /**
- * Use to create mapper and perform ORM on top of Cassandra tables.
- *
- * @param session
- * current dse session.
- * @return
- * mapper
- */
- @Bean
- public MappingManager initializeMappingManager(DseSession session) {
- // Do not map all fields, only the annotated ones with @Column or @Fields
- PropertyMapper propertyMapper = new DefaultPropertyMapper()
- .setPropertyTransienceStrategy(PropertyTransienceStrategy.OPT_IN);
- // Build configuration from mapping
- MappingConfiguration configuration = MappingConfiguration.builder()
- .withPropertyMapper(propertyMapper)
- .build();
- // Sample Manager with advance configuration
- return new MappingManager(session, configuration);
- }
-
- /**
- * Graph Traversal for suggested videos.
- *
- * @param session
- * current dse session.
- * @return
- * traversal
- */
- @Bean
- public KillrVideoTraversalSource initializeGraphTraversalSource(DseSession session) {
- return DseGraph.traversal(session, KillrVideoTraversalSource.class);
- }
-
- /**
- * Retrieve cluster nodes adresses (eg:node1:9042) from ETCD and initialize the `contact points`,
- * endpoints of Cassandra cluster nodes.
- *
- * @note
- * [Initializing Contact Points with Java Driver]
- *
- * (1) The default port is 9042. If you keep using the default port you
- * do not need to use or `withPort()` or `addContactPointsWithPorts()`, only `addContactPoint()`.
- *
- * (2) Best practice is to use the SAME PORT for each node and to setup the port through `withPort()`.
- *
- * (3) Never, ever use `addContactPointsWithPorts` in clusters : it will ony set port FOR THE FIRST NODE.
- * DON'T USE, EVEN IF ALL NODE USE SAME PORT. It purpose is only for tests and standalone servers.
- *
- * (4) What if I have a cluster and nodes do not use the same port (eg: node1:9043, node2:9044, node3:9045) ?
- * You need to use {@link AddressTranslator} as defined below and reference with `withAddressTranslator(translator);`
- *
- *
- * public class MyClusterAddressTranslator implements AddressTranslator {
- * @Override
- * public void init(Cluster cluster) {}
- * @Override
- * public void close() {}
- *
- * @Override
- * public InetSocketAddress translate(InetSocketAddress incoming) {
- * // Given the configuration
- * Map clusterNodes = new HashMap() { {
- * put("node1", 9043);put("node2", 9044);put("node3", 9045);
- * }};
- * String targetHostName = incoming.getHostName();
- * if (clusterNodes.containsKey(targetHostName)) {
- * return new InetSocketAddress(targetHostName, clusterNodes.get(targetHostName));
- * }
- * throw new IllegalArgumentException("Cannot translate URL " + incoming + " hostName not found");
- * }
- * }
- *
- *
- * @param clusterConfig
- * current configuration
- */
- private void populateContactPoints(Builder clusterConfig) {
- discoveryDao.lookup(cassandraServiceName).stream()
- .map(this::asSocketInetAdress)
- .filter(node -> node.isPresent())
- .map(node -> node.get())
- // Use one node port to setup - they are all the same
- .peek(node -> clusterConfig.withPort(node.getPort()))
- .map(adress -> adress.getHostName())
- .forEach(clusterConfig::addContactPoint);
- }
-
- /**
- * Check to see if we have username and password from the environment
- * This is here because we have a dual use scenario. One for developers and others
- * who download KillrVideo and run within a local Docker container and the other
- * who might need (like us for example) to connect KillrVideo up to an external
- * cluster that requires authentication.
- */
- private void populateAuthentication(Builder clusterConfig) {
- if (dseUsername.isPresent() && dsePassword.isPresent()
- && dseUsername.get().length() > 0) {
- AuthProvider cassandraAuthProvider = new DsePlainTextAuthProvider(dseUsername.get(), dsePassword.get());
- clusterConfig.withAuthProvider(cassandraAuthProvider);
- String obfuscatedPassword = new String(new char[dsePassword.get().length()]).replace("\0", "*");
- LOGGER.info(" + Using supplied DSE username: '{}' and password: '{}' from environment variables",
- dseUsername.get(), obfuscatedPassword);
- } else {
- LOGGER.info(" + Connection is not authenticated (no username/password)");
- }
- }
-
- private void populateGraphOptions(Builder clusterConfig) {
- GraphOptions go = new GraphOptions();
- go.setGraphName(graphRecommendationName);
- go.setReadTimeoutMillis(graphTimeout);
- go.setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
- clusterConfig.withGraphOptions(go);
- }
-
- /**
- * Convert information in ETCD as real adress {@link InetSocketAddress} if possible.
- *
- * @param contactPoint
- * network node adress information like hostname:port
- * @return
- * java formatted inet adress
- */
- private Optional asSocketInetAdress(String contactPoint) {
- Optional target = Optional.empty();
- try {
- if (contactPoint != null && contactPoint.length() > 0) {
- String[] chunks = contactPoint.split(":");
- if (chunks.length == 2) {
- LOGGER.info(" + Adding node '{}' to the Cassandra cluster definition", contactPoint);
- return Optional.of(new InetSocketAddress(InetAddress.getByName(chunks[0]), Integer.parseInt(chunks[1])));
- }
- }
- } catch (NumberFormatException e) {
- LOGGER.warn(" + Cannot read contactPoint - "
- + "Invalid Port Numer, entry '" + contactPoint + "' will be ignored", e);
- } catch (UnknownHostException e) {
- LOGGER.warn(" + Cannot read contactPoint - "
- + "Invalid Hostname, entry '" + contactPoint + "' will be ignored", e);
- }
- return target;
- }
-
- /**
- * If SSL is enabled use the supplied CA cert file to create
- * an SSL context and use to configure our cluster.
- *
- * @param clusterConfig
- * current configuration
- */
- private void populateSSL(Builder clusterConfig) {
-
- // Reading Environment Variables to eventually override default config
- if (!dseEnableSSLEnvVar.isEmpty()) {
- dseEnableSSL = dseEnableSSLEnvVar.get();
- if (!sslCACertFileLocationEnvVar.isEmpty()) {
- sslCACertFileLocation = sslCACertFileLocationEnvVar.get();
- }
- }
-
- if (dseEnableSSL) {
- LOGGER.info(" + SSL is enabled, using supplied SSL certificate: '{}'", sslCACertFileLocation);
- try {
- // X509 Certificate
- FileInputStream fis = new FileInputStream(sslCACertFileLocation);
- X509Certificate caCert = (X509Certificate) CertificateFactory.getInstance("X.509")
- .generateCertificate(new BufferedInputStream(fis));
-
- // KeyStore
- KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
- ks.load(null, null);
- ks.setCertificateEntry(Integer.toString(1), caCert);
-
- // TrustStore
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- tmf.init(ks);
- SslContext sslContext = SslContextBuilder.forClient().trustManager(tmf).build();
- clusterConfig.withSSL(new RemoteEndpointAwareNettySSLOptions(sslContext));
-
- } catch (FileNotFoundException fne) {
- String errMsg = "SSL cert file not found. You must provide a valid certification file when using SSL encryption option.";
- LOGGER.error(errMsg, fne);
- throw new IllegalArgumentException(errMsg, fne);
-
- } catch (CertificateException ce) {
- String errCert = "Your CA certificate looks invalid. You must provide a valid certification file when using SSL encryption option.";
- LOGGER.error(errCert, ce);
-
- throw new IllegalArgumentException(errCert, ce);
- } catch (Exception e) {
- String errSsl = "Exception in SSL configuration: ";
- LOGGER.error(errSsl, e);
- throw new IllegalArgumentException(errSsl, e);
- }
- } else {
- LOGGER.info(" + SSL encryption is not enabled)");
- }
- }
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/AbstractDseDao.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/AbstractDseDao.java
new file mode 100644
index 00000000..66eb9080
--- /dev/null
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/AbstractDseDao.java
@@ -0,0 +1,51 @@
+package com.killrvideo.dse.dao;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverConfig;
+
+/**
+ * Mutualization of code for DAO.
+ *
+ * @author DataStax Developer Advocates team.
+ */
+public abstract class AbstractDseDao implements DseSchema {
+
+ @Autowired
+ protected CqlSession cqlSession;
+
+ @Autowired
+ @Qualifier("killrvideo.keyspace")
+ protected CqlIdentifier keyspaceName;
+
+ protected DriverConfig dseDriverConfig;
+
+ /**
+ * Default constructor.
+ */
+ public AbstractDseDao() {}
+
+ /**
+ * Allow explicit intialization for test purpose.
+ */
+ public AbstractDseDao(CqlSession dseSession) {
+ this.cqlSession = dseSession;
+ this.dseDriverConfig = dseSession.getContext().getConfig();
+ if (!dseSession.getKeyspace().isEmpty()) {
+ keyspaceName = dseSession.getKeyspace().get();
+ }
+ }
+
+ /**
+ * Utility for validations.
+ */
+ protected void assertNotNull(String mName, String pName, Object obj) {
+ if (obj == null) {
+ throw new IllegalArgumentException("Assertion failed: param " + pName + " is required for method " + mName);
+ }
+ }
+
+}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/DseDaoSupport.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/DseDaoSupport.java
deleted file mode 100644
index 01541759..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/DseDaoSupport.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.killrvideo.dse.dao;
-
-import static com.datastax.driver.mapping.Mapper.Option.timestamp;
-
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import com.datastax.driver.dse.DseSession;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.killrvideo.model.CommonConstants;
-
-/**
- * Mutualization of code for DAO.
- *
- * @author DataStax Developer Advocates team.
- */
-public abstract class DseDaoSupport implements CommonConstants {
-
- /** Loger for that class. */
- private Logger LOGGER = LoggerFactory.getLogger(getClass());
-
- /** Code for Solr QUERY. */
- public static final String SOLR_QUERY = "solr_query";
-
- /** Hold Connectivity to DSE. */
- @Autowired
- protected DseSession dseSession;
-
- /** Hold Driver Mapper to implement ORM with Cassandra. */
- @Autowired
- protected MappingManager mappingManager;
-
- /**
- * Preparation of statements before firing queries allow signifiant performance improvements.
- * This can only be done it the statement is 'static', mean the number of parameter
- * to bind() is fixed. If not the case you can find sample in method buildStatement*() in this class.
- */
- @PostConstruct
- protected abstract void initialize ();
-
- /**
- * Default constructor.
- */
- public DseDaoSupport() {}
-
- /**
- * Allow explicit intialization for test purpose.
- */
- public DseDaoSupport(DseSession dseSession) {
- this.dseSession = dseSession;
- this.mappingManager = new MappingManager(dseSession);
- initialize();
- }
-
- /**
- * Allows to save with custom mapper if relevant.
- *
- * @param entity
- * current entity
- * @param overridingMapper
- * current mapper
- */
- protected void save(T entity, Mapper mapper) {
- long start = System.currentTimeMillis();
- mapper.save(entity, timestamp(System.currentTimeMillis()));
- LOGGER.debug("Saving entity {} in {} milli(s).", entity, System.currentTimeMillis() - start);
- }
-
- protected void assertNotNull(String mName, String pName, Object obj) {
- if (obj == null) {
- throw new IllegalArgumentException("Assertion failed: param " + pName + " is required for method " + mName);
- }
- }
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/DseSchema.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/DseSchema.java
new file mode 100644
index 00000000..e25d6632
--- /dev/null
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/dao/DseSchema.java
@@ -0,0 +1,122 @@
+package com.killrvideo.dse.dao;
+
+import java.text.SimpleDateFormat;
+import java.time.format.DateTimeFormatter;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+
+/**
+ * Information related to SCHEMA : use to 'decorate' POJO in Mapper, then prepareStatements.
+ *
+ * @author DataStax Developer Advocates team.
+ */
+public interface DseSchema {
+
+ SimpleDateFormat FORMATTER_DAY = new SimpleDateFormat("yyyyMMdd");
+ DateTimeFormatter DATEFORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
+
+ CqlIdentifier SOLR_QUERY = CqlIdentifier.fromCql("solr_query");
+
+ // user_credentials
+ String TABLENAME_USER_CREDENTIALS = "user_credentials";
+ String USERCREDENTIAL_COLUMN_USERID = "userid" ;
+ String USERCREDENTIAL_COLUMN_PASSWORD = "\"password\"";
+ String USERCREDENTIAL_COLUMN_EMAIL = "email";
+ CqlIdentifier TABLENAME_USER_CREDENTIALS_ = CqlIdentifier.fromCql(TABLENAME_USER_CREDENTIALS);
+ CqlIdentifier USERCREDENTIAL_COLUMN_USERID_ = CqlIdentifier.fromCql(USERCREDENTIAL_COLUMN_USERID);
+ CqlIdentifier USERCREDENTIAL_COLUMN_PASSWORD_ = CqlIdentifier.fromCql(USERCREDENTIAL_COLUMN_PASSWORD);
+ CqlIdentifier USERCREDENTIAL_COLUMN_EMAIL_ = CqlIdentifier.fromCql(USERCREDENTIAL_COLUMN_EMAIL);
+
+ // users
+ String TABLENAME_USERS = "users";
+ String USER_COLUMN_USERID = "userid";
+ String USER_COLUMN_FIRSTNAME = "firstname";
+ String USER_COLUMN_LASTNAME = "lastname";
+ String USER_COLUMN_EMAIL = "email";
+ String USER_COLUMN_CREATE = "created_date";
+ CqlIdentifier TABLENAME_USERS_ = CqlIdentifier.fromCql(TABLENAME_USERS);
+ CqlIdentifier USER_COLUMN_USERID_ = CqlIdentifier.fromCql(USER_COLUMN_USERID);
+ CqlIdentifier USER_COLUMN_FIRSTNAME_ = CqlIdentifier.fromCql(USER_COLUMN_FIRSTNAME);
+ CqlIdentifier USER_COLUMN_LASTNAME_ = CqlIdentifier.fromCql(USER_COLUMN_LASTNAME);
+ CqlIdentifier USER_COLUMN_EMAIL_ = CqlIdentifier.fromCql(USER_COLUMN_EMAIL);
+ CqlIdentifier USER_COLUMN_CREATE_ = CqlIdentifier.fromCql(USER_COLUMN_CREATE);
+
+ // videos
+ String TABLENAME_VIDEOS = "videos";
+ String VIDEOS_COLUMN_VIDEOID = "videoid";
+ String VIDEOS_COLUMN_USERID = "userid";
+ String VIDEOS_COLUMN_NAME = "name";
+ String VIDEOS_COLUMN_DESCRIPTION = "description";
+ String VIDEOS_COLUMN_LOCATION = "location";
+ String VIDEOS_COLUMN_LOCATIONTYPE = "location_type";
+ String VIDEOS_COLUMN_PREVIEW = "preview_image_location";
+ String VIDEOS_COLUMN_TAGS = "tags";
+ String VIDEOS_COLUMN_ADDED_DATE = "added_date";
+ CqlIdentifier TABLENAME_VIDEOS_ = CqlIdentifier.fromCql(TABLENAME_VIDEOS);
+ CqlIdentifier VIDEOS_COLUMN_VIDEOID_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_VIDEOID);
+ CqlIdentifier VIDEOS_COLUMN_USERID_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_USERID);
+ CqlIdentifier VIDEOS_COLUMN_NAME_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_NAME);
+ CqlIdentifier VIDEOS_COLUMN_DESCRIPTION_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_DESCRIPTION);
+ CqlIdentifier VIDEOS_COLUMN_LOCATION_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_LOCATION);
+ CqlIdentifier VIDEOS_COLUMN_LOCATIONTYPE_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_LOCATIONTYPE);
+ CqlIdentifier VIDEOS_COLUMN_PREVIEW_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_PREVIEW);
+ CqlIdentifier VIDEOS_COLUMN_TAGS_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_TAGS);
+ CqlIdentifier VIDEOS_COLUMN_ADDED_DATE_ = CqlIdentifier.fromCql(VIDEOS_COLUMN_ADDED_DATE);
+
+ // user_videos
+ String TABLENAME_USERS_VIDEO = "user_videos";
+ String USERVIDEOS_COLUMN_USERID = "userid";
+ CqlIdentifier TABLENAME_USERS_VIDEO_ = CqlIdentifier.fromCql(TABLENAME_USERS_VIDEO);
+ CqlIdentifier USERVIDEOS_COLUMN_USERID_ = CqlIdentifier.fromCql(USERVIDEOS_COLUMN_USERID);
+
+ // latest_videos
+ String TABLENAME_LATEST_VIDEO = "latest_videos";
+ String LATESTVIDEOS_COLUMN_YYYYMMDD = "yyyymmdd";
+ String LATESTVIDEOS_COLUMN_VIDEOID = "videoid";
+ String LATESTVIDEOS_COLUMN_USERID = "userid";
+ CqlIdentifier TABLENAME_LATEST_VIDEO_ = CqlIdentifier.fromCql(TABLENAME_LATEST_VIDEO);
+ CqlIdentifier LATESTVIDEOS_COLUMN_YYYYMMDD_ = CqlIdentifier.fromCql(LATESTVIDEOS_COLUMN_YYYYMMDD);
+ CqlIdentifier LATESTVIDEOS_COLUMN_VIDEOID_ = CqlIdentifier.fromCql(LATESTVIDEOS_COLUMN_VIDEOID);
+ CqlIdentifier LATESTVIDEOS_COLUMN_USERID_ = CqlIdentifier.fromCql(LATESTVIDEOS_COLUMN_USERID);
+
+
+ // comments_by_video + comments_by_user
+ String TABLENAME_COMMENTS_BY_USER = "comments_by_user";
+ String TABLENAME_COMMENTS_BY_VIDEO = "comments_by_video";
+ String COMMENTS_COLUMN_VIDEOID = "videoid";
+ String COMMENTS_COLUMN_USERID = "userid";
+ String COMMENTS_COLUMN_COMMENTID = "commentid";
+ String COMMENTS_COLUMN_COMMENT = "comment";
+ CqlIdentifier TABLENAME_COMMENTS_BY_USER_ = CqlIdentifier.fromCql(TABLENAME_COMMENTS_BY_USER);
+ CqlIdentifier TABLENAME_COMMENTS_BY_VIDEO_ = CqlIdentifier.fromCql(TABLENAME_COMMENTS_BY_VIDEO);
+ CqlIdentifier COMMENTS_COLUMN_VIDEOID_ = CqlIdentifier.fromCql(COMMENTS_COLUMN_VIDEOID);
+ CqlIdentifier COMMENTS_COLUMN_USERID_ = CqlIdentifier.fromCql(COMMENTS_COLUMN_USERID);
+ CqlIdentifier COMMENTS_COLUMN_COMMENTID_ = CqlIdentifier.fromCql(COMMENTS_COLUMN_COMMENTID);
+ CqlIdentifier COMMENTS_COLUMN_COMMENT_ = CqlIdentifier.fromCql(COMMENTS_COLUMN_COMMENT);
+
+ // video_ratings + video_ratings_by_user
+ String TABLENAME_VIDEO_RATINGS = "video_ratings";
+ String TABLENAME_VIDEO_RATINGS_BYUSER = "video_ratings_by_user";
+ String RATING_COLUMN_RATING = "rating";
+ String RATING_COLUMN_RATING_COUNTER = "rating_counter";
+ String RATING_COLUMN_RATING_TOTAL = "rating_total";
+ String RATING_COLUMN_VIDEOID = "videoid";
+ String RATING_COLUMN_USERID = "userid";
+ CqlIdentifier TABLENAME_VIDEO_RATINGS_ = CqlIdentifier.fromCql(TABLENAME_VIDEO_RATINGS);
+ CqlIdentifier TABLENAME_VIDEO_RATINGS_BYUSER_ = CqlIdentifier.fromCql(TABLENAME_VIDEO_RATINGS_BYUSER);
+ CqlIdentifier RATING_COLUMN_RATING_COUNTER_ = CqlIdentifier.fromCql(RATING_COLUMN_RATING_COUNTER);
+ CqlIdentifier RATING_COLUMN_RATING_TOTAL_ = CqlIdentifier.fromCql(RATING_COLUMN_RATING_TOTAL);
+ CqlIdentifier RATING_COLUMN_VIDEOID_ = CqlIdentifier.fromCql(RATING_COLUMN_VIDEOID);
+ CqlIdentifier RATING_COLUMN_USERID_ = CqlIdentifier.fromCql(RATING_COLUMN_USERID);
+ CqlIdentifier RATING_COLUMN_RATING_ = CqlIdentifier.fromCql(RATING_COLUMN_RATING);
+
+ // video_playback_stats
+ String TABLENAME_PLAYBACK_STATS = "video_playback_stats";
+ String COLUMN_PLAYBACK_VIDEOID = "videoid";
+ String COLUMN_PLAYBACK_VIEWS = "views";
+ CqlIdentifier TABLENAME_PLAYBACK_STATS_ = CqlIdentifier.fromCql(TABLENAME_PLAYBACK_STATS);
+ CqlIdentifier COLUMN_PLAYBACK_VIDEOID_ = CqlIdentifier.fromCql(COLUMN_PLAYBACK_VIDEOID);
+ CqlIdentifier COLUMN_PLAYBACK_VIEWS_ = CqlIdentifier.fromCql(COLUMN_PLAYBACK_VIEWS);
+
+
+}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractEntity.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractEntity.java
deleted file mode 100644
index dfd00717..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractEntity.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.killrvideo.dse.dto;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.killrvideo.model.CommonConstants;
-
-
-
-/**
- * Entities to be used in the application.
- *
- * @author DataStax Developer Advocates team.
- */
-public abstract class AbstractEntity implements Serializable, CommonConstants {
-
- /** Serial. */
- private static final long serialVersionUID = 7239223683486549695L;
-
- /** Used as converter. */
- public static final SimpleDateFormat FORMATTER_DAY = new SimpleDateFormat("yyyyMMdd");
-
- /** Helping Loging. */
- private static ObjectMapper om = new ObjectMapper();
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- try {
- return getClass().getSimpleName() + " : " + om.writeValueAsString(this);
- } catch (IOException e) {
- return super.toString();
- }
- }
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractVideo.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractVideo.java
index 2657cbfc..f6053a0c 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractVideo.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/AbstractVideo.java
@@ -1,29 +1,27 @@
package com.killrvideo.dse.dto;
+import java.io.Serializable;
+
import org.hibernate.validator.constraints.Length;
-import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.oss.driver.api.mapper.annotations.CqlName;
+import com.killrvideo.dse.dao.DseSchema;
/**
* Bean representing shared attributes in videos.
*
* @author DataStax Developer Advocates team
*/
-public abstract class AbstractVideo extends AbstractEntity {
+public abstract class AbstractVideo implements Serializable, DseSchema {
/** Serial. */
private static final long serialVersionUID = -4366554197274003003L;
- /** Column names. */
- public static final String COLUMN_NAME = "name";
- public static final String COLUMN_TAGS = "tags";
- public static final String COLUMN_PREVIEW = "preview_image_location";
-
- @Column
+ @CqlName(VIDEOS_COLUMN_NAME)
@Length(min = 1, message = "The video name must not be empty")
protected String name;
- @Column(name = COLUMN_PREVIEW)
+ @CqlName(VIDEOS_COLUMN_PREVIEW)
protected String previewImageLocation;
/**
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/ResultListPage.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/ResultListPage.java
index b590e1df..ff0bd5ab 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/ResultListPage.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/ResultListPage.java
@@ -1,15 +1,15 @@
package com.killrvideo.dse.dto;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
-import com.datastax.driver.core.PagingState;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.Result;
+import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
+import com.datastax.oss.driver.api.core.PagingIterable;
+import com.datastax.oss.protocol.internal.util.Bytes;
/**
* Ease usage of the paginState.
@@ -37,29 +37,30 @@ public ResultListPage() {}
* @param mapper
* mapper
*/
- public ResultListPage(Result rs) {
+ public ResultListPage(PagingIterable rs) {
if (null != rs) {
Iterator iterResults = rs.iterator();
- // rs.getAvailableWithoutFetching() all to parse only current page without fecthing all
IntStream.range(0, rs.getAvailableWithoutFetching())
.forEach(item -> listOfResults.add(iterResults.next()));
- nextPage = Optional.ofNullable(rs.getExecutionInfo().getPagingState())
- .map(PagingState::toString);
+ if (null != rs.getExecutionInfo().getPagingState()) {
+ ByteBuffer pagingState = rs.getExecutionInfo().getPagingState();
+ if (pagingState != null && pagingState.hasArray()) {
+ nextPage = Optional.ofNullable(Bytes.toHexString(pagingState));
+ }
+ }
+ }
+ }
+
+ public ResultListPage(MappedAsyncPagingIterable rs) {
+ if (null != rs) {
+ rs.currentPage().forEach(listOfResults::add);
+ ByteBuffer pagingState = rs.getExecutionInfo().getPagingState();
+ if (pagingState != null && pagingState.hasArray()) {
+ nextPage = Optional.ofNullable(Bytes.toHexString(pagingState));
+ }
}
}
- /**
- * Constructor with mapper.
- *
- * @param rs
- * result set
- * @param mapper
- * mapper
- */
- public ResultListPage(ResultSet rs, Mapper mapper) {
- this(mapper.map(rs));
- }
-
/** {@inheritDoc} */
@Override
public String toString() {
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/Video.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/Video.java
index b035f7bd..2c066d0c 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/Video.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/dto/Video.java
@@ -1,6 +1,7 @@
package com.killrvideo.dse.dto;
-import java.util.Date;
+import java.time.Instant;
+import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -8,58 +9,48 @@
import org.hibernate.validator.constraints.Length;
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.PartitionKey;
-import com.datastax.driver.mapping.annotations.Table;
-import com.killrvideo.dse.utils.EmptyCollectionIfNull;
-import com.killrvideo.model.CommonConstants;
+import com.datastax.oss.driver.api.mapper.annotations.CqlName;
+import com.datastax.oss.driver.api.mapper.annotations.Entity;
+import com.datastax.oss.driver.api.mapper.annotations.PartitionKey;
+import com.killrvideo.dse.dao.DseSchema;
/**
* Pojo representing DTO for table 'videos'.
*
* @author DataStax Developer Advocates team.
*/
-@Table(keyspace = CommonConstants.KILLRVIDEO_KEYSPACE, name = Video.TABLENAME_VIDEOS)
+@Entity
+@CqlName(DseSchema.TABLENAME_VIDEOS)
public class Video extends AbstractVideo {
/** Serial. */
private static final long serialVersionUID = 7035802926837646137L;
- public static final String TABLENAME_VIDEOS = "videos";
-
- /** Column names in the DB. */
- public static final String COLUMN_USERID = "userid";
- public static final String COLUMN_VIDEOID = "videoid";
- public static final String COLUMN_DESCRIPTION = "description";
- public static final String COLUMN_LOCATION = "location";
- public static final String COLUMN_LOCATIONTYPE = "location_type";
- public static final String COLUMN_ADDED_DATE = "added_date";
-
@PartitionKey
+ @CqlName(VIDEOS_COLUMN_VIDEOID)
private UUID videoid;
@NotNull
- @Column
+ @CqlName(VIDEOS_COLUMN_USERID)
private UUID userid;
@Length(min = 1, message = "description must not be empty")
- @Column
+ @CqlName(VIDEOS_COLUMN_DESCRIPTION)
private String description;
@Length(min = 1, message = "location must not be empty")
- @Column
+ @CqlName(VIDEOS_COLUMN_LOCATION)
private String location;
- @Column(name = COLUMN_LOCATIONTYPE)
+ @CqlName(VIDEOS_COLUMN_LOCATIONTYPE)
private int locationType;
- @Column
- @EmptyCollectionIfNull
- private Set tags;
+ @CqlName(VIDEOS_COLUMN_TAGS)
+ private Set tags = new HashSet<>();
@NotNull
- @Column(name = COLUMN_ADDED_DATE)
- private Date addedDate;
+ @CqlName(VIDEOS_COLUMN_ADDED_DATE)
+ private Instant addedDate;
/**
* Default Constructor allowing reflection.
@@ -76,14 +67,16 @@ public Video(String title) {
/**
* Constructor wihout location nor preview.
*/
- public Video(UUID videoid, UUID userid, String name, String description, int locationType, Set tags, Date addedDate) {
+ public Video(UUID videoid, UUID userid, String name, String description, int locationType,
+ Set tags, Instant addedDate) {
this(videoid, userid, name, description, null, locationType, null, tags, addedDate);
}
/**
* All attributes constructor.
*/
- public Video(UUID videoid, UUID userid, String name, String description, String location, int locationType, String previewImageLocation, Set tags, Date addedDate) {
+ public Video(UUID videoid, UUID userid, String name, String description, String location,
+ int locationType, String previewImageLocation, Set tags, Instant addedDate) {
super(name, previewImageLocation);
this.videoid = videoid;
this.userid = userid;
@@ -214,7 +207,7 @@ public void setTags(Set tags) {
* @return
* current value of 'addedDate'
*/
- public Date getAddedDate() {
+ public Instant getAddedDate() {
return addedDate;
}
@@ -223,9 +216,31 @@ public Date getAddedDate() {
* @param addedDate
* new value for 'addedDate '
*/
- public void setAddedDate(Date addedDate) {
+ public void setAddedDate(Instant addedDate) {
this.addedDate = addedDate;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Video [videoid=");
+ builder.append(videoid);
+ builder.append(", userid=");
+ builder.append(userid);
+ builder.append(", description=");
+ builder.append(description);
+ builder.append(", location=");
+ builder.append(location);
+ builder.append(", locationType=");
+ builder.append(locationType);
+ builder.append(", tags=");
+ builder.append(tags);
+ builder.append(", addedDate=");
+ builder.append(addedDate);
+ builder.append("]");
+ return builder.toString();
+ }
}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/graph/KillrVideoTraversalSourceDsl.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/graph/KillrVideoTraversalSourceDsl.java
index d0d9a745..b9e7f50b 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/graph/KillrVideoTraversalSourceDsl.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/graph/KillrVideoTraversalSourceDsl.java
@@ -1,5 +1,6 @@
package com.killrvideo.dse.graph;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
@@ -29,6 +30,10 @@ public KillrVideoTraversalSourceDsl(final Graph graph, final TraversalStrategies
public KillrVideoTraversalSourceDsl(final Graph graph) {
super(graph);
}
+
+ public KillrVideoTraversalSourceDsl(final RemoteConnection connection) {
+ super(connection);
+ }
/**
* Applied filtering on VIDEO/VIDEOID vertices.
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/BlobToStringCodec.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/BlobToStringCodec.java
deleted file mode 100644
index 1212b926..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/BlobToStringCodec.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.killrvideo.dse.utils;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import com.datastax.driver.core.TypeCodec;
-import com.datastax.driver.extras.codecs.MappingCodec;
-
-/**
- * Column expect a blob, attribute is a String, we need a codec here for conversion.
- *
- * In CQL you would be able to use textAsBlob().
- *
- * @author DataStax Developer Advocates team.
- */
-public class BlobToStringCodec extends MappingCodec {
-
- /** Working woth Charset UTF */
- private Charset plarformCharset;
-
- /** Default charset will be UTF-8. */
- public BlobToStringCodec() {
- this(StandardCharsets.UTF_8);
- }
-
- /**
- * Default construcot.
- */
- public BlobToStringCodec(Charset charset) {
- super(TypeCodec.blob(), String.class);
- this.plarformCharset = charset;
- }
-
- /** {@inheritDoc} */
- @Override
- protected ByteBuffer serialize(String str) {
- return ByteBuffer.wrap(str.getBytes(plarformCharset));
- }
-
- /** {@inheritDoc} */
- @Override
- protected String deserialize(ByteBuffer byteBuffer) {
- return plarformCharset.decode(byteBuffer).toString();
- }
-}
\ No newline at end of file
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/DseUtils.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/DseUtils.java
index 1d464741..15ec6ddb 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/DseUtils.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/DseUtils.java
@@ -3,22 +3,22 @@
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Scanner;
-import java.util.concurrent.CompletableFuture;
+import java.util.UUID;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.schemabuilder.SchemaBuilder;
-import com.datastax.driver.dse.DseSession;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
+import com.datastax.oss.driver.api.mapper.entity.EntityHelper;
+import com.datastax.oss.driver.api.mapper.entity.saving.NullSavingStrategy;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
/**
* Utility class for DSE.
@@ -29,14 +29,13 @@ public class DseUtils {
/** Internal logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(DseUtils.class);
-
- /** Replication Strategies. */
- public static enum ReplicationStrategy { SimpleStrategy, NetworkTopologyStrategy };
-
- private static final String KEY_CLASS = "class";
- private static final String KEY_REPLICATIONFACTOR = "replication_factor";
private static final String UTF8_ENCODING = "UTF-8";
private static final String NEW_LINE = System.getProperty("line.separator");
+ private static final long INT_SINCE_UUID_EPOCH = 0x01b21dd213814000L;
+
+ public static long getTimeFromUUID(UUID uuid) {
+ return (uuid.timestamp() - INT_SINCE_UUID_EPOCH) / 10000;
+ }
/**
* Helper to create a KeySpace.
@@ -44,36 +43,28 @@ public static enum ReplicationStrategy { SimpleStrategy, NetworkTopologyStrategy
* @param keyspacename
* target keyspaceName
*/
- public static void createKeySpaceSimpleStrategy(DseSession dseSession, String keyspacename, int replicationFactor) {
- final Map replication = new HashMap<>();
- replication.put(KEY_CLASS, ReplicationStrategy.SimpleStrategy.name());
- replication.put(KEY_REPLICATIONFACTOR, replicationFactor);
- dseSession.execute(SchemaBuilder.createKeyspace(keyspacename).ifNotExists().with().replication(replication));
- useKeySpace(dseSession, keyspacename);
+ public static void createKeySpaceSimpleStrategy(CqlSession cqlSession, String keyspacename, int replicationFactor) {
+ cqlSession.execute(SchemaBuilder.createKeyspace(keyspacename)
+ .ifNotExists()
+ .withSimpleStrategy(replicationFactor)
+ .build());
+ useKeySpace(cqlSession, keyspacename);
}
- /**
- * Setup connection to use keyspace.
- *
- * @param dseSession
- * current session
- * @param keyspacename
- * target keyspace
- */
- public static void useKeySpace(DseSession dseSession, String keyspacename) {
- dseSession.execute("USE " + keyspacename);
+ public static boolean isTableEmpty(CqlSession cqlSession, CqlIdentifier keyspace, CqlIdentifier tablename) {
+ return 0 == cqlSession.execute(QueryBuilder.selectFrom(keyspace, tablename).all().build()).getAvailableWithoutFetching();
}
- /**
- * Empty table.
- *
- * @param dseSession
- * current session
- * @param tableName
- * table name
- */
- public static void truncate(DseSession dseSession, String tableName) {
- dseSession.execute(QueryBuilder.truncate(tableName));
+ public static void useKeySpace(CqlSession cqlSession, String keyspacename) {
+ cqlSession.execute("USE " + keyspacename);
+ }
+
+ public static void dropKeyspace(CqlSession cqlSession, String keyspacename) {
+ cqlSession.executeAsync(SchemaBuilder.dropKeyspace(keyspacename).ifExists().build());
+ }
+
+ public static void truncateTable(CqlSession cqlSession, CqlIdentifier keyspace, CqlIdentifier tableName) {
+ cqlSession.execute(QueryBuilder.truncate(keyspace, tableName).build());
}
/**
@@ -86,7 +77,7 @@ public static void truncate(DseSession dseSession, String tableName) {
* @throws FileNotFoundException
* cql file has not been found.
*/
- public static void executeCQLFile(DseSession dseSession, String fileName)
+ public static void executeCQLFile(CqlSession cqlSession, String fileName)
throws FileNotFoundException {
long top = System.currentTimeMillis();
LOGGER.info("Processing file: " + fileName);
@@ -94,7 +85,7 @@ public static void executeCQLFile(DseSession dseSession, String fileName)
String query = statement.replaceAll(NEW_LINE, "").trim();
try {
if (query.length() > 0) {
- dseSession.execute(query);
+ cqlSession.execute(query);
LOGGER.info(" + Executed. " + query);
}
} catch (InvalidQueryException e) {
@@ -142,23 +133,9 @@ private static String loadFileAsString(String fileName)
return strBuilder.toString();
}
- /**
- * From Future to completableFuture, also useful for
- *
- * @param listenableFuture
- * @return
- */
- public static CompletableFuture buildCompletableFuture(final ListenableFuture listenableFuture) {
- CompletableFuture completable = new CompletableFuture();
- Futures.addCallback(listenableFuture, new FutureCallback() {
- public void onSuccess(T result) { completable.complete(result); }
- public void onFailure(Throwable t) { completable.completeExceptionally(t);}
- });
- return completable;
- }
-
- @SuppressWarnings("rawtypes")
- public static String displayGraphTranserval(T graphTraversal) {
- return org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyTranslator.of("g").translate(graphTraversal.getBytecode());
+ public static BoundStatement bind(PreparedStatement preparedStatement, T entity, EntityHelper entityHelper) {
+ BoundStatementBuilder boundStatement = preparedStatement.boundStatementBuilder();
+ entityHelper.set(entity, boundStatement, NullSavingStrategy.DO_NOT_SET);
+ return boundStatement.build();
}
}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/EmptyCollectionIfNull.java b/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/EmptyCollectionIfNull.java
deleted file mode 100644
index 94281ab6..00000000
--- a/killrvideo-commons/src/main/java/com/killrvideo/dse/utils/EmptyCollectionIfNull.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2012-2016 DuyHai DOAN
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.killrvideo.dse.utils;
-
-import java.lang.annotation.*;
-
-/**
-
- * In Cassandra there is no difference between an empty collection/map
- * and a null value for collection/map
- *
- *
- * In Java we do make the difference. This annotations allows mapping null values from
- * Cassandra into empty collection & map.
- *
- *
- * Empty list will default to ArrayList.
- * Empty set will default to HashSet.
- * Empty map will default to HashMap.
-
- *
- * {@literal @}Column
- * {@literal @}EmptyCollectionIfNull
- * private List<String> friends
- *
- *
-
- * This annotation can be used for nested collections too:
- *
- *
- * {@literal @}Column
- * private Tuple2<Integer,{@literal @}EmptyCollectionIfNull List<String>> friends
- *
-
- *
- *
- * @see @EmptyCollectionIfNull
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.FIELD, ElementType.TYPE_USE})
-@Documented
-public @interface EmptyCollectionIfNull {
-
-}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/grpc/AbstractSingleServiceGrpcServer.java b/killrvideo-commons/src/main/java/com/killrvideo/grpc/AbstractSingleServiceGrpcServer.java
index ce2f25fd..ddf1418f 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/grpc/AbstractSingleServiceGrpcServer.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/grpc/AbstractSingleServiceGrpcServer.java
@@ -1,7 +1,5 @@
package com.killrvideo.grpc;
-import java.util.Optional;
-
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -9,8 +7,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import com.killrvideo.conf.KillrVideoConfiguration;
-import com.killrvideo.discovery.ServiceDiscoveryDaoEtcd;
+import com.killrvideo.conf.GrpcConfiguration;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@@ -26,20 +23,12 @@ public abstract class AbstractSingleServiceGrpcServer {
/** Some logger. */
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSingleServiceGrpcServer.class);
- /** Global Configuration. s*/
- @Autowired
- protected KillrVideoConfiguration killrVideoConfig;
-
- /** Connectivity to ETCD Service discovery. */
@Autowired
- protected ServiceDiscoveryDaoEtcd serviceDiscoveryDao;
+ protected GrpcConfiguration grpcConfig;
/** GRPC Server to start. */
protected Server grpcServer;
- /** Port to be allocated dynamically based on ETCD. */
- protected static int grpcServerPort = 0;
-
/** Service Name. */
protected abstract String getServiceName();
@@ -55,13 +44,7 @@ public abstract class AbstractSingleServiceGrpcServer {
@PostConstruct
public void startGrpcServer() throws Exception {
LOGGER.info("Initializing Comment Service");
- grpcServerPort = getDefaultPort();
- Optional maxUsedPort = serviceDiscoveryDao.lookupServicePorts(getServiceName(),
- killrVideoConfig.getApplicationHost());
- if (maxUsedPort.isPresent()) {
- grpcServerPort = maxUsedPort.get() + 1;
- }
- grpcServer = ServerBuilder.forPort(grpcServerPort)
+ grpcServer = ServerBuilder.forPort(grpcConfig.getGrpcPort())
.addService(getService())
.build();
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -70,17 +53,14 @@ public void run() {
}
});
grpcServer.start();
- LOGGER.info("[OK] Grpc Server started on port: '{}'", grpcServerPort);
- serviceDiscoveryDao.register(getServiceName(),
- killrVideoConfig.getApplicationHost(), grpcServerPort);
+ LOGGER.info("[OK] Grpc Server started on port: '{}'", grpcConfig.getGrpcPort());
}
@PreDestroy
public void stopGrpcServer() {
- LOGGER.info("Calling shutdown for GrpcServer");
- serviceDiscoveryDao.register(getServiceName(),
- killrVideoConfig.getApplicationHost(), grpcServerPort);
+ LOGGER.info("Stopping GrpcServer...");
grpcServer.shutdown();
+ LOGGER.info("[OK] Grpc Server stopped");
}
}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/utils/GrpcMappingUtils.java b/killrvideo-commons/src/main/java/com/killrvideo/grpc/GrpcMappingUtils.java
similarity index 82%
rename from killrvideo-commons/src/main/java/com/killrvideo/utils/GrpcMappingUtils.java
rename to killrvideo-commons/src/main/java/com/killrvideo/grpc/GrpcMappingUtils.java
index a626c01d..9a4bafca 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/utils/GrpcMappingUtils.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/grpc/GrpcMappingUtils.java
@@ -1,4 +1,4 @@
-package com.killrvideo.utils;
+package com.killrvideo.grpc;
import java.time.Instant;
import java.util.Date;
@@ -22,30 +22,35 @@ private GrpcMappingUtils() {}
/**
* Conversions.
*/
-
public static TimeUuid uuidToTimeUuid(UUID uuid) {
+ if (null == uuid) return null;
return TimeUuid.newBuilder().setValue(uuid.toString()).build();
}
public static Uuid uuidToUuid(UUID uuid) {
+ if (null == uuid) return null;
return Uuid.newBuilder().setValue(uuid.toString()).build();
}
public static Instant timestampToInstant(Timestamp protoTimeStamp) {
+ if (null == protoTimeStamp) return null;
return Instant.ofEpochSecond(
protoTimeStamp.getSeconds(),
protoTimeStamp.getNanos() ) ;
}
public static Date timestampToDate(Timestamp protoTimestamp) {
+ if (null == protoTimestamp) return null;
return Date.from(timestampToInstant(protoTimestamp));
}
public static Timestamp dateToTimestamp(Date date) {
+ if (null == date) return null;
return instantToTimeStamp(date.toInstant());
}
public static Timestamp instantToTimeStamp(Instant instant) {
+ if (null == instant) return null;
return Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).setNanos(instant.getNano()).build();
}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/ErrorProcessor.java b/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/ErrorProcessor.java
index 1edba0ca..255d8275 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/ErrorProcessor.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/ErrorProcessor.java
@@ -1,6 +1,7 @@
package com.killrvideo.messaging.dao;
-import java.io.FileNotFoundException;
+import java.io.File;
+import java.io.IOException;
import java.io.PrintWriter;
import javax.annotation.PostConstruct;
@@ -26,13 +27,17 @@ public class ErrorProcessor {
/** LOGGER for the class. */
private static Logger LOGGER = LoggerFactory.getLogger(ErrorProcessor.class);
- @Value("${killrvideo.cassandra.mutation-error-log: /tmp/killrvideo-mutation-errors.log}")
+ @Value("${killrvideo.dse.mutation-error-log:/tmp/killrvideo-mutation-errors.log}")
private String mutationErrorLog;
private PrintWriter errorLogFile;
@PostConstruct
- public void openErrorLogFile() throws FileNotFoundException {
+ public void openErrorLogFile() throws IOException {
+ File logFile = new File(getMutationErrorLog());
+ if (!logFile.exists()) {
+ logFile.createNewFile();
+ }
this.errorLogFile = new PrintWriter(getMutationErrorLog());
}
diff --git a/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/MessagingDao.java b/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/MessagingDao.java
index c1dedfef..9616a533 100644
--- a/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/MessagingDao.java
+++ b/killrvideo-commons/src/main/java/com/killrvideo/messaging/dao/MessagingDao.java
@@ -1,11 +1,10 @@
package com.killrvideo.messaging.dao;
-import static com.killrvideo.messaging.utils.MessagingUtils.mapCustomError;
-import static com.killrvideo.messaging.utils.MessagingUtils.mapError;
-
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import com.killrvideo.messaging.utils.MessagingUtils;
+
/**
* Interface to work with Events.
*
@@ -36,14 +35,14 @@ public interface MessagingDao {
* @param t
*/
default CompletableFuture