@@ -24,22 +24,16 @@ import java.time.Instant
2424import kotlin.reflect.jvm.jvmName
2525
2626open class FormattedPathFactory : RecordPathFactory () {
27- private lateinit var format: String
28- private lateinit var plugins: List <PathFormatterPlugin >
2927 private lateinit var formatter: PathFormatter
3028 private lateinit var properties: Map <String , String >
3129 private var topicFormatters: Map <String , PathFormatter > = emptyMap()
3230
3331 override fun init (properties : Map <String , String >) {
3432 super .init (properties)
3533
36- format = properties[" format" ] ? : DEFAULT_FORMAT
37-
38- plugins = instantiatePlugins(properties[" plugins" ] ? : DEFAULT_FORMAT_PLUGINS , properties)
39-
40- logger.info(" Path formatter uses format '{}' with plugins '{}'" , format, plugins.map { it.name })
41-
42- formatter = PathFormatter (format, plugins)
34+ this .properties = DEFAULTS + properties
35+ formatter = createFormatter(this .properties)
36+ logger.info(" Formatting path with {}" , formatter)
4337 }
4438
4539 private fun instantiatePlugins (
@@ -54,23 +48,20 @@ open class FormattedPathFactory : RecordPathFactory() {
5448 override fun addTopicConfiguration (topicConfig : Map <String , TopicConfig >) {
5549 topicFormatters = topicConfig
5650 .filter { (_, config) -> config.pathProperties.isNotEmpty() }
57- .mapValues { (topic , config) ->
58- val topicFormat = config.pathProperties.getOrDefault( " format " , format )
59- val pluginClassNames = config.pathProperties[ " plugins " ]
60-
61- val topicPlugins = if (pluginClassNames != null ) {
62- instantiatePlugins(pluginClassNames, properties + config.pathProperties)
63- } else plugins
51+ .mapValues { (_ , config) ->
52+ createFormatter(properties + config.pathProperties)
53+ }
54+ .onEach { (topic, formatter) ->
55+ logger.info( " Formatting path of topic {} with {} " , topic, formatter)
56+ }
57+ }
6458
65- logger.info(
66- " Path formatter of topic {} uses format {} with plugins {}" ,
67- topic,
68- topicFormat,
69- topicPlugins.map { it.name }
70- )
59+ private fun createFormatter (properties : Map <String , String >): PathFormatter {
60+ val format = checkNotNull(properties[" format" ])
61+ val pluginClassNames = checkNotNull(properties[" plugins" ])
62+ val plugins = instantiatePlugins(pluginClassNames, properties)
7163
72- PathFormatter (topicFormat, topicPlugins)
73- }
64+ return PathFormatter (format, plugins)
7465 }
7566
7667 override fun getRelativePath (
@@ -88,8 +79,10 @@ open class FormattedPathFactory : RecordPathFactory() {
8879 ): String = sanitizeId(key.get(" sourceId" ), " unknown-source" )
8980
9081 companion object {
91- internal const val DEFAULT_FORMAT = " \$ {projectId}/\$ {userId}/\$ {topic}/\$ {filename}"
92- internal const val DEFAULT_FORMAT_PLUGINS = " fixed time key value"
82+ internal val DEFAULTS = mapOf (
83+ " format" to " \$ {projectId}/\$ {userId}/\$ {topic}/\$ {filename}" ,
84+ " plugins" to " fixed time key value" ,
85+ )
9386 private val logger = LoggerFactory .getLogger(FormattedPathFactory ::class .java)
9487
9588 internal fun String.toPathFormatterPlugin (): PathFormatterPlugin ? = when (this ) {
0 commit comments