Skip to content

Commit e50938d

Browse files
committed
Remove stream parallel setting and optimize imports
1 parent bcb43cb commit e50938d

File tree

14 files changed

+31
-25
lines changed

14 files changed

+31
-25
lines changed

src/main/java/org/radarbase/output/Application.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ package org.radarbase.output
1818

1919
import com.beust.jcommander.JCommander
2020
import com.beust.jcommander.ParameterException
21-
import kotlinx.coroutines.*
21+
import kotlinx.coroutines.launch
22+
import kotlinx.coroutines.runBlocking
2223
import kotlinx.coroutines.sync.Mutex
2324
import kotlinx.coroutines.sync.Semaphore
24-
import kotlinx.coroutines.sync.withLock
2525
import org.radarbase.output.accounting.*
2626
import org.radarbase.output.cleaner.SourceDataCleaner
2727
import org.radarbase.output.compression.Compression
@@ -33,7 +33,6 @@ import org.radarbase.output.source.SourceStorage
3333
import org.radarbase.output.source.SourceStorageFactory
3434
import org.radarbase.output.target.TargetStorage
3535
import org.radarbase.output.target.TargetStorageFactory
36-
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
3736
import org.radarbase.output.util.Timer
3837
import org.radarbase.output.worker.FileCacheStore
3938
import org.radarbase.output.worker.Job
@@ -89,8 +88,6 @@ class Application(
8988
override fun newFileCacheStore(accountant: Accountant) = FileCacheStore(this, accountant)
9089

9190
fun start() {
92-
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",
93-
(config.worker.numThreads - 1).toString())
9491
System.setProperty("kotlinx.coroutines.scheduler.max.pool.size",
9592
config.worker.numThreads.toString())
9693
System.setProperty("kotlinx.coroutines.scheduler.core.pool.size",

src/main/java/org/radarbase/output/accounting/OffsetPersistenceFactory.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package org.radarbase.output.accounting
1818

1919
import kotlinx.coroutines.CoroutineScope
2020
import org.radarbase.output.util.SuspendedCloseable
21-
import java.io.Closeable
22-
import java.io.Flushable
2321
import java.nio.file.Path
2422

2523
/**

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package org.radarbase.output.cleaner
22

3-
import kotlinx.coroutines.*
3+
import kotlinx.coroutines.SupervisorJob
4+
import kotlinx.coroutines.coroutineScope
5+
import kotlinx.coroutines.launch
46
import kotlinx.coroutines.sync.Mutex
57
import kotlinx.coroutines.sync.withPermit
8+
import kotlinx.coroutines.withContext
69
import org.radarbase.output.Application.Companion.format
710
import org.radarbase.output.FileStoreFactory
811
import org.radarbase.output.accounting.Accountant

src/main/java/org/radarbase/output/format/CsvAvroConverterFactory.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import org.radarbase.output.util.TimeUtil.parseDateTime
1111
import org.radarbase.output.util.TimeUtil.parseTime
1212
import org.radarbase.output.util.TimeUtil.toDouble
1313
import org.slf4j.LoggerFactory
14-
import java.io.*
14+
import java.io.IOException
15+
import java.io.InputStream
16+
import java.io.Reader
17+
import java.io.Writer
1518
import java.nio.file.Path
1619
import kotlin.io.path.inputStream
1720
import kotlin.io.path.outputStream

src/main/java/org/radarbase/output/format/RecordConverterFactory.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.radarbase.output.compression.Compression
2323
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
2424
import java.io.*
2525
import java.nio.file.Path
26-
import java.util.*
2726
import java.util.regex.Pattern
2827
import kotlin.collections.component1
2928
import kotlin.collections.component2

src/main/java/org/radarbase/output/source/S3SourceStorage.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@ package org.radarbase.output.source
33
import io.minio.*
44
import io.minio.errors.ErrorResponseException
55
import io.minio.messages.Tags
6-
import kotlinx.coroutines.*
7-
import kotlinx.coroutines.flow.*
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.delay
8+
import kotlinx.coroutines.flow.first
9+
import kotlinx.coroutines.flow.flow
10+
import kotlinx.coroutines.flow.flowOn
11+
import kotlinx.coroutines.flow.retryWhen
12+
import kotlinx.coroutines.withContext
813
import org.apache.avro.file.SeekableFileInput
914
import org.apache.avro.file.SeekableInput
1015
import org.radarbase.output.config.S3Config

src/main/java/org/radarbase/output/source/SourceStorage.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package org.radarbase.output.source
22

33
import org.apache.avro.file.SeekableInput
4-
import org.radarbase.output.accounting.TopicPartitionOffsetRange
5-
import org.radarbase.output.util.*
64
import org.radarbase.output.util.AvroFileLister.Companion.avroFileTreeLister
75
import org.radarbase.output.util.AvroTopicLister.Companion.avroTopicTreeLister
6+
import org.radarbase.output.util.SuspendedCloseable
7+
import org.radarbase.output.util.TopicPath
88
import java.nio.file.Path
99
import java.time.Instant
1010

src/main/java/org/radarbase/output/target/S3TargetStorage.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.radarbase.output.target
1818

1919
import io.minio.*
20-
import io.minio.errors.ErrorResponseException
2120
import org.radarbase.output.config.S3Config
2221
import org.radarbase.output.source.S3SourceStorage.Companion.faultTolerant
2322
import org.radarbase.output.util.bucketBuild

src/main/java/org/radarbase/output/util/PostponedWriter.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package org.radarbase.output.util
1919
import kotlinx.coroutines.*
2020
import kotlinx.coroutines.sync.Mutex
2121
import kotlinx.coroutines.sync.withLock
22-
import org.slf4j.LoggerFactory
2322
import java.io.IOException
24-
import java.util.concurrent.*
25-
import java.util.concurrent.CancellationException
23+
import java.util.concurrent.TimeUnit
2624

2725
/**
2826
* File writer where data is written in a separate thread with a timeout.

src/main/java/org/radarbase/output/util/Timer.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package org.radarbase.output.util
1818

1919
import org.radarbase.output.util.ProgressBar.Companion.appendTime
2020
import java.time.Duration
21-
import java.util.*
2221
import java.util.concurrent.ConcurrentHashMap
2322
import java.util.concurrent.ConcurrentMap
2423
import java.util.concurrent.atomic.LongAdder

0 commit comments

Comments
 (0)