2828import static com .marklogic .kafka .connect .sink .MarkLogicSinkConfig .*;
2929
3030/**
31- * Base class for any test that wishes to connect to the "kafka-test-test-content" app server on port 8019.
32- * AbstractSpringMarkLogicTest assumes it can find mlHost/mlTestRestPort/mlUsername/mlPassword properties in
33- * gradle.properties and gradle-local.properties. It uses those to construct a DatabaseClient which can be fetched
31+ * Base class for any test that wishes to connect to the
32+ * "kafka-test-test-content" app server on port 8019.
33+ * AbstractSpringMarkLogicTest assumes it can find
34+ * mlHost/mlTestRestPort/mlUsername/mlPassword properties in
35+ * gradle.properties and gradle-local.properties. It uses those to construct a
36+ * DatabaseClient which can be fetched
3437 * via getDatabaseClient().
3538 */
3639public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTest {
@@ -39,31 +42,34 @@ public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTes
3942 @ Autowired
4043 SimpleTestConfig testConfig ;
4144
42- private final static long DEFAULT_RETRY_SLEEP_TIME = 250 ;
43- private final static int DEFAULT_RETRY_ATTEMPTS = 10 ;
4445 private Map <String , Object > taskConfig = new HashMap <>();
4546
4647 /**
47- * @param configParamNamesAndValues - Configuration values that need to be set for the test.
48- * @return a MarkLogicSinkTask based on the default connection config and any optional config params provided by
49- * the caller
48+ * @param configParamNamesAndValues - Configuration values that need to be set
49+ * for the test.
50+ * @return a MarkLogicSinkTask based on the default connection config and any
51+ * optional config params provided by
52+ * the caller
5053 */
5154 protected AbstractSinkTask startSinkTask (String ... configParamNamesAndValues ) {
5255 return startSinkTask (null , configParamNamesAndValues );
5356 }
5457
55- protected AbstractSinkTask startSinkTask (BiConsumer <SinkRecord , Throwable > errorReporterMethod , String ... configParamNamesAndValues ) {
58+ protected AbstractSinkTask startSinkTask (BiConsumer <SinkRecord , Throwable > errorReporterMethod ,
59+ String ... configParamNamesAndValues ) {
5660 Map <String , String > config = newMarkLogicConfig (testConfig );
5761 config .put (MarkLogicSinkConfig .DOCUMENT_PERMISSIONS , "rest-reader,read,rest-writer,update" );
5862 for (int i = 0 ; i < configParamNamesAndValues .length ; i += 2 ) {
5963 config .put (configParamNamesAndValues [i ], configParamNamesAndValues [i + 1 ]);
6064 }
6165 taskConfig .putAll (config );
6266 if (taskConfig .containsKey (DMSDK_INCLUDE_KAFKA_METADATA )) {
63- taskConfig .put (DMSDK_INCLUDE_KAFKA_METADATA , Boolean .valueOf ((String ) taskConfig .get (DMSDK_INCLUDE_KAFKA_METADATA )));
67+ taskConfig .put (DMSDK_INCLUDE_KAFKA_METADATA ,
68+ Boolean .valueOf ((String ) taskConfig .get (DMSDK_INCLUDE_KAFKA_METADATA )));
6469 }
6570 if (taskConfig .containsKey (DOCUMENT_COLLECTIONS_ADD_TOPIC )) {
66- taskConfig .put (DOCUMENT_COLLECTIONS_ADD_TOPIC , Boolean .valueOf ((String ) taskConfig .get (DOCUMENT_COLLECTIONS_ADD_TOPIC )));
71+ taskConfig .put (DOCUMENT_COLLECTIONS_ADD_TOPIC ,
72+ Boolean .valueOf ((String ) taskConfig .get (DOCUMENT_COLLECTIONS_ADD_TOPIC )));
6773 }
6874
6975 MarkLogicSinkConnector connector = new MarkLogicSinkConnector ();
@@ -92,31 +98,6 @@ protected void putAndFlushRecords(AbstractSinkTask task, SinkRecord... records)
9298 task .flush (new HashMap <>());
9399 }
94100
95- protected final void retryIfNotSuccessful (Runnable r ) {
96- retryIfNotSuccessful (r , DEFAULT_RETRY_SLEEP_TIME , DEFAULT_RETRY_ATTEMPTS );
97- }
98-
99- @ SuppressWarnings ("java:S2925" ) // We're fine with the sleep call here, due to the nature of testing with kafka-junit
100- protected final void retryIfNotSuccessful (Runnable r , long sleepTime , int attempts ) {
101- for (int i = 1 ; i <= attempts ; i ++) {
102- logger .info ("Trying assertion, attempt " + i + " out of " + attempts );
103- try {
104- r .run ();
105- return ;
106- } catch (Throwable ex ) {
107- if (i == attempts ) {
108- throw ex ;
109- }
110- logger .info ("Assertion failed: " + ex .getMessage () + "; will sleep for " + sleepTime + " ms and try again" );
111- try {
112- Thread .sleep (sleepTime );
113- } catch (InterruptedException e ) {
114- // Ignore, not expected during a test
115- }
116- }
117- }
118- }
119-
120101 protected Map <String , Object > getTaskConfig () {
121102 return taskConfig ;
122103 }
0 commit comments