diff --git a/core/src/test/java/org/apache/stormcrawler/TestUtil.java b/core/src/test/java/org/apache/stormcrawler/TestUtil.java index 033255c40..d8358489c 100644 --- a/core/src/test/java/org/apache/stormcrawler/TestUtil.java +++ b/core/src/test/java/org/apache/stormcrawler/TestUtil.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.when; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.storm.metric.api.IMetric; import org.apache.storm.task.TopologyContext; @@ -48,6 +50,42 @@ public IMetric answer(InvocationOnMock invocation) throws Throwable { return context; } + /** + * Creates a mocked TopologyContext for testing bucket partitioning. + * + * @param taskIndex The task index for this spout instance (determines bucket number) + * @param totalTasks Total number of tasks in the topology + * @param componentId The component ID + * @return Mocked TopologyContext + */ + public static TopologyContext getMockedTopologyContextWithBucket( + int taskIndex, int totalTasks, String componentId) { + TopologyContext context = mock(TopologyContext.class); + + // Mock metric registration + when(context.registerMetric(anyString(), any(IMetric.class), anyInt())) + .thenAnswer( + new Answer() { + @Override + public IMetric answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArgument(1, IMetric.class); + } + }); + + // Mock task information for bucket assignment + when(context.getThisTaskIndex()).thenReturn(taskIndex); + when(context.getThisComponentId()).thenReturn(componentId); + + // Create list of task IDs (0 to totalTasks-1) + List taskIds = new ArrayList<>(); + for (int i = 0; i < totalTasks; i++) { + taskIds.add(i); + } + when(context.getComponentTasks(componentId)).thenReturn(taskIds); + + return context; + } + public static Tuple getMockedTestTuple(String url, String content, Metadata metadata) { Tuple tuple = mock(Tuple.class); when(tuple.getStringByField("url")).thenReturn(url); diff --git a/external/sql/pom.xml b/external/sql/pom.xml index 0eece4e64..2b0ece448 100644 --- a/external/sql/pom.xml +++ b/external/sql/pom.xml @@ -37,13 +37,28 @@ under the License. SQL-based resources for StormCrawler - + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + mysql + test + + + com.mysql + mysql-connector-j + + + org.apache.stormcrawler + stormcrawler-core + ${project.version} + test-jar + test + diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index 89b711f32..d9b0db0c0 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -21,7 +21,11 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; @@ -42,12 +46,14 @@ public class IndexerBolt extends AbstractIndexerBolt { public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table"; - private OutputCollector _collector; + private OutputCollector collector; private MultiCountMetric eventCounter; private Connection connection; + private PreparedStatement preparedStmt; + private String tableName; private Map conf; @@ -56,7 +62,7 @@ public class IndexerBolt extends AbstractIndexerBolt { public void prepare( Map conf, TopologyContext context, OutputCollector collector) { super.prepare(conf, context, collector); - _collector = collector; + this.collector = collector; this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10); @@ -74,15 +80,14 @@ public void execute(Tuple tuple) { String normalisedurl = valueForURL(tuple); Metadata metadata = (Metadata) tuple.getValueByField("metadata"); - String text = tuple.getStringByField("text"); boolean keep = filterDocument(metadata); if (!keep) { eventCounter.scope("Filtered").incrBy(1); // treat it as successfully processed even if // we do not index it - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); - _collector.ack(tuple); + collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + collector.ack(tuple); return; } @@ -90,35 +95,9 @@ public void execute(Tuple tuple) { // which metadata to display? Map keyVals = filterMetadata(metadata); + List keys = new ArrayList<>(keyVals.keySet()); - StringBuilder query = - new StringBuilder(" insert into ") - .append(tableName) - .append(" (") - .append(fieldNameForURL()); - - Object[] keys = keyVals.keySet().toArray(); - - for (Object o : keys) { - query.append(", ").append((String) o); - } - - query.append(") values(?"); - - for (int i = 0; i < keys.length; i++) { - query.append(", ?"); - } - - query.append(")"); - - query.append(" ON DUPLICATE KEY UPDATE "); - for (int i = 0; i < keys.length; i++) { - String key = (String) keys[i]; - if (i > 0) { - query.append(", "); - } - query.append(key).append("=VALUES(").append(key).append(")"); - } + String query = buildQuery(keys); if (connection == null) { try { @@ -131,9 +110,8 @@ public void execute(Tuple tuple) { LOG.debug("PreparedStatement => {}", query); - // create the mysql insert preparedstatement - PreparedStatement preparedStmt = connection.prepareStatement(query.toString()); - + // create the mysql insert PreparedStatement + preparedStmt = connection.prepareStatement(query); // TODO store the text of the document? if (StringUtils.isNotBlank(fieldNameForText())) { // builder.field(fieldNameForText(), trimText(text)); @@ -144,21 +122,21 @@ public void execute(Tuple tuple) { preparedStmt.setString(1, normalisedurl); } - for (int i = 0; i < keys.length; i++) { - insert(preparedStmt, i + 2, (String) keys[i], keyVals); + // Set all metadata parameters + for (int i = 0; i < keys.size(); i++) { + insert(preparedStmt, i + 2, keys.get(i), keyVals); } - + // Execute the statement (single row insert) preparedStmt.executeUpdate(); eventCounter.scope("Indexed").incrBy(1); - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); - _collector.ack(tuple); - + collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + collector.ack(tuple); } catch (Exception e) { // do not send to status stream so that it gets replayed LOG.error("Error inserting into SQL", e); - _collector.fail(tuple); + collector.fail(tuple); if (connection != null) { // reset the connection try { @@ -188,4 +166,41 @@ private void insert( } preparedStmt.setString(position, value); } + + private String buildQuery(final List keys) { + final String columns = String.join(", ", keys); + final String placeholders = keys.stream().map(k -> "?").collect(Collectors.joining(", ")); + + final String updates = + keys.stream() + .map(k -> String.format(Locale.ROOT, "%s=VALUES(%s)", k, k)) + .collect(Collectors.joining(", ")); + + // Build the ON DUPLICATE KEY UPDATE clause + // If there are metadata keys, update them; otherwise, update the URL field to itself + final String updateClause = + updates.isEmpty() + ? String.format( + Locale.ROOT, "%s=VALUES(%s)", fieldNameForURL(), fieldNameForURL()) + : updates; + + return String.format( + Locale.ROOT, + """ + INSERT INTO %s (%s%s) + VALUES (?%s) + ON DUPLICATE KEY UPDATE %s + """, + tableName, + fieldNameForURL(), + columns.isEmpty() ? "" : ", " + columns, + placeholders.isEmpty() ? "" : ", " + placeholders, + updateClause); + } + + @Override + public void cleanup() { + SQLUtil.closeResource(preparedStmt, "prepared statement"); + SQLUtil.closeResource(connection, "connection"); + } } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index 084c5de9c..9f2f06388 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -19,12 +19,13 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.storm.spout.Scheme; import org.apache.storm.spout.SpoutOutputCollector; @@ -43,9 +44,39 @@ public class SQLSpout extends AbstractQueryingSpout { private static final Scheme SCHEME = new StringTabScheme(); - private String tableName; + private static final String BASE_SQL = + """ + SELECT * + FROM ( + SELECT + rank() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, + url, + metadata, + nextfetchdate + FROM %s + WHERE nextfetchdate <= ? %s + ) AS urls_ranks + WHERE urls_ranks.ranking <= ? + ORDER BY ranking %s + """; + + private static final String BUCKET_CLAUSE = + """ + AND bucket = ? + """; + + private static final String LIMIT_CLAUSE = + """ + LIMIT ? + """; + + private static final String URL_COLUMN = "url"; + private static final String METADATA_COLUMN = "metadata"; + + private static String preparedSql; private Connection connection; + private PreparedStatement ps; /** * if more than one instance of the spout exist, each one is in charge of a separate bucket @@ -70,7 +101,8 @@ public void open( maxDocsPerBucket = ConfUtils.getInt(conf, Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5); - tableName = ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); + final String tableName = + ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); maxNumResults = ConfUtils.getInt(conf, Constants.SQL_MAXRESULTS_PARAM_NAME, 100); @@ -88,6 +120,18 @@ public void open( "[" + context.getThisComponentId() + " #" + context.getThisTaskIndex() + "] "; bucketNum = context.getThisTaskIndex(); } + + final String bucketClause = (bucketNum >= 0) ? BUCKET_CLAUSE : ""; + final String limitClause = (maxNumResults != -1) ? LIMIT_CLAUSE : ""; + + preparedSql = String.format(Locale.ROOT, BASE_SQL, tableName, bucketClause, limitClause); + + try { + ps = connection.prepareStatement(preparedSql); + } catch (SQLException e) { + LOG.error("Failed to prepare statement", e); + throw new RuntimeException(e); + } } @Override @@ -117,97 +161,41 @@ protected void populateBuffer() { // https://mariadb.com/kb/en/library/window-functions-overview/ // http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/ - String query = - "SELECT * from (select rank() over (partition by host order by nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from " - + tableName; - - query += - " WHERE nextfetchdate <= '" + new Timestamp(lastNextFetchDate.toEpochMilli()) + "'"; - - // constraint on bucket num - if (bucketNum >= 0) { - query += " AND bucket = '" + bucketNum + "'"; - } - - query += - ") as urls_ranks where (urls_ranks.ranking <= " - + maxDocsPerBucket - + ") order by ranking"; - - if (maxNumResults != -1) { - query += " LIMIT " + this.maxNumResults; - } - int alreadyprocessed = 0; int numhits = 0; long timeStartQuery = System.currentTimeMillis(); - // create the java statement - Statement st = null; - ResultSet rs = null; try { - st = this.connection.createStatement(); + int i = 1; + ps.setTimestamp(i++, new Timestamp(lastNextFetchDate.toEpochMilli())); - // dump query to log - LOG.debug("{} SQL query {}", logIdprefix, query); - - // execute the query, and get a java resultset - rs = st.executeQuery(query); - - long timeTaken = System.currentTimeMillis() - timeStartQuery; - queryTimes.addMeasurement(timeTaken); - - // iterate through the java resultset - while (rs.next()) { - String url = rs.getString("url"); - numhits++; - // already processed? skip - if (beingProcessed.containsKey(url)) { - alreadyprocessed++; - continue; - } - String metadata = rs.getString("metadata"); - if (metadata == null) { - metadata = ""; - } else if (!metadata.startsWith("\t")) { - metadata = "\t" + metadata; - } - String URLMD = url + metadata; - List v = - SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8))); - buffer.add(url, (Metadata) v.get(1)); + if (bucketNum >= 0) { + ps.setInt(i++, bucketNum); } - // no results? reset the date - if (numhits == 0) { - lastNextFetchDate = null; + ps.setInt(i++, maxDocsPerBucket); + + if (maxNumResults != -1) { + ps.setInt(i++, maxNumResults); } - eventCounter.scope("already_being_processed").incrBy(alreadyprocessed); - eventCounter.scope("queries").incrBy(1); - eventCounter.scope("docs").incrBy(numhits); + // dump query to log + LOG.debug("{} SQL query {}", logIdprefix, preparedSql); - LOG.info( - "{} SQL query returned {} hits in {} msec with {} already being processed", - logIdprefix, - numhits, - timeTaken, - alreadyprocessed); + try (ResultSet rs = ps.executeQuery()) { + final long timeTaken = recordQueryTiming(timeStartQuery); + + // iterate through the java resultset + while (rs.next()) { + numhits++; + alreadyprocessed += processRow(rs); + } + postProcessResults(numhits, alreadyprocessed, timeTaken); + } } catch (SQLException e) { LOG.error("Exception while querying table", e); - } finally { - try { - if (rs != null) rs.close(); - } catch (SQLException e) { - LOG.error("Exception closing resultset", e); - } - try { - if (st != null) st.close(); - } catch (SQLException e) { - LOG.error("Exception closing statement", e); - } } } @@ -226,10 +214,55 @@ public void fail(Object msgId) { @Override public void close() { super.close(); - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Exception caught while closing SQL connection", e); + SQLUtil.closeResource(ps, "prepared statement"); + SQLUtil.closeResource(connection, "connection"); + } + + private long recordQueryTiming(long timeStartQuery) { + long timeTaken = System.currentTimeMillis() - timeStartQuery; + queryTimes.addMeasurement(timeTaken); + return timeTaken; + } + + private int processRow(final ResultSet rs) throws SQLException { + + final String url = rs.getString(URL_COLUMN); + final String metadata = rs.getString(METADATA_COLUMN); + + // already processed? skip + if (beingProcessed.containsKey(url)) { + return 1; + } + + final String normalisedMetadata = + (metadata == null || metadata.startsWith("\t")) ? metadata : "\t" + metadata; + + final String urlWithMetadata = String.format(Locale.ROOT, "%s%s", url, normalisedMetadata); + final List v = + SCHEME.deserialize( + ByteBuffer.wrap(urlWithMetadata.getBytes(StandardCharsets.UTF_8))); + buffer.add(url, (Metadata) v.get(1)); + + return 0; + } + + private void postProcessResults( + final int numHits, final int alreadyProcessed, final long timeTaken) { + + // no results? reset the date + if (numHits == 0) { + lastNextFetchDate = null; } + + eventCounter.scope("already_being_processed").incrBy(alreadyProcessed); + eventCounter.scope("queries").incrBy(1); + eventCounter.scope("docs").incrBy(numHits); + + LOG.info( + "{} SQL query returned {} hits in {} msec with {} already being processed", + logIdprefix, + numHits, + timeTaken, + alreadyProcessed); } } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java index ad75f2432..c6c197a64 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLUtil.java @@ -20,23 +20,24 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; public class SQLUtil { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(SQLUtil.class); + private SQLUtil() {} public static Connection getConnection(Map stormConf) throws SQLException { // SQL connection details - Map sqlConf = (Map) stormConf.get("sql.connection"); + Map sqlConf = (Map) stormConf.get("sql.connection"); if (sqlConf == null) { throw new RuntimeException( "Missing SQL connection config, add a section 'sql.connection' to the configuration"); } - String url = (String) sqlConf.get("url"); + String url = sqlConf.get("url"); if (url == null) { throw new RuntimeException( "Missing SQL url, add an entry 'url' to the section 'sql.connection' of the configuration"); @@ -44,10 +45,18 @@ public static Connection getConnection(Map stormConf) throws SQL Properties props = new Properties(); - for (Entry entry : sqlConf.entrySet()) { - props.setProperty(entry.getKey(), (String) entry.getValue()); - } + props.putAll(sqlConf); return DriverManager.getConnection(url, props); } + + public static void closeResource(final AutoCloseable resource, final String resourceName) { + if (resource != null) { + try { + resource.close(); + } catch (Exception e) { + LOG.error("Error closing {}", resourceName, e); + } + } + } } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java index 20ca051e5..27dc27194 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java @@ -16,6 +16,8 @@ */ package org.apache.stormcrawler.sql; +import static org.apache.stormcrawler.sql.SQLUtil.closeResource; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -23,6 +25,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; @@ -52,22 +55,19 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt { private MultiCountMetric eventCounter; private Connection connection; - private String tableName; private URLPartitioner partitioner; private int maxNumBuckets = -1; private int batchMaxSize = 1000; - private float batchMaxIdleMsec = 2000; private int currentBatchSize = 0; - private PreparedStatement insertPreparedStmt = null; - private long lastInsertBatchTime = -1; - private String updateQuery; - private String insertQuery; + private PreparedStatement updatePreparedStmt; + private PreparedStatement insertPreparedStmt; + private ScheduledExecutorService executor; private final Map> waitingAck = new HashMap<>(); @@ -88,7 +88,8 @@ public void prepare( this.eventCounter = context.registerMetric("counter", new MultiCountMetric(), 10); - tableName = ConfUtils.getString(stormConf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); + final String tableName = + ConfUtils.getString(stormConf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); batchMaxSize = ConfUtils.getInt(stormConf, Constants.SQL_UPDATE_BATCH_SIZE_PARAM_NAME, 1000); @@ -100,21 +101,39 @@ public void prepare( throw new RuntimeException(ex); } - String query = - tableName - + " (url, status, nextfetchdate, metadata, bucket, host)" - + " values (?, ?, ?, ?, ?, ?)"; - - updateQuery = "REPLACE INTO " + query; - insertQuery = "INSERT IGNORE INTO " + query; + final String baseColumns = + """ + (url, status, nextfetchdate, metadata, bucket, host) + VALUES (?, ?, ?, ?, ?, ?) + """; + + final String updateQuery = + String.format( + Locale.ROOT, + """ + REPLACE INTO %s %s + """, + tableName, + baseColumns); + + final String insertQuery = + String.format( + Locale.ROOT, + """ + INSERT IGNORE INTO %s %s + """, + tableName, + baseColumns); try { + updatePreparedStmt = connection.prepareStatement(updateQuery); insertPreparedStmt = connection.prepareStatement(insertQuery); } catch (SQLException e) { - LOG.error(e.getMessage(), e); + LOG.error("Failed to prepare statements", e); + throw new RuntimeException(e); } - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate( () -> { try { @@ -162,37 +181,27 @@ public synchronized void store( partition = Math.abs(partitionKey.hashCode() % maxNumBuckets); } - PreparedStatement preparedStmt = this.insertPreparedStmt; - // create in table if does not already exist if (isUpdate) { - preparedStmt = connection.prepareStatement(updateQuery); - } - - preparedStmt.setString(1, url); - preparedStmt.setString(2, status.toString()); - if (nextFetch.isPresent()) { - final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant()); - preparedStmt.setObject(3, tsp); - } else { - // a value so large it means it will never be refetched - preparedStmt.setObject(3, NEVER); - } - preparedStmt.setString(4, mdAsString.toString()); - preparedStmt.setInt(5, partition); - preparedStmt.setString(6, partitionKey); - - // updates are not batched - if (isUpdate) { - preparedStmt.executeUpdate(); - preparedStmt.close(); + populate( + url, + status, + nextFetch, + mdAsString, + partition, + partitionKey, + updatePreparedStmt); + + // updates are not batched + updatePreparedStmt.executeUpdate(); eventCounter.scope("sql_updates_number").incrBy(1); super.ack(t, url); return; } // code below is for inserts i.e. DISCOVERED URLs - preparedStmt.addBatch(); + populate(url, status, nextFetch, mdAsString, partition, partitionKey, insertPreparedStmt); + insertPreparedStmt.addBatch(); if (lastInsertBatchTime == -1) { lastInsertBatchTime = System.currentTimeMillis(); @@ -210,12 +219,36 @@ public synchronized void store( eventCounter.scope("sql_inserts_number").incrBy(1); } + private void populate( + final String url, + final Status status, + final Optional nextFetch, + final StringBuilder mdAsString, + final int partition, + final String partitionKey, + final PreparedStatement preparedStmt) + throws SQLException { + preparedStmt.setString(1, url); + preparedStmt.setString(2, status.toString()); + if (nextFetch.isPresent()) { + final Timestamp tsp = Timestamp.from(nextFetch.get().toInstant()); + preparedStmt.setObject(3, tsp); + } else { + // a value so large it means it will never be refetched + preparedStmt.setObject(3, NEVER); + } + preparedStmt.setString(4, mdAsString.toString()); + preparedStmt.setInt(5, partition); + preparedStmt.setString(6, partitionKey); + } + private synchronized void checkExecuteBatch() throws SQLException { if (currentBatchSize == 0) { return; } long now = System.currentTimeMillis(); // check whether the insert batches need executing + final float batchMaxIdleMsec = 2000; if ((currentBatchSize == batchMaxSize)) { LOG.info("About to execute batch - triggered by size"); } else if (lastInsertBatchTime + (long) batchMaxIdleMsec < System.currentTimeMillis()) { @@ -253,17 +286,27 @@ private synchronized void checkExecuteBatch() throws SQLException { lastInsertBatchTime = System.currentTimeMillis(); currentBatchSize = 0; waitingAck.clear(); - - insertPreparedStmt.close(); - insertPreparedStmt = connection.prepareStatement(insertQuery); } @Override public void cleanup() { - if (connection != null) + closeResource(updatePreparedStmt, "update prepared statement"); + closeResource(insertPreparedStmt, "insert prepared statement"); + closeResource(connection, "connection"); + closeExecutor(); + } + + private void closeExecutor() { + if (executor != null) { + executor.shutdown(); try { - connection.close(); - } catch (SQLException e) { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); } + } } } diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java new file mode 100644 index 000000000..bf454cf15 --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/AbstractSQLTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +/** + * Abstract base class for SQL module tests that provides a shared MySQL container. Uses the + * Testcontainers singleton pattern to ensure the container is created once and reused across all + * test classes, improving test performance. + */ +@Testcontainers(disabledWithoutDocker = true) +@Timeout(value = 120, unit = TimeUnit.SECONDS) +abstract class AbstractSQLTest { + + private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.4.0"); + + @Container + private static final MySQLContainer MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_IMAGE) + .withDatabaseName("crawl") + .withUsername("crawler") + .withPassword("crawler") + .withReuse(true); + + static Connection testConnection; + + static Map createSqlConnectionConfig() { + Map sqlConnection = new HashMap<>(); + sqlConnection.put("url", MYSQL_CONTAINER.getJdbcUrl()); + sqlConnection.put("user", MYSQL_CONTAINER.getUsername()); + sqlConnection.put("password", MYSQL_CONTAINER.getPassword()); + return sqlConnection; + } + + void execute(String sql) throws SQLException { + try (Statement stmt = testConnection.createStatement()) { + stmt.execute(sql); + } + } + + @BeforeAll + static void init() throws SQLException { + testConnection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + @BeforeEach + void baseSetup() throws Exception { + setupTestTables(); + } + + protected abstract void setupTestTables() throws Exception; + + @AfterAll + static void cleanup() throws Exception { + if (testConnection != null) { + testConnection.close(); + } + } +} diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java new file mode 100644 index 000000000..5a34a6b79 --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/IndexerBoltTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Constants; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.apache.stormcrawler.persistence.Status; +import org.apache.stormcrawler.util.RobotsTags; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IndexerBoltTest extends AbstractSQLTest { + + private TestOutputCollector output; + private final String tableName = "content"; + + @Override + protected void setupTestTables() throws Exception { + execute( + """ + DROP TABLE IF EXISTS content + """); + execute( + """ + CREATE TABLE IF NOT EXISTS content ( + url VARCHAR(255) PRIMARY KEY, + title VARCHAR(255), + description TEXT, + keywords VARCHAR(255) + ) + """); + } + + @BeforeEach + void setup() { + output = new TestOutputCollector(); + } + + @Test + void testBasicIndexing() throws Exception { + IndexerBolt bolt = createBolt(createBasicConfig()); + String url = "http://example.com/page1"; + + executeTuple(bolt, url, "This is the page content", getMetadata()); + + // Verify URL was stored in database + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next(), "URL should be stored in database"); + assertEquals(url, rs.getString("url")); + assertEquals("Test Page Title", rs.getString("title")); + assertEquals("Test page description", rs.getString("description")); + assertEquals("test, page, keywords", rs.getString("keywords")); + } + + // Verify tuple was acked and status emitted + assertEquals(1, output.getAckedTuples().size()); + assertEquals(1, output.getEmitted(Constants.StatusStreamName).size()); + + // Verify emitted status is FETCHED + List emitted = output.getEmitted(Constants.StatusStreamName).get(0); + assertEquals(url, emitted.get(0)); + assertEquals(Status.FETCHED, emitted.get(2)); + bolt.cleanup(); + } + + @Test + void testDuplicateHandling() throws Exception { + IndexerBolt bolt = createBolt(createBasicConfig()); + + String url = "http://example.com/page2"; + executeTuple(bolt, url, "Original content", getMetadata()); + + // Second indexing with updated content (same URL) + Metadata metadata = new Metadata(); + metadata.addValue("title", "Updated Title"); + metadata.addValue("description", "Updated description"); + + executeTuple(bolt, url, "Updated content", metadata); + + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Updated Title", rs.getString("title")); + assertEquals("Updated description", rs.getString("description")); + assertEquals("test, page, keywords", rs.getString("keywords")); + } + + assertEquals(2, output.getAckedTuples().size()); + bolt.cleanup(); + } + + @Test + void testFilteringByRobotsNoIndex() throws Exception { + IndexerBolt bolt = createBolt(createBasicConfig()); + + String url = "http://example.com/noindex-page"; + Metadata metadata = new Metadata(); + metadata.addValue("title", "Should Not Be Indexed"); + metadata.addValue(RobotsTags.ROBOTS_NO_INDEX, "true"); + + executeTuple(bolt, url, "Content", metadata); + + // Verify URL was NOT stored in database + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertFalse(rs.next(), "URL with noindex should not be stored in database"); + } + + // But tuple should still be acked and FETCHED status emitted + assertEquals(1, output.getAckedTuples().size()); + assertEquals(1, output.getEmitted(Constants.StatusStreamName).size()); + + List emitted = output.getEmitted(Constants.StatusStreamName).get(0); + assertEquals(Status.FETCHED, emitted.get(2)); + bolt.cleanup(); + } + + @Test + void testFilteringByMetadataFilter() throws Exception { + // Configure filter to only index documents with indexable=yes + Map conf = createBasicConfig(); + conf.put(AbstractIndexerBolt.metadataFilterParamName, "indexable=yes"); + + IndexerBolt bolt = createBolt(conf); + + // Document that should be filtered out (no indexable metadata) + String url1 = "http://example.com/filtered-page"; + Metadata metadata1 = new Metadata(); + metadata1.addValue("title", "Filtered Page"); + + executeTuple(bolt, url1, "Content", metadata1); + + // Verify filtered document was NOT stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url1 + "'")) { + assertFalse(rs.next(), "Filtered document should not be stored"); + } + + // Document that should be indexed (has indexable=yes) + String url2 = "http://example.com/indexed-page"; + Metadata metadata2 = new Metadata(); + metadata2.addValue("title", "Indexed Page"); + metadata2.addValue("indexable", "yes"); + + Tuple tuple2 = createTuple(url2, "Content", metadata2); + bolt.execute(tuple2); + + // Verify indexed document WAS stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url2 + "'")) { + assertTrue(rs.next(), "Document with indexable=yes should be stored"); + assertEquals("Indexed Page", rs.getString("title")); + } + + // Both tuples should be acked with FETCHED status + assertEquals(2, output.getAckedTuples().size()); + assertEquals(2, output.getEmitted(Constants.StatusStreamName).size()); + bolt.cleanup(); + } + + @Test + void testMetadataExtraction() throws Exception { + // Configure to only extract specific metadata fields + Map conf = createBasicConfig(); + // Only map title and description, not keywords + List mdMapping = new ArrayList<>(); + mdMapping.add("title"); + mdMapping.add("description"); + conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); + + IndexerBolt bolt = createBolt(conf); + + String url = "http://example.com/metadata-test"; + Metadata metadata = new Metadata(); + metadata.addValue("title", "Extracted Title"); + metadata.addValue("description", "Extracted Description"); + metadata.addValue("keywords", "these,should,not,be,stored"); + metadata.addValue("author", "Should Not Be Stored"); + + executeTuple(bolt, url, "Content", metadata); + + // Verify only configured metadata was stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Extracted Title", rs.getString("title")); + assertEquals("Extracted Description", rs.getString("description")); + // keywords column should be null since it wasn't in the mapping + assertNull(rs.getString("keywords")); + } + + assertEquals(1, output.getAckedTuples().size()); + bolt.cleanup(); + } + + @Test + void testMetadataAliasMapping() throws Exception { + // Configure metadata mapping with aliases + Map conf = createBasicConfig(); + List mdMapping = new ArrayList<>(); + mdMapping.add("parse.title=title"); // map parse.title to title column + mdMapping.add("parse.description=description"); + conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); + + IndexerBolt bolt = createBolt(conf); + + String url = "http://example.com/alias-test"; + Metadata metadata = new Metadata(); + metadata.addValue("parse.title", "Title from Parser"); + metadata.addValue("parse.description", "Description from Parser"); + + executeTuple(bolt, url, "Content", metadata); + + // Verify aliased metadata was stored correctly + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = + stmt.executeQuery( + "SELECT * FROM " + tableName + " WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("Title from Parser", rs.getString("title")); + assertEquals("Description from Parser", rs.getString("description")); + } + bolt.cleanup(); + } + + private Tuple createTuple(String url, String text, Metadata metadata) { + Tuple tuple = mock(Tuple.class); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getStringByField("text")).thenReturn(text); + when(tuple.getValueByField("metadata")).thenReturn(metadata); + return tuple; + } + + private void executeTuple(IndexerBolt bolt, String url, String text, Metadata metadata) { + Tuple tuple = createTuple(url, text, metadata); + bolt.execute(tuple); + } + + private Map createBasicConfig() { + Map conf = new HashMap<>(); + conf.put("sql.connection", createSqlConnectionConfig()); + conf.put(IndexerBolt.SQL_INDEX_TABLE_PARAM_NAME, tableName); + conf.put(AbstractIndexerBolt.urlFieldParamName, "url"); + // Default metadata mapping + List mdMapping = new ArrayList<>(); + mdMapping.add("title"); + mdMapping.add("description"); + mdMapping.add("keywords"); + conf.put(AbstractIndexerBolt.metadata2fieldParamName, mdMapping); + return conf; + } + + private IndexerBolt createBolt(Map conf) { + IndexerBolt bolt = new IndexerBolt(); + bolt.prepare(conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + return bolt; + } + + private Metadata getMetadata() { + Metadata metadata = new Metadata(); + metadata.addValue("title", "Test Page Title"); + metadata.addValue("description", "Test page description"); + metadata.addValue("keywords", "test, page, keywords"); + return metadata; + } +} diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java new file mode 100644 index 000000000..9b65f2e25 --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/SQLSpoutTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import static org.apache.stormcrawler.TestUtil.getMockedTopologyContextWithBucket; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Field; +import java.sql.PreparedStatement; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.urlbuffer.URLBuffer; +import org.junit.jupiter.api.Test; + +class SQLSpoutTest extends AbstractSQLTest { + + @Override + protected void setupTestTables() throws Exception { + execute("DROP TABLE IF EXISTS urls"); + execute( + """ + CREATE TABLE IF NOT EXISTS urls ( + url VARCHAR(255), + status VARCHAR(16) DEFAULT 'DISCOVERED', + nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata TEXT, + bucket SMALLINT DEFAULT 0, + host VARCHAR(128), + PRIMARY KEY(url) + ) + """); + } + + @Test + void bufferIsPopulatedAndEmitted() throws Exception { + // Insert base test data with past nextfetchdate to ensure they're eligible for fetching + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + insertTestURL("http://example.com/page1", 0, "example.com", pastTime); + insertTestURL("http://example.com/page2", 0, "example.com", pastTime); + insertTestURL("http://test.com/page1", 0, "test.com", pastTime); + + TestOutputCollector testCollector = new TestOutputCollector(); + SQLSpout spout = createSpout(testCollector, TestUtil.getMockedTopologyContext()); + + // First call to nextTuple() populates the buffer + spout.nextTuple(); + + final List expectedURLs = + Arrays.asList( + "http://example.com/page1", + "http://example.com/page2", + "http://test.com/page1"); + assertURLsEmitted(spout, testCollector, 3, expectedURLs); + spout.close(); + } + + @Test + void testMaxDocsPerBucket() throws Exception { + // Add more URLs from example.com to test the maxDocsPerBucket limit + // Insert base test data with past nextfetchdate to ensure they're eligible for fetching + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + insertTestURL("http://example.com/page1", 0, "example.com", pastTime); + insertTestURL("http://example.com/page2", 0, "example.com", pastTime); + insertTestURL("http://test.com/page1", 0, "test.com", pastTime); + + TestOutputCollector testCollector = new TestOutputCollector(); + SQLSpout spout = createSpout(testCollector, TestUtil.getMockedTopologyContext()); + + pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + for (int i = 4; i <= 10; i++) { + insertTestURL("http://example.com/page" + i, 0, "example.com", pastTime); + } + + spout.nextTuple(); + + URLBuffer buffer = getBufferFromSpout(spout); + + // With maxDocsPerBucket=5, we should get at most 5 URLs from example.com + // plus 1 from test.com = 6 total + assertEquals(6, buffer.size()); + spout.close(); + } + + @Test + void testSingleInstanceNoBucketFiltering() throws Exception { + + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + + // Insert URLs into different buckets + insertTestURL("http://site1.com/page1", 0, "site1.com", pastTime); + insertTestURL("http://site2.com/page1", 1, "site2.com", pastTime); + insertTestURL("http://site3.com/page1", 2, "site3.com", pastTime); + insertTestURL("http://site4.com/page1", 3, "site4.com", pastTime); + + // Create a single spout instance (totalTasks = 1) + TestOutputCollector collector = new TestOutputCollector(); + TopologyContext context = getMockedTopologyContextWithBucket(0, 1, "sqlSpout"); + SQLSpout singleSpout = createSpout(collector, context); + + // Populate buffer + singleSpout.nextTuple(); + + final List expectedURLs = + Arrays.asList( + "http://site1.com/page1", + "http://site2.com/page1", + "http://site3.com/page1", + "http://site4.com/page1"); + assertURLsEmitted(singleSpout, collector, 4, expectedURLs); + singleSpout.close(); + } + + @Test + void testBucketPartitioningTwoInstances() throws Exception { + + // Insert URLs into different buckets + Instant pastTime = Instant.now().minus(1, ChronoUnit.HOURS); + + // Bucket 0 URLs + insertTestURL("http://bucket0-site1.com/page1", 0, "bucket0-site1.com", pastTime); + insertTestURL("http://bucket0-site2.com/page1", 0, "bucket0-site2.com", pastTime); + insertTestURL("http://bucket0-site3.com/page1", 0, "bucket0-site3.com", pastTime); + + // Bucket 1 URLs + insertTestURL("http://bucket1-site1.com/page1", 1, "bucket1-site1.com", pastTime); + insertTestURL("http://bucket1-site2.com/page1", 1, "bucket1-site2.com", pastTime); + insertTestURL("http://bucket1-site3.com/page1", 1, "bucket1-site3.com", pastTime); + + // Create two spout instances with different bucket assignments + SQLSpout[] spouts = new SQLSpout[2]; + for (int i = 0; i < 2; i++) { + TestOutputCollector collector = new TestOutputCollector(); + TopologyContext context = getMockedTopologyContextWithBucket(i, 2, "sqlSpout"); + spouts[i] = createSpout(collector, context); + spouts[i].nextTuple(); + assertURLsEmitted( + spouts[i], + collector, + 3, + Arrays.asList( + "http://bucket" + i + "-site1.com/page1", + "http://bucket" + i + "-site2.com/page1", + "http://bucket" + i + "-site3.com/page1")); + spouts[i].close(); + } + } + + private void insertTestURL(String url, int bucket, String host, Instant time) throws Exception { + String sql = + """ + INSERT INTO urls (url, status, nextfetchdate, metadata, bucket, host) + VALUES (?, ?, ?, ?, ?, ?) + """; + + try (PreparedStatement ps = testConnection.prepareStatement(sql)) { + ps.setString(1, url); + ps.setString(2, "DISCOVERED"); + ps.setTimestamp(3, Timestamp.from(time)); + ps.setString(4, "\tkey=value\tdepth=0"); + ps.setInt(5, bucket); + ps.setString(6, host); + ps.executeUpdate(); + } + } + + private Map createTestConfig() { + Map conf = new HashMap<>(); + conf.put("sql.connection", createSqlConnectionConfig()); + conf.put("sql.status.table", "urls"); + conf.put("sql.max.urls.per.bucket", 5); + conf.put("sql.spout.max.results", 100); + conf.put( + "urlbuffer.class", "org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer"); + return conf; + } + + private URLBuffer getBufferFromSpout(SQLSpout spoutInstance) throws Exception { + Field bufferField = spoutInstance.getClass().getSuperclass().getDeclaredField("buffer"); + bufferField.setAccessible(true); + return (URLBuffer) bufferField.get(spoutInstance); + } + + private void assertURLsEmitted( + SQLSpout spout, + TestOutputCollector collector, + int numTuples, + List expectedURLs) { + assertEquals(0, collector.getEmitted().size()); + + // Emit all URLs + Set urls = new HashSet<>(); + for (int i = 0; i < numTuples; i++) { + spout.nextTuple(); + } + for (List tuple : collector.getEmitted()) { + urls.add((String) tuple.get(0)); + } + + for (String url : expectedURLs) { + assertTrue(urls.contains(url)); + } + } + + private SQLSpout createSpout(TestOutputCollector collector, TopologyContext context) { + SQLSpout singleSpout = new SQLSpout(); + Map conf = createTestConfig(); + singleSpout.open(conf, context, new SpoutOutputCollector(collector)); + singleSpout.activate(); + return singleSpout; + } +} diff --git a/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java new file mode 100644 index 000000000..4108150ff --- /dev/null +++ b/external/sql/src/test/java/org/apache/stormcrawler/sql/StatusUpdaterBoltTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.sql; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.Status; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class StatusUpdaterBoltTest extends AbstractSQLTest { + + private TestOutputCollector output; + private StatusUpdaterBolt bolt; + + @Override + protected void setupTestTables() throws Exception { + execute("DROP TABLE IF EXISTS urls"); + execute( + """ + CREATE TABLE IF NOT EXISTS urls ( + url VARCHAR(255), + status VARCHAR(16) DEFAULT 'DISCOVERED', + nextfetchdate TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata TEXT, + bucket SMALLINT DEFAULT 0, + host VARCHAR(128), + PRIMARY KEY(url) + ) + """); + } + + @BeforeEach + void setup() { + output = new TestOutputCollector(); + bolt = createBolt(); + } + + @AfterEach + void close() { + bolt.cleanup(); + } + + @Test + void testStoreDiscoveredURL() throws Exception { + String url = "http://example.com/page1"; + Metadata metadata = new Metadata(); + metadata.addValue("key1", "value1"); + + Tuple tuple = createTuple(url, Status.DISCOVERED, metadata); + bolt.execute(tuple); + + // Trigger batch execution by sending another tuple (which will also check the batch) + String url2 = "http://example.com/page1-trigger"; + Tuple triggerTuple = createTuple(url2, Status.DISCOVERED, metadata); + bolt.execute(triggerTuple); + + // Verify URL was stored + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url = '" + url + "'")) { + assertTrue(rs.next(), "URL should be stored in database after batch execution"); + assertEquals("DISCOVERED", rs.getString("status")); + assertNotNull(rs.getString("metadata")); + } + bolt.cleanup(); + } + + @Test + void testUpdateURL() throws Exception { + String url = "http://example.com/page2"; + Metadata metadata = new Metadata(); + metadata.addValue("key1", "value1"); + + // First store as DISCOVERED + Tuple tuple1 = createTuple(url, Status.DISCOVERED, metadata); + bolt.execute(tuple1); + + // Now update to FETCHED + Tuple tuple2 = createTuple(url, Status.FETCHED, metadata); + bolt.execute(tuple2); + + // Verify URL was updated + try (Statement stmt = testConnection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM urls WHERE url = '" + url + "'")) { + assertTrue(rs.next()); + assertEquals("FETCHED", rs.getString("status")); + assertNotNull(rs.getString("metadata")); + } + bolt.cleanup(); + } + + private Tuple createTuple(String url, Status status, Metadata metadata) { + Tuple tuple = mock(Tuple.class); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getValueByField("status")).thenReturn(status); + when(tuple.getValueByField("metadata")).thenReturn(metadata); + return tuple; + } + + private Map createTestConfig() { + Map conf = new HashMap<>(); + conf.put("sql.connection", createSqlConnectionConfig()); + conf.put("sql.status.table", "urls"); + conf.put("sql.status.max.urls.per.bucket", 10); + conf.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler"); + conf.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h"); + conf.put("sql.update.batch.size", 1); + return conf; + } + + private StatusUpdaterBolt createBolt() { + StatusUpdaterBolt statusUpdaterBolt = new StatusUpdaterBolt(); + Map conf = createTestConfig(); + statusUpdaterBolt.prepare( + conf, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + return statusUpdaterBolt; + } +} diff --git a/pom.xml b/pom.xml index 5c9ee3f67..2ad34c1de 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ under the License. 1.00 1.00 1.00 + 9.3.0 true @@ -673,6 +674,12 @@ under the License. ${commons.codec.version} + + com.mysql + mysql-connector-j + ${mysql-connector-j} + +