Skip to content

Commit fe77270

Browse files
authored
Merge pull request apache-spark-on-k8s#220 from palantir/rk/upstream
2 parents 2f69501 + e3a87ea commit fe77270

File tree

285 files changed

+6053
-3397
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

285 files changed

+6053
-3397
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: SparkR
22
Type: Package
3-
Version: 2.2.0
3+
Version: 2.3.0
44
Title: R Frontend for Apache Spark
55
Description: The SparkR package provides an R Frontend for Apache Spark.
66
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),

R/pkg/NAMESPACE

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ exportMethods("glm",
7575
# Job group lifecycle management methods
7676
export("setJobGroup",
7777
"clearJobGroup",
78-
"cancelJobGroup")
78+
"cancelJobGroup",
79+
"setJobDescription")
7980

8081
# Export Utility methods
8182
export("setLogLevel")

R/pkg/R/SQLContext.R

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ tableToDF <- function(tableName) {
584584
#'
585585
#' @param path The path of files to load
586586
#' @param source The name of external data source
587-
#' @param schema The data schema defined in structType
587+
#' @param schema The data schema defined in structType or a DDL-formatted string.
588588
#' @param na.strings Default string value for NA when source is "csv"
589589
#' @param ... additional external data source specific named properties.
590590
#' @return SparkDataFrame
@@ -600,6 +600,8 @@ tableToDF <- function(tableName) {
600600
#' structField("info", "map<string,double>"))
601601
#' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
602602
#' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
603+
#' stringSchema <- "name STRING, info MAP<STRING, DOUBLE>"
604+
#' df4 <- read.df(mapTypeJsonPath, "json", stringSchema, multiLine = TRUE)
603605
#' }
604606
#' @name read.df
605607
#' @method read.df default
@@ -623,14 +625,19 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
623625
if (source == "csv" && is.null(options[["nullValue"]])) {
624626
options[["nullValue"]] <- na.strings
625627
}
628+
read <- callJMethod(sparkSession, "read")
629+
read <- callJMethod(read, "format", source)
626630
if (!is.null(schema)) {
627-
stopifnot(class(schema) == "structType")
628-
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
629-
source, schema$jobj, options)
630-
} else {
631-
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
632-
source, options)
631+
if (class(schema) == "structType") {
632+
read <- callJMethod(read, "schema", schema$jobj)
633+
} else if (is.character(schema)) {
634+
read <- callJMethod(read, "schema", schema)
635+
} else {
636+
stop("schema should be structType or character.")
637+
}
633638
}
639+
read <- callJMethod(read, "options", options)
640+
sdf <- handledCallJMethod(read, "load")
634641
dataFrame(sdf)
635642
}
636643

@@ -717,8 +724,8 @@ read.jdbc <- function(url, tableName,
717724
#' "spark.sql.sources.default" will be used.
718725
#'
719726
#' @param source The name of external data source
720-
#' @param schema The data schema defined in structType, this is required for file-based streaming
721-
#' data source
727+
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
728+
#' required for file-based streaming data source
722729
#' @param ... additional external data source specific named options, for instance \code{path} for
723730
#' file-based streaming data source
724731
#' @return SparkDataFrame
@@ -733,6 +740,8 @@ read.jdbc <- function(url, tableName,
733740
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
734741
#'
735742
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
743+
#' stringSchema <- "name STRING, info MAP<STRING, DOUBLE>"
744+
#' df1 <- read.stream("json", path = jsonDir, schema = stringSchema, maxFilesPerTrigger = 1)
736745
#' }
737746
#' @name read.stream
738747
#' @note read.stream since 2.2.0
@@ -750,10 +759,15 @@ read.stream <- function(source = NULL, schema = NULL, ...) {
750759
read <- callJMethod(sparkSession, "readStream")
751760
read <- callJMethod(read, "format", source)
752761
if (!is.null(schema)) {
753-
stopifnot(class(schema) == "structType")
754-
read <- callJMethod(read, "schema", schema$jobj)
762+
if (class(schema) == "structType") {
763+
read <- callJMethod(read, "schema", schema$jobj)
764+
} else if (is.character(schema)) {
765+
read <- callJMethod(read, "schema", schema)
766+
} else {
767+
stop("schema should be structType or character.")
768+
}
755769
}
756770
read <- callJMethod(read, "options", options)
757771
sdf <- handledCallJMethod(read, "load")
758-
dataFrame(callJMethod(sdf, "toDF"))
772+
dataFrame(sdf)
759773
}

0 commit comments

Comments
 (0)