Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions THIRD-PARTY.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ List of third-party dependencies grouped by their license type.
* Jackson dataformat: CBOR (com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.2 - https://github.com/FasterXML/jackson-dataformats-binary)
* Jackson dataformat: Smile (com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.2 - https://github.com/FasterXML/jackson-dataformats-binary)
* Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.2 - https://github.com/FasterXML/jackson-dataformats-text)
* Jackson datatype: jdk8 (com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.1 - https://github.com/FasterXML/jackson-modules-java8/jackson-datatype-jdk8)
* java-libpst (com.pff:java-libpst:0.9.3 - https://github.com/rjohnsondev/java-libpst)
* JCL 1.2 implemented over SLF4J (org.slf4j:jcl-over-slf4j:2.0.17 - http://www.slf4j.org)
* JetBrains Java Annotations (org.jetbrains:annotations:26.0.2-1 - https://github.com/JetBrains/java-annotations)
Expand Down
195 changes: 195 additions & 0 deletions core/src/main/java/org/apache/stormcrawler/pii/PiiBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.stormcrawler.pii;

import org.apache.commons.lang3.StringUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.InitialisationUtil;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* StormCrawler bolt that performs PII redaction on the content of web pages
* before they are passed to the indexing or persistence bolt.<br>
* If enabled, the HTML content will be overwritten with a dummy HTML page (containing just "REDACTED")<br><br>
* <b>pii.redacter.class</b> is the name of the class implementing the PiiInterface interface (e.g. org.apache.stormcrawler.pii.PresidioRedacter)<br>
* <b>pii.language.field</b>, if set, allows to set the name of a Metadata field that contains the language to be passed to the PII redacter instance
*
*/
@SuppressWarnings("serial")
public class PiiBolt extends BaseRichBolt {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(PiiBolt.class);

/*
* Name of config field defining the PII Redacter class
* (This class must implement the PiiRedacter interface
*/
public static final String PII_REDACTER_CLASS_PARAM = "pii.redacter.class";

/*
* Name of the field for configurating language detection
*/
public static final String PII_DETECT_LANGUAGE_PARAM = "pii.detect.language";

/*
* Name of the field for defining Metadata field containing language
*/
public static final String PII_LANGUAGE_FIELD = "pii.language.field";

/*
* Name of the field for disabling PII removal
*/
public static final String PII_ENABLE_FIELD = "pii.removal.enable";

private static final String FIELD_URL = "url";
private static final String FIELD_CONTENT = "content";
private static final String FIELD_METADATA = "metadata";
private static final String FIELD_TEXT = "text";


// Default value for language metadata field
private String languageFieldName = "parse.lang";

protected OutputCollector collector;

protected PiiRedacter piiRedacter;

private boolean piiEnabled = false;

public static final String REDACTED_HTML = "<!DOCTYPE html><html lang='en'><head><meta charset='UTF-8'><title>REDACTED</title></head><body>REDACTED</body></html>";

public static final byte[] REDACTED_BYTES = REDACTED_HTML.getBytes(StandardCharsets.UTF_8);

/**
* Returns a Scheduler instance based on the configuration *
*/
public static PiiRedacter getInstance(Map<String, Object> stormConf) {
PiiRedacter redacter;

String className = ConfUtils.getString(stormConf, PII_REDACTER_CLASS_PARAM);
if (className == null || className.isEmpty()) {
throw new RuntimeException("PiiRedacter class name must be defined in the configuration (pii.redacter.class)");
}

LOG.info("Loading PII Redacter class, name={}", className);
try {
redacter = InitialisationUtil.initializeFromQualifiedName(className, PiiRedacter.class);
} catch (Exception e) {
throw new RuntimeException("Can't instantiate " + className, e);
}

LOG.info("Initializing PII Redacter instance");
try {
redacter.init(stormConf);
} catch (Exception e) {
LOG.error("Error while initializing PII Redacter", e);
}

return redacter;
}

public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
// Uncomment if extending StatusEmitterBolt
//super.prepare(topoConf, context, collector);

this.collector = collector;

this.piiRedacter = getInstance(topoConf);
LOG.info("Initialized PiiRedacter instance");

// Get language metadata field name
String confLanguageField = ConfUtils.getString(topoConf, "pii.language.field");
if (confLanguageField != null && !confLanguageField.isEmpty()) {
languageFieldName = confLanguageField;
}
LOG.info("PII language field: {}", languageFieldName);

piiEnabled = ConfUtils.getBoolean(topoConf, PII_ENABLE_FIELD, false);
LOG.info("PII enabled: {}", piiEnabled);

}

@Override
public void execute(Tuple input) {

if (!piiEnabled) {
this.collector.emit(input, input.getValues());
this.collector.ack(input);
return;
}

String url = input.getStringByField(FIELD_URL);
LOG.info("Processing URL for PII redaction: {}", url);

Metadata metadata = (Metadata) input.getValueByField(FIELD_METADATA);
String text = input.getStringByField(FIELD_TEXT);
byte[] originalBytes = input.getBinaryByField(FIELD_CONTENT);

if (StringUtils.isBlank(text)) {
LOG.info("No text to process for URL: {}", url);
metadata.addValue("pii.processed", "false");
// Force the binary content to a dummy content
emitTuple(input, url, REDACTED_BYTES, metadata, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if we should default to a redacted html here, if the original content was empty. Why not just return (similar to pii is disabled). Any reason for this?

this.collector.ack(input);
return;
}

try {
String language = metadata.getFirstValue(languageFieldName);
String redacted = (language != null) ?
piiRedacter.redact(text, language) :
piiRedacter.redact(text);

if (redacted == null) {
throw new Exception("PII Redacter returned null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Shouldn't we fallback or just default to something instead of raising a hard exception here; triggering a re-try in the topology?

}

metadata.addValue("pii.processed", "true");

// Force the binary content to a dummy content
emitTuple(input, url, REDACTED_BYTES, metadata, redacted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we force the binary content to dummy content here, if we have redacted? Shouldn't we set it to redacted.getBytes(...) ?

} catch (Exception e) {
LOG.error("Error during PII redaction for URL {}: {}", url, e.getMessage());
metadata.addValue("pii.error", e.getMessage());

// How to handle the content in case of error ?
emitTuple(input, url, originalBytes, metadata, text);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail soft imho. So just return the original data + the added metadata?

}

this.collector.ack(input);
}

private void emitTuple(Tuple input, String url, byte[] content, Metadata metadata, String text) {
this.collector.emit(input, new Values(url, content, metadata, text));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELD_URL, FIELD_CONTENT, FIELD_METADATA, FIELD_TEXT));
}
}
44 changes: 44 additions & 0 deletions core/src/main/java/org/apache/stormcrawler/pii/PiiRedacter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.stormcrawler.pii;

import java.util.Map;

/**
* An interface for bolts implementing PII redaction
*/
public interface PiiRedacter {
void init(Map<String, Object> topologyConf) throws Exception;

/**
* Redacts PII from the input string using default language settings
* (e.g. no language or a default language configured at initialization)
*
* @param input the input string possibly containing PII
* @return the input string with PII redacted
*/
String redact(String input);

/**
* Redacts PII from the input string using the specified language
* @param input the input string possibly containing PII
* @param language the language to use for PII redaction
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing

*/
String redact(String input, String language);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.stormcrawler.pii;

import java.util.Map;

/**
* Mock PII Redacter implementation for testing purposes.
* This class simulates redaction by replacing occurrences of the word"secret"
* with "*****".
*/

public class MockPiiRedacter implements PiiRedacter {

@Override public void init(Map<String, Object> conf) {}

@Override public String redact(String content) {
return redact(content, null);
}

@Override public String redact(String content, String language) {
// simple redaction logic for the test
return content.replaceAll("secret", "*****");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just be replace, no regex needed.

}
}
Loading
Loading