Skip to content

Commit 69261e5

Browse files
extend Scala API
1 parent 63b1934 commit 69261e5

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,19 @@ class DataStream[T](stream: JavaStream[T]) {
609609
}
610610
filter(filterFun)
611611
}
612+
613+
/** Creates a new DataStream that contains only the elements not satisfying the given filter predicate.
614+
*/
615+
def filterNot(fun: T => Boolean): DataStream[T] = {
616+
if (fun == null) {
617+
throw new NullPointerException("FilteNot function must not be null.")
618+
}
619+
val cleanFun = clean(fun)
620+
val filterFun = new FilterFunction[T] {
621+
def filter(in: T): Boolean = !cleanFun(in)
622+
}
623+
filter(filterFun)
624+
}
612625

613626
/** Windows this [[DataStream]] into sliding count windows.
614627
*

modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,15 @@ object StreamExecutionEnvironment {
825825
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
826826
}
827827

828+
/** Creates an execution environment that represents the context in which the program is currently executed.
829+
*
830+
* @param configuration
831+
* Pass a custom configuration into the cluster.
832+
*/
833+
def getExecutionEnvironment(configuration: Configuration): StreamExecutionEnvironment = {
834+
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(configuration))
835+
}
836+
828837
// --------------------------------------------------------------------------
829838
// local environment
830839
// --------------------------------------------------------------------------

0 commit comments

Comments
 (0)