From dbe1be8225d7be6101432cd7595bec554521ef23 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sat, 27 Dec 2025 15:27:15 +0000 Subject: [PATCH 01/25] Refactor SQL module to use PreparedStatement in SQLSpout and IndexerBolt for improved readability and performance --- .../apache/stormcrawler/sql/IndexerBolt.java | 95 +++++---- .../org/apache/stormcrawler/sql/SQLSpout.java | 187 ++++++++++-------- .../stormcrawler/sql/StatusUpdaterBolt.java | 91 +++++---- 3 files changed, 199 insertions(+), 174 deletions(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index 89b711f32..25bd39d14 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -21,7 +21,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; @@ -74,7 +77,7 @@ public void execute(Tuple tuple) { String normalisedurl = valueForURL(tuple); Metadata metadata = (Metadata) tuple.getValueByField("metadata"); - String text = tuple.getStringByField("text"); + tuple.getStringByField("text"); boolean keep = filterDocument(metadata); if (!keep) { @@ -90,35 +93,9 @@ public void execute(Tuple tuple) { // which metadata to display? Map keyVals = filterMetadata(metadata); + List keys = new ArrayList<>(keyVals.keySet()); - StringBuilder query = - new StringBuilder(" insert into ") - .append(tableName) - .append(" (") - .append(fieldNameForURL()); - - Object[] keys = keyVals.keySet().toArray(); - - for (Object o : keys) { - query.append(", ").append((String) o); - } - - query.append(") values(?"); - - for (int i = 0; i < keys.length; i++) { - query.append(", ?"); - } - - query.append(")"); - - query.append(" ON DUPLICATE KEY UPDATE "); - for (int i = 0; i < keys.length; i++) { - String key = (String) keys[i]; - if (i > 0) { - query.append(", "); - } - query.append(key).append("=VALUES(").append(key).append(")"); - } + String query = getQuery(keys); if (connection == null) { try { @@ -131,30 +108,30 @@ public void execute(Tuple tuple) { LOG.debug("PreparedStatement => {}", query); - // create the mysql insert preparedstatement - PreparedStatement preparedStmt = connection.prepareStatement(query.toString()); - - // TODO store the text of the document? - if (StringUtils.isNotBlank(fieldNameForText())) { - // builder.field(fieldNameForText(), trimText(text)); - } + // create the mysql insert PreparedStatement - // send URL as field? - if (fieldNameForURL() != null) { - preparedStmt.setString(1, normalisedurl); - } + try (PreparedStatement preparedStmt = connection.prepareStatement(query)) { + // TODO store the text of the document? + if (StringUtils.isNotBlank(fieldNameForText())) { + // builder.field(fieldNameForText(), trimText(text)); + } - for (int i = 0; i < keys.length; i++) { - insert(preparedStmt, i + 2, (String) keys[i], keyVals); - } + // send URL as field? + if (fieldNameForURL() != null) { + preparedStmt.setString(1, normalisedurl); + } - preparedStmt.executeUpdate(); + for (int i = 0; i < keys.size(); i++) { + insert(preparedStmt, i + 2, keys.get(i), keyVals); + preparedStmt.addBatch(); + } + preparedStmt.executeBatch(); - eventCounter.scope("Indexed").incrBy(1); - - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); - _collector.ack(tuple); + eventCounter.scope("Indexed").incrBy(1); + _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + _collector.ack(tuple); + } } catch (Exception e) { // do not send to status stream so that it gets replayed LOG.error("Error inserting into SQL", e); @@ -188,4 +165,26 @@ private void insert( } preparedStmt.setString(position, value); } + + private String getQuery(final List keys) { + final String columns = String.join(", ", keys); + final String placeholders = keys.stream().map(k -> "?").collect(Collectors.joining(", ")); + + final String updates = + keys.stream() + .map(k -> String.format("%s=VALUES(%s)", k, k)) + .collect(Collectors.joining(", ")); + + return String.format( + """ + INSERT INTO %s (%s%s) + VALUES (?%s) + ON DUPLICATE KEY UPDATE %s + """, + tableName, + fieldNameForURL(), + columns.isEmpty() ? "" : ", " + columns, + placeholders.isEmpty() ? "" : ", " + placeholders, + updates); + } } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index 084c5de9c..4d000fe91 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -19,9 +19,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -43,7 +43,36 @@ public class SQLSpout extends AbstractQueryingSpout { private static final Scheme SCHEME = new StringTabScheme(); - private String tableName; + private static final String BASE_SQL = + """ + SELECT * + FROM ( + SELECT + rank() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, + url, + metadata, + nextfetchdate + FROM %s + WHERE nextfetchdate <= ? %s + ) AS urls_ranks + WHERE urls_ranks.ranking <= ? + ORDER BY ranking %s + """; + + private static final String BUCKET_CLAUSE = + """ + AND bucket = ? + """; + + private static final String LIMIT_CLAUSE = + """ + LIMIT ? + """; + + private static final String URL_COLUMN = "url"; + private static final String METADATA_COLUMN = "metadata"; + + private static String preparedSql; private Connection connection; @@ -70,7 +99,8 @@ public void open( maxDocsPerBucket = ConfUtils.getInt(conf, Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5); - tableName = ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); + final String tableName = + ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); maxNumResults = ConfUtils.getInt(conf, Constants.SQL_MAXRESULTS_PARAM_NAME, 100); @@ -88,6 +118,11 @@ public void open( "[" + context.getThisComponentId() + " #" + context.getThisTaskIndex() + "] "; bucketNum = context.getThisTaskIndex(); } + + final String bucketClause = (bucketNum >= 0) ? BUCKET_CLAUSE : ""; + final String limitClause = (maxNumResults != -1) ? LIMIT_CLAUSE : ""; + + preparedSql = String.format(BASE_SQL, tableName, bucketClause, limitClause); } @Override @@ -117,97 +152,41 @@ protected void populateBuffer() { // https://mariadb.com/kb/en/library/window-functions-overview/ // http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/ - String query = - "SELECT * from (select rank() over (partition by host order by nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from " - + tableName; - - query += - " WHERE nextfetchdate <= '" + new Timestamp(lastNextFetchDate.toEpochMilli()) + "'"; - - // constraint on bucket num - if (bucketNum >= 0) { - query += " AND bucket = '" + bucketNum + "'"; - } - - query += - ") as urls_ranks where (urls_ranks.ranking <= " - + maxDocsPerBucket - + ") order by ranking"; - - if (maxNumResults != -1) { - query += " LIMIT " + this.maxNumResults; - } - int alreadyprocessed = 0; int numhits = 0; long timeStartQuery = System.currentTimeMillis(); - // create the java statement - Statement st = null; - ResultSet rs = null; - try { - st = this.connection.createStatement(); + try (PreparedStatement ps = connection.prepareStatement(preparedSql)) { + int i = 1; + ps.setTimestamp(i++, new Timestamp(lastNextFetchDate.toEpochMilli())); - // dump query to log - LOG.debug("{} SQL query {}", logIdprefix, query); - - // execute the query, and get a java resultset - rs = st.executeQuery(query); - - long timeTaken = System.currentTimeMillis() - timeStartQuery; - queryTimes.addMeasurement(timeTaken); - - // iterate through the java resultset - while (rs.next()) { - String url = rs.getString("url"); - numhits++; - // already processed? skip - if (beingProcessed.containsKey(url)) { - alreadyprocessed++; - continue; - } - String metadata = rs.getString("metadata"); - if (metadata == null) { - metadata = ""; - } else if (!metadata.startsWith("\t")) { - metadata = "\t" + metadata; - } - String URLMD = url + metadata; - List v = - SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8))); - buffer.add(url, (Metadata) v.get(1)); + if (bucketNum >= 0) { + ps.setInt(i++, bucketNum); } - // no results? reset the date - if (numhits == 0) { - lastNextFetchDate = null; + ps.setInt(i++, maxDocsPerBucket); + + if (maxNumResults != -1) { + ps.setInt(i++, maxNumResults); } - eventCounter.scope("already_being_processed").incrBy(alreadyprocessed); - eventCounter.scope("queries").incrBy(1); - eventCounter.scope("docs").incrBy(numhits); + // dump query to log + LOG.debug("{} SQL query {}", logIdprefix, preparedSql); + + try (ResultSet rs = ps.executeQuery()) { + final long timeTaken = recordQueryTiming(timeStartQuery); - LOG.info( - "{} SQL query returned {} hits in {} msec with {} already being processed", - logIdprefix, - numhits, - timeTaken, - alreadyprocessed); + // iterate through the java resultset + while (rs.next()) { + numhits++; + alreadyprocessed += processRow(rs); + } + postProcessResults(numhits, alreadyprocessed, timeTaken); + } } catch (SQLException e) { LOG.error("Exception while querying table", e); - } finally { - try { - if (rs != null) rs.close(); - } catch (SQLException e) { - LOG.error("Exception closing resultset", e); - } - try { - if (st != null) st.close(); - } catch (SQLException e) { - LOG.error("Exception closing statement", e); - } } } @@ -232,4 +211,52 @@ public void close() { LOG.error("Exception caught while closing SQL connection", e); } } + + private long recordQueryTiming(long timeStartQuery) { + long timeTaken = System.currentTimeMillis() - timeStartQuery; + queryTimes.addMeasurement(timeTaken); + return timeTaken; + } + + private int processRow(final ResultSet rs) throws SQLException { + + final String url = rs.getString(URL_COLUMN); + final String metadata = rs.getString(METADATA_COLUMN); + + // already processed? skip + if (beingProcessed.containsKey(url)) { + return 1; + } + + final String normalisedMetadata = + (metadata == null || metadata.startsWith("\t")) ? metadata : "\t" + metadata; + + final String urlWithMetadata = String.format("%s%s", url, normalisedMetadata); + final List v = + SCHEME.deserialize( + ByteBuffer.wrap(urlWithMetadata.getBytes(StandardCharsets.UTF_8))); + buffer.add(url, (Metadata) v.get(1)); + + return 0; + } + + private void postProcessResults( + final int numHits, final int alreadyProcessed, final long timeTaken) { + + // no results? reset the date + if (numHits == 0) { + lastNextFetchDate = null; + } + + eventCounter.scope("already_being_processed").incrBy(alreadyProcessed); + eventCounter.scope("queries").incrBy(1); + eventCounter.scope("docs").incrBy(numHits); + + LOG.info( + "{} SQL query returned {} hits in {} msec with {} already being processed", + logIdprefix, + numHits, + timeTaken, + alreadyProcessed); + } } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java index 20ca051e5..15f4b8864 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java @@ -52,18 +52,14 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt { private MultiCountMetric eventCounter; private Connection connection; - private String tableName; private URLPartitioner partitioner; private int maxNumBuckets = -1; private int batchMaxSize = 1000; - private float batchMaxIdleMsec = 2000; private int currentBatchSize = 0; - private PreparedStatement insertPreparedStmt = null; - private long lastInsertBatchTime = -1; private String updateQuery; @@ -88,7 +84,8 @@ public void prepare( this.eventCounter = context.registerMetric("counter", new MultiCountMetric(), 10); - tableName = ConfUtils.getString(stormConf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); + final String tableName = + ConfUtils.getString(stormConf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); batchMaxSize = ConfUtils.getInt(stormConf, Constants.SQL_UPDATE_BATCH_SIZE_PARAM_NAME, 1000); @@ -100,19 +97,25 @@ public void prepare( throw new RuntimeException(ex); } - String query = - tableName - + " (url, status, nextfetchdate, metadata, bucket, host)" - + " values (?, ?, ?, ?, ?, ?)"; - - updateQuery = "REPLACE INTO " + query; - insertQuery = "INSERT IGNORE INTO " + query; - - try { - insertPreparedStmt = connection.prepareStatement(insertQuery); - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } + final String baseColumns = + """ + (url, status, nextfetchdate, metadata, bucket, host) + VALUES (?, ?, ?, ?, ?, ?) + """; + + updateQuery = + String.format( + """ + REPLACE INTO %s %s + """, + tableName, baseColumns); + + insertQuery = + String.format( + """ + INSERT IGNORE INTO %s %s + """, + tableName, baseColumns); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate( @@ -162,38 +165,31 @@ public synchronized void store( partition = Math.abs(partitionKey.hashCode() % maxNumBuckets); } - PreparedStatement preparedStmt = this.insertPreparedStmt; - // create in table if does not already exist if (isUpdate) { - preparedStmt = connection.prepareStatement(updateQuery); - } - preparedStmt.setString(1, url); - preparedStmt.setString(2, status.toString()); - if (nextFetch.isPresent()) { - final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant()); - preparedStmt.setObject(3, tsp); - } else { - // a value so large it means it will never be refetched - preparedStmt.setObject(3, NEVER); - } - preparedStmt.setString(4, mdAsString.toString()); - preparedStmt.setInt(5, partition); - preparedStmt.setString(6, partitionKey); - - // updates are not batched - if (isUpdate) { - preparedStmt.executeUpdate(); - preparedStmt.close(); + try (PreparedStatement ps = connection.prepareStatement(updateQuery)) { + ps.setString(1, url); + ps.setString(2, status.toString()); + if (nextFetch.isPresent()) { + final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant()); + ps.setObject(3, tsp); + } else { + // a value so large it means it will never be refetched + ps.setObject(3, NEVER); + } + ps.setString(4, mdAsString.toString()); + ps.setInt(5, partition); + ps.setString(6, partitionKey); + + // updates are not batched + ps.executeUpdate(); + } eventCounter.scope("sql_updates_number").incrBy(1); super.ack(t, url); return; } - // code below is for inserts i.e. DISCOVERED URLs - preparedStmt.addBatch(); - if (lastInsertBatchTime == -1) { lastInsertBatchTime = System.currentTimeMillis(); } @@ -216,6 +212,7 @@ private synchronized void checkExecuteBatch() throws SQLException { } long now = System.currentTimeMillis(); // check whether the insert batches need executing + final float batchMaxIdleMsec = 2000; if ((currentBatchSize == batchMaxSize)) { LOG.info("About to execute batch - triggered by size"); } else if (lastInsertBatchTime + (long) batchMaxIdleMsec < System.currentTimeMillis()) { @@ -229,7 +226,12 @@ private synchronized void checkExecuteBatch() throws SQLException { try { long start = System.currentTimeMillis(); - insertPreparedStmt.executeBatch(); + try (PreparedStatement ps = connection.prepareStatement(insertQuery)) { + // code below is for inserts i.e. DISCOVERED URLs + ps.addBatch(); + ps.executeBatch(); + } + long end = System.currentTimeMillis(); LOG.info("Batched {} inserts executed in {} msec", currentBatchSize, end - start); @@ -253,9 +255,6 @@ private synchronized void checkExecuteBatch() throws SQLException { lastInsertBatchTime = System.currentTimeMillis(); currentBatchSize = 0; waitingAck.clear(); - - insertPreparedStmt.close(); - insertPreparedStmt = connection.prepareStatement(insertQuery); } @Override From 5ff026e693da7dc7f40fc36b1e91f304e83639d2 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sat, 27 Dec 2025 16:10:25 +0000 Subject: [PATCH 02/25] Refactor SQL module to use PreparedStatement in SQLSpout and IndexerBolt for improved readability and performance --- .../org/apache/stormcrawler/sql/SQLUtil.java | 9 ++---- .../sql/metrics/MetricsConsumer.java | 28 +++++++++---------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java index ad75f2432..d580236a4 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java @@ -20,7 +20,6 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; public class SQLUtil { @@ -29,14 +28,14 @@ private SQLUtil() {} public static Connection getConnection(Map stormConf) throws SQLException { // SQL connection details - Map sqlConf = (Map) stormConf.get("sql.connection"); + Map sqlConf = (Map) stormConf.get("sql.connection"); if (sqlConf == null) { throw new RuntimeException( "Missing SQL connection config, add a section 'sql.connection' to the configuration"); } - String url = (String) sqlConf.get("url"); + String url = sqlConf.get("url"); if (url == null) { throw new RuntimeException( "Missing SQL url, add an entry 'url' to the section 'sql.connection' of the configuration"); @@ -44,9 +43,7 @@ public static Connection getConnection(Map stormConf) throws SQL Properties props = new Properties(); - for (Entry entry : sqlConf.entrySet()) { - props.setProperty(entry.getKey(), (String) entry.getValue()); - } + props.putAll(sqlConf); return DriverManager.getConnection(url, props); } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java index 0062f154b..2641d5ea4 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java @@ -23,7 +23,6 @@ import java.time.Instant; import java.util.Collection; import java.util.Map; -import java.util.Map.Entry; import org.apache.storm.metric.api.IMetricsConsumer; import org.apache.storm.task.IErrorReporter; import org.apache.storm.task.TopologyContext; @@ -35,7 +34,7 @@ public class MetricsConsumer implements IMetricsConsumer { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(MetricsConsumer.class); private Connection connection; private String query; @@ -64,13 +63,11 @@ public void prepare( public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { final Timestamp now = Timestamp.from(Instant.now()); - try { - PreparedStatement preparedStmt = connection.prepareStatement(query); + try (PreparedStatement preparedStmt = connection.prepareStatement(query)) { for (DataPoint dataPoint : dataPoints) { handleDataPoints(preparedStmt, taskInfo, dataPoint.name, dataPoint.value, now); } preparedStmt.executeBatch(); - preparedStmt.close(); } catch (SQLException ex) { LOG.error(ex.getMessage(), ex); throw new RuntimeException(ex); @@ -83,24 +80,25 @@ private void handleDataPoints( final String nameprefix, final Object value, final Timestamp now) { - if (value instanceof Number) { + if (value instanceof final Number number) { try { - indexDataPoint( - preparedStmt, taskInfo, now, nameprefix, ((Number) value).doubleValue()); + indexDataPoint(preparedStmt, taskInfo, now, nameprefix, (number).doubleValue()); } catch (SQLException e) { LOG.error("Exception while indexing datapoint", e); } - } else if (value instanceof Map) { - for (Entry entry : ((Map) value).entrySet()) { - String newnameprefix = nameprefix + "." + entry.getKey(); - handleDataPoints(preparedStmt, taskInfo, newnameprefix, entry.getValue(), now); + } else if (value instanceof Map map) { + for (Map.Entry entry : map.entrySet()) { + if (entry.getKey() instanceof String key) { + String newNamePrefix = nameprefix + "." + key; + handleDataPoints(preparedStmt, taskInfo, newNamePrefix, entry.getValue(), now); + } } - } else if (value instanceof Collection) { - for (Object collectionObj : (Collection) value) { + } else if (value instanceof Collection collection) { + for (Object collectionObj : collection) { handleDataPoints(preparedStmt, taskInfo, nameprefix, collectionObj, now); } } else { - LOG.warn("Found data point value {} of {}", nameprefix, value.getClass().toString()); + LOG.warn("Found data point value {} of {}", nameprefix, value.getClass()); } } From 03916ef74a970c56c58538681e0f231b0eeac469 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sat, 27 Dec 2025 20:43:22 +0000 Subject: [PATCH 03/25] Fix forbidden violations --- .../java/org/apache/stormcrawler/sql/IndexerBolt.java | 4 +++- .../main/java/org/apache/stormcrawler/sql/SQLSpout.java | 5 +++-- .../org/apache/stormcrawler/sql/StatusUpdaterBolt.java | 9 +++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index 25bd39d14..895151124 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -172,10 +173,11 @@ private String getQuery(final List keys) { final String updates = keys.stream() - .map(k -> String.format("%s=VALUES(%s)", k, k)) + .map(k -> String.format(Locale.ROOT, "%s=VALUES(%s)", k, k)) .collect(Collectors.joining(", ")); return String.format( + Locale.ROOT, """ INSERT INTO %s (%s%s) VALUES (?%s) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index 4d000fe91..575050c07 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -25,6 +25,7 @@ import java.sql.Timestamp; import java.time.Instant; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.storm.spout.Scheme; import org.apache.storm.spout.SpoutOutputCollector; @@ -122,7 +123,7 @@ public void open( final String bucketClause = (bucketNum >= 0) ? BUCKET_CLAUSE : ""; final String limitClause = (maxNumResults != -1) ? LIMIT_CLAUSE : ""; - preparedSql = String.format(BASE_SQL, tableName, bucketClause, limitClause); + preparedSql = String.format(Locale.ROOT, BASE_SQL, tableName, bucketClause, limitClause); } @Override @@ -231,7 +232,7 @@ private int processRow(final ResultSet rs) throws SQLException { final String normalisedMetadata = (metadata == null || metadata.startsWith("\t")) ? metadata : "\t" + metadata; - final String urlWithMetadata = String.format("%s%s", url, normalisedMetadata); + final String urlWithMetadata = String.format(Locale.ROOT, "%s%s", url, normalisedMetadata); final List v = SCHEME.deserialize( ByteBuffer.wrap(urlWithMetadata.getBytes(StandardCharsets.UTF_8))); diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java index 15f4b8864..e04f1b7cf 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; @@ -105,17 +106,21 @@ public void prepare( updateQuery = String.format( + Locale.ROOT, """ REPLACE INTO %s %s """, - tableName, baseColumns); + tableName, + baseColumns); insertQuery = String.format( + Locale.ROOT, """ INSERT IGNORE INTO %s %s """, - tableName, baseColumns); + tableName, + baseColumns); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate( From 4e37c555fd267f4f9068b748643bac32b33e3b2f Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 16:45:18 +0000 Subject: [PATCH 04/25] changes following PR review. Added tests for SQLSpout --- .../org/apache/stormcrawler/sql/SQLSpout.java | 17 +- .../org/apache/stormcrawler/sql/SQLUtil.java | 12 + .../apache/stormcrawler/sql/SQLSpoutTest.java | 308 ++++++++++++++++++ 3 files changed, 331 insertions(+), 6 deletions(-) create mode 100644 external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index 575050c07..9f2f06388 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -76,6 +76,7 @@ public class SQLSpout extends AbstractQueryingSpout { private static String preparedSql; private Connection connection; + private PreparedStatement ps; /** * if more than one instance of the spout exist, each one is in charge of a separate bucket @@ -124,6 +125,13 @@ public void open( final String limitClause = (maxNumResults != -1) ? LIMIT_CLAUSE : ""; preparedSql = String.format(Locale.ROOT, BASE_SQL, tableName, bucketClause, limitClause); + + try { + ps = connection.prepareStatement(preparedSql); + } catch (SQLException e) { + LOG.error("Failed to prepare statement", e); + throw new RuntimeException(e); + } } @Override @@ -158,7 +166,7 @@ protected void populateBuffer() { long timeStartQuery = System.currentTimeMillis(); - try (PreparedStatement ps = connection.prepareStatement(preparedSql)) { + try { int i = 1; ps.setTimestamp(i++, new Timestamp(lastNextFetchDate.toEpochMilli())); @@ -206,11 +214,8 @@ public void fail(Object msgId) { @Override public void close() { super.close(); - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Exception caught while closing SQL connection", e); - } + SQLUtil.closeResource(ps, "prepared statement"); + SQLUtil.closeResource(connection, "connection"); } private long recordQueryTiming(long timeStartQuery) { diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java index d580236a4..c6c197a64 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java @@ -24,6 +24,8 @@ public class SQLUtil { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(SQLUtil.class); + private SQLUtil() {} public static Connection getConnection(Map stormConf) throws SQLException { @@ -47,4 +49,14 @@ public static Connection getConnection(Map stormConf) throws SQL return DriverManager.getConnection(url, props); } + + public static void closeResource(final AutoCloseable resource, final String resourceName) { + if (resource != null) { + try { + resource.close(); + } catch (Exception e) { + LOG.error("Error closing {}", resourceName, e); + } + } + } } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java new file mode 100644 index 000000000..7b61cc234 --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import static org.apache.stormcrawler.TestUtil.getMockedTopologyContextWithBucket; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.urlbuffer.URLBuffer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers(disabledWithoutDocker = true) +class SQLSpoutTest { + + @Container + private static final MySQLContainer mysqlContainer = + new MySQLContainer<>("mysql:8.4.0") + .withDatabaseName("crawl") + .withUsername("crawler") + .withPassword("crawler"); + + private static Connection testConnection; + + @BeforeAll + static void beforeAll() throws Exception { + // Create table + testConnection = + DriverManager.getConnection( + mysqlContainer.getJdbcUrl(), + mysqlContainer.getUsername(), + mysqlContainer.getPassword()); + + try (Statement stmt = testConnection.createStatement()) { + stmt.execute( + """ + CREATE TABLE urls ( + url VARCHAR(255), + status VARCHAR(16) DEFAULT 'DISCOVERED', + nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata TEXT, + bucket SMALLINT DEFAULT 0, + host VARCHAR(128), + PRIMARY KEY(url) + ) + """); + + // Add indexes + stmt.execute("ALTER TABLE urls ADD INDEX b (`bucket`)"); + stmt.execute("ALTER TABLE urls ADD INDEX t (`nextfetchdate`)"); + stmt.execute("ALTER TABLE urls ADD INDEX h (`host`)"); + } + } + + @BeforeEach + void setup() throws Exception { + // Clear any existing test data + clearAllURLs(); + } + + @AfterAll + static void afterAll() throws Exception { + if (testConnection != null) { + testConnection.close(); + } + } + + @Test + void bufferIsPopulatedAndEmitted() throws Exception { + // Insert base test data with past nextfetchdate to ensure they're eligible for fetching + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + insertTestURL("http://example.com/page1", 0, "example.com", pastTime); + insertTestURL("http://example.com/page2", 0, "example.com", pastTime); + insertTestURL("http://test.com/page1", 0, "test.com", pastTime); + + TestOutputCollector testCollector = new TestOutputCollector(); + SQLSpout spout = createSpout(testCollector, TestUtil.getMockedTopologyContext()); + + URLBuffer buffer = getBufferFromSpout(spout); + assertEquals(0, buffer.size(), "Buffer should be empty initially"); + + // First call to nextTuple() populates the buffer + spout.nextTuple(); + assertEquals(3, buffer.size(), "Buffer should contain 3 URLs after population"); + + final List expectedURLs = + Arrays.asList( + "http://example.com/page1", + "http://example.com/page2", + "http://test.com/page1"); + assertURLsEmitted(spout, testCollector, 3, expectedURLs); + spout.close(); + } + + @Test + void testMaxDocsPerBucket() throws Exception { + // Add more URLs from example.com to test the maxDocsPerBucket limit + // Insert base test data with past nextfetchdate to ensure they're eligible for fetching + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + insertTestURL("http://example.com/page1", 0, "example.com", pastTime); + insertTestURL("http://example.com/page2", 0, "example.com", pastTime); + insertTestURL("http://test.com/page1", 0, "test.com", pastTime); + + TestOutputCollector testCollector = new TestOutputCollector(); + SQLSpout spout = createSpout(testCollector, TestUtil.getMockedTopologyContext()); + + pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + for (int i = 4; i <= 10; i++) { + insertTestURL("http://example.com/page" + i, 0, "example.com", pastTime); + } + + spout.nextTuple(); + + URLBuffer buffer = getBufferFromSpout(spout); + + // With maxDocsPerBucket=5, we should get at most 5 URLs from example.com + // plus 1 from test.com = 6 total + assertEquals(6, buffer.size()); + spout.close(); + } + + @Test + void testSingleInstanceNoBucketFiltering() throws Exception { + + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + + // Insert URLs into different buckets + insertTestURL("http://site1.com/page1", 0, "site1.com", pastTime); + insertTestURL("http://site2.com/page1", 1, "site2.com", pastTime); + insertTestURL("http://site3.com/page1", 2, "site3.com", pastTime); + insertTestURL("http://site4.com/page1", 3, "site4.com", pastTime); + + // Create a single spout instance (totalTasks = 1) + TestOutputCollector collector = new TestOutputCollector(); + TopologyContext context = getMockedTopologyContextWithBucket(0, 1, "sqlSpout"); + SQLSpout singleSpout = createSpout(collector, context); + + // Populate buffer + singleSpout.nextTuple(); + + URLBuffer buffer = getBufferFromSpout(singleSpout); + + // Should fetch all URLs regardless of bucket + assertEquals(4, buffer.size(), "Single spout instance should fetch URLs from all buckets"); + + final List expectedURLs = + Arrays.asList( + "http://site1.com/page1", + "http://site2.com/page1", + "http://site3.com/page1", + "http://site4.com/page1"); + assertURLsEmitted(singleSpout, collector, 4, expectedURLs); + singleSpout.close(); + } + + @Test + void testBucketPartitioningTwoInstances() throws Exception { + + // Insert URLs into different buckets + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + + // Bucket 0 URLs + insertTestURL("http://bucket0-site1.com/page1", 0, "bucket0-site1.com", pastTime); + insertTestURL("http://bucket0-site2.com/page1", 0, "bucket0-site2.com", pastTime); + insertTestURL("http://bucket0-site3.com/page1", 0, "bucket0-site3.com", pastTime); + + // Bucket 1 URLs + insertTestURL("http://bucket1-site1.com/page1", 1, "bucket1-site1.com", pastTime); + insertTestURL("http://bucket1-site2.com/page1", 1, "bucket1-site2.com", pastTime); + insertTestURL("http://bucket1-site3.com/page1", 1, "bucket1-site3.com", pastTime); + + // Create two spout instances with different bucket assignments + SQLSpout[] spouts = new SQLSpout[2]; + for (int i = 0; i < 2; i++) { + TestOutputCollector collector = new TestOutputCollector(); + TopologyContext context = getMockedTopologyContextWithBucket(i, 2, "sqlSpout"); + spouts[i] = createSpout(collector, context); + spouts[i].nextTuple(); + URLBuffer buffer = getBufferFromSpout(spouts[i]); + assertEquals(3, buffer.size()); + assertURLsEmitted( + spouts[i], + collector, + 3, + Arrays.asList( + "http://bucket" + i + "-site1.com/page1", + "http://bucket" + i + "-site2.com/page1", + "http://bucket" + i + "-site3.com/page1")); + spouts[i].close(); + } + } + + private static void insertTestURL(String url, int bucket, String host, Instant time) + throws Exception { + String sql = + """ + INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host) + VALUES (?, ?, ?, ?, ?, ?) + """; + + try (PreparedStatement ps = testConnection.prepareStatement(sql)) { + ps.setString(1, url); + ps.setString(2, "DISCOVERED"); + ps.setTimestamp(3, Timestamp.from(time)); + ps.setString(4, "\tkey=value\tdepth=0"); + ps.setInt(5, bucket); + ps.setString(6, host); + ps.executeUpdate(); + } + } + + /** Helper method to clear all URLs from the database between tests. */ + private static void clearAllURLs() throws Exception { + try (Statement stmt = testConnection.createStatement()) { + stmt.execute("DELETE FROM urls"); + } + } + + private Map createTestConfig() { + Map conf = new HashMap<>(); + + Map sqlConnection = new HashMap<>(); + sqlConnection.put("url", mysqlContainer.getJdbcUrl()); + sqlConnection.put("user", mysqlContainer.getUsername()); + sqlConnection.put("password", mysqlContainer.getPassword()); + conf.put("sql.connection", sqlConnection); + + conf.put("sql.status.table", "urls"); + conf.put("sql.max.urls.per.bucket", 5); + conf.put("sql.spout.max.results", 100); + conf.put( + "urlbuffer.class", "org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer"); + + return conf; + } + + private URLBuffer getBufferFromSpout(SQLSpout spoutInstance) throws Exception { + Field bufferField = spoutInstance.getClass().getSuperclass().getDeclaredField("buffer"); + bufferField.setAccessible(true); + return (URLBuffer) bufferField.get(spoutInstance); + } + + private void assertURLsEmitted( + SQLSpout spout, + TestOutputCollector collector, + int numTuples, + List expectedURLs) { + assertEquals(0, collector.getEmitted().size()); + + // Emit all URLs + Set urls = new HashSet<>(); + for (int i = 0; i < numTuples; i++) { + spout.nextTuple(); + } + for (List tuple : collector.getEmitted()) { + urls.add((String) tuple.get(0)); + } + + for (String url : expectedURLs) { + assertTrue(urls.contains(url)); + } + } + + private SQLSpout createSpout(TestOutputCollector collector, TopologyContext context) { + SQLSpout singleSpout = new SQLSpout(); + Map conf = createTestConfig(); + singleSpout.open(conf, context, new SpoutOutputCollector(collector)); + singleSpout.activate(); + return singleSpout; + } +} From 42dd9f3cccfb6d9877e3f5a5e3a2a743560d9dad Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 16:46:23 +0000 Subject: [PATCH 05/25] changes following PR review. Added tests for SQLSpout --- .../org/apache/stormcrawler/TestUtil.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/core/src/test/java/org/apache/stormcrawler/TestUtil.java b/core/src/test/java/org/apache/stormcrawler/TestUtil.java index 033255c40..d8358489c 100644 --- a/core/src/test/java/org/apache/stormcrawler/TestUtil.java +++ b/core/src/test/java/org/apache/stormcrawler/TestUtil.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.when; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.storm.metric.api.IMetric; import org.apache.storm.task.TopologyContext; @@ -48,6 +50,42 @@ public IMetric answer(InvocationOnMock invocation) throws Throwable { return context; } + /** + * Creates a mocked TopologyContext for testing bucket partitioning. + * + * @param taskIndex The task index for this spout instance (determines bucket number) + * @param totalTasks Total number of tasks in the topology + * @param componentId The component ID + * @return Mocked TopologyContext + */ + public static TopologyContext getMockedTopologyContextWithBucket( + int taskIndex, int totalTasks, String componentId) { + TopologyContext context = mock(TopologyContext.class); + + // Mock metric registration + when(context.registerMetric(anyString(), any(IMetric.class), anyInt())) + .thenAnswer( + new Answer() { + @Override + public IMetric answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArgument(1, IMetric.class); + } + }); + + // Mock task information for bucket assignment + when(context.getThisTaskIndex()).thenReturn(taskIndex); + when(context.getThisComponentId()).thenReturn(componentId); + + // Create list of task IDs (0 to totalTasks-1) + List taskIds = new ArrayList<>(); + for (int i = 0; i < totalTasks; i++) { + taskIds.add(i); + } + when(context.getComponentTasks(componentId)).thenReturn(taskIds); + + return context; + } + public static Tuple getMockedTestTuple(String url, String content, Metadata metadata) { Tuple tuple = mock(Tuple.class); when(tuple.getStringByField("url")).thenReturn(url); From 37d595f76c1741eaa0760cec279616b3f0fbec1c Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 17:02:27 +0000 Subject: [PATCH 06/25] changes following PR review. Added tests for SQLSpout --- .../org/apache/stormcrawler/sql/SQLSpoutTest.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java index 7b61cc234..a9c725823 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -113,12 +113,8 @@ void bufferIsPopulatedAndEmitted() throws Exception { TestOutputCollector testCollector = new TestOutputCollector(); SQLSpout spout = createSpout(testCollector, TestUtil.getMockedTopologyContext()); - URLBuffer buffer = getBufferFromSpout(spout); - assertEquals(0, buffer.size(), "Buffer should be empty initially"); - // First call to nextTuple() populates the buffer spout.nextTuple(); - assertEquals(3, buffer.size(), "Buffer should contain 3 URLs after population"); final List expectedURLs = Arrays.asList( @@ -175,11 +171,6 @@ void testSingleInstanceNoBucketFiltering() throws Exception { // Populate buffer singleSpout.nextTuple(); - URLBuffer buffer = getBufferFromSpout(singleSpout); - - // Should fetch all URLs regardless of bucket - assertEquals(4, buffer.size(), "Single spout instance should fetch URLs from all buckets"); - final List expectedURLs = Arrays.asList( "http://site1.com/page1", @@ -213,8 +204,6 @@ void testBucketPartitioningTwoInstances() throws Exception { TopologyContext context = getMockedTopologyContextWithBucket(i, 2, "sqlSpout"); spouts[i] = createSpout(collector, context); spouts[i].nextTuple(); - URLBuffer buffer = getBufferFromSpout(spouts[i]); - assertEquals(3, buffer.size()); assertURLsEmitted( spouts[i], collector, From 747aa9b98e9b2c64fa566eaa6bb31aaf82e3b9b6 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 18:04:18 +0000 Subject: [PATCH 07/25] changes following PR review. Added tests for StatusUpdaterBolt --- .../stormcrawler/sql/StatusUpdaterBolt.java | 89 +++++---- .../sql/StatusUpdaterBoltTest.java | 172 ++++++++++++++++++ 2 files changed, 228 insertions(+), 33 deletions(-) create mode 100644 external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java index e04f1b7cf..2c997c8c6 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java @@ -16,6 +16,8 @@ */ package org.apache.stormcrawler.sql; +import static org.apache.stormcrawler.sql.SQLUtil.closeResource; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -63,8 +65,8 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt { private long lastInsertBatchTime = -1; - private String updateQuery; - private String insertQuery; + private PreparedStatement updatePreparedStmt; + private PreparedStatement insertPreparedStmt; private final Map> waitingAck = new HashMap<>(); @@ -104,7 +106,7 @@ public void prepare( VALUES (?, ?, ?, ?, ?, ?) """; - updateQuery = + final String updateQuery = String.format( Locale.ROOT, """ @@ -113,7 +115,7 @@ public void prepare( tableName, baseColumns); - insertQuery = + final String insertQuery = String.format( Locale.ROOT, """ @@ -122,6 +124,14 @@ public void prepare( tableName, baseColumns); + try { + updatePreparedStmt = connection.prepareStatement(updateQuery); + insertPreparedStmt = connection.prepareStatement(insertQuery); + } catch (SQLException e) { + LOG.error("Failed to prepare statements", e); + throw new RuntimeException(e); + } + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate( () -> { @@ -172,29 +182,26 @@ public synchronized void store( // create in table if does not already exist if (isUpdate) { - - try (PreparedStatement ps = connection.prepareStatement(updateQuery)) { - ps.setString(1, url); - ps.setString(2, status.toString()); - if (nextFetch.isPresent()) { - final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant()); - ps.setObject(3, tsp); - } else { - // a value so large it means it will never be refetched - ps.setObject(3, NEVER); - } - ps.setString(4, mdAsString.toString()); - ps.setInt(5, partition); - ps.setString(6, partitionKey); - - // updates are not batched - ps.executeUpdate(); - } + populate( + url, + status, + nextFetch, + mdAsString, + partition, + partitionKey, + updatePreparedStmt); + + // updates are not batched + updatePreparedStmt.executeUpdate(); eventCounter.scope("sql_updates_number").incrBy(1); super.ack(t, url); return; } + // code below is for inserts i.e. DISCOVERED URLs + populate(url, status, nextFetch, mdAsString, partition, partitionKey, insertPreparedStmt); + insertPreparedStmt.addBatch(); + if (lastInsertBatchTime == -1) { lastInsertBatchTime = System.currentTimeMillis(); } @@ -211,6 +218,29 @@ public synchronized void store( eventCounter.scope("sql_inserts_number").incrBy(1); } + private void populate( + final String url, + final Status status, + final Optional nextFetch, + final StringBuilder mdAsString, + final int partition, + final String partitionKey, + final PreparedStatement preparedStmt) + throws SQLException { + preparedStmt.setString(1, url); + preparedStmt.setString(2, status.toString()); + if (nextFetch.isPresent()) { + final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant()); + preparedStmt.setObject(3, tsp); + } else { + // a value so large it means it will never be refetched + preparedStmt.setObject(3, NEVER); + } + preparedStmt.setString(4, mdAsString.toString()); + preparedStmt.setInt(5, partition); + preparedStmt.setString(6, partitionKey); + } + private synchronized void checkExecuteBatch() throws SQLException { if (currentBatchSize == 0) { return; @@ -231,12 +261,7 @@ private synchronized void checkExecuteBatch() throws SQLException { try { long start = System.currentTimeMillis(); - try (PreparedStatement ps = connection.prepareStatement(insertQuery)) { - // code below is for inserts i.e. DISCOVERED URLs - ps.addBatch(); - ps.executeBatch(); - } - + insertPreparedStmt.executeBatch(); long end = System.currentTimeMillis(); LOG.info("Batched {} inserts executed in {} msec", currentBatchSize, end - start); @@ -264,10 +289,8 @@ private synchronized void checkExecuteBatch() throws SQLException { @Override public void cleanup() { - if (connection != null) - try { - connection.close(); - } catch (SQLException e) { - } + closeResource(updatePreparedStmt, "update prepared statement"); + closeResource(insertPreparedStmt, "insert prepared statement"); + closeResource(connection, "connection"); } } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java new file mode 100644 index 000000000..75f1fe29d --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.Status; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers(disabledWithoutDocker = true) +class StatusUpdaterBoltTest { + + @Container + private static final MySQLContainer mysqlContainer = + new MySQLContainer<>("mysql:8.0") + .withDatabaseName("crawl") + .withUsername("crawler") + .withPassword("crawler"); + + private Connection testConnection; + private TestOutputCollector output; + + @BeforeEach + void setup() throws Exception { + output = new TestOutputCollector(); + // Create table + testConnection = + DriverManager.getConnection( + mysqlContainer.getJdbcUrl(), + mysqlContainer.getUsername(), + mysqlContainer.getPassword()); + + try (Statement stmt = testConnection.createStatement()) { + + stmt.execute( + """ + CREATE TABLE IF NOT EXISTS urls ( + url VARCHAR(255), + status VARCHAR(16) DEFAULT 'DISCOVERED', + nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata TEXT, + bucket SMALLINT DEFAULT 0, + host VARCHAR(128), + PRIMARY KEY(url) + ) + """); + } + } + + @AfterEach + void cleanup() throws Exception { + if (testConnection != null) { + testConnection.close(); + } + } + + @Test + void testStoreDiscoveredURL() throws Exception { + StatusUpdaterBolt bolt = createBolt(); + String url = "http://example.com/page1"; + Metadata metadata = new Metadata(); + metadata.addValue("key1", "value1"); + + Tuple tuple = createTuple(url, Status.DISCOVERED, metadata); + bolt.execute(tuple); + + // DISCOVERED URLs are batched and the batch executes after 2 seconds (batchMaxIdleMsec) + // Wait long enough for the batch to be executed + Thread.sleep(3000); + + // Verify URL was stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url = '" + url + "'")) { + assertTrue(rs.next(), "URL should be stored in database after batch execution"); + assertEquals("DISCOVERED", rs.getString("status")); + assertNotNull(rs.getString("metadata")); + } + bolt.cleanup(); + } + + @Test + void testUpdateURL() throws Exception { + StatusUpdaterBolt bolt = createBolt(); + String url = "http://example.com/page2"; + Metadata metadata = new Metadata(); + metadata.addValue("key1", "value1"); + + // First store as DISCOVERED + Tuple tuple1 = createTuple(url, Status.DISCOVERED, metadata); + bolt.execute(tuple1); + + // Now update to FETCHED + Tuple tuple2 = createTuple(url, Status.FETCHED, metadata); + bolt.execute(tuple2); + + // Verify URL was updated + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("FETCHED", rs.getString("status")); + assertNotNull(rs.getString("metadata")); + } + bolt.cleanup(); + } + + private Tuple createTuple(String url, Status status, Metadata metadata) { + Tuple tuple = mock(Tuple.class); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getValueByField("status")).thenReturn(status); + when(tuple.getValueByField("metadata")).thenReturn(metadata); + return tuple; + } + + private Map createTestConfig() { + Map conf = new HashMap<>(); + + Map sqlConnection = new HashMap<>(); + sqlConnection.put("url", mysqlContainer.getJdbcUrl()); + sqlConnection.put("user", mysqlContainer.getUsername()); + sqlConnection.put("password", mysqlContainer.getPassword()); + conf.put("sql.connection", sqlConnection); + + conf.put("sql.status.table", "urls"); + conf.put("sql.status.max.urls.per.bucket", 10); + conf.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler"); + // Add cache configuration to prevent NullPointerException + conf.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h"); + + return conf; + } + + private StatusUpdaterBolt createBolt() { + StatusUpdaterBolt bolt = new StatusUpdaterBolt(); + Map conf = createTestConfig(); + bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + return bolt; + } +} From a69212d3b40fceaa2773535dae5ce0aecebf26ec Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 18:32:40 +0000 Subject: [PATCH 08/25] Add test dependencies and update MySQL container version in tests --- external/sql/pom.xml | 30 ++++++++++++++----- .../apache/stormcrawler/sql/SQLSpoutTest.java | 5 +++- .../sql/StatusUpdaterBoltTest.java | 5 +++- pom.xml | 8 +++++ 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/external/sql/pom.xml b/external/sql/pom.xml index 0eece4e64..b4ef395a5 100644 --- a/external/sql/pom.xml +++ b/external/sql/pom.xml @@ -37,13 +37,29 @@ under the License. SQL-based resources for StormCrawler - + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + mysql + test + + + com.mysql + mysql-connector-j + test + + + org.apache.stormcrawler + stormcrawler-core + ${project.version} + test-jar + test + diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java index a9c725823..cb260a2ab 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -46,13 +46,16 @@ import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; @Testcontainers(disabledWithoutDocker = true) class SQLSpoutTest { + private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); + @Container private static final MySQLContainer mysqlContainer = - new MySQLContainer<>("mysql:8.4.0") + new MySQLContainer<>(MYSQL_IMAGE) .withDatabaseName("crawl") .withUsername("crawler") .withPassword("crawler"); diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index 75f1fe29d..09aa484f3 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -40,13 +40,16 @@ import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; @Testcontainers(disabledWithoutDocker = true) class StatusUpdaterBoltTest { + private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); + @Container private static final MySQLContainer mysqlContainer = - new MySQLContainer<>("mysql:8.0") + new MySQLContainer<>(MYSQL_IMAGE) .withDatabaseName("crawl") .withUsername("crawler") .withPassword("crawler"); diff --git a/pom.xml b/pom.xml index 183a1830d..5d27560ab 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ under the License. 1.00 1.00 1.00 + 9.3.0 true @@ -673,6 +674,13 @@ under the License. ${commons.codec.version} + + com.mysql + mysql-connector-j + ${mysql-connector-j} + test + + From 3e3d6bd4c399173019b9add0a2337107e048aa5c Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 20:45:13 +0000 Subject: [PATCH 09/25] Fix falkiness in testStoreDiscoveredURL --- .../apache/stormcrawler/sql/StatusUpdaterBoltTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index 09aa484f3..bf3bc6a5b 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -101,9 +101,10 @@ void testStoreDiscoveredURL() throws Exception { Tuple tuple = createTuple(url, Status.DISCOVERED, metadata); bolt.execute(tuple); - // DISCOVERED URLs are batched and the batch executes after 2 seconds (batchMaxIdleMsec) - // Wait long enough for the batch to be executed - Thread.sleep(3000); + // Trigger batch execution by sending another tuple (which will also check the batch) + String url2 = "http://example.com/page1-trigger"; + Tuple triggerTuple = createTuple(url2, Status.DISCOVERED, metadata); + bolt.execute(triggerTuple); // Verify URL was stored try (Statement stmt = testConnection.createStatement(); @@ -160,8 +161,8 @@ private Map createTestConfig() { conf.put("sql.status.table", "urls"); conf.put("sql.status.max.urls.per.bucket", 10); conf.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler"); - // Add cache configuration to prevent NullPointerException conf.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h"); + conf.put("sql.update.batch.size", 1); return conf; } From 92454995a79dee0211bf32943ee50f9390683f19 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 22:19:28 +0000 Subject: [PATCH 10/25] ScheduledExecutorService should be shit down when the bolt is cleaned up. --- .../stormcrawler/sql/StatusUpdaterBolt.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java index 2c997c8c6..27dc27194 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java @@ -67,6 +67,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt { private PreparedStatement updatePreparedStmt; private PreparedStatement insertPreparedStmt; + private ScheduledExecutorService executor; private final Map> waitingAck = new HashMap<>(); @@ -132,7 +133,7 @@ public void prepare( throw new RuntimeException(e); } - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate( () -> { try { @@ -292,5 +293,20 @@ public void cleanup() { closeResource(updatePreparedStmt, "update prepared statement"); closeResource(insertPreparedStmt, "insert prepared statement"); closeResource(connection, "connection"); + closeExecutor(); + } + + private void closeExecutor() { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } } From fb7febe4c5ad2eb52ed4368e217a99845518fa62 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:00:32 +0000 Subject: [PATCH 11/25] Refactor IndexerBolt to improve code clarity and add unit tests for indexing functionality --- .../apache/stormcrawler/sql/IndexerBolt.java | 65 ++-- .../stormcrawler/sql/IndexerBoltTest.java | 365 ++++++++++++++++++ 2 files changed, 405 insertions(+), 25 deletions(-) create mode 100644 external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index 895151124..dc3b570f7 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -46,12 +46,14 @@ public class IndexerBolt extends AbstractIndexerBolt { public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table"; - private OutputCollector _collector; + private OutputCollector collector; private MultiCountMetric eventCounter; private Connection connection; + private PreparedStatement preparedStmt; + private String tableName; private Map conf; @@ -60,7 +62,7 @@ public class IndexerBolt extends AbstractIndexerBolt { public void prepare( Map conf, TopologyContext context, OutputCollector collector) { super.prepare(conf, context, collector); - _collector = collector; + this.collector = collector; this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10); @@ -85,8 +87,8 @@ public void execute(Tuple tuple) { eventCounter.scope("Filtered").incrBy(1); // treat it as successfully processed even if // we do not index it - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); - _collector.ack(tuple); + collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + collector.ack(tuple); return; } @@ -110,33 +112,32 @@ public void execute(Tuple tuple) { LOG.debug("PreparedStatement => {}", query); // create the mysql insert PreparedStatement + preparedStmt = connection.prepareStatement(query); + // TODO store the text of the document? + if (StringUtils.isNotBlank(fieldNameForText())) { + // builder.field(fieldNameForText(), trimText(text)); + } - try (PreparedStatement preparedStmt = connection.prepareStatement(query)) { - // TODO store the text of the document? - if (StringUtils.isNotBlank(fieldNameForText())) { - // builder.field(fieldNameForText(), trimText(text)); - } - - // send URL as field? - if (fieldNameForURL() != null) { - preparedStmt.setString(1, normalisedurl); - } + // send URL as field? + if (fieldNameForURL() != null) { + preparedStmt.setString(1, normalisedurl); + } - for (int i = 0; i < keys.size(); i++) { - insert(preparedStmt, i + 2, keys.get(i), keyVals); - preparedStmt.addBatch(); - } - preparedStmt.executeBatch(); + // Set all metadata parameters + for (int i = 0; i < keys.size(); i++) { + insert(preparedStmt, i + 2, keys.get(i), keyVals); + } + // Execute the statement (single row insert) + preparedStmt.executeUpdate(); - eventCounter.scope("Indexed").incrBy(1); + eventCounter.scope("Indexed").incrBy(1); - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); - _collector.ack(tuple); - } + collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + collector.ack(tuple); } catch (Exception e) { // do not send to status stream so that it gets replayed LOG.error("Error inserting into SQL", e); - _collector.fail(tuple); + collector.fail(tuple); if (connection != null) { // reset the connection try { @@ -176,6 +177,14 @@ private String getQuery(final List keys) { .map(k -> String.format(Locale.ROOT, "%s=VALUES(%s)", k, k)) .collect(Collectors.joining(", ")); + // Build the ON DUPLICATE KEY UPDATE clause + // If there are metadata keys, update them; otherwise, update the URL field to itself + final String updateClause = + updates.isEmpty() + ? String.format( + Locale.ROOT, "%s=VALUES(%s)", fieldNameForURL(), fieldNameForURL()) + : updates; + return String.format( Locale.ROOT, """ @@ -187,6 +196,12 @@ private String getQuery(final List keys) { fieldNameForURL(), columns.isEmpty() ? "" : ", " + columns, placeholders.isEmpty() ? "" : ", " + placeholders, - updates); + updateClause); + } + + @Override + public void cleanup() { + SQLUtil.closeResource(preparedStmt, "prepared statement"); + SQLUtil.closeResource(connection, "connection"); } } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java new file mode 100644 index 000000000..6039098fc --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Constants; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.apache.stormcrawler.persistence.Status; +import org.apache.stormcrawler.util.RobotsTags; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers(disabledWithoutDocker = true) +class IndexerBoltTest { + + private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); + + @Container + private static final MySQLContainer mysqlContainer = + new MySQLContainer<>(MYSQL_IMAGE) + .withDatabaseName("crawl") + .withUsername("crawler") + .withPassword("crawler"); + + private Connection testConnection; + private TestOutputCollector output; + private final String tableName = "content"; + + @BeforeEach + void setup() throws Exception { + output = new TestOutputCollector(); + testConnection = + DriverManager.getConnection( + mysqlContainer.getJdbcUrl(), + mysqlContainer.getUsername(), + mysqlContainer.getPassword()); + + // Create content table for indexing using the configured table name + try (Statement stmt = testConnection.createStatement()) { + stmt.execute( + """ + CREATE TABLE IF NOT EXISTS content ( + url VARCHAR(255) PRIMARY KEY, + title VARCHAR(255), + description TEXT, + keywords VARCHAR(255) + ) + """); + // Clear table before each test + stmt.execute("TRUNCATE TABLE content"); + } + } + + @AfterEach + void cleanup() throws Exception { + if (testConnection != null) { + testConnection.close(); + } + } + + @Test + void testBasicIndexing() throws Exception { + IndexerBolt bolt = createBolt(createBasicConfig()); + + String url = "http://example.com/page1"; + String text = "This is the page content"; + Metadata metadata = new Metadata(); + metadata.addValue("title", "Test Page Title"); + metadata.addValue("description", "Test page description"); + metadata.addValue("keywords", "test, page, keywords"); + + Tuple tuple = createTuple(url, text, metadata); + bolt.execute(tuple); + + // Verify URL was stored in database + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next(), "URL should be stored in database"); + assertEquals(url, rs.getString("url")); + assertEquals("Test Page Title", rs.getString("title")); + assertEquals("Test page description", rs.getString("description")); + assertEquals("test, page, keywords", rs.getString("keywords")); + } + + // Verify tuple was acked and status emitted + assertEquals(1, output.getAckedTuples().size()); + assertEquals(1, output.getEmitted(Constants.StatusStreamName).size()); + + // Verify emitted status is FETCHED + List emitted = output.getEmitted(Constants.StatusStreamName).get(0); + assertEquals(url, emitted.get(0)); + assertEquals(Status.FETCHED, emitted.get(2)); + bolt.cleanup(); + } + + @Test + void testDuplicateHandling() throws Exception { + IndexerBolt bolt = createBolt(createBasicConfig()); + + String url = "http://example.com/page2"; + + // First indexing + Metadata metadata1 = new Metadata(); + metadata1.addValue("title", "Original Title"); + metadata1.addValue("description", "Original description"); + + Tuple tuple1 = createTuple(url, "Original content", metadata1); + bolt.execute(tuple1); + + // Verify first insert + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Original Title", rs.getString("title")); + } + + // Second indexing with updated content (same URL) + Metadata metadata2 = new Metadata(); + metadata2.addValue("title", "Updated Title"); + metadata2.addValue("description", "Updated description"); + + Tuple tuple2 = createTuple(url, "Updated content", metadata2); + bolt.execute(tuple2); + + // Verify ON DUPLICATE KEY UPDATE worked - should have updated values + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Updated Title", rs.getString("title")); + assertEquals("Updated description", rs.getString("description")); + // Should only be one row + assertFalse(rs.next(), "Should only have one row for the URL"); + } + + // Verify both tuples were acked + assertEquals(2, output.getAckedTuples().size()); + bolt.cleanup(); + } + + @Test + void testFilteringByRobotsNoIndex() throws Exception { + IndexerBolt bolt = createBolt(createBasicConfig()); + + String url = "http://example.com/noindex-page"; + Metadata metadata = new Metadata(); + metadata.addValue("title", "Should Not Be Indexed"); + metadata.addValue(RobotsTags.ROBOTS_NO_INDEX, "true"); + + Tuple tuple = createTuple(url, "Content", metadata); + bolt.execute(tuple); + + // Verify URL was NOT stored in database + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertFalse(rs.next(), "URL with noindex should not be stored in database"); + } + + // But tuple should still be acked and FETCHED status emitted + assertEquals(1, output.getAckedTuples().size()); + assertEquals(1, output.getEmitted(Constants.StatusStreamName).size()); + + List emitted = output.getEmitted(Constants.StatusStreamName).get(0); + assertEquals(Status.FETCHED, emitted.get(2)); + bolt.cleanup(); + } + + @Test + void testFilteringByMetadataFilter() throws Exception { + // Configure filter to only index documents with indexable=yes + Map conf = createBasicConfig(); + conf.put(AbstractIndexerBolt.metadataFilterParamName, "indexable=yes"); + + IndexerBolt bolt = createBolt(conf); + + // Document that should be filtered out (no indexable metadata) + String url1 = "http://example.com/filtered-page"; + Metadata metadata1 = new Metadata(); + metadata1.addValue("title", "Filtered Page"); + + Tuple tuple1 = createTuple(url1, "Content", metadata1); + bolt.execute(tuple1); + + // Verify filtered document was NOT stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url1 + "'")) { + assertFalse(rs.next(), "Filtered document should not be stored"); + } + + // Document that should be indexed (has indexable=yes) + String url2 = "http://example.com/indexed-page"; + Metadata metadata2 = new Metadata(); + metadata2.addValue("title", "Indexed Page"); + metadata2.addValue("indexable", "yes"); + + Tuple tuple2 = createTuple(url2, "Content", metadata2); + bolt.execute(tuple2); + + // Verify indexed document WAS stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url2 + "'")) { + assertTrue(rs.next(), "Document with indexable=yes should be stored"); + assertEquals("Indexed Page", rs.getString("title")); + } + + // Both tuples should be acked with FETCHED status + assertEquals(2, output.getAckedTuples().size()); + assertEquals(2, output.getEmitted(Constants.StatusStreamName).size()); + bolt.cleanup(); + } + + @Test + void testMetadataExtraction() throws Exception { + // Configure to only extract specific metadata fields + Map conf = createBasicConfig(); + // Only map title and description, not keywords + List mdMapping = new ArrayList<>(); + mdMapping.add("title"); + mdMapping.add("description"); + conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); + + IndexerBolt bolt = createBolt(conf); + + String url = "http://example.com/metadata-test"; + Metadata metadata = new Metadata(); + metadata.addValue("title", "Extracted Title"); + metadata.addValue("description", "Extracted Description"); + metadata.addValue("keywords", "these,should,not,be,stored"); + metadata.addValue("author", "Should Not Be Stored"); + + Tuple tuple = createTuple(url, "Content", metadata); + bolt.execute(tuple); + + // Verify only configured metadata was stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Extracted Title", rs.getString("title")); + assertEquals("Extracted Description", rs.getString("description")); + // keywords column should be null since it wasn't in the mapping + assertNull(rs.getString("keywords")); + } + + assertEquals(1, output.getAckedTuples().size()); + bolt.cleanup(); + } + + @Test + void testMetadataAliasMapping() throws Exception { + // Configure metadata mapping with aliases + Map conf = createBasicConfig(); + List mdMapping = new ArrayList<>(); + mdMapping.add("parse.title=title"); // map parse.title to title column + mdMapping.add("parse.description=description"); + conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); + + IndexerBolt bolt = createBolt(conf); + + String url = "http://example.com/alias-test"; + Metadata metadata = new Metadata(); + metadata.addValue("parse.title", "Title from Parser"); + metadata.addValue("parse.description", "Description from Parser"); + + Tuple tuple = createTuple(url, "Content", metadata); + bolt.execute(tuple); + + // Verify aliased metadata was stored correctly + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Title from Parser", rs.getString("title")); + assertEquals("Description from Parser", rs.getString("description")); + } + bolt.cleanup(); + } + + private Tuple createTuple(String url, String text, Metadata metadata) { + Tuple tuple = mock(Tuple.class); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getStringByField("text")).thenReturn(text); + when(tuple.getValueByField("metadata")).thenReturn(metadata); + return tuple; + } + + private Map createBasicConfig() { + Map conf = new HashMap<>(); + + Map sqlConnection = new HashMap<>(); + sqlConnection.put("url", mysqlContainer.getJdbcUrl()); + sqlConnection.put("user", mysqlContainer.getUsername()); + sqlConnection.put("password", mysqlContainer.getPassword()); + conf.put("sql.connection", sqlConnection); + + conf.put(IndexerBolt.SQL_INDEX_TABLE_PARAM_NAME, tableName); + conf.put(AbstractIndexerBolt.urlFieldParamName, "url"); + + // Default metadata mapping + List mdMapping = new ArrayList<>(); + mdMapping.add("title"); + mdMapping.add("description"); + mdMapping.add("keywords"); + conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); + + return conf; + } + + private IndexerBolt createBolt(Map conf) { + IndexerBolt bolt = new IndexerBolt(); + bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + return bolt; + } +} From c14419bc591a96c4cbdbd3a772d1420915b433b5 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:18:47 +0000 Subject: [PATCH 12/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../stormcrawler/sql/AbstractSQLTest.java | 82 +++++++++++++++++++ .../stormcrawler/sql/IndexerBoltTest.java | 46 ++--------- .../apache/stormcrawler/sql/SQLSpoutTest.java | 74 +++-------------- .../sql/StatusUpdaterBoltTest.java | 50 +++-------- 4 files changed, 109 insertions(+), 143 deletions(-) create mode 100644 external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java new file mode 100644 index 000000000..8e6c4bb02 --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Abstract base class for SQL module tests that provides a shared MySQL container. Uses the + * Testcontainers singleton pattern to ensure the container is created once and reused across all + * test classes, improving test performance. + */ +public abstract class AbstractSQLTest { + + private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); + + private static final MySQLContainer MYSQL_CONTAINER; + + static { + MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_IMAGE) + .withDatabaseName("crawl") + .withUsername("crawler") + .withPassword("crawler") + .withReuse(true); + MYSQL_CONTAINER.start(); + } + + protected Connection testConnection; + + protected static Connection createConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + protected static Map createSqlConnectionConfig() { + Map sqlConnection = new HashMap<>(); + sqlConnection.put("url", MYSQL_CONTAINER.getJdbcUrl()); + sqlConnection.put("user", MYSQL_CONTAINER.getUsername()); + sqlConnection.put("password", MYSQL_CONTAINER.getPassword()); + return sqlConnection; + } + + @BeforeEach + void setup() throws Exception { + testConnection = createConnection(); + setupTestTables(); + } + + protected abstract void setupTestTables() throws Exception; + + @AfterEach + void baseCleanup() throws Exception { + if (testConnection != null) { + testConnection.close(); + } + MYSQL_CONTAINER.close(); + } +} diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java index 6039098fc..9ac9de784 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -23,8 +23,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; @@ -40,40 +38,16 @@ import org.apache.stormcrawler.indexing.AbstractIndexerBolt; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.RobotsTags; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -@Testcontainers(disabledWithoutDocker = true) -class IndexerBoltTest { +class IndexerBoltTest extends AbstractSQLTest { - private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); - - @Container - private static final MySQLContainer mysqlContainer = - new MySQLContainer<>(MYSQL_IMAGE) - .withDatabaseName("crawl") - .withUsername("crawler") - .withPassword("crawler"); - - private Connection testConnection; private TestOutputCollector output; private final String tableName = "content"; - @BeforeEach - void setup() throws Exception { - output = new TestOutputCollector(); - testConnection = - DriverManager.getConnection( - mysqlContainer.getJdbcUrl(), - mysqlContainer.getUsername(), - mysqlContainer.getPassword()); - - // Create content table for indexing using the configured table name + @Override + public void setupTestTables() throws Exception { try (Statement stmt = testConnection.createStatement()) { stmt.execute( """ @@ -89,11 +63,9 @@ keywords VARCHAR(255) } } - @AfterEach - void cleanup() throws Exception { - if (testConnection != null) { - testConnection.close(); - } + @BeforeEach + void setup() { + output = new TestOutputCollector(); } @Test @@ -338,11 +310,7 @@ private Tuple createTuple(String url, String text, Metadata metadata) { private Map createBasicConfig() { Map conf = new HashMap<>(); - Map sqlConnection = new HashMap<>(); - sqlConnection.put("url", mysqlContainer.getJdbcUrl()); - sqlConnection.put("user", mysqlContainer.getUsername()); - sqlConnection.put("password", mysqlContainer.getPassword()); - conf.put("sql.connection", sqlConnection); + conf.put("sql.connection", createSqlConnectionConfig()); conf.put(IndexerBolt.SQL_INDEX_TABLE_PARAM_NAME, tableName); conf.put(AbstractIndexerBolt.urlFieldParamName, "url"); diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java index cb260a2ab..11eddd14a 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -21,8 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.reflect.Field; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Statement; import java.sql.Timestamp; @@ -39,42 +37,17 @@ import org.apache.stormcrawler.TestOutputCollector; import org.apache.stormcrawler.TestUtil; import org.apache.stormcrawler.persistence.urlbuffer.URLBuffer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -@Testcontainers(disabledWithoutDocker = true) -class SQLSpoutTest { - - private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); - - @Container - private static final MySQLContainer mysqlContainer = - new MySQLContainer<>(MYSQL_IMAGE) - .withDatabaseName("crawl") - .withUsername("crawler") - .withPassword("crawler"); - - private static Connection testConnection; - - @BeforeAll - static void beforeAll() throws Exception { - // Create table - testConnection = - DriverManager.getConnection( - mysqlContainer.getJdbcUrl(), - mysqlContainer.getUsername(), - mysqlContainer.getPassword()); +class SQLSpoutTest extends AbstractSQLTest { + + @Override + protected void setupTestTables() throws Exception { try (Statement stmt = testConnection.createStatement()) { + stmt.executeQuery("DROP TABLE IF EXISTS urls"); stmt.execute( """ - CREATE TABLE urls ( + CREATE TABLE IF NOT EXISTS urls ( url VARCHAR(255), status VARCHAR(16) DEFAULT 'DISCOVERED', nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, @@ -85,23 +58,8 @@ PRIMARY KEY(url) ) """); - // Add indexes - stmt.execute("ALTER TABLE urls ADD INDEX b (`bucket`)"); - stmt.execute("ALTER TABLE urls ADD INDEX t (`nextfetchdate`)"); - stmt.execute("ALTER TABLE urls ADD INDEX h (`host`)"); - } - } - - @BeforeEach - void setup() throws Exception { - // Clear any existing test data - clearAllURLs(); - } - - @AfterAll - static void afterAll() throws Exception { - if (testConnection != null) { - testConnection.close(); + // Clear any existing test data + stmt.execute("DELETE FROM urls"); } } @@ -219,8 +177,7 @@ void testBucketPartitioningTwoInstances() throws Exception { } } - private static void insertTestURL(String url, int bucket, String host, Instant time) - throws Exception { + private void insertTestURL(String url, int bucket, String host, Instant time) throws Exception { String sql = """ INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host) @@ -238,21 +195,10 @@ INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host) } } - /** Helper method to clear all URLs from the database between tests. */ - private static void clearAllURLs() throws Exception { - try (Statement stmt = testConnection.createStatement()) { - stmt.execute("DELETE FROM urls"); - } - } - private Map createTestConfig() { Map conf = new HashMap<>(); - Map sqlConnection = new HashMap<>(); - sqlConnection.put("url", mysqlContainer.getJdbcUrl()); - sqlConnection.put("user", mysqlContainer.getUsername()); - sqlConnection.put("password", mysqlContainer.getPassword()); - conf.put("sql.connection", sqlConnection); + conf.put("sql.connection", createSqlConnectionConfig()); conf.put("sql.status.table", "urls"); conf.put("sql.max.urls.per.bucket", 5); diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index bf3bc6a5b..d559c2add 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -22,8 +22,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.HashMap; @@ -34,41 +32,17 @@ import org.apache.stormcrawler.TestOutputCollector; import org.apache.stormcrawler.TestUtil; import org.apache.stormcrawler.persistence.Status; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -@Testcontainers(disabledWithoutDocker = true) -class StatusUpdaterBoltTest { +class StatusUpdaterBoltTest extends AbstractSQLTest { - private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); - - @Container - private static final MySQLContainer mysqlContainer = - new MySQLContainer<>(MYSQL_IMAGE) - .withDatabaseName("crawl") - .withUsername("crawler") - .withPassword("crawler"); - - private Connection testConnection; private TestOutputCollector output; - @BeforeEach - void setup() throws Exception { - output = new TestOutputCollector(); - // Create table - testConnection = - DriverManager.getConnection( - mysqlContainer.getJdbcUrl(), - mysqlContainer.getUsername(), - mysqlContainer.getPassword()); - + @Override + protected void setupTestTables() throws Exception { try (Statement stmt = testConnection.createStatement()) { - + stmt.executeQuery("DROP TABLE IF EXISTS urls"); stmt.execute( """ CREATE TABLE IF NOT EXISTS urls ( @@ -81,14 +55,14 @@ host VARCHAR(128), PRIMARY KEY(url) ) """); + // Clear table before each test + stmt.execute("TRUNCATE TABLE urls"); } } - @AfterEach - void cleanup() throws Exception { - if (testConnection != null) { - testConnection.close(); - } + @BeforeEach + void setup() { + output = new TestOutputCollector(); } @Test @@ -152,11 +126,7 @@ private Tuple createTuple(String url, Status status, Metadata metadata) { private Map createTestConfig() { Map conf = new HashMap<>(); - Map sqlConnection = new HashMap<>(); - sqlConnection.put("url", mysqlContainer.getJdbcUrl()); - sqlConnection.put("user", mysqlContainer.getUsername()); - sqlConnection.put("password", mysqlContainer.getPassword()); - conf.put("sql.connection", sqlConnection); + conf.put("sql.connection", createSqlConnectionConfig()); conf.put("sql.status.table", "urls"); conf.put("sql.status.max.urls.per.bucket", 10); From 051f0d90ad86c770820263c52c2357465c0d53f9 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:22:28 +0000 Subject: [PATCH 13/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java | 3 +-- .../test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java | 2 +- .../test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java | 2 +- .../org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java index 8e6c4bb02..b1394ad76 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -65,7 +65,7 @@ protected static Map createSqlConnectionConfig() { } @BeforeEach - void setup() throws Exception { + void baseSetup() throws Exception { testConnection = createConnection(); setupTestTables(); } @@ -77,6 +77,5 @@ void baseCleanup() throws Exception { if (testConnection != null) { testConnection.close(); } - MYSQL_CONTAINER.close(); } } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java index 9ac9de784..630461e08 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -47,7 +47,7 @@ class IndexerBoltTest extends AbstractSQLTest { private final String tableName = "content"; @Override - public void setupTestTables() throws Exception { + protected void setupTestTables() throws Exception { try (Statement stmt = testConnection.createStatement()) { stmt.execute( """ diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java index 11eddd14a..eb028a677 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -44,7 +44,7 @@ class SQLSpoutTest extends AbstractSQLTest { @Override protected void setupTestTables() throws Exception { try (Statement stmt = testConnection.createStatement()) { - stmt.executeQuery("DROP TABLE IF EXISTS urls"); + stmt.execute("DROP TABLE IF EXISTS urls"); stmt.execute( """ CREATE TABLE IF NOT EXISTS urls ( diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index d559c2add..03332c6e3 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -42,7 +42,7 @@ class StatusUpdaterBoltTest extends AbstractSQLTest { @Override protected void setupTestTables() throws Exception { try (Statement stmt = testConnection.createStatement()) { - stmt.executeQuery("DROP TABLE IF EXISTS urls"); + stmt.execute("DROP TABLE IF EXISTS urls"); stmt.execute( """ CREATE TABLE IF NOT EXISTS urls ( From 28a859674fda6960926485177dc02d5e0cb87a78 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:24:36 +0000 Subject: [PATCH 14/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../stormcrawler/sql/AbstractSQLTest.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java index b1394ad76..1c537527a 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -21,9 +21,13 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; /** @@ -31,21 +35,19 @@ * Testcontainers singleton pattern to ensure the container is created once and reused across all * test classes, improving test performance. */ +@Testcontainers(disabledWithoutDocker = true) +@Timeout(value = 120, unit = TimeUnit.SECONDS) public abstract class AbstractSQLTest { private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); - private static final MySQLContainer MYSQL_CONTAINER; - - static { - MYSQL_CONTAINER = - new MySQLContainer<>(MYSQL_IMAGE) - .withDatabaseName("crawl") - .withUsername("crawler") - .withPassword("crawler") - .withReuse(true); - MYSQL_CONTAINER.start(); - } + @Container + private static final MySQLContainer MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_IMAGE) + .withDatabaseName("crawl") + .withUsername("crawler") + .withPassword("crawler") + .withReuse(true); protected Connection testConnection; From c43e08f32fa45dbaff21565a5814cca1900197f3 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:27:44 +0000 Subject: [PATCH 15/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../apache/stormcrawler/sql/AbstractSQLTest.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java index 1c537527a..5259973c4 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -22,7 +22,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Timeout; import org.testcontainers.containers.MySQLContainer; @@ -49,7 +50,7 @@ public abstract class AbstractSQLTest { .withPassword("crawler") .withReuse(true); - protected Connection testConnection; + protected static Connection testConnection; protected static Connection createConnection() throws SQLException { return DriverManager.getConnection( @@ -66,16 +67,20 @@ protected static Map createSqlConnectionConfig() { return sqlConnection; } + @BeforeAll + static void init() throws SQLException { + testConnection = createConnection(); + } + @BeforeEach void baseSetup() throws Exception { - testConnection = createConnection(); setupTestTables(); } protected abstract void setupTestTables() throws Exception; - @AfterEach - void baseCleanup() throws Exception { + @AfterAll + static void cleanup() throws Exception { if (testConnection != null) { testConnection.close(); } From bf445375503b729a1731778aa5dac56dec6a8631 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:35:31 +0000 Subject: [PATCH 16/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../java/org/apache/stormcrawler/sql/AbstractSQLTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java index 5259973c4..61cea28fc 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -38,7 +38,7 @@ */ @Testcontainers(disabledWithoutDocker = true) @Timeout(value = 120, unit = TimeUnit.SECONDS) -public abstract class AbstractSQLTest { +abstract class AbstractSQLTest { private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); @@ -50,16 +50,16 @@ public abstract class AbstractSQLTest { .withPassword("crawler") .withReuse(true); - protected static Connection testConnection; + static Connection testConnection; - protected static Connection createConnection() throws SQLException { + static Connection createConnection() throws SQLException { return DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword()); } - protected static Map createSqlConnectionConfig() { + static Map createSqlConnectionConfig() { Map sqlConnection = new HashMap<>(); sqlConnection.put("url", MYSQL_CONTAINER.getJdbcUrl()); sqlConnection.put("user", MYSQL_CONTAINER.getUsername()); From 5e47e81b5dfe682164834cb80969adf89085d744 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:37:03 +0000 Subject: [PATCH 17/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../java/org/apache/stormcrawler/sql/IndexerBoltTest.java | 4 ---- .../test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java | 3 --- .../org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java | 3 --- 3 files changed, 10 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java index 630461e08..b5fa9bd84 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -309,19 +309,15 @@ private Tuple createTuple(String url, String text, Metadata metadata) { private Map createBasicConfig() { Map conf = new HashMap<>(); - conf.put("sql.connection", createSqlConnectionConfig()); - conf.put(IndexerBolt.SQL_INDEX_TABLE_PARAM_NAME, tableName); conf.put(AbstractIndexerBolt.urlFieldParamName, "url"); - // Default metadata mapping List mdMapping = new ArrayList<>(); mdMapping.add("title"); mdMapping.add("description"); mdMapping.add("keywords"); conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); - return conf; } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java index eb028a677..9b9bfd615 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -197,15 +197,12 @@ INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host) private Map createTestConfig() { Map conf = new HashMap<>(); - conf.put("sql.connection", createSqlConnectionConfig()); - conf.put("sql.status.table", "urls"); conf.put("sql.max.urls.per.bucket", 5); conf.put("sql.spout.max.results", 100); conf.put( "urlbuffer.class", "org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer"); - return conf; } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index 03332c6e3..bc292b451 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -125,15 +125,12 @@ private Tuple createTuple(String url, Status status, Metadata metadata) { private Map createTestConfig() { Map conf = new HashMap<>(); - conf.put("sql.connection", createSqlConnectionConfig()); - conf.put("sql.status.table", "urls"); conf.put("sql.status.max.urls.per.bucket", 10); conf.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler"); conf.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h"); conf.put("sql.update.batch.size", 1); - return conf; } From a1f3691de50a540f22c19f9b590564e34bd4a7ad Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:45:52 +0000 Subject: [PATCH 18/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../stormcrawler/sql/AbstractSQLTest.java | 7 +++++ .../stormcrawler/sql/IndexerBoltTest.java | 26 ++++++++-------- .../apache/stormcrawler/sql/SQLSpoutTest.java | 12 ++------ .../sql/StatusUpdaterBoltTest.java | 30 ++++++++----------- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java index 61cea28fc..a68b7eab5 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -19,6 +19,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.sql.Statement; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -67,6 +68,12 @@ static Map createSqlConnectionConfig() { return sqlConnection; } + void execute(String sql) throws SQLException { + try (Statement stmt = testConnection.createStatement()) { + stmt.execute(sql); + } + } + @BeforeAll static void init() throws SQLException { testConnection = createConnection(); diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java index b5fa9bd84..bcbcb43db 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -48,19 +48,19 @@ class IndexerBoltTest extends AbstractSQLTest { @Override protected void setupTestTables() throws Exception { - try (Statement stmt = testConnection.createStatement()) { - stmt.execute( - """ - CREATE TABLE IF NOT EXISTS content ( - url VARCHAR(255) PRIMARY KEY, - title VARCHAR(255), - description TEXT, - keywords VARCHAR(255) - ) - """); - // Clear table before each test - stmt.execute("TRUNCATE TABLE content"); - } + execute( + """ + DROP TABLE IF EXISTS content + """); + execute( + """ + CREATE TABLE IF NOT EXISTS content ( + url VARCHAR(255) PRIMARY KEY, + title VARCHAR(255), + description TEXT, + keywords VARCHAR(255) + ) + """); } @BeforeEach diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java index 9b9bfd615..9b65f2e25 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -22,7 +22,6 @@ import java.lang.reflect.Field; import java.sql.PreparedStatement; -import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -43,10 +42,9 @@ class SQLSpoutTest extends AbstractSQLTest { @Override protected void setupTestTables() throws Exception { - try (Statement stmt = testConnection.createStatement()) { - stmt.execute("DROP TABLE IF EXISTS urls"); - stmt.execute( - """ + execute("DROP TABLE IF EXISTS urls"); + execute( + """ CREATE TABLE IF NOT EXISTS urls ( url VARCHAR(255), status VARCHAR(16) DEFAULT 'DISCOVERED', @@ -57,10 +55,6 @@ host VARCHAR(128), PRIMARY KEY(url) ) """); - - // Clear any existing test data - stmt.execute("DELETE FROM urls"); - } } @Test diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index bc292b451..3fd2e42e6 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -41,23 +41,19 @@ class StatusUpdaterBoltTest extends AbstractSQLTest { @Override protected void setupTestTables() throws Exception { - try (Statement stmt = testConnection.createStatement()) { - stmt.execute("DROP TABLE IF EXISTS urls"); - stmt.execute( - """ - CREATE TABLE IF NOT EXISTS urls ( - url VARCHAR(255), - status VARCHAR(16) DEFAULT 'DISCOVERED', - nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - metadata TEXT, - bucket SMALLINT DEFAULT 0, - host VARCHAR(128), - PRIMARY KEY(url) - ) - """); - // Clear table before each test - stmt.execute("TRUNCATE TABLE urls"); - } + execute("DROP TABLE IF EXISTS urls"); + execute( + """ + CREATE TABLE IF NOT EXISTS urls ( + url VARCHAR(255), + status VARCHAR(16) DEFAULT 'DISCOVERED', + nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata TEXT, + bucket SMALLINT DEFAULT 0, + host VARCHAR(128), + PRIMARY KEY(url) + ) + """); } @BeforeEach From 0a6ec5c9ef1c78f752a8e822d64680cdbebed399 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Thu, 1 Jan 2026 23:47:08 +0000 Subject: [PATCH 19/25] Introduce AbstractSQLTest for shared MySQL container setup in SQL tests --- .../apache/stormcrawler/sql/AbstractSQLTest.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java index a68b7eab5..bf454cf15 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -53,13 +53,6 @@ abstract class AbstractSQLTest { static Connection testConnection; - static Connection createConnection() throws SQLException { - return DriverManager.getConnection( - MYSQL_CONTAINER.getJdbcUrl(), - MYSQL_CONTAINER.getUsername(), - MYSQL_CONTAINER.getPassword()); - } - static Map createSqlConnectionConfig() { Map sqlConnection = new HashMap<>(); sqlConnection.put("url", MYSQL_CONTAINER.getJdbcUrl()); @@ -76,7 +69,11 @@ void execute(String sql) throws SQLException { @BeforeAll static void init() throws SQLException { - testConnection = createConnection(); + testConnection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); } @BeforeEach From 43658c2cc3b58422bc5e91cc56324d46b9da0cff Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Fri, 2 Jan 2026 00:02:48 +0000 Subject: [PATCH 20/25] Refactor StatusUpdaterBoltTest to improve setup and cleanup methods --- .../stormcrawler/sql/StatusUpdaterBoltTest.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java index 3fd2e42e6..4108150ff 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -32,12 +32,14 @@ import org.apache.stormcrawler.TestOutputCollector; import org.apache.stormcrawler.TestUtil; import org.apache.stormcrawler.persistence.Status; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class StatusUpdaterBoltTest extends AbstractSQLTest { private TestOutputCollector output; + private StatusUpdaterBolt bolt; @Override protected void setupTestTables() throws Exception { @@ -59,11 +61,16 @@ PRIMARY KEY(url) @BeforeEach void setup() { output = new TestOutputCollector(); + bolt = createBolt(); + } + + @AfterEach + void close() { + bolt.cleanup(); } @Test void testStoreDiscoveredURL() throws Exception { - StatusUpdaterBolt bolt = createBolt(); String url = "http://example.com/page1"; Metadata metadata = new Metadata(); metadata.addValue("key1", "value1"); @@ -88,7 +95,6 @@ void testStoreDiscoveredURL() throws Exception { @Test void testUpdateURL() throws Exception { - StatusUpdaterBolt bolt = createBolt(); String url = "http://example.com/page2"; Metadata metadata = new Metadata(); metadata.addValue("key1", "value1"); @@ -131,9 +137,10 @@ private Map createTestConfig() { } private StatusUpdaterBolt createBolt() { - StatusUpdaterBolt bolt = new StatusUpdaterBolt(); + StatusUpdaterBolt statusUpdaterBolt = new StatusUpdaterBolt(); Map conf = createTestConfig(); - bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); - return bolt; + statusUpdaterBolt.prepare( + conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + return statusUpdaterBolt; } } From 86e9f3a05c570efd2a22b0d5ca633e6303d9d849 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Fri, 2 Jan 2026 00:07:44 +0000 Subject: [PATCH 21/25] Undo changes in MetricsConsumer --- .../sql/metrics/MetricsConsumer.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java index 2641d5ea4..0062f154b 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsConsumer.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.Collection; import java.util.Map; +import java.util.Map.Entry; import org.apache.storm.metric.api.IMetricsConsumer; import org.apache.storm.task.IErrorReporter; import org.apache.storm.task.TopologyContext; @@ -34,7 +35,7 @@ public class MetricsConsumer implements IMetricsConsumer { - private static final Logger LOG = LoggerFactory.getLogger(MetricsConsumer.class); + private final Logger LOG = LoggerFactory.getLogger(getClass()); private Connection connection; private String query; @@ -63,11 +64,13 @@ public void prepare( public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { final Timestamp now = Timestamp.from(Instant.now()); - try (PreparedStatement preparedStmt = connection.prepareStatement(query)) { + try { + PreparedStatement preparedStmt = connection.prepareStatement(query); for (DataPoint dataPoint : dataPoints) { handleDataPoints(preparedStmt, taskInfo, dataPoint.name, dataPoint.value, now); } preparedStmt.executeBatch(); + preparedStmt.close(); } catch (SQLException ex) { LOG.error(ex.getMessage(), ex); throw new RuntimeException(ex); @@ -80,25 +83,24 @@ private void handleDataPoints( final String nameprefix, final Object value, final Timestamp now) { - if (value instanceof final Number number) { + if (value instanceof Number) { try { - indexDataPoint(preparedStmt, taskInfo, now, nameprefix, (number).doubleValue()); + indexDataPoint( + preparedStmt, taskInfo, now, nameprefix, ((Number) value).doubleValue()); } catch (SQLException e) { LOG.error("Exception while indexing datapoint", e); } - } else if (value instanceof Map map) { - for (Map.Entry entry : map.entrySet()) { - if (entry.getKey() instanceof String key) { - String newNamePrefix = nameprefix + "." + key; - handleDataPoints(preparedStmt, taskInfo, newNamePrefix, entry.getValue(), now); - } + } else if (value instanceof Map) { + for (Entry entry : ((Map) value).entrySet()) { + String newnameprefix = nameprefix + "." + entry.getKey(); + handleDataPoints(preparedStmt, taskInfo, newnameprefix, entry.getValue(), now); } - } else if (value instanceof Collection collection) { - for (Object collectionObj : collection) { + } else if (value instanceof Collection) { + for (Object collectionObj : (Collection) value) { handleDataPoints(preparedStmt, taskInfo, nameprefix, collectionObj, now); } } else { - LOG.warn("Found data point value {} of {}", nameprefix, value.getClass()); + LOG.warn("Found data point value {} of {}", nameprefix, value.getClass().toString()); } } From f7859eed5848863d72ce15b22c0802ff09c3066c Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Fri, 2 Jan 2026 00:51:10 +0000 Subject: [PATCH 22/25] minor changes --- .../test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java index bcbcb43db..f4711e37d 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -126,6 +126,7 @@ void testDuplicateHandling() throws Exception { "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { assertTrue(rs.next()); assertEquals("Original Title", rs.getString("title")); + assertEquals("Original description", rs.getString("description")); } // Second indexing with updated content (same URL) @@ -144,8 +145,6 @@ void testDuplicateHandling() throws Exception { assertTrue(rs.next()); assertEquals("Updated Title", rs.getString("title")); assertEquals("Updated description", rs.getString("description")); - // Should only be one row - assertFalse(rs.next(), "Should only have one row for the URL"); } // Verify both tuples were acked From 190efb28b906645f21f0cc52a5388c0a74047647 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Fri, 2 Jan 2026 01:11:52 +0000 Subject: [PATCH 23/25] minor changes --- .../stormcrawler/sql/IndexerBoltTest.java | 65 +++++++------------ 1 file changed, 24 insertions(+), 41 deletions(-) diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java index f4711e37d..5a34a6b79 100644 --- a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -71,16 +71,9 @@ void setup() { @Test void testBasicIndexing() throws Exception { IndexerBolt bolt = createBolt(createBasicConfig()); - String url = "http://example.com/page1"; - String text = "This is the page content"; - Metadata metadata = new Metadata(); - metadata.addValue("title", "Test Page Title"); - metadata.addValue("description", "Test page description"); - metadata.addValue("keywords", "test, page, keywords"); - Tuple tuple = createTuple(url, text, metadata); - bolt.execute(tuple); + executeTuple(bolt, url, "This is the page content", getMetadata()); // Verify URL was stored in database try (Statement stmt = testConnection.createStatement(); @@ -110,34 +103,15 @@ void testDuplicateHandling() throws Exception { IndexerBolt bolt = createBolt(createBasicConfig()); String url = "http://example.com/page2"; - - // First indexing - Metadata metadata1 = new Metadata(); - metadata1.addValue("title", "Original Title"); - metadata1.addValue("description", "Original description"); - - Tuple tuple1 = createTuple(url, "Original content", metadata1); - bolt.execute(tuple1); - - // Verify first insert - try (Statement stmt = testConnection.createStatement(); - ResultSet rs = - stmt.executeQuery( - "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { - assertTrue(rs.next()); - assertEquals("Original Title", rs.getString("title")); - assertEquals("Original description", rs.getString("description")); - } + executeTuple(bolt, url, "Original content", getMetadata()); // Second indexing with updated content (same URL) - Metadata metadata2 = new Metadata(); - metadata2.addValue("title", "Updated Title"); - metadata2.addValue("description", "Updated description"); + Metadata metadata = new Metadata(); + metadata.addValue("title", "Updated Title"); + metadata.addValue("description", "Updated description"); - Tuple tuple2 = createTuple(url, "Updated content", metadata2); - bolt.execute(tuple2); + executeTuple(bolt, url, "Updated content", metadata); - // Verify ON DUPLICATE KEY UPDATE worked - should have updated values try (Statement stmt = testConnection.createStatement(); ResultSet rs = stmt.executeQuery( @@ -145,9 +119,9 @@ void testDuplicateHandling() throws Exception { assertTrue(rs.next()); assertEquals("Updated Title", rs.getString("title")); assertEquals("Updated description", rs.getString("description")); + assertEquals("test, page, keywords", rs.getString("keywords")); } - // Verify both tuples were acked assertEquals(2, output.getAckedTuples().size()); bolt.cleanup(); } @@ -161,8 +135,7 @@ void testFilteringByRobotsNoIndex() throws Exception { metadata.addValue("title", "Should Not Be Indexed"); metadata.addValue(RobotsTags.ROBOTS_NO_INDEX, "true"); - Tuple tuple = createTuple(url, "Content", metadata); - bolt.execute(tuple); + executeTuple(bolt, url, "Content", metadata); // Verify URL was NOT stored in database try (Statement stmt = testConnection.createStatement(); @@ -194,8 +167,7 @@ void testFilteringByMetadataFilter() throws Exception { Metadata metadata1 = new Metadata(); metadata1.addValue("title", "Filtered Page"); - Tuple tuple1 = createTuple(url1, "Content", metadata1); - bolt.execute(tuple1); + executeTuple(bolt, url1, "Content", metadata1); // Verify filtered document was NOT stored try (Statement stmt = testConnection.createStatement(); @@ -248,8 +220,7 @@ void testMetadataExtraction() throws Exception { metadata.addValue("keywords", "these,should,not,be,stored"); metadata.addValue("author", "Should Not Be Stored"); - Tuple tuple = createTuple(url, "Content", metadata); - bolt.execute(tuple); + executeTuple(bolt, url, "Content", metadata); // Verify only configured metadata was stored try (Statement stmt = testConnection.createStatement(); @@ -283,8 +254,7 @@ void testMetadataAliasMapping() throws Exception { metadata.addValue("parse.title", "Title from Parser"); metadata.addValue("parse.description", "Description from Parser"); - Tuple tuple = createTuple(url, "Content", metadata); - bolt.execute(tuple); + executeTuple(bolt, url, "Content", metadata); // Verify aliased metadata was stored correctly try (Statement stmt = testConnection.createStatement(); @@ -306,6 +276,11 @@ private Tuple createTuple(String url, String text, Metadata metadata) { return tuple; } + private void executeTuple(IndexerBolt bolt, String url, String text, Metadata metadata) { + Tuple tuple = createTuple(url, text, metadata); + bolt.execute(tuple); + } + private Map createBasicConfig() { Map conf = new HashMap<>(); conf.put("sql.connection", createSqlConnectionConfig()); @@ -325,4 +300,12 @@ private IndexerBolt createBolt(Map conf) { bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); return bolt; } + + private Metadata getMetadata() { + Metadata metadata = new Metadata(); + metadata.addValue("title", "Test Page Title"); + metadata.addValue("description", "Test page description"); + metadata.addValue("keywords", "test, page, keywords"); + return metadata; + } } From b8f17576205e35b56db50e7eb16f6ba6f89e333f Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Wed, 7 Jan 2026 18:50:35 +0000 Subject: [PATCH 24/25] Changes following PR review. --- external/sql/pom.xml | 1 - .../main/java/org/apache/stormcrawler/sql/IndexerBolt.java | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/external/sql/pom.xml b/external/sql/pom.xml index b4ef395a5..2b0ece448 100644 --- a/external/sql/pom.xml +++ b/external/sql/pom.xml @@ -51,7 +51,6 @@ under the License. com.mysql mysql-connector-j - test org.apache.stormcrawler diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index dc3b570f7..d9b0db0c0 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -80,7 +80,6 @@ public void execute(Tuple tuple) { String normalisedurl = valueForURL(tuple); Metadata metadata = (Metadata) tuple.getValueByField("metadata"); - tuple.getStringByField("text"); boolean keep = filterDocument(metadata); if (!keep) { @@ -98,7 +97,7 @@ public void execute(Tuple tuple) { Map keyVals = filterMetadata(metadata); List keys = new ArrayList<>(keyVals.keySet()); - String query = getQuery(keys); + String query = buildQuery(keys); if (connection == null) { try { @@ -168,7 +167,7 @@ private void insert( preparedStmt.setString(position, value); } - private String getQuery(final List keys) { + private String buildQuery(final List keys) { final String columns = String.join(", ", keys); final String placeholders = keys.stream().map(k -> "?").collect(Collectors.joining(", ")); From 5ef9728edcaa5bae6532c92567b44bd9b7f9f76b Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Wed, 7 Jan 2026 18:51:45 +0000 Subject: [PATCH 25/25] Changes following PR review. --- pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4c14149a1..2ad34c1de 100644 --- a/pom.xml +++ b/pom.xml @@ -678,7 +678,6 @@ under the License. com.mysql mysql-connector-j ${mysql-connector-j} - test