diff --git a/.travis.yml b/.travis.yml index c8071655fc..3c5fb2c70c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -49,10 +49,7 @@ env: METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 - PIO_ELASTICSEARCH_VERSION=5.6.9 - - BUILD_TYPE=Integration - METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 - PIO_ELASTICSEARCH_VERSION=6.4.2 + PIO_ELASTICSEARCH_VERSION=6.8.1 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS PIO_HBASE_VERSION=1.2.6 @@ -101,7 +98,7 @@ env: - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS PIO_SCALA_VERSION=2.11.12 - PIO_SPARK_VERSION=2.4.0 + PIO_SPARK_VERSION=2.4.3 PIO_HADOOP_VERSION=2.7.7 - BUILD_TYPE=LicenseCheck diff --git a/LICENSE.txt b/LICENSE.txt index 6e02f9bccb..3a0e66be21 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1530,7 +1530,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. Binary distribution bundles com.thoughtworks.paranamer # paranamer # 2.3 (https://github.com/paul-hammant/paranamer) - com.thoughtworks.paranamer # paranamer # 2.6 (https://github.com/paul-hammant/paranamer) + com.thoughtworks.paranamer # paranamer # 2.8 (https://github.com/paul-hammant/paranamer) which is available under the BSD license (http://www.opensource.org/licenses/bsd-license.php) @@ -1701,15 +1701,12 @@ EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. Binary distribution bundles org.scala-lang # scala-library # 2.11.12 (http://scala-lang.org/) - org.scala-lang # scala-compiler # 2.11.12 (http://scala-lang.org/) org.scala-lang # scala-reflect # 2.11.12 (http://scala-lang.org/) - org.scala-lang # scalap # 2.11.12 (http://scala-lang.org/) org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 (http://scala-lang.org/) org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 (http://scala-lang.org/) org.scala-lang.modules # scala-parser-combinators_2.11 # 1.1.0 (http://scala-lang.org/) - org.scala-lang.modules # scala-xml_2.11 # 1.0.5 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.6 (http://scala-lang.org/) - + which is available under the BSD license (http://www.scala-lang.org/downloads/license.html) Copyright (c) 2002-2017 EPFL diff --git a/build.sbt b/build.sbt index 082521b929..ca76d92470 100644 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,7 @@ scalaVersion in ThisBuild := sys.props.getOrElse("scala.version", "2.11.12") scalaBinaryVersion in ThisBuild := binaryVersion(scalaVersion.value) -crossScalaVersions in ThisBuild := Seq("2.11.12") +crossScalaVersions in ThisBuild := Seq(scalaVersion.value) scalacOptions in ThisBuild ++= Seq("-deprecation", "-unchecked", "-feature") @@ -36,8 +36,7 @@ fork in (ThisBuild, run) := true javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:deprecation", "-Xlint:unchecked") -// Ignore differentiation of Spark patch levels -sparkVersion in ThisBuild := sys.props.getOrElse("spark.version", "2.1.3") +sparkVersion in ThisBuild := sys.props.getOrElse("spark.version", "2.4.3") sparkBinaryVersion in ThisBuild := binaryVersion(sparkVersion.value) @@ -45,7 +44,7 @@ hadoopVersion in ThisBuild := sys.props.getOrElse("hadoop.version", "2.7.7") akkaVersion in ThisBuild := sys.props.getOrElse("akka.version", "2.5.17") -elasticsearchVersion in ThisBuild := sys.props.getOrElse("elasticsearch.version", "5.6.9") +elasticsearchVersion in ThisBuild := sys.props.getOrElse("elasticsearch.version", "6.8.1") hbaseVersion in ThisBuild := sys.props.getOrElse("hbase.version", "1.2.6") diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index 5fbad4b426..f209ad70d2 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -24,8 +24,7 @@ # you need to change these to fit your site. # SPARK_HOME: Apache Spark is a hard dependency and must be configured. -# SPARK_HOME=$PIO_HOME/vendors/spark-2.0.2-bin-hadoop2.7 -SPARK_HOME=$PIO_HOME/vendors/spark-2.1.1-bin-hadoop2.6 +SPARK_HOME=$PIO_HOME/vendors/spark-2.4.3-bin-hadoop2.7 POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-42.0.0.jar MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.41.jar @@ -40,7 +39,7 @@ MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.41.jar # HBASE_CONF_DIR: You must configure this if you intend to run PredictionIO # with HBase on a remote cluster. -# HBASE_CONF_DIR=$PIO_HOME/vendors/hbase-1.0.0/conf +# HBASE_CONF_DIR=$PIO_HOME/vendors/hbase-1.2.6/conf # Filesystem paths where PredictionIO uses as block storage. PIO_FS_BASEDIR=$HOME/.pio_store @@ -89,7 +88,7 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http -# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.6.9 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-6.8.1 # Optional basic HTTP auth # PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name # PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret @@ -100,7 +99,7 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # HBase Example # PIO_STORAGE_SOURCES_HBASE_TYPE=hbase -# PIO_STORAGE_SOURCES_HBASE_HOME=$PIO_HOME/vendors/hbase-1.0.0 +# PIO_STORAGE_SOURCES_HBASE_HOME=$PIO_HOME/vendors/hbase-1.2.6 # AWS S3 Example # PIO_STORAGE_SOURCES_S3_TYPE=s3 diff --git a/conf/pio-vendors.sh b/conf/pio-vendors.sh index 959c395f88..15facce944 100644 --- a/conf/pio-vendors.sh +++ b/conf/pio-vendors.sh @@ -24,7 +24,7 @@ if [ -z "$PIO_SCALA_VERSION" ]; then fi if [ -z "$PIO_SPARK_VERSION" ]; then - PIO_SPARK_VERSION="2.1.3" + PIO_SPARK_VERSION="2.4.3" fi if [ -z "$PIO_HADOOP_VERSION" ]; then @@ -32,7 +32,7 @@ if [ -z "$PIO_HADOOP_VERSION" ]; then fi if [ -z "$PIO_ELASTICSEARCH_VERSION" ]; then - PIO_ELASTICSEARCH_VERSION="5.6.9" + PIO_ELASTICSEARCH_VERSION="6.8.1" fi if [ -z "$PIO_HBASE_VERSION" ]; then diff --git a/docs/manual/data/versions.yml b/docs/manual/data/versions.yml index 54fa46968b..d969ffa665 100644 --- a/docs/manual/data/versions.yml +++ b/docs/manual/data/versions.yml @@ -1,7 +1,7 @@ pio: 0.14.0 -spark: 2.4.0 -spark_download_filename: spark-2.4.0-bin-hadoop2.7 -elasticsearch_download_filename: elasticsearch-5.6.9 +spark: 2.4.3 +spark_download_filename: spark-2.4.3-bin-hadoop2.7 +elasticsearch_download_filename: elasticsearch-6.8.1 hbase_version: 1.2.6 hbase_basename: hbase-1.2.6 hbase_variant: bin diff --git a/docs/manual/source/install/index.html.md.erb b/docs/manual/source/install/index.html.md.erb index ce2023b993..9d5240919e 100644 --- a/docs/manual/source/install/index.html.md.erb +++ b/docs/manual/source/install/index.html.md.erb @@ -21,54 +21,34 @@ limitations under the License. ## Prerequisites -It is **very important** to meet the minimum version of the following +It is **very important** to meet the version of the following technologies that power Apache PredictionIO®. -* Apache Hadoop 2.6.5 (optional, required only if YARN and HDFS are needed) -* Apache Spark 2.0.2 for Hadoop 2.6 * Java SE Development Kit 8 +* Apache Spark 2.0+ +* Apache Hadoop 2.6, 2.7 and one of the following sets: -* PostgreSQL 9.1 - -or - -* MySQL 5.1 - -or - -* Apache HBase 0.98.5 -* Elasticsearch 5.6.9 - -WARNING: **Note that support for Scala 2.10 and Spark 1.6 were removed as of PredictionIO 0.14.0.** - -If you are running on a single machine, we recommend a minimum of 2GB memory. - -INFO: If you are using Linux, Apache Spark local mode, which is the default -operation mode without further configuration, may not work. In that case, -configure your Apache Spark to run in [standalone cluster -mode](http://spark.apache.org/docs/latest/spark-standalone.html). +* PostgreSQL 9.6 or MySQL 5.1 +* Apache HBase 1.2 +* Elasticsearch 6.x, 5.6(deprecated) ## Installation -* [Installing Apache PredictionIO](install-sourcecode.html) +Pre-built for the following versions -You may also use Docker to install Apache PredictionIO® +* Scala 2.11 +* Apache Spark 2.4 +* Apache Hadoop 2.7 +* Elasticsearch 6.8 -* [Installing Apache PredictionIO with Docker](install-docker.html) - - -[//]: # (* *(coming soon)* Installing Apache PredictionIO with Homebrew) +* [Downloading Binary Distribution](install-sourcecode.html#downloading-binary-distribution) +Building Apache PredictionIO +* [Downloading Source Code](install-sourcecode.html#downloading-source-code) -WARNING: **0.8.2 contains schema changes from the previous versions, if you have -installed the previous versions, you may need to clear both HBase and -Elasticsearch. See more [here](/resources/upgrade/).** +Docker - -[//]: # (## Production Deployment) - -[//]: # (For production environment setup, please refer to [Production) -[//]: # (Deployment](/production/deploy.html) guide.) +* [Installing Apache PredictionIO with Docker](install-docker.html) diff --git a/docs/manual/source/install/install-sourcecode.html.md.erb b/docs/manual/source/install/install-sourcecode.html.md.erb index 60f8772850..cfb49298f1 100644 --- a/docs/manual/source/install/install-sourcecode.html.md.erb +++ b/docs/manual/source/install/install-sourcecode.html.md.erb @@ -24,14 +24,6 @@ replace `/home/abc` with your own home directory wherever you see it. ## Downloading Binary Distribution -You can use pre-built binary distribution for Apache PredictionIO® if you are -building against - -* Scala 2.11.12 -* Spark 2.1.3 -* Hadoop 2.7.7 -* Elasticsearch 5.6.9 - Download [binary release from an Apache mirror](https://www.apache.org/dyn/closer.lua/predictionio/<%= data.versions.pio %>/apache-predictionio-<%= data.versions.pio %>-bin.tar.gz). @@ -127,17 +119,6 @@ Extract the binary distribution you have just built. $ tar zxvf PredictionIO-<%= data.versions.pio %>.tar.gz ``` -### Building against Different Versions of Dependencies - -Starting from version 0.11.0, PredictionIO can be built against different -versions of dependencies. As of writing, one could build PredictionIO against -these different dependencies: - -* Scala 2.11.x -* Spark 2.0.x, 2.1.x, 2.2.x, 2.3.x, 2.4.x -* Hadoop 2.6.x, 2.7.x -* Elasticsearch 5.6.x, 6.x - ## Installing Dependencies Let us install dependencies inside a subdirectory of the Apache PredictionIO diff --git a/docs/manual/source/partials/shared/install/_postgres.html.erb b/docs/manual/source/partials/shared/install/_postgres.html.erb index a2e6e99b77..3de4f3a799 100644 --- a/docs/manual/source/partials/shared/install/_postgres.html.erb +++ b/docs/manual/source/partials/shared/install/_postgres.html.erb @@ -54,6 +54,5 @@ $ psql -c "create user pio with password 'pio'" Starting from 0.11.0, PredictionIO no longer bundles JDBC drivers. Download the PostgreSQL JDBC driver from the [official web site](https://jdbc.postgresql.org/), and put the JAR file in the `lib` -subdirectory. By default, `conf/pio-env.sh` assumes version 42.0.0 JDBC 4.2. If -you use a different version, modify `POSTGRES_JDBC_DRIVER` to point to the -correct JAR. +subdirectory. Afterwords, you need to edit `conf/pio-env.sh` and change the +`POSTGRES_JDBC_DRIVER` variable to point to the correct JAR. diff --git a/project/assembly.sbt b/project/assembly.sbt index d95475f16f..9c014713d3 100644 --- a/project/assembly.sbt +++ b/project/assembly.sbt @@ -1 +1 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9") diff --git a/project/build.properties b/project/build.properties index 5f528e4747..1fc4b8093e 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.2.3 \ No newline at end of file +sbt.version=1.2.8 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index fece7e4235..0c9832c959 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,9 +2,9 @@ addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2") -addSbtPlugin("com.typesafe.sbt" % "sbt-twirl" % "1.3.15") +addSbtPlugin("com.typesafe.sbt" % "sbt-twirl" % "1.4.1") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") @@ -12,6 +12,6 @@ resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositori addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") -addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.22") addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0") \ No newline at end of file diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt index 3dbd3dec96..b7362f68df 100644 --- a/storage/elasticsearch/build.sbt +++ b/storage/elasticsearch/build.sbt @@ -19,16 +19,13 @@ import PIOBuild._ name := "apache-predictionio-data-elasticsearch" -elasticsearchVersion := (if (majorVersion(elasticsearchVersion.value) < 5) "5.6.9" else elasticsearchVersion.value) - libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", - "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.elasticsearch.client" % "elasticsearch-rest-client" % elasticsearchVersion.value, - "org.elasticsearch" %% "elasticsearch-spark-20" % elasticsearchVersion.value + "org.elasticsearch" %% "elasticsearch-spark-20" % elasticsearchVersion.value exclude("org.apache.spark", "*"), - "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, - "org.specs2" %% "specs2" % "2.3.13" % "test") + "org.specs2" %% "specs2" % "2.3.13" % "test") parallelExecution in Test := false @@ -36,12 +33,6 @@ pomExtra := childrenPomExtra.value assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) -assemblyShadeRules in assembly := Seq( - ShadeRule.rename("org.apache.http.**" -> - "org.apache.predictionio.shaded.org.apache.http.@1").inAll, - ShadeRule.rename("org.elasticsearch.client.**" -> - "org.apache.predictionio.shaded.org.elasticsearch.client.@1").inAll) - // skip test in assembly test in assembly := {} diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index eef83e4f68..6661257fcf 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -36,19 +36,20 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging /** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String) +class ESAccessKeys(client: RestClient, config: StorageClientConfig, metadataName: String) extends AccessKeys with Logging { implicit val formats = DefaultFormats.lossless - private val estype = "accesskeys" - private val internalIndex = index + "_" + estype - - ESUtils.createIndex(client, internalIndex) - val mappingJson = - (estype -> - ("properties" -> - ("key" -> ("type" -> "keyword")) ~ - ("events" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) + private val metadataKey = "accesskeys" + private val index = metadataName + "_" + metadataKey + private val estype = { + val mappingJson = + ("mappings" -> + ("properties" -> + ("key" -> ("type" -> "keyword")) ~ + ("events" -> ("type" -> "keyword")))) + + ESUtils.createIndex(client, index, compact(render(mappingJson))) + } def insert(accessKey: AccessKey): Option[String] = { val key = if (accessKey.key.isEmpty) generateKey else accessKey.key @@ -63,7 +64,7 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin try { val response = client.performRequest( "GET", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -77,11 +78,11 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } } @@ -91,10 +92,10 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[AccessKey](client, index, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -105,10 +106,10 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[AccessKey](client, index, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -118,8 +119,8 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin try { val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", - s"/$internalIndex/$estype/$id", + "PUT", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -128,11 +129,11 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to update $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to update $index/$estype/$id", e) } } @@ -140,18 +141,18 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin try { val response = client.performRequest( "DELETE", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/id") + error(s"[$result] Failed to delete $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/id", e) + error(s"Failed to delete $index/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 26621cff35..bb7adf2aaf 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -36,27 +36,28 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging /** Elasticsearch implementation of Items. */ -class ESApps(client: RestClient, config: StorageClientConfig, index: String) +class ESApps(client: RestClient, config: StorageClientConfig, metadataName: String) extends Apps with Logging { implicit val formats = DefaultFormats.lossless - private val estype = "apps" - private val seq = new ESSequences(client, config, index) - private val internalIndex = index + "_" + estype + private val seq = new ESSequences(client, config, metadataName) + private val metadataKey = "apps" + private val index = metadataName + "_" + metadataKey + private val estype = { + val mappingJson = + ("mappings" -> + ("properties" -> + ("id" -> ("type" -> "keyword")) ~ + ("name" -> ("type" -> "keyword")))) - ESUtils.createIndex(client, internalIndex) - val mappingJson = - (estype -> - ("properties" -> - ("id" -> ("type" -> "keyword")) ~ - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) + ESUtils.createIndex(client, index, compact(render(mappingJson))) + } def insert(app: App): Option[Int] = { val id = app.id match { case v if v == 0 => @scala.annotation.tailrec def generateId: Int = { - seq.genNext(estype).toInt match { + seq.genNext(metadataKey).toInt match { case x if !get(x).isEmpty => generateId case x => x } @@ -72,7 +73,7 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "GET", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -86,11 +87,11 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } } @@ -104,20 +105,17 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$internalIndex/$estype/_search", + s"/$index/_search", Map.empty[String, String].asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "hits" \ "total").extract[Long] match { - case 0 => None - case _ => - val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] - val result = (results.head \ "_source").extract[App] - Some(result) + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + results.headOption.map { jv => + (jv \ "_source").extract[App] } } catch { case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) None } } @@ -127,10 +125,10 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) val json = ("query" -> ("match_all" -> Nil)) - ESUtils.getAll[App](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[App](client, index, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -138,10 +136,10 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) def update(app: App): Unit = { val id = app.id.toString try { - val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); + val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", - s"/$internalIndex/$estype/$id", + "PUT", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -150,11 +148,11 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to update $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to update $index/$estype/$id", e) } } @@ -162,18 +160,18 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "DELETE", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to delete $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/id", e) + error(s"Failed to delete $index/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index ac248debf4..ebba755dc0 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -35,26 +35,27 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -class ESChannels(client: RestClient, config: StorageClientConfig, index: String) +class ESChannels(client: RestClient, config: StorageClientConfig, metadataName: String) extends Channels with Logging { implicit val formats = DefaultFormats.lossless - private val estype = "channels" - private val seq = new ESSequences(client, config, index) - private val internalIndex = index + "_" + estype + private val seq = new ESSequences(client, config, metadataName) + private val metadataKey = "channels" + private val index = metadataName + "_" + metadataKey + private val estype = { + val mappingJson = + ("mappings" -> + ("properties" -> + ("name" -> ("type" -> "keyword")))) - ESUtils.createIndex(client, internalIndex) - val mappingJson = - (estype -> - ("properties" -> - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) + ESUtils.createIndex(client, index, compact(render(mappingJson))) + } def insert(channel: Channel): Option[Int] = { val id = channel.id match { case v if v == 0 => @scala.annotation.tailrec def generateId: Int = { - seq.genNext(estype).toInt match { + seq.genNext(metadataKey).toInt match { case x if !get(x).isEmpty => generateId case x => x } @@ -70,7 +71,7 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "GET", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -84,11 +85,11 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } } @@ -99,10 +100,10 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[Channel](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[Channel](client, index, compact(render(json))) } catch { case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -112,8 +113,8 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) try { val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", - s"/$internalIndex/$estype/$id", + "PUT", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava, entity) val json = parse(EntityUtils.toString(response.getEntity)) @@ -122,12 +123,12 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) case "created" => true case "updated" => true case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to update $index/$estype/$id") false } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to update $index/$estype/$id", e) false } } @@ -136,18 +137,18 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "DELETE", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) val result = (jsonResponse \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to delete $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to delete $index/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 96f8a6720c..850bdb325e 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -36,30 +36,31 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String) +class ESEngineInstances(client: RestClient, config: StorageClientConfig, metadataName: String) extends EngineInstances with Logging { implicit val formats = DefaultFormats + new EngineInstanceSerializer - private val estype = "engine_instances" - private val internalIndex = index + "_" + estype - - ESUtils.createIndex(client, internalIndex) - val mappingJson = - (estype -> - ("properties" -> - ("status" -> ("type" -> "keyword")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "keyword")) ~ - ("engineVersion" -> ("type" -> "keyword")) ~ - ("engineVariant" -> ("type" -> "keyword")) ~ - ("engineFactory" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("dataSourceParams" -> ("type" -> "keyword")) ~ - ("preparatorParams" -> ("type" -> "keyword")) ~ - ("algorithmsParams" -> ("type" -> "keyword")) ~ - ("servingParams" -> ("type" -> "keyword")) + private val metadataKey = "engine_instances" + private val index = metadataName + "_" + metadataKey + private val estype = { + val mappingJson = + ("mappings" -> + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "keyword")) ~ + ("engineVersion" -> ("type" -> "keyword")) ~ + ("engineVariant" -> ("type" -> "keyword")) ~ + ("engineFactory" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("dataSourceParams" -> ("type" -> "keyword")) ~ + ("preparatorParams" -> ("type" -> "keyword")) ~ + ("algorithmsParams" -> ("type" -> "keyword")) ~ + ("servingParams" -> ("type" -> "keyword")) )) - ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) + + ESUtils.createIndex(client, index, compact(render(mappingJson))) + } def insert(i: EngineInstance): String = { val id = i.id match { @@ -84,7 +85,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$internalIndex/$estype/", + s"/$index/$estype", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -93,12 +94,12 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: case "created" => Some((jsonResponse \ "_id").extract[String]) case _ => - error(s"[$result] Failed to create $internalIndex/$estype") + error(s"[$result] Failed to create $index/$estype") None } } catch { case e: IOException => - error(s"Failed to create $internalIndex/$estype", e) + error(s"Failed to create $index/$estype", e) None } } @@ -107,7 +108,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: try { val response = client.performRequest( "GET", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -121,11 +122,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } } @@ -135,10 +136,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](client, index, compact(render(json))) } catch { case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -163,10 +164,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: ("sort" -> List( ("startTime" -> ("order" -> "desc")))) - ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](client, index, compact(render(json))) } catch { case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -185,8 +186,8 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: try { val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", - s"/$internalIndex/$estype/$id", + "PUT", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -195,11 +196,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to update $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to update $index/$estype/$id", e) } } @@ -207,18 +208,18 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: try { val response = client.performRequest( "DELETE", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to delete $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to delete $index/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 0025950d03..93c3e33c8c 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -36,34 +36,35 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String) +class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, metadataName: String) extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer - private val estype = "evaluation_instances" - private val seq = new ESSequences(client, config, index) - private val internalIndex = index + "_" + estype + private val seq = new ESSequences(client, config, metadataName) + private val metadataKey = "evaluation_instances" + private val index = metadataName + "_" + metadataKey + private val estype = { + val mappingJson = + ("mappings" -> + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> ("type" -> "keyword")) ~ + ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("evaluatorResults" -> ("type" -> "text")) ~ + ("evaluatorResultsHTML" -> ("enabled" -> false)) ~ + ("evaluatorResultsJSON" -> ("enabled" -> false)))) - ESUtils.createIndex(client, internalIndex) - val mappingJson = - (estype -> - ("properties" -> - ("status" -> ("type" -> "keyword")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> ("type" -> "keyword")) ~ - ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("evaluatorResults" -> ("type" -> "text")) ~ - ("evaluatorResultsHTML" -> ("enabled" -> false)) ~ - ("evaluatorResultsJSON" -> ("enabled" -> false)))) - ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) + ESUtils.createIndex(client, index, compact(render(mappingJson))) + } def insert(i: EvaluationInstance): String = { val id = i.id match { case v if v.isEmpty => @scala.annotation.tailrec def generateId: String = { - seq.genNext(estype).toString match { + seq.genNext(metadataKey).toString match { case x if !get(x).isEmpty => generateId case x => x } @@ -79,7 +80,7 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind try { val response = client.performRequest( "GET", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -93,11 +94,11 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$internalIndex/$estype/$id", e) + error(s"Failed to access to /$index/$estype/$id", e) None } } @@ -107,10 +108,10 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](client, index, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -124,10 +125,10 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind ("sort" -> ("startTime" -> ("order" -> "desc"))) - ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](client, index, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$internalIndex/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) Nil } } @@ -137,8 +138,8 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind try { val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", - s"/$internalIndex/$estype/$id", + "PUT", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava, entity) val json = parse(EntityUtils.toString(response.getEntity)) @@ -147,11 +148,11 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to update $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to update $index/$estype/$id", e) } } @@ -159,18 +160,18 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind try { val response = client.performRequest( "DELETE", - s"/$internalIndex/$estype/$id", + s"/$index/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $internalIndex/$estype/$id") + error(s"[$result] Failed to delete $index/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $internalIndex/$estype/$id", e) + error(s"Failed to delete $index/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 708d3d33b3..8cd14344f6 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -28,7 +28,7 @@ import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.LEvents import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.{ResponseException, RestClient} +import org.elasticsearch.client.RestClient import org.joda.time.DateTime import org.json4s._ import org.json4s.JsonDSL._ @@ -38,11 +38,11 @@ import org.json4s.ext.JodaTimeSerializers import grizzled.slf4j.Logging import org.apache.http.message.BasicHeader -class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String) +class ESLEvents(val client: RestClient, config: StorageClientConfig, val eventdataName: String) extends LEvents with Logging { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - def getEsType(appId: Int, channelId: Option[Int] = None): String = { + def eventdataKey(appId: Int, channelId: Option[Int] = None): String = { channelId.map { ch => s"${appId}_${ch}" }.getOrElse { @@ -51,11 +51,9 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd } override def init(appId: Int, channelId: Option[Int] = None): Boolean = { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype - ESUtils.createIndex(client, index) + val index = eventdataName + "_" + eventdataKey(appId, channelId) val json = - (estype -> + ("mappings" -> ("properties" -> ("name" -> ("type" -> "keyword")) ~ ("eventId" -> ("type" -> "keyword")) ~ @@ -69,31 +67,26 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd ("tags" -> ("type" -> "keyword")) ~ ("prId" -> ("type" -> "keyword")) ~ ("creationTime" -> ("type" -> "date")))) - ESUtils.createMapping(client, index, estype, compact(render(json))) + ESUtils.createIndex(client, index, compact(render(json))) true } override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) try { - val json = - ("query" -> - ("match_all" -> List.empty)) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) client.performRequest( - "POST", - s"/$index/$estype/_delete_by_query", - Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, - entity).getStatusLine.getStatusCode match { - case 200 => true - case _ => - error(s"Failed to remove $index/$estype") - false - } + "DELETE", + s"/$index", + Map.empty[String, String].asJava + ).getStatusLine.getStatusCode match { + case 200 => true + case _ => + error(s"Failed to remove $index") + false + } } catch { case e: Exception => - error(s"Failed to remove $index/$estype", e) + error(s"Failed to remove $index", e) false } } @@ -105,8 +98,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { Future { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) + val estype = ESUtils.esType(client, index) try { val id = event.eventId.getOrElse { ESEventsUtil.getBase64UUID @@ -125,7 +118,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd ("properties" -> write(event.properties.toJObject)) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", + "PUT", s"/$index/$estype/$id", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, entity) @@ -133,7 +126,6 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd val result = (jsonResponse \ "result").extract[String] result match { case "created" => id - case "updated" => id case _ => error(s"[$result] Failed to update $index/$estype/$id") "" @@ -151,8 +143,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = { Future { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) + val estype = ESUtils.esType(client, index) try { val ids = events.map { event => event.eventId.getOrElse(ESEventsUtil.getBase64UUID) @@ -160,7 +152,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd val json = events.zip(ids).map { case (event, id) => val commandJson = - ("index" -> ( + ("create" -> ( ("_index" -> index) ~ ("_type" -> estype) ~ ("_id" -> id) @@ -195,12 +187,11 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd val items = (responseJson \ "items").asInstanceOf[JArray] items.arr.map { case value: JObject => - val result = (value \ "index" \ "result").extract[String] - val id = (value \ "index" \ "_id").extract[String] + val result = (value \ "create" \ "result").extract[String] + val id = (value \ "create" \ "_id").extract[String] result match { case "created" => id - case "updated" => id case _ => error(s"[$result] Failed to update $index/$estype/$id") "" @@ -214,37 +205,12 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd } } - private def exists(client: RestClient, estype: String, id: Int): Boolean = { - val index = baseIndex + "_" + estype - try { - client.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { - case 200 => true - case _ => false - } - } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => false - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - false - } - case e: IOException => - error(s"Failed to access to $index/$estype/$id", e) - false - } - } - override def futureGet( eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { Future { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) try { val json = ("query" -> @@ -253,20 +219,17 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/_search", + s"/$index/_search", Map.empty[String, String].asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "hits" \ "total").extract[Long] match { - case 0 => None - case _ => - val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] - val result = (results.head \ "_source").extract[Event] - Some(result) + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + results.headOption.map { jv => + (jv \ "_source").extract[Event] } } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error(s"Failed to access to /$index/_search", e) None } } @@ -277,8 +240,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { Future { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) try { val json = ("query" -> @@ -287,14 +249,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/_delete_by_query", + s"/$index/_delete_by_query", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "deleted").extract[Int] > 0 } catch { case e: IOException => - error(s"Failed to delete $index/$estype:$eventId", e) + error(s"Failed to delete $index:$eventId", e) false } } @@ -314,15 +276,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd reversed: Option[Boolean] = None) (implicit ec: ExecutionContext): Future[Iterator[Event]] = { Future { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) try { val query = ESUtils.createEventQuery( startTime, untilTime, entityType, entityId, eventNames, targetEntityType, targetEntityId, reversed) limit.getOrElse(20) match { - case -1 => ESUtils.getEventAll(client, index, estype, query).toIterator - case size => ESUtils.getEvents(client, index, estype, query, size).toIterator + case -1 => ESUtils.getEventAll(client, index, query).toIterator + case size => ESUtils.getEvents(client, index, query, size).toIterator } } catch { case e: IOException => diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala index a86d378331..f54456f2bf 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -41,11 +41,11 @@ import org.json4s.native.JsonMethods._ import org.json4s.ext.JodaTimeSerializers -class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: String) +class ESPEvents(client: RestClient, config: StorageClientConfig, eventdataName: String) extends PEvents { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - def getEsType(appId: Int, channelId: Option[Int] = None): String = { + def eventdataKey(appId: Int, channelId: Option[Int] = None): String = { channelId.map { ch => s"${appId}_${ch}" }.getOrElse { @@ -77,10 +77,9 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: Stri startTime, untilTime, entityType, entityId, eventNames, targetEntityType, targetEntityId, None) - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) val conf = new Configuration() - conf.set("es.resource", s"$index/$estype") + conf.set("es.resource", index) conf.set("es.query", query) conf.set("es.nodes", getESNodes()) @@ -97,8 +96,8 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: Stri override def write( events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) + val estype = ESUtils.esType(client, index) val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes()) events.map { event => ESEventsUtil.eventToPut(event, appId) @@ -108,8 +107,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: Stri override def delete( eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - val estype = getEsType(appId, channelId) - val index = baseIndex + "_" + estype + val index = eventdataName + "_" + eventdataKey(appId, channelId) eventIds.foreachPartition { iter => iter.foreach { eventId => try { @@ -120,19 +118,17 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: Stri val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/_delete_by_query", + s"/$index/_delete_by_query", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "deleted" => - case _ => - logger.error(s"[$result] Failed to update $index/$estype:$eventId") + if ((jsonResponse \ "deleted").extract[Int] == 0) { + logger.warn("The number of documents that were successfully deleted is 0. " + + s"$index:$eventId") } } catch { case e: IOException => - logger.error(s"Failed to update $index/$estype:$eventId", e) + logger.error(s"Failed to update $index:$eventId", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index ade0f40ce9..0fb1a73a76 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -34,24 +34,25 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging { +class ESSequences(client: RestClient, config: StorageClientConfig, metadataName: String) extends Logging { implicit val formats = DefaultFormats - private val estype = "sequences" - private val internalIndex = index + "_" + estype + private val metadataKey = "sequences" + private val index = metadataName + "_" + metadataKey + private val estype = { + val mappingJson = + ("mappings" -> + ("properties" -> + ("n" -> ("enabled" -> false)))) - ESUtils.createIndex(client, internalIndex) - val mappingJson = - (estype -> - ("properties" -> - ("n" -> ("enabled" -> false)))) - ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) + ESUtils.createIndex(client, index, compact(render(mappingJson))) + } def genNext(name: String): Long = { try { val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) val response = client.performRequest( - "POST", - s"/$internalIndex/$estype/$name", + "PUT", + s"/$index/$estype/$name", Map("refresh" -> "false").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -62,11 +63,11 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String case "updated" => (jsonResponse \ "_version").extract[Long] case _ => - throw new IllegalStateException(s"[$result] Failed to update $internalIndex/$estype/$name") + throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") } } catch { case e: IOException => - throw new StorageClientException(s"Failed to update $internalIndex/$estype/$name", e) + throw new StorageClientException(s"Failed to update $index/$estype/$name", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index 93d5d94912..80079e319d 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -18,7 +18,6 @@ package org.apache.predictionio.data.storage.elasticsearch import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity @@ -82,24 +81,22 @@ object ESUtils { def getEvents( client: RestClient, index: String, - estype: String, query: String, size: Int)( implicit formats: Formats): Seq[Event] = { - getDocList(client, index, estype, query, size).map(x => toEvent(x)) + getDocList(client, index, query, size).map(x => toEvent(x)) } def getDocList( client: RestClient, index: String, - estype: String, query: String, size: Int)( implicit formats: Formats): Seq[JValue] = { val entity = new NStringEntity(query, ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/_search", + s"/$index/_search", Map("size" -> s"${size}"), entity) val responseJValue = parse(EntityUtils.toString(response.getEntity)) @@ -110,25 +107,22 @@ object ESUtils { def getAll[T: Manifest]( client: RestClient, index: String, - estype: String, query: String)( implicit formats: Formats): Seq[T] = { - getDocAll(client, index, estype, query).map(x => x.extract[T]) + getDocAll(client, index, query).map(x => x.extract[T]) } def getEventAll( client: RestClient, index: String, - estype: String, query: String)( implicit formats: Formats): Seq[Event] = { - getDocAll(client, index, estype, query).map(x => toEvent(x)) + getDocAll(client, index, query).map(x => toEvent(x)) } def getDocAll( client: RestClient, index: String, - estype: String, query: String)( implicit formats: Formats): Seq[JValue] = { @@ -153,7 +147,7 @@ object ESUtils { val entity = new NStringEntity(query, ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/_search", + s"/$index/_search", Map("scroll" -> scrollLife), entity) val responseJValue = parse(EntityUtils.toString(response.getEntity)) @@ -164,42 +158,52 @@ object ESUtils { def createIndex( client: RestClient, - index: String): Unit = { + index: String, + json: String)( + implicit formats: Formats): String = { client.performRequest( "HEAD", s"/$index", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + Map("include_type_name" -> "false")).getStatusLine.getStatusCode match { case 404 => + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON) client.performRequest( "PUT", s"/$index", - Map.empty[String, String].asJava) + Map("include_type_name" -> "false"), + entity).getStatusLine.getStatusCode match { + case 200 => + "_doc" + case _ => + throw new IllegalStateException(s"/$index is invalid: $json") + } case 200 => + esType(client, index) case _ => - throw new IllegalStateException(s"/$index is invalid.") + throw new IllegalStateException(s"/$index is invalid: $json") } } - def createMapping( + // We cannot have several types within a single index as of ES 6.0, so + // continue to add or update a document under the current type. This code is + // a step towards ES 7.0 support (removal of mapping types). + def esType( client: RestClient, - index: String, - estype: String, - json: String): Unit = { - client.performRequest( - "HEAD", - s"/$index/_mapping/$estype", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { - case 404 => - val entity = new NStringEntity(json, ContentType.APPLICATION_JSON) - client.performRequest( - "PUT", - s"/$index/_mapping/$estype", - Map.empty[String, String].asJava, - entity) - case 200 => - case _ => - throw new IllegalStateException(s"/$index/$estype is invalid: $json") - } + index: String)( + implicit formats: Formats): String = { + val response = client.performRequest( + "GET", + s"/$index", + Map("include_type_name" -> "true")) + response.getStatusLine.getStatusCode match { + case 200 => + (parse(EntityUtils.toString(response.getEntity)) \ index \ "mappings") + .extract[JObject].values.collectFirst { + case (name, _) if name != "_doc" && name != "properties" => name + }.getOrElse("_doc") + case _ => + throw new IllegalStateException(s"/$index is invalid.") + } } def formatUTCDateTime(dt: DateTime): String = { diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala deleted file mode 100644 index 795cf7e290..0000000000 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase.upgrade - -import org.apache.predictionio.annotation.Experimental - -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation -import org.apache.predictionio.data.storage.DataMap - -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.client.HConnection -import org.apache.hadoop.hbase.client.Result -import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.util.Bytes - -import org.joda.time.DateTime -import org.joda.time.DateTimeZone - -import org.json4s.DefaultFormats -import org.json4s.JObject -import org.json4s.native.Serialization.{ read, write } - -import org.apache.commons.codec.binary.Base64 - -import scala.collection.JavaConversions._ - -/** :: Experimental :: */ -@Experimental -object HB_0_8_0 { - - implicit val formats = DefaultFormats - - def getByAppId( - connection: HConnection, - namespace: String, - appId: Int): Iterator[Event] = { - val tableName = TableName.valueOf(namespace, "events") - val table = connection.getTable(tableName) - val start = PartialRowKey(appId) - val stop = PartialRowKey(appId + 1) - val scan = new Scan(start.toBytes, stop.toBytes) - val scanner = table.getScanner(scan) - table.close() - scanner.iterator().map { resultToEvent(_) } - } - - val colNames: Map[String, Array[Byte]] = Map( - "event" -> "e", - "entityType" -> "ety", - "entityId" -> "eid", - "targetEntityType" -> "tety", - "targetEntityId" -> "teid", - "properties" -> "p", - "prId" -> "pk", // columna name is 'pk' in 0.8.0/0.8.1 - "eventTimeZone" -> "etz", - "creationTimeZone" -> "ctz" - ).mapValues(Bytes.toBytes(_)) - - - class RowKey( - val appId: Int, - val millis: Long, - val uuidLow: Long - ) { - lazy val toBytes: Array[Byte] = { - // add UUID least significant bits for multiple actions at the same time - // (UUID's most significant bits are actually timestamp, - // use eventTime instead). - Bytes.toBytes(appId) ++ Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow) - } - override def toString: String = { - Base64.encodeBase64URLSafeString(toBytes) - } - } - - object RowKey { - // get RowKey from string representation - def apply(s: String): RowKey = { - try { - apply(Base64.decodeBase64(s)) - } catch { - case e: Exception => throw new RowKeyException( - s"Failed to convert String ${s} to RowKey because ${e}", e) - } - } - - def apply(b: Array[Byte]): RowKey = { - if (b.size != 20) { - val bString = b.mkString(",") - throw new RowKeyException( - s"Incorrect byte array size. Bytes: ${bString}.") - } - - new RowKey( - appId = Bytes.toInt(b.slice(0, 4)), - millis = Bytes.toLong(b.slice(4, 12)), - uuidLow = Bytes.toLong(b.slice(12, 20)) - ) - } - } - - class RowKeyException(msg: String, cause: Exception) - extends Exception(msg, cause) { - def this(msg: String) = this(msg, null) - } - - case class PartialRowKey(val appId: Int, val millis: Option[Long] = None) { - val toBytes: Array[Byte] = { - Bytes.toBytes(appId) ++ - (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]())) - } - } - - def resultToEvent(result: Result): Event = { - val rowKey = RowKey(result.getRow()) - - val eBytes = Bytes.toBytes("e") - // val e = result.getFamilyMap(eBytes) - - def getStringCol(col: String): String = { - val r = result.getValue(eBytes, colNames(col)) - require(r != null, - s"Failed to get value for column ${col}. " + - s"Rowkey: ${rowKey.toString} " + - s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") - - Bytes.toString(r) - } - - def getOptStringCol(col: String): Option[String] = { - val r = result.getValue(eBytes, colNames(col)) - if (r == null) { - None - } else { - Some(Bytes.toString(r)) - } - } - - def getTimestamp(col: String): Long = { - result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp() - } - - val event = getStringCol("event") - val entityType = getStringCol("entityType") - val entityId = getStringCol("entityId") - val targetEntityType = getOptStringCol("targetEntityType") - val targetEntityId = getOptStringCol("targetEntityId") - val properties: DataMap = getOptStringCol("properties") - .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) - val prId = getOptStringCol("prId") - val eventTimeZone = getOptStringCol("eventTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val creationTimeZone = getOptStringCol("creationTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - - val creationTime: DateTime = new DateTime( - getTimestamp("event"), creationTimeZone - ) - - Event( - eventId = Some(RowKey(result.getRow()).toString), - event = event, - entityType = entityType, - entityId = entityId, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - properties = properties, - eventTime = new DateTime(rowKey.millis, eventTimeZone), - tags = Nil, - prId = prId, - creationTime = creationTime - ) - } -} diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala deleted file mode 100644 index 1759561207..0000000000 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase.upgrade - -import org.apache.predictionio.annotation.Experimental - -import org.apache.predictionio.data.storage.Storage -import org.apache.predictionio.data.storage.hbase.HBLEvents -import org.apache.predictionio.data.storage.hbase.HBEventsUtil - -import scala.collection.JavaConversions._ - -/** :: Experimental :: */ -@Experimental -object Upgrade { - - def main(args: Array[String]) { - val fromAppId = args(0).toInt - val toAppId = args(1).toInt - val batchSize = args.lift(2).map(_.toInt).getOrElse(100) - val fromNamespace = args.lift(3).getOrElse("predictionio_eventdata") - - upgrade(fromAppId, toAppId, batchSize, fromNamespace) - } - - /* For upgrade from 0.8.0 or 0.8.1 to 0.8.2 only */ - def upgrade( - fromAppId: Int, - toAppId: Int, - batchSize: Int, - fromNamespace: String) { - - val events = Storage.getLEvents().asInstanceOf[HBLEvents] - - // Assume already run "pio app new " (new app already created) - // TODO: check if new table empty and warn user if not - val newTable = events.getTable(toAppId) - - val newTableName = newTable.getName().getNameAsString() - println(s"Copying data from ${fromNamespace}:events for app ID ${fromAppId}" - + s" to new HBase table ${newTableName}...") - - HB_0_8_0.getByAppId( - events.client.connection, - fromNamespace, - fromAppId).grouped(batchSize).foreach { eventGroup => - val puts = eventGroup.map{ e => - val (put, rowkey) = HBEventsUtil.eventToPut(e, toAppId) - put - } - newTable.put(puts.toList) - } - - newTable.flushCommits() - newTable.close() - println("Done.") - } - -} diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala deleted file mode 100644 index de74d46dce..0000000000 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase.upgrade - -import org.apache.predictionio.annotation.Experimental - -import grizzled.slf4j.Logger -import org.apache.predictionio.data.storage.Storage -import org.apache.predictionio.data.storage.DataMap -import org.apache.predictionio.data.storage.hbase.HBLEvents -import org.apache.predictionio.data.storage.hbase.HBEventsUtil - -import scala.collection.JavaConversions._ - -import scala.concurrent._ -import ExecutionContext.Implicits.global -import org.apache.predictionio.data.storage.LEvents -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import java.lang.Thread - -object CheckDistribution { - def entityType(eventClient: LEvents, appId: Int) - : Map[(String, Option[String]), Int] = { - eventClient - .find(appId = appId) - .foldLeft(Map[(String, Option[String]), Int]().withDefaultValue(0)) { - case (m, e) => { - val k = (e.entityType, e.targetEntityType) - m.updated(k, m(k) + 1) - } - } - } - - def runMain(appId: Int) { - val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents] - - entityType(eventClient, appId) - .toSeq - .sortBy(-_._2) - .foreach { println } - - } - - def main(args: Array[String]) { - runMain(args(0).toInt) - } - -} - -/** :: Experimental :: */ -@Experimental -object Upgrade_0_8_3 { - val NameMap = Map( - "pio_user" -> "user", - "pio_item" -> "item") - val RevNameMap = NameMap.toSeq.map(_.swap).toMap - - val logger = Logger[this.type] - - def main(args: Array[String]) { - val fromAppId = args(0).toInt - val toAppId = args(1).toInt - - runMain(fromAppId, toAppId) - } - - def runMain(fromAppId: Int, toAppId: Int): Unit = { - upgrade(fromAppId, toAppId) - } - - - val obsEntityTypes = Set("pio_user", "pio_item") - val obsProperties = Set( - "pio_itypes", "pio_starttime", "pio_endtime", - "pio_inactive", "pio_price", "pio_rating") - - def hasPIOPrefix(eventClient: LEvents, appId: Int): Boolean = { - eventClient.find(appId = appId).filter( e => - (obsEntityTypes.contains(e.entityType) || - e.targetEntityType.map(obsEntityTypes.contains(_)).getOrElse(false) || - (!e.properties.keySet.forall(!obsProperties.contains(_))) - ) - ).hasNext - } - - def isEmpty(eventClient: LEvents, appId: Int): Boolean = - !eventClient.find(appId = appId).hasNext - - - def upgradeCopy(eventClient: LEvents, fromAppId: Int, toAppId: Int) { - val fromDist = CheckDistribution.entityType(eventClient, fromAppId) - - logger.info("FromAppId Distribution") - fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } - - val events = eventClient - .find(appId = fromAppId) - .zipWithIndex - .foreach { case (fromEvent, index) => { - if (index % 50000 == 0) { - // logger.info(s"Progress: $fromEvent $index") - logger.info(s"Progress: $index") - } - - - val fromEntityType = fromEvent.entityType - val toEntityType = NameMap.getOrElse(fromEntityType, fromEntityType) - - val fromTargetEntityType = fromEvent.targetEntityType - val toTargetEntityType = fromTargetEntityType - .map { et => NameMap.getOrElse(et, et) } - - val toProperties = DataMap(fromEvent.properties.fields.map { - case (k, v) => - val newK = if (obsProperties.contains(k)) { - val nK = k.stripPrefix("pio_") - logger.info(s"property ${k} will be renamed to ${nK}") - nK - } else k - (newK, v) - }) - - val toEvent = fromEvent.copy( - entityType = toEntityType, - targetEntityType = toTargetEntityType, - properties = toProperties) - - eventClient.insert(toEvent, toAppId) - }} - - - val toDist = CheckDistribution.entityType(eventClient, toAppId) - - logger.info("Recap fromAppId Distribution") - fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } - - logger.info("ToAppId Distribution") - toDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } - - val fromGood = fromDist - .toSeq - .forall { case (k, c) => { - val (et, tet) = k - val net = NameMap.getOrElse(et, et) - val ntet = tet.map(tet => NameMap.getOrElse(tet, tet)) - val nk = (net, ntet) - val nc = toDist.getOrElse(nk, -1) - val checkMatch = (c == nc) - if (!checkMatch) { - logger.info(s"${k} doesn't match: old has ${c}. new has ${nc}.") - } - checkMatch - }} - - val toGood = toDist - .toSeq - .forall { case (k, c) => { - val (et, tet) = k - val oet = RevNameMap.getOrElse(et, et) - val otet = tet.map(tet => RevNameMap.getOrElse(tet, tet)) - val ok = (oet, otet) - val oc = fromDist.getOrElse(ok, -1) - val checkMatch = (c == oc) - if (!checkMatch) { - logger.info(s"${k} doesn't match: new has ${c}. old has ${oc}.") - } - checkMatch - }} - - if (!fromGood || !toGood) { - logger.error("Doesn't match!! There is an import error.") - } else { - logger.info("Count matches. Looks like we are good to go.") - } - } - - /* For upgrade from 0.8.2 to 0.8.3 only */ - def upgrade(fromAppId: Int, toAppId: Int) { - - val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents] - - require(fromAppId != toAppId, - s"FromAppId: $fromAppId must be different from toAppId: $toAppId") - - if (hasPIOPrefix(eventClient, fromAppId)) { - require( - isEmpty(eventClient, toAppId), - s"Target appId: $toAppId is not empty. Please run " + - "`pio app data-delete ` to clean the data before upgrading") - - logger.info(s"$fromAppId isEmpty: " + isEmpty(eventClient, fromAppId)) - - upgradeCopy(eventClient, fromAppId, toAppId) - - } else { - logger.info(s"From appId: ${fromAppId} doesn't contain" - + s" obsolete entityTypes ${obsEntityTypes} or" - + s" obsolete properties ${obsProperties}." - + " No need data migration." - + s" You can continue to use appId ${fromAppId}.") - } - - logger.info("Done.") - } - - -} diff --git a/tests/pio_tests/engines/recommendation-engine/build.sbt b/tests/pio_tests/engines/recommendation-engine/build.sbt index 14454179d5..6f90445dae 100644 --- a/tests/pio_tests/engines/recommendation-engine/build.sbt +++ b/tests/pio_tests/engines/recommendation-engine/build.sbt @@ -15,10 +15,6 @@ * limitations under the License. */ -import AssemblyKeys._ - -assemblySettings - scalaVersion in ThisBuild := sys.env.getOrElse("PIO_SCALA_VERSION", "2.11.12") name := "template-scala-parallel-recommendation" @@ -26,6 +22,6 @@ name := "template-scala-parallel-recommendation" organization := "org.apache.predictionio" libraryDependencies ++= Seq( - "org.apache.predictionio" %% "apache-predictionio-core" % "0.14.0" % "provided", - "org.apache.spark" %% "spark-core" % sys.env.getOrElse("PIO_SPARK_VERSION", "2.1.1") % "provided", - "org.apache.spark" %% "spark-mllib" % sys.env.getOrElse("PIO_SPARK_VERSION", "2.1.1") % "provided") + "org.apache.predictionio" %% "apache-predictionio-core" % "0.15.0-SNAPSHOT" % "provided", + "org.apache.spark" %% "spark-mllib" % sys.env.getOrElse("PIO_SPARK_VERSION", "2.4.3") % "provided") + diff --git a/tests/pio_tests/engines/recommendation-engine/project/assembly.sbt b/tests/pio_tests/engines/recommendation-engine/project/assembly.sbt index 54c32528e9..9c014713d3 100644 --- a/tests/pio_tests/engines/recommendation-engine/project/assembly.sbt +++ b/tests/pio_tests/engines/recommendation-engine/project/assembly.sbt @@ -1 +1 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9") diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala index 1b4c8a86ec..a3691d009a 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala @@ -24,6 +24,7 @@ import grizzled.slf4j.Logging import scala.concurrent.Await import scala.concurrent.duration.Duration +import scala.io.StdIn import scala.language.implicitConversions import scala.sys.process._ @@ -238,7 +239,7 @@ object Pio extends Logging { info(f" ${ch.name}%16s | ${ch.id}%10s") } - val choice = if(force) "YES" else readLine("Enter 'YES' to proceed: ") + val choice = if(force) "YES" else StdIn.readLine("Enter 'YES' to proceed: ") choice match { case "YES" => AppCmd.delete(name) @@ -278,7 +279,7 @@ object Pio extends Logging { info(s" App ID: ${appDesc.app.id}") info(s" Description: ${appDesc.app.description}") - val choice = if(force) "YES" else readLine("Enter 'YES' to proceed: ") + val choice = if(force) "YES" else StdIn.readLine("Enter 'YES' to proceed: ") choice match { case "YES" => AppCmd.dataDelete(name, channel, all) @@ -307,7 +308,7 @@ object Pio extends Logging { info(s" Channel ID: ${chan.id}") info(s" App Name: ${appDesc.app.name}") info(s" App ID: ${appDesc.app.id}") - val choice = if(force) "YES" else readLine("Enter 'YES' to proceed: ") + val choice = if(force) "YES" else StdIn.readLine("Enter 'YES' to proceed: ") choice match { case "YES" => AppCmd.channelDelete(appName, deleteChannel)