|
19 | 19 |
|
20 | 20 | import org.apache.kafka.common.Configurable; |
21 | 21 | import org.apache.kafka.common.config.AbstractConfig; |
| 22 | +import org.apache.kafka.common.config.ConfigData; |
22 | 23 | import org.apache.kafka.common.config.ConfigDef; |
23 | 24 | import org.apache.kafka.common.config.ConfigException; |
24 | 25 | import org.apache.kafka.common.config.provider.ConfigProvider; |
25 | 26 | import org.apache.kafka.common.internals.Plugin; |
| 27 | +import org.apache.kafka.common.metrics.Metrics; |
| 28 | +import org.apache.kafka.common.metrics.Monitorable; |
| 29 | +import org.apache.kafka.common.metrics.PluginMetrics; |
26 | 30 | import org.apache.kafka.common.utils.LogCaptureAppender; |
27 | 31 | import org.apache.kafka.common.utils.Utils; |
28 | 32 | import org.apache.kafka.connect.components.Versioned; |
|
52 | 56 | import org.junit.jupiter.api.Test; |
53 | 57 |
|
54 | 58 | import java.io.File; |
| 59 | +import java.io.IOException; |
55 | 60 | import java.net.MalformedURLException; |
56 | 61 | import java.net.URL; |
57 | 62 | import java.net.URLClassLoader; |
@@ -398,6 +403,16 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() { |
398 | 403 | assertPluginClassLoaderAlwaysActive(plugin.get()); |
399 | 404 | } |
400 | 405 |
|
| 406 | + @Test |
| 407 | + public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { |
| 408 | + String providerName = "monitorable"; |
| 409 | + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; |
| 410 | + props.put(providerPrefix + ".class", MonitorableConfigProvider .class.getName()); |
| 411 | + createConfig(); |
| 412 | + Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); |
| 413 | + assertInstanceOf(MonitorableConfigProvider.class, plugin.get()); |
| 414 | + } |
| 415 | + |
401 | 416 | @Test |
402 | 417 | public void newHeaderConverterShouldConfigureWithPluginClassLoader() { |
403 | 418 | props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestPlugin.SAMPLING_HEADER_CONVERTER.className()); |
@@ -793,4 +808,33 @@ public void configure(Map<String, ?> configs) { |
793 | 808 | super.configure(configs); |
794 | 809 | } |
795 | 810 | } |
| 811 | + |
| 812 | + public static class MonitorableConfigProvider implements ConfigProvider, Monitorable { |
| 813 | + private boolean configured = false; |
| 814 | + |
| 815 | + @Override |
| 816 | + public void withPluginMetrics(PluginMetrics metrics) { |
| 817 | + assertTrue(configured); |
| 818 | + } |
| 819 | + |
| 820 | + @Override |
| 821 | + public ConfigData get(String path) { |
| 822 | + return null; |
| 823 | + } |
| 824 | + |
| 825 | + @Override |
| 826 | + public ConfigData get(String path, Set<String> keys) { |
| 827 | + return null; |
| 828 | + } |
| 829 | + |
| 830 | + @Override |
| 831 | + public void close() throws IOException { |
| 832 | + } |
| 833 | + |
| 834 | + @Override |
| 835 | + public void configure(Map<String, ?> configs) { |
| 836 | + configured = true; |
| 837 | + } |
| 838 | + } |
| 839 | + |
796 | 840 | } |
0 commit comments