Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dbe1be8
Refactor SQL module to use PreparedStatement in SQLSpout and IndexerB…
Dec 27, 2025
5ff026e
Refactor SQL module to use PreparedStatement in SQLSpout and IndexerB…
Dec 27, 2025
03916ef
Fix forbidden violations
Dec 27, 2025
4e37c55
changes following PR review. Added tests for SQLSpout
Jan 1, 2026
42dd9f3
changes following PR review. Added tests for SQLSpout
Jan 1, 2026
37d595f
changes following PR review. Added tests for SQLSpout
Jan 1, 2026
747aa9b
changes following PR review. Added tests for StatusUpdaterBolt
Jan 1, 2026
a69212d
Add test dependencies and update MySQL container version in tests
Jan 1, 2026
3e3d6bd
Fix falkiness in testStoreDiscoveredURL
Jan 1, 2026
9245499
ScheduledExecutorService should be shit down when the bolt is cleaned…
Jan 1, 2026
fb7febe
Refactor IndexerBolt to improve code clarity and add unit tests for i…
Jan 1, 2026
c14419b
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
051f0d9
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
28a8596
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
c43e08f
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
bf44537
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
5e47e81
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
a1f3691
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
0a6ec5c
Introduce AbstractSQLTest for shared MySQL container setup in SQL tests
Jan 1, 2026
43658c2
Refactor StatusUpdaterBoltTest to improve setup and cleanup methods
Jan 2, 2026
86e9f3a
Undo changes in MetricsConsumer
Jan 2, 2026
f7859ee
minor changes
Jan 2, 2026
190efb2
minor changes
Jan 2, 2026
9e37c1c
Merge branch 'apache:main' into main
rajucomp Jan 7, 2026
b8f1757
Changes following PR review.
Jan 7, 2026
5ef9728
Changes following PR review.
Jan 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/test/java/org/apache/stormcrawler/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.mockito.Mockito.when;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.TopologyContext;
Expand All @@ -48,6 +50,42 @@ public IMetric answer(InvocationOnMock invocation) throws Throwable {
return context;
}

/**
* Creates a mocked TopologyContext for testing bucket partitioning.
*
* @param taskIndex The task index for this spout instance (determines bucket number)
* @param totalTasks Total number of tasks in the topology
* @param componentId The component ID
* @return Mocked TopologyContext
*/
public static TopologyContext getMockedTopologyContextWithBucket(
int taskIndex, int totalTasks, String componentId) {
TopologyContext context = mock(TopologyContext.class);

// Mock metric registration
when(context.registerMetric(anyString(), any(IMetric.class), anyInt()))
.thenAnswer(
new Answer<IMetric>() {
@Override
public IMetric answer(InvocationOnMock invocation) throws Throwable {
return invocation.getArgument(1, IMetric.class);
}
});

// Mock task information for bucket assignment
when(context.getThisTaskIndex()).thenReturn(taskIndex);
when(context.getThisComponentId()).thenReturn(componentId);

// Create list of task IDs (0 to totalTasks-1)
List<Integer> taskIds = new ArrayList<>();
for (int i = 0; i < totalTasks; i++) {
taskIds.add(i);
}
when(context.getComponentTasks(componentId)).thenReturn(taskIds);

return context;
}

public static Tuple getMockedTestTuple(String url, String content, Metadata metadata) {
Tuple tuple = mock(Tuple.class);
when(tuple.getStringByField("url")).thenReturn(url);
Expand Down
29 changes: 22 additions & 7 deletions external/sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,28 @@ under the License.
<description>SQL-based resources for StormCrawler</description>

<dependencies>
<!--
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
-->

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.stormcrawler</groupId>
<artifactId>stormcrawler-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
Expand All @@ -42,12 +46,14 @@ public class IndexerBolt extends AbstractIndexerBolt {

public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table";

private OutputCollector _collector;
private OutputCollector collector;

private MultiCountMetric eventCounter;

private Connection connection;

private PreparedStatement preparedStmt;

private String tableName;

private Map<String, Object> conf;
Expand All @@ -56,7 +62,7 @@ public class IndexerBolt extends AbstractIndexerBolt {
public void prepare(
Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
super.prepare(conf, context, collector);
_collector = collector;
this.collector = collector;

this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10);

Expand All @@ -74,51 +80,24 @@ public void execute(Tuple tuple) {
String normalisedurl = valueForURL(tuple);

Metadata metadata = (Metadata) tuple.getValueByField("metadata");
String text = tuple.getStringByField("text");

boolean keep = filterDocument(metadata);
if (!keep) {
eventCounter.scope("Filtered").incrBy(1);
// treat it as successfully processed even if
// we do not index it
_collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
_collector.ack(tuple);
collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
collector.ack(tuple);
return;
}

try {

// which metadata to display?
Map<String, String[]> keyVals = filterMetadata(metadata);
List<String> keys = new ArrayList<>(keyVals.keySet());

StringBuilder query =
new StringBuilder(" insert into ")
.append(tableName)
.append(" (")
.append(fieldNameForURL());

Object[] keys = keyVals.keySet().toArray();

for (Object o : keys) {
query.append(", ").append((String) o);
}

query.append(") values(?");

for (int i = 0; i < keys.length; i++) {
query.append(", ?");
}

query.append(")");

query.append(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < keys.length; i++) {
String key = (String) keys[i];
if (i > 0) {
query.append(", ");
}
query.append(key).append("=VALUES(").append(key).append(")");
}
String query = buildQuery(keys);

if (connection == null) {
try {
Expand All @@ -131,9 +110,8 @@ public void execute(Tuple tuple) {

LOG.debug("PreparedStatement => {}", query);

// create the mysql insert preparedstatement
PreparedStatement preparedStmt = connection.prepareStatement(query.toString());

// create the mysql insert PreparedStatement
preparedStmt = connection.prepareStatement(query);
// TODO store the text of the document?
if (StringUtils.isNotBlank(fieldNameForText())) {
// builder.field(fieldNameForText(), trimText(text));
Expand All @@ -144,21 +122,21 @@ public void execute(Tuple tuple) {
preparedStmt.setString(1, normalisedurl);
}

for (int i = 0; i < keys.length; i++) {
insert(preparedStmt, i + 2, (String) keys[i], keyVals);
// Set all metadata parameters
for (int i = 0; i < keys.size(); i++) {
insert(preparedStmt, i + 2, keys.get(i), keyVals);
}

// Execute the statement (single row insert)
preparedStmt.executeUpdate();

eventCounter.scope("Indexed").incrBy(1);

_collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
_collector.ack(tuple);

collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
collector.ack(tuple);
} catch (Exception e) {
// do not send to status stream so that it gets replayed
LOG.error("Error inserting into SQL", e);
_collector.fail(tuple);
collector.fail(tuple);
if (connection != null) {
// reset the connection
try {
Expand Down Expand Up @@ -188,4 +166,41 @@ private void insert(
}
preparedStmt.setString(position, value);
}

private String buildQuery(final List<String> keys) {
final String columns = String.join(", ", keys);
final String placeholders = keys.stream().map(k -> "?").collect(Collectors.joining(", "));

final String updates =
keys.stream()
.map(k -> String.format(Locale.ROOT, "%s=VALUES(%s)", k, k))
.collect(Collectors.joining(", "));

// Build the ON DUPLICATE KEY UPDATE clause
// If there are metadata keys, update them; otherwise, update the URL field to itself
final String updateClause =
updates.isEmpty()
? String.format(
Locale.ROOT, "%s=VALUES(%s)", fieldNameForURL(), fieldNameForURL())
: updates;

return String.format(
Locale.ROOT,
"""
INSERT INTO %s (%s%s)
VALUES (?%s)
ON DUPLICATE KEY UPDATE %s
""",
tableName,
fieldNameForURL(),
columns.isEmpty() ? "" : ", " + columns,
placeholders.isEmpty() ? "" : ", " + placeholders,
updateClause);
}

@Override
public void cleanup() {
SQLUtil.closeResource(preparedStmt, "prepared statement");
SQLUtil.closeResource(connection, "connection");
}
}
Loading