Skip to content

Commit 3c08cb2

Browse files
format with new version
1 parent 42f63d5 commit 3c08cb2

24 files changed

+113
-139
lines changed

.scalafmt.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
style = defaultWithAlign
22
maxColumn = 120
3-
version = 3.8.3
3+
version = 3.8.4
44
assumeStandardLibraryStripMargin = true
55
align.stripMargin = true
66
runner.dialect = scala3

modules/examples/src/main/scala/org/example/SocketTextStreamWordCount.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,11 @@ import org.apache.flink.configuration.ConfigConstants
2525
import org.apache.flink.configuration.RestOptions.BIND_PORT
2626
import scala.jdk.CollectionConverters.*
2727

28-
/** This example shows an implementation of WordCount with data from a text
29-
* socket. To run the example make sure that the service providing the text
30-
* data is already up and running.
28+
/** This example shows an implementation of WordCount with data from a text socket. To run the example make sure that
29+
* the service providing the text data is already up and running.
3130
*
32-
* To start an example socket text stream on your local machine run netcat from
33-
* a command line, where the parameter specifies the port number:
31+
* To start an example socket text stream on your local machine run netcat from a command line, where the parameter
32+
* specifies the port number:
3433
*
3534
* {{{
3635
* nc -lk 9999

modules/examples/src/main/scala/org/example/TransactonIOs.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ object TransactionsSource:
7676
def iterator: FromIteratorFunction[Transaction] =
7777
FromIteratorFunction[Transaction](
7878
(new Iterator[Transaction] with Serializable:
79-
var rows = data.iterator
80-
var timestamp = Timestamp.valueOf("2019-01-01 00:00:00").getTime
79+
var rows = data.iterator
80+
var timestamp = Timestamp.valueOf("2019-01-01 00:00:00").getTime
8181
val sixMinutes = 6.minutes.toMillis
8282

8383
override def hasNext: Boolean = rows.hasNext

modules/examples/src/main/scala/org/example/connectedStreams.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ import org.apache.flinkx.api.serializers.*
1919
val streamOfWords = env
2020
.addSource(TransactionsSource.iterator)
2121
.keyBy(_.accountId)
22-
22+
2323
control
2424
.connect(streamOfWords)
2525
.flatMap(ControlFunction())
2626
.print()
2727

2828
env.execute()
2929

30-
class ControlFunction
31-
extends RichCoFlatMapFunction[Transaction, Transaction, Transaction]:
30+
class ControlFunction extends RichCoFlatMapFunction[Transaction, Transaction, Transaction]:
3231

3332
@transient lazy val state: ValueState[Double] = getRuntimeContext.getState(
3433
new ValueStateDescriptor(

modules/examples/src/main/scala/org/example/fileFilter.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class MyDefaultFileFilter extends Predicate[Path]:
2828

2929
@main def filterFiles =
3030
val currentDirectory = File(".").getCanonicalPath
31-
val inputBasePath = Path(s"$currentDirectory/input-table")
31+
val inputBasePath = Path(s"$currentDirectory/input-table")
3232
val fileSourceBuilder =
3333
FileSource.forRecordStreamFormat(
3434
TextLineInputFormat(),
@@ -37,9 +37,7 @@ class MyDefaultFileFilter extends Predicate[Path]:
3737

3838
val fileSource = fileSourceBuilder
3939
.monitorContinuously(Duration.ofSeconds(2))
40-
.setFileEnumerator(() =>
41-
NonSplittingRecursiveEnumerator(MyDefaultFileFilter())
42-
)
40+
.setFileEnumerator(() => NonSplittingRecursiveEnumerator(MyDefaultFileFilter()))
4341
.build()
4442
val env =
4543
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(Configuration())

modules/examples/src/main/scala/org/example/fraud/FraudDetectionJob.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package org.example.fraud
1818
* limitations under the License.
1919
*/
2020

21-
2221
import java.io.File
2322

2423
import org.apache.flinkx.api.*
@@ -73,7 +72,7 @@ import Givens.given
7372
conf.setString("execution.checkpointing.min-pause", "3s")
7473
conf.setString("state.backend", "filesystem")
7574

76-
val env = StreamExecutionEnvironment.getExecutionEnvironment //.createLocalEnvironmentWithWebUI(conf)
75+
val env = StreamExecutionEnvironment.getExecutionEnvironment // .createLocalEnvironmentWithWebUI(conf)
7776

7877
val transactions = env
7978
.addSource(TransactionsSource.iterator)
@@ -94,21 +93,20 @@ import Givens.given
9493

9594
@main def fraudDetectionState() =
9695
val env = StreamExecutionEnvironment.getExecutionEnvironment
97-
val savepoint = SavepointReader.read(env.getJavaEnv, "///tmp/savepoints/savepoint-827976-a94a8feb6c07",
98-
HashMapStateBackend())
99-
val keyedState = savepoint.readKeyedState("fraud-state", ReaderFunction(), TypeInformation.of(classOf[Long]), keyedStateInfo)
96+
val savepoint =
97+
SavepointReader.read(env.getJavaEnv, "///tmp/savepoints/savepoint-827976-a94a8feb6c07", HashMapStateBackend())
98+
val keyedState =
99+
savepoint.readKeyedState("fraud-state", ReaderFunction(), TypeInformation.of(classOf[Long]), keyedStateInfo)
100100
keyedState.print()
101101
env.execute()
102102

103103
case class MaxTransaction(amount: Double, timestamp: Long)
104104

105-
class MaxAggregate
106-
extends AggregateFunction[Transaction, MaxTransaction, MaxTransaction]:
105+
class MaxAggregate extends AggregateFunction[Transaction, MaxTransaction, MaxTransaction]:
107106
override def createAccumulator(): MaxTransaction = MaxTransaction(0d, 0L)
108107

109108
override def add(value: Transaction, accumulator: MaxTransaction): MaxTransaction =
110-
if value.amount > accumulator._1 then
111-
MaxTransaction(value.amount, value.timestamp)
109+
if value.amount > accumulator._1 then MaxTransaction(value.amount, value.timestamp)
112110
else accumulator
113111

114112
override def getResult(accumulator: MaxTransaction): MaxTransaction =

modules/examples/src/main/scala/org/example/fraud/FraudDetector.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ case class FraudStateVars(
5555
timerState.clear()
5656

5757
object FraudDetector:
58-
val SmallAmount = 1.00
59-
val LargeAmount = 500.00
58+
val SmallAmount = 1.00
59+
val LargeAmount = 500.00
6060
val OneMinute: Long = 1.minute.toMillis
6161

6262
def readState(context: RuntimeContext): FraudStateVars =

modules/examples/src/main/scala/org/example/fraud/RunningAverage.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
99
import org.apache.flink.configuration.Configuration
1010
import org.example.Transaction
1111

12-
class RunningAverage
13-
extends RichMapFunction[Transaction, (Transaction, Double)]:
12+
class RunningAverage extends RichMapFunction[Transaction, (Transaction, Double)]:
1413

1514
given tranTypeInfo: TypeInformation[Transaction] =
1615
TypeInformation.of(classOf[Transaction])

modules/examples/src/main/scala/org/example/runningSum.scala

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ final case class TestEvent(
3434
bag: List[Int] = Nil
3535
)
3636

37-
class CustomEventTimeTrigger[T, W <: TimeWindow](trigger: EventTimeTrigger)
38-
extends Trigger[T, W]:
37+
class CustomEventTimeTrigger[T, W <: TimeWindow](trigger: EventTimeTrigger) extends Trigger[T, W]:
3938

4039
override def onElement(
4140
element: T,
@@ -87,8 +86,7 @@ def windowAction(
8786
val output =
8887
reduced.copy(
8988
windowStart = window.getStart,
90-
runningCount =
91-
if reduced.runningCount > 0 then reduced.runningCount else 1
89+
runningCount = if reduced.runningCount > 0 then reduced.runningCount else 1
9290
)
9391
println(
9492
s"\n{start: ${window.getStart} .. end: ${window.getEnd}, count: ${output.runningCount} \ninput: "
@@ -111,13 +109,11 @@ def reduceEvents(a: TestEvent, b: TestEvent) =
111109
@main def runningWindowedSum =
112110
val env = StreamExecutionEnvironment.getExecutionEnvironment
113111

114-
val windowSize = Time.of(10, TimeUnit.SECONDS)
112+
val windowSize = Time.of(10, TimeUnit.SECONDS)
115113
val windowSlide = Time.of(2, TimeUnit.SECONDS)
116114
val watermarkStrategy = WatermarkStrategy
117115
.forBoundedOutOfOrderness[TestEvent](Duration.ofSeconds(1000))
118-
.withTimestampAssigner((event: TestEvent, streamRecordTimestamp: Long) =>
119-
event.timestamp
120-
)
116+
.withTimestampAssigner((event: TestEvent, streamRecordTimestamp: Long) => event.timestamp)
121117

122118
env
123119
.fromElements(
@@ -133,11 +129,10 @@ def reduceEvents(a: TestEvent, b: TestEvent) =
133129

134130
env.execute()
135131

136-
class RunningCountFunc(windowSize: Duration)
137-
extends KeyedProcessFunction[Long, TestEvent, TestEvent]:
132+
class RunningCountFunc(windowSize: Duration) extends KeyedProcessFunction[Long, TestEvent, TestEvent]:
138133

139-
val oldEntriesCleanupInterval = 1000L
140-
var minTimestamp: ValueState[Long] = _
134+
val oldEntriesCleanupInterval = 1000L
135+
var minTimestamp: ValueState[Long] = _
141136
var timeToCount: MapState[Long, Long] = _
142137
override def open(parameters: Configuration): Unit =
143138
timeToCount = getRuntimeContext.getMapState(
@@ -158,8 +153,7 @@ class RunningCountFunc(windowSize: Duration)
158153
out: Collector[TestEvent]
159154
): Unit =
160155
val currentCount =
161-
if timeToCount.contains(event.timestamp) then
162-
timeToCount.get(event.timestamp)
156+
if timeToCount.contains(event.timestamp) then timeToCount.get(event.timestamp)
163157
else 0
164158
timeToCount.put(event.timestamp, currentCount + 1)
165159

modules/examples/src/main/scala/org/example/troubleshooting/fakeKafkaSource.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ class FakeKafkaSource(
3737
(0 to FakeKafkaSource.NO_OF_PARTITIONS).filter(
3838
_ % numberOfParallelSubtasks == indexOfThisSubtask
3939
)
40-
40+
4141
val rand = Random(seed)
4242

4343
@transient @volatile var cancelled = false
4444

4545
override def open(parameters: Configuration): Unit =
4646
println(s"Now reading from partitions: $assignedPartitions")
47-
47+
4848
override def run(ctx: SourceContext[FakeKafkaRecord]): Unit =
4949
if assignedPartitions.nonEmpty then
5050
while !cancelled do {
@@ -61,8 +61,7 @@ class FakeKafkaSource(
6161
var serializedMeasurement =
6262
serializedMeasurements(rand.nextInt(serializedMeasurements.length))
6363

64-
if rand.nextFloat() > 1 - poisonPillRate then
65-
serializedMeasurement = Arrays.copyOf(serializedMeasurement, 10)
64+
if rand.nextFloat() > 1 - poisonPillRate then serializedMeasurement = Arrays.copyOf(serializedMeasurement, 10)
6665

6766
(ctx.getCheckpointLock()).synchronized {
6867
ctx.collect(

0 commit comments

Comments
 (0)