diff --git a/THIRD-PARTY.txt b/THIRD-PARTY.txt
index c2694b46e..eb6b5b08c 100644
--- a/THIRD-PARTY.txt
+++ b/THIRD-PARTY.txt
@@ -157,6 +157,7 @@ List of third-party dependencies grouped by their license type.
* Jackson dataformat: CBOR (com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2 - https://github.com/FasterXML/jackson-dataformats-binary)
* Jackson dataformat: Smile (com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.2 - https://github.com/FasterXML/jackson-dataformats-binary)
* Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2 - https://github.com/FasterXML/jackson-dataformats-text)
+ * Jackson datatype: jdk8 (com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.1 - https://github.com/FasterXML/jackson-modules-java8/jackson-datatype-jdk8)
* java-libpst (com.pff:java-libpst:0.9.3 - https://github.com/rjohnsondev/java-libpst)
* JCL 1.2 implemented over SLF4J (org.slf4j:jcl-over-slf4j:2.0.17 - http://www.slf4j.org)
* JetBrains Java Annotations (org.jetbrains:annotations:26.0.2-1 - https://github.com/JetBrains/java-annotations)
diff --git a/core/src/main/java/org/apache/stormcrawler/pii/PiiBolt.java b/core/src/main/java/org/apache/stormcrawler/pii/PiiBolt.java
new file mode 100644
index 000000000..564d114ca
--- /dev/null
+++ b/core/src/main/java/org/apache/stormcrawler/pii/PiiBolt.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.pii;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.util.ConfUtils;
+import org.apache.stormcrawler.util.InitialisationUtil;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * StormCrawler bolt that performs PII redaction on the content of web pages
+ * before they are passed to the indexing or persistence bolt.
+ * If enabled, the HTML content will be overwritten with a dummy HTML page (containing just "REDACTED")
+ * pii.redacter.class is the name of the class implementing the PiiInterface interface (e.g. org.apache.stormcrawler.pii.PresidioRedacter)
+ * pii.language.field, if set, allows to set the name of a Metadata field that contains the language to be passed to the PII redacter instance
+ *
+ */
+@SuppressWarnings("serial")
+public class PiiBolt extends BaseRichBolt {
+
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(PiiBolt.class);
+
+ /*
+ * Name of config field defining the PII Redacter class
+ * (This class must implement the PiiRedacter interface
+ */
+ public static final String PII_REDACTER_CLASS_PARAM = "pii.redacter.class";
+
+ /*
+ * Name of the field for configurating language detection
+ */
+ public static final String PII_DETECT_LANGUAGE_PARAM = "pii.detect.language";
+
+ /*
+ * Name of the field for defining Metadata field containing language
+ */
+ public static final String PII_LANGUAGE_FIELD = "pii.language.field";
+
+ /*
+ * Name of the field for disabling PII removal
+ */
+ public static final String PII_ENABLE_FIELD = "pii.removal.enable";
+
+ private static final String FIELD_URL = "url";
+ private static final String FIELD_CONTENT = "content";
+ private static final String FIELD_METADATA = "metadata";
+ private static final String FIELD_TEXT = "text";
+
+
+ // Default value for language metadata field
+ private String languageFieldName = "parse.lang";
+
+ protected OutputCollector collector;
+
+ protected PiiRedacter piiRedacter;
+
+ private boolean piiEnabled = false;
+
+ public static final String REDACTED_HTML = "
REDACTEDREDACTED";
+
+ public static final byte[] REDACTED_BYTES = REDACTED_HTML.getBytes(StandardCharsets.UTF_8);
+
+ /**
+ * Returns a Scheduler instance based on the configuration *
+ */
+ public static PiiRedacter getInstance(Map stormConf) {
+ PiiRedacter redacter;
+
+ String className = ConfUtils.getString(stormConf, PII_REDACTER_CLASS_PARAM);
+ if (className == null || className.isEmpty()) {
+ throw new RuntimeException("PiiRedacter class name must be defined in the configuration (pii.redacter.class)");
+ }
+
+ LOG.info("Loading PII Redacter class, name={}", className);
+ try {
+ redacter = InitialisationUtil.initializeFromQualifiedName(className, PiiRedacter.class);
+ } catch (Exception e) {
+ throw new RuntimeException("Can't instantiate " + className, e);
+ }
+
+ LOG.info("Initializing PII Redacter instance");
+ try {
+ redacter.init(stormConf);
+ } catch (Exception e) {
+ LOG.error("Error while initializing PII Redacter", e);
+ }
+
+ return redacter;
+ }
+
+ public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
+ // Uncomment if extending StatusEmitterBolt
+ //super.prepare(topoConf, context, collector);
+
+ this.collector = collector;
+
+ this.piiRedacter = getInstance(topoConf);
+ LOG.info("Initialized PiiRedacter instance");
+
+ // Get language metadata field name
+ String confLanguageField = ConfUtils.getString(topoConf, "pii.language.field");
+ if (confLanguageField != null && !confLanguageField.isEmpty()) {
+ languageFieldName = confLanguageField;
+ }
+ LOG.info("PII language field: {}", languageFieldName);
+
+ piiEnabled = ConfUtils.getBoolean(topoConf, PII_ENABLE_FIELD, false);
+ LOG.info("PII enabled: {}", piiEnabled);
+
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ if (!piiEnabled) {
+ this.collector.emit(input, input.getValues());
+ this.collector.ack(input);
+ return;
+ }
+
+ String url = input.getStringByField(FIELD_URL);
+ LOG.info("Processing URL for PII redaction: {}", url);
+
+ Metadata metadata = (Metadata) input.getValueByField(FIELD_METADATA);
+ String text = input.getStringByField(FIELD_TEXT);
+ byte[] originalBytes = input.getBinaryByField(FIELD_CONTENT);
+
+ if (StringUtils.isBlank(text)) {
+ LOG.info("No text to process for URL: {}", url);
+ metadata.addValue("pii.processed", "false");
+ // Force the binary content to a dummy content
+ emitTuple(input, url, REDACTED_BYTES, metadata, "");
+ this.collector.ack(input);
+ return;
+ }
+
+ try {
+ String language = metadata.getFirstValue(languageFieldName);
+ String redacted = (language != null) ?
+ piiRedacter.redact(text, language) :
+ piiRedacter.redact(text);
+
+ if (redacted == null) {
+ throw new Exception("PII Redacter returned null");
+ }
+
+ metadata.addValue("pii.processed", "true");
+
+ // Force the binary content to a dummy content
+ emitTuple(input, url, REDACTED_BYTES, metadata, redacted);
+ } catch (Exception e) {
+ LOG.error("Error during PII redaction for URL {}: {}", url, e.getMessage());
+ metadata.addValue("pii.error", e.getMessage());
+
+ // How to handle the content in case of error ?
+ emitTuple(input, url, originalBytes, metadata, text);
+ }
+
+ this.collector.ack(input);
+ }
+
+ private void emitTuple(Tuple input, String url, byte[] content, Metadata metadata, String text) {
+ this.collector.emit(input, new Values(url, content, metadata, text));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELD_URL, FIELD_CONTENT, FIELD_METADATA, FIELD_TEXT));
+ }
+}
diff --git a/core/src/main/java/org/apache/stormcrawler/pii/PiiRedacter.java b/core/src/main/java/org/apache/stormcrawler/pii/PiiRedacter.java
new file mode 100644
index 000000000..cdaa203f7
--- /dev/null
+++ b/core/src/main/java/org/apache/stormcrawler/pii/PiiRedacter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.pii;
+
+import java.util.Map;
+
+/**
+ * An interface for bolts implementing PII redaction
+ */
+public interface PiiRedacter {
+ void init(Map topologyConf) throws Exception;
+
+ /**
+ * Redacts PII from the input string using default language settings
+ * (e.g. no language or a default language configured at initialization)
+ *
+ * @param input the input string possibly containing PII
+ * @return the input string with PII redacted
+ */
+ String redact(String input);
+
+ /**
+ * Redacts PII from the input string using the specified language
+ * @param input the input string possibly containing PII
+ * @param language the language to use for PII redaction
+ * @return
+ */
+ String redact(String input, String language);
+}
diff --git a/core/src/test/java/org/apache/stormcrawler/pii/MockPiiRedacter.java b/core/src/test/java/org/apache/stormcrawler/pii/MockPiiRedacter.java
new file mode 100644
index 000000000..01b46fe09
--- /dev/null
+++ b/core/src/test/java/org/apache/stormcrawler/pii/MockPiiRedacter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.pii;
+
+import java.util.Map;
+
+/**
+ * Mock PII Redacter implementation for testing purposes.
+ * This class simulates redaction by replacing occurrences of the word"secret"
+ * with "*****".
+ */
+
+public class MockPiiRedacter implements PiiRedacter {
+
+ @Override public void init(Map conf) {}
+
+ @Override public String redact(String content) {
+ return redact(content, null);
+ }
+
+ @Override public String redact(String content, String language) {
+ // simple redaction logic for the test
+ return content.replaceAll("secret", "*****");
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/stormcrawler/pii/PiiBoltTest.java b/core/src/test/java/org/apache/stormcrawler/pii/PiiBoltTest.java
new file mode 100644
index 000000000..5be57b92b
--- /dev/null
+++ b/core/src/test/java/org/apache/stormcrawler/pii/PiiBoltTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.pii;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.*;
+import org.apache.stormcrawler.Metadata;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for {@link PiiBolt}. */
+class PiiBoltTest {
+
+ private PiiBolt bolt;
+ private OutputCollector collector;
+ private TopologyContext context;
+
+ private static final String MOCK_REDACTER_CLASS = "org.apache.stormcrawler.pii.MockPiiRedacter";
+
+ @BeforeEach
+ void setUp() {
+ // Mock Storm infrastructure
+ collector = mock(OutputCollector.class);
+ context = mock(TopologyContext.class);
+
+ // Prepare a minimal configuration
+ Map conf = new HashMap<>();
+ conf.put(PiiBolt.PII_REDACTER_CLASS_PARAM, MOCK_REDACTER_CLASS);
+ conf.put(PiiBolt.PII_ENABLE_FIELD, false);
+ conf.put(PiiBolt.PII_DETECT_LANGUAGE_PARAM, false);
+
+ bolt = new PiiBolt();
+ bolt.prepare(conf, context, collector);
+ }
+
+ /** A simple redacter used only for the test – it replaces “secret” with “*****”. */
+ public static class MockPiiRedacter implements PiiRedacter {
+ @Override public void init(Map conf) { /* no‑op */ }
+ @Override public String redact(String content) { return redact(content, null); }
+ @Override public String redact(String content, String language) {
+ return content.replaceAll("secret", "*****");
+ }
+ }
+
+ @Test
+ void testRedactionAndMetadata() {
+
+ Map conf = new HashMap<>();
+ conf.put(PiiBolt.PII_REDACTER_CLASS_PARAM, MOCK_REDACTER_CLASS);
+ conf.put(PiiBolt.PII_ENABLE_FIELD, true);
+ bolt.prepare(conf, context, collector);
+
+ // Input tuple
+ String url = "http://example.com";
+ String html = "this is a secret page";
+ byte[] contentBytes = html.getBytes(StandardCharsets.UTF_8);
+ Metadata md = new Metadata();
+ md.addValue("some", "value");
+ Tuple input = mock(Tuple.class);
+ when(input.getStringByField("url")).thenReturn(url);
+ when(input.getBinaryByField("content")).thenReturn(contentBytes);
+ when(input.getValueByField("metadata")).thenReturn(md);
+ when(input.getStringByField("text")).thenReturn("this is a secret page");
+
+
+ // Execute bolt
+ bolt.execute(input);
+
+ // Capture emitted tuple
+ ArgumentCaptor valuesCaptor = ArgumentCaptor.forClass(Values.class);
+ verify(collector).emit(eq(input), valuesCaptor.capture());
+ Values emitted = valuesCaptor.getValue();
+
+ // Verify URL unchanged
+ assertEquals(url, emitted.get(0));
+
+ // Verify HTML binary content has been replaced by dummy content
+ byte[] emittedBytes = (byte[]) emitted.get(1);
+ String emittedContent = new String(emittedBytes, StandardCharsets.UTF_8);
+ assertEquals(PiiBolt.REDACTED_HTML, emittedContent);
+
+ String emittedText = (String)emitted.get(3);
+
+ // Verify text has been properly redacted
+ assertTrue(emittedText.contains("*****"));
+ assertFalse(emittedText.contains("secret"));
+
+ // Verify metadata flag
+ Metadata outMd = (Metadata) emitted.get(2);
+ assertEquals("true", outMd.getFirstValue("pii.processed"));
+
+ // Verify ack
+ verify(collector).ack(input);
+ }
+
+ @Test
+ void testDisabledRedaction() {
+ // Re‑prepare bolt with disabling flag set
+ Map conf = new HashMap<>();
+ conf.put(PiiBolt.PII_REDACTER_CLASS_PARAM, MOCK_REDACTER_CLASS);
+ conf.put(PiiBolt.PII_ENABLE_FIELD, false);
+ bolt.prepare(conf, context, collector);
+
+ String url = "http://example.com";
+ byte[] contentBytes = "secret".getBytes(StandardCharsets.UTF_8);
+ Metadata md = new Metadata();
+ Tuple input = mock(Tuple.class);
+ when(input.getStringByField("url")).thenReturn(url);
+ when(input.getBinaryByField("content")).thenReturn(contentBytes);
+ when(input.getValueByField("metadata")).thenReturn(md);
+ when(input.getStringByField("text")).thenReturn("irrelevant");
+
+ bolt.execute(input);
+
+ // Should emit original bytes unchanged
+ ArgumentCaptor valuesCaptor = ArgumentCaptor.forClass(Values.class);
+ verify(collector).emit(eq(input), valuesCaptor.capture());
+ Values emitted = valuesCaptor.getValue();
+ assertArrayEquals(contentBytes, (byte[]) emitted.get(1));
+
+ // No pii.processed flag added
+ Metadata outMd = (Metadata) emitted.get(2);
+ assertNull(outMd.getFirstValue("pii.processed"));
+
+ verify(collector).ack(input);
+ }
+
+
+ @Test
+ void testRedactionEmptyText() {
+
+ Map conf = new HashMap<>();
+ conf.put(PiiBolt.PII_REDACTER_CLASS_PARAM, MOCK_REDACTER_CLASS);
+ conf.put(PiiBolt.PII_ENABLE_FIELD, true);
+ bolt.prepare(conf, context, collector);
+
+ // Input tuple
+ String url = "http://example.com";
+ String html = "this is a secret page";
+ byte[] contentBytes = html.getBytes(StandardCharsets.UTF_8);
+ Metadata md = new Metadata();
+ md.addValue("some", "value");
+ Tuple input = mock(Tuple.class);
+ when(input.getStringByField("url")).thenReturn(url);
+ when(input.getBinaryByField("content")).thenReturn(contentBytes);
+ when(input.getValueByField("metadata")).thenReturn(md);
+ when(input.getStringByField("text")).thenReturn(null);
+
+
+ // Execute bolt
+ bolt.execute(input);
+
+ // Capture emitted tuple
+ ArgumentCaptor valuesCaptor = ArgumentCaptor.forClass(Values.class);
+ verify(collector).emit(eq(input), valuesCaptor.capture());
+ Values emitted = valuesCaptor.getValue();
+
+ // Verify URL unchanged
+ assertEquals(url, emitted.get(0));
+
+ // Verify HTML binary content has been replaced by dummy content
+ byte[] emittedBytes = (byte[]) emitted.get(1);
+ String emittedContent = new String(emittedBytes, StandardCharsets.UTF_8);
+ assertEquals(PiiBolt.REDACTED_HTML, emittedContent);
+
+ String emittedText = (String)emitted.get(3);
+
+ // Verify text is empty string
+ assertEquals("", emittedText);
+
+ // Verify metadata flag
+ Metadata outMd = (Metadata) emitted.get(2);
+ assertEquals("false", outMd.getFirstValue("pii.processed"));
+
+ // Verify ack
+ verify(collector).ack(input);
+ }
+}
diff --git a/external/presidio/README.md b/external/presidio/README.md
new file mode 100644
index 000000000..525a62161
--- /dev/null
+++ b/external/presidio/README.md
@@ -0,0 +1,42 @@
+
+## PII Redaction with Microsoft Presidio
+
+StormCrawler provides a bolt (`PiiBolt`) that can redact personally‑identifiable information (PII) from page content before it reaches the indexing or persistence stages.
+The actual redaction work is delegated to a **PresidioRedacter** implementation that talks to Microsoft Presidio’s *analyzer* and *anonymizer* services.
+
+### How it works
+1. `PiiBolt` reads the page text (field **text**) and optional language metadata.
+2. It creates an instance of the class configured via `pii.redacter.class` (default is `org.apache.stormcrawler.pii.PresidioRedacter`).
+3. The redacter:
+ * Sends the text to the **Presidio Analyzer** (`presidio.analyzer.endpoint`) – optionally with a language code.
+ * Sends the analyzer results to the **Presidio Anonymizer** (`presidio.anonymizer.endpoint`).
+ * Returns the anonymised text, which `PiiBolt` stores in a dummy HTML payload (`REDACTED_BYTES`) while preserving the original metadata.
+
+### Required configuration
+
+| Property | Description | Example |
+|----------|-------------|---------|
+| `pii.redacter.class` | Fully‑qualified class name of the PII redacter. Must implement `PiiRedacter`. | `org.apache.stormcrawler.pii.PresidioRedacter` |
+| `pii.detect.language` | Set to `true` if you want `PiiBolt` to look for a language field in the metadata. | `true` |
+| `pii.language.field` | Name of the metadata field that contains the language code (e.g. `parse.lang`). | `parse.lang` |
+| `pii.removal.enable` | Enable/disable the whole redaction step. | `true` |
+| `presidio.analyzer.endpoint` | URL of the Presidio Analyzer service. | `https://my-presidio.example.com/analyze` |
+| `presidio.anonymizer.endpoint` | URL of the Presidio Anonymizer service. | `https://my-presidio.example.com/anonymize` |
+| `presidio.analyzer.entities` *(optional)* | Comma‑separated list of entity types to request from the analyzer. | `PERSON,EMAIL,PHONE_NUMBER` |
+| `presidio.supported.languages` *(optional)* | Comma‑separated list of ISO language codes supported by your Presidio deployment. If a language is not listed, the redacter falls back to the multi‑lingual model (`xx`). | `en,fr,de,es` |
+
+### Minimal topology example
+
+```java
+// add the bolt to your topology
+builder.setBolt("piiRedactor", new PiiBolt())
+ .shuffleGrouping("fetcher");
+
+// make sure the required config entries are present
+Map conf = new HashMap<>();
+conf.put(PiiBolt.PII_REDACTER_CLASS_PARAM, "org.apache.stormcrawler.pii.PresidioRedacter");
+conf.put(PiiBolt.PII_ENABLE_FIELD, true);
+conf.put("presidio.analyzer.endpoint", "https://presidio.mycorp.com/analyze");
+conf.put("presidio.anonymizer.endpoint", "https://presidio.mycorp.com/anonymize");
+// optional: language field
+conf.put("pii.language.field", "parse.lang");
diff --git a/external/presidio/pom.xml b/external/presidio/pom.xml
new file mode 100644
index 000000000..decf1e926
--- /dev/null
+++ b/external/presidio/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.apache.stormcrawler
+ stormcrawler-external
+ 3.5.1-SNAPSHOT
+ ../pom.xml
+
+
+ presidio
+ jar
+
+ stormcrawler-presidio
+ https://github.com/apache/stormcrawler/tree/master/external/presidio
+ Interface with Microsoft Presisio for Data Protection and De-identification
+
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jdk8
+ ${jackson.version}
+
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito.version}
+ test
+
+
+
diff --git a/external/presidio/src/main/java/org/apache/stormcrawler/pii/PresidioRedacter.java b/external/presidio/src/main/java/org/apache/stormcrawler/pii/PresidioRedacter.java
new file mode 100644
index 000000000..df0222c9e
--- /dev/null
+++ b/external/presidio/src/main/java/org/apache/stormcrawler/pii/PresidioRedacter.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.pii;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.LoggerFactory;
+
+import org.apache.stormcrawler.util.ConfUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+/**
+ * PII Redacter implementation for Microsoft Presidio
+ */
+public class PresidioRedacter implements PiiRedacter {
+
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(PresidioRedacter.class);
+
+ private static final String PRESIDIO_ANONYMIZER_ENDPOINT = "presidio.anonymizer.endpoint";
+
+ private static final String PRESIDIO_ANALYZER_ENDPOINT = "presidio.analyzer.endpoint";
+
+ private static final String PRESIDIO_ANALYZER_ENTITIES = "presidio.analyzer.entities";
+
+ private static final String PRESIDIO_SUPPORTED_LANGUAGES = "presidio.supported.languages";
+
+ private String analyzerEndpoint = "https://your-presidio-endpoint/analyze";
+
+ private String anonymizerEndpoint = "https://your-presidio-endpoint/anonymize"; ;
+
+ private List analyzerEntities = null;
+
+ private List supportedLanguages = Arrays.asList("en", "fr", "de", "xx");
+
+ private OkHttpClient httpClient = new OkHttpClient();
+
+ public static final MediaType JSON = MediaType.get("application/json");
+
+ public PresidioRedacter() {
+ LOG.info("Created PresidioRedactor instance");
+ }
+
+ // Dependency with Jdk8Module should be removed when Storm dependency will use a version > 2.20
+ private ObjectMapper objectMapper = new ObjectMapper().registerModule(new Jdk8Module());
+
+ static class AnalyzerPayload {
+
+ public String text;
+ public String language;
+ public Optional> entities;
+
+ AnalyzerPayload(String input, String lang, Optional> entities) {
+ this.text = input;
+ this.language = lang;
+ this.entities = entities;
+ }
+ }
+
+ static class AnonymizerPayload {
+
+ public String text;
+ public JsonNode analyzer_results;
+
+ AnonymizerPayload(String input, JsonNode results) {
+ this.text = input;
+ this.analyzer_results = results;
+ }
+ }
+
+ public PresidioRedacter(String analyzerEndpoint, String anonymizerEndpoint) {
+ this.analyzerEndpoint = analyzerEndpoint;
+ this.anonymizerEndpoint = anonymizerEndpoint;
+
+ LOG.info("Created PresidioRedactor instance with:");
+ LOG.info("Analyzer endpoint: {}", analyzerEndpoint);
+ LOG.info("Anonymizer endpoint: {}", anonymizerEndpoint);
+ }
+
+ protected PresidioRedacter(String analyzerEndpoint, String anonymizerEndpoint, OkHttpClient client) {
+ this(analyzerEndpoint, anonymizerEndpoint);
+ this.httpClient = client;
+ }
+
+ /**
+ * Retrieve the endpoints from the topology configuration
+ */
+ @Override
+ public void init(Map topologyConf) throws MalformedURLException {
+ LOG.info("Initializating PresidioRedacter...");
+
+ this.analyzerEndpoint = ConfUtils.getString(topologyConf, PRESIDIO_ANALYZER_ENDPOINT);
+ this.anonymizerEndpoint = ConfUtils.getString(topologyConf, PRESIDIO_ANONYMIZER_ENDPOINT);
+
+ String entitiesString = ConfUtils.getString(topologyConf, PRESIDIO_ANALYZER_ENTITIES);
+
+ if (entitiesString != null && StringUtils.isNotBlank(entitiesString)) {
+ List entities = Arrays.asList(entitiesString.split(",", -1));
+ if (!entities.isEmpty()) {
+ this.analyzerEntities = entities;
+ LOG.info("Analyzer entities: {}", entitiesString);
+ }
+ }
+
+ String supLangs = ConfUtils.getString(topologyConf, PRESIDIO_SUPPORTED_LANGUAGES);
+ if (supLangs != null && StringUtils.isNotBlank(supLangs)) {
+ List langs = Arrays.asList(supLangs.split(",", -1));
+ if (!langs.isEmpty()) {
+ this.supportedLanguages = langs;
+ }
+ }
+ LOG.info("Analyzer supported languages: {}", this.supportedLanguages);
+
+
+ if (StringUtils.isBlank(analyzerEndpoint) || StringUtils.isBlank(anonymizerEndpoint)) {
+ String msg = "Presidio Analyzer and anonymizer endpoints can not be null or empty !";
+ LOG.error(msg);
+ throw new MalformedURLException(msg);
+ }
+
+
+ LOG.info("Analyzer endpoint: {}", analyzerEndpoint);
+ LOG.info("Anonymizer endpoint: {}", anonymizerEndpoint);
+
+ }
+
+ /**
+ * Calls the analyzer with the multi lingual model
+ * and then the anonymizer
+ */
+ @Override
+ public String redact(String input) {
+ String analyzerResult = analyze(input, "xx");
+ String anonymizerResult = null;
+
+ if (analyzerResult != null) {
+ anonymizerResult = anonymize(input, analyzerResult);
+ }
+
+ return anonymizerResult;
+ }
+
+ /**
+ * Calls the analyzer in a specific language
+ * and then the anonymizer
+ */
+ @Override
+ public String redact(String input, String language) {
+ String analyzerResult = null;
+ String anonymizerResult = null;
+
+ if (supportedLanguages.contains(language)) {
+ analyzerResult = analyze(input, language);
+ } else {
+ LOG.warn("Language {} not supported by PresidioRedactor. Falling back to multi-lingual model (xx)", language);
+ analyzerResult = analyze(input, "xx");
+ }
+
+ if (analyzerResult != null) {
+ anonymizerResult = anonymize(input, analyzerResult);
+ }
+
+ return anonymizerResult;
+ }
+
+ public List getSupportedLanguages() {
+ return supportedLanguages;
+ }
+
+ /**
+ * Calls the Presidio analyzer endpoint
+ * @param input
+ * @param language
+ * @return
+ */
+ protected String analyze(String input, String language) {
+ String analyzerResult = null;
+
+ LOG.info("Calling presidio for analyzing text in language {}:", language);
+
+ AnalyzerPayload objPayload = new AnalyzerPayload(input, language, Optional.ofNullable(analyzerEntities));
+ String payload;
+ try {
+ payload = objectMapper.writeValueAsString(objPayload);
+ } catch (JsonProcessingException e) {
+ LOG.error(e.getMessage(), e);
+ LOG.error("Input was:");
+ LOG.error(input);
+ return null;
+ }
+
+ RequestBody body = RequestBody.create(payload, JSON);
+ Request request = new Request.Builder()
+ .url(analyzerEndpoint)
+ .post(body)
+ .build();
+
+ Instant start = Instant.now();
+
+ try (Response response = httpClient.newCall(request).execute()) {
+ if (response.isSuccessful()) {
+ analyzerResult = response.body().string();
+ } else {
+ LOG.error("Response not successfull: {}", response.code());
+ String errbody = response.body().string();
+ LOG.error(errbody);
+ LOG.error("Input was:");
+ LOG.error(input);
+ }
+ } catch (IOException e) {
+ LOG.error("Error calling Presidio analyzer:");
+ LOG.error(e.getMessage(), e);
+ }
+
+ Instant finish = Instant.now();
+ long elapsed = Duration.between(start, finish).toMillis();
+ LOG.info("Analyzer request took {} ms", elapsed);
+
+ return analyzerResult;
+ }
+
+ /**
+ * Calls the Presidio anonymizer endpoint
+ * @param input
+ * @param analyzerResults
+ * @return
+ */
+ protected String anonymize(String input, String analyzerResults) {
+ String anonymizedText = null;
+
+ LOG.info("Calling presidio anonymizer");
+
+ String payload;
+ try {
+ JsonNode tree = objectMapper.readTree(analyzerResults);
+ AnonymizerPayload objPayload = new AnonymizerPayload(input, tree);
+ payload = objectMapper.writeValueAsString(objPayload);
+ } catch (JsonProcessingException e) {
+ LOG.error(e.getMessage(), e);
+ LOG.error("Analyzer results was:");
+ LOG.error(analyzerResults);
+ return null;
+ }
+
+ RequestBody body = RequestBody.create(payload, JSON);
+ Request request = new Request.Builder()
+ .url(anonymizerEndpoint)
+ .post(body)
+ .build();
+
+ Instant start = Instant.now();
+
+ String responseBody = null;
+ try (Response response = httpClient.newCall(request).execute()) {
+ if (response.isSuccessful()) {
+ responseBody = response.body().string();
+ } else {
+ LOG.error("Response not successfull: {}", response.code());
+ String errbody = response.body().string();
+ LOG.error(errbody);
+ }
+ } catch (IOException e) {
+ LOG.error("Error calling Presidio analyzer:");
+ LOG.error(e.getMessage(), e);
+ }
+
+ Instant finish = Instant.now();
+ long elapsed = Duration.between(start, finish).toMillis();
+ LOG.info("Anonymizer request took {} ms", elapsed);
+
+ if (Objects.nonNull(responseBody)) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ JsonNode root = mapper.readTree(responseBody);
+ anonymizedText = root.path("text").asText();
+ } catch (JsonProcessingException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ return anonymizedText;
+ }
+}
diff --git a/external/presidio/src/test/java/org/apache/stormcrawler/pii/PresidioRedacterTest.java b/external/presidio/src/test/java/org/apache/stormcrawler/pii/PresidioRedacterTest.java
new file mode 100644
index 000000000..105689cc7
--- /dev/null
+++ b/external/presidio/src/test/java/org/apache/stormcrawler/pii/PresidioRedacterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.pii;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Request.Builder;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+@ExtendWith(MockitoExtension.class)
+class PresidioRedacterTest extends PresidioRedacter {
+
+ @Mock
+ private OkHttpClient mockHttpClient;
+
+ @Mock
+ private Call mockCall;
+
+ private PresidioRedacter fakeRedacter;
+
+ private String analyzerEndpoint = "http://localhost:5002/analyze";
+
+ private String anonymizerEndpoint = "http://localhost:5001/anonymize";
+
+ private static String input = "My name is John Doe (SSN: 489-36-8350) and I live in New York, US.";
+
+ private static String analyzerResult = "[{\"analysis_explanation\": null, \"end\": 24, \"entity_type\": \"PERSON\", \"recognition_metadata\": {\"recognizer_identifier\": \"SpacyRecognizer_140697834419632\", \"recognizer_name\": \"SpacyRecognizer\"}, \"score\": 0.85, \"start\": 11}, {\"analysis_explanation\": null, \"end\": 29, \"entity_type\": \"ORGANIZATION\", \"recognition_metadata\": {\"recognizer_identifier\": \"SpacyRecognizer_140697834419632\", \"recognizer_name\": \"SpacyRecognizer\"}, \"score\": 0.85, \"start\": 26}, {\"analysis_explanation\": null, \"end\": 42, \"entity_type\": \"US_SSN\", \"recognition_metadata\": {\"recognizer_identifier\": \"UsSsnRecognizer_140698153812224\", \"recognizer_name\": \"UsSsnRecognizer\"}, \"score\": 0.85, \"start\": 31}, {\"analysis_explanation\": null, \"end\": 66, \"entity_type\": \"LOCATION\", \"recognition_metadata\": {\"recognizer_identifier\": \"SpacyRecognizer_140697834419632\", \"recognizer_name\": \"SpacyRecognizer\"}, \"score\": 0.85, \"start\": 58}, {\"analysis_explanation\": null, \"end\": 70, \"entity_type\": \"LOCATION\", \"recognition_metadata\": {\"recognizer_identifier\": \"SpacyRecognizer_140697834419632\", \"recognizer_name\": \"SpacyRecognizer\"}, \"score\": 0.85, \"start\": 68}]\r\n"
+ + "";
+
+ private static String expected = "My name is (: ) and I live in , .";
+
+ @BeforeEach
+ void setUp() {
+
+ fakeRedacter = new PresidioRedacter(analyzerEndpoint, anonymizerEndpoint, mockHttpClient);
+ }
+
+ @Test
+ void testInitOk() {
+ PresidioRedacter redacter = new PresidioRedacter();
+
+ Map conf = new HashMap<>();
+ conf.put("presidio.analyzer.endpoint", "http://analyzer.org:10000");
+ conf.put("presidio.anonymizer.endpoint", "http://anonymizer.endpoint:20000");
+ Assertions.assertDoesNotThrow(() -> redacter.init(conf));
+ }
+
+ @Test
+ void testInitfail() {
+ PresidioRedacter redacter = new PresidioRedacter();
+
+ Map conf = new HashMap<>();
+ conf.put("presidio.analyzer.endpoint", "http://analyzer.org:10000");
+ Assertions.assertThrows(MalformedURLException.class, () -> redacter.init(conf));
+ }
+
+ @Test
+ void testMockedAnalyzer() throws Exception {
+ String language = "en";
+
+ Request.Builder reqBuilder = new Builder();
+ Request req1 = reqBuilder
+ .url(analyzerEndpoint)
+ .build();
+
+ ResponseBody respBody = ResponseBody.create(JSON, analyzerResult);
+
+ Response.Builder builder = new Response.Builder();
+ Response response = builder
+ .request(req1)
+ .body(respBody)
+ .protocol(Protocol.HTTP_1_0)
+ .message("Fake response")
+ .code(200)
+ .build();
+
+
+ when(mockHttpClient.newCall(any())).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(response);
+
+ String result = fakeRedacter.analyze(input, language);
+ assertNotNull(result);
+ assertEquals(analyzerResult, result);
+ }
+
+ @Test
+ void testMockedAnonymizer() throws Exception {
+ String expectedBody = "{\"text\": \"My name is (: ) and I live in , .\", \"items\": [{\"start\": 73, \"end\": 83, \"entity_type\": \"LOCATION\", \"text\": \"\", \"operator\": \"replace\"}, {\"start\": 61, \"end\": 71, \"entity_type\": \"LOCATION\", \"text\": \"\", \"operator\": \"replace\"}, {\"start\": 37, \"end\": 45, \"entity_type\": \"US_SSN\", \"text\": \"\", \"operator\": \"replace\"}, {\"start\": 21, \"end\": 35, \"entity_type\": \"ORGANIZATION\", \"text\": \"\", \"operator\": \"replace\"}, {\"start\": 11, \"end\": 19, \"entity_type\": \"PERSON\", \"text\": \"\", \"operator\": \"replace\"}]}";
+
+ Request.Builder reqBuilder = new Builder();
+ Request req1 = reqBuilder
+ .url(anonymizerEndpoint)
+ .build();
+
+ ResponseBody respBody = ResponseBody.create(JSON, expectedBody);
+
+ Response.Builder builder = new Response.Builder();
+ Response response = builder
+ .request(req1)
+ .body(respBody)
+ .protocol(Protocol.HTTP_1_0)
+ .message("Fake response")
+ .code(200)
+ .build();
+
+
+ when(mockHttpClient.newCall(any())).thenReturn(mockCall);
+ when(mockCall.execute()).thenReturn(response);
+
+ String result = fakeRedacter.anonymize(input, analyzerResult);
+ assertNotNull(result);
+ assertEquals(expected, result);
+ }
+
+ @Test
+ @Disabled("For local tests only")
+ void testLocalAnalyzer() {
+
+ PresidioRedacter redacter = new PresidioRedacter(analyzerEndpoint, anonymizerEndpoint);
+ String result = redacter.analyze(input, "en");
+ assertNotNull(result);
+ assertTrue(result.contains("analysis_explanation"));
+ }
+
+ @Test
+ @Disabled("For local tests only")
+ void testLocalAnonymizer() {
+
+ PresidioRedacter redacter = new PresidioRedacter(analyzerEndpoint, anonymizerEndpoint);
+ String result = redacter.analyze(input, "en");
+ String redacted = redacter.anonymize(input, result);
+
+ assertNotNull(redacted);
+ assertEquals(expected, redacted);
+ }
+
+ @Test
+ @Disabled("For local tests only")
+ void testLocalRedact() {
+ PresidioRedacter redacter = new PresidioRedacter(analyzerEndpoint, anonymizerEndpoint);
+
+ String redacted = redacter.redact(input, "en");
+ assertNotNull(redacted);
+ assertEquals(expected, redacted);
+ }
+
+ @Test
+ void testMockedAnonymizerWithWrongJson() throws Exception {
+ // Create a bad object that ObjectMapper might fail to serialize as JSON
+ // string
+ String badObject = "{ \"key\": { \"value\": [1, 2, 3, \"nestedKey\": { \"nestedValue\": null } } }";
+
+ // Ensure the method returs null
+ String result = fakeRedacter.anonymize(input, badObject);
+ assertNull(result);
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d50091163..6eb79c0fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -692,6 +692,7 @@ under the License.
archetype
external/opensearch/archetype
external/solr/archetype
+ external/presidio