diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefParser.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefParser.java new file mode 100644 index 0000000000000..4e5022c1abe9f --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefParser.java @@ -0,0 +1,301 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.common; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.Matcher; + +import org.elasticsearch.ingest.IngestDocument; + +final class CefParser { + + private final IngestDocument ingestDocument; + private final boolean removeEmptyValue; + + CefParser(IngestDocument ingestDocument, boolean removeEmptyValue) { + this.ingestDocument = ingestDocument; + this.removeEmptyValue = removeEmptyValue; + } + + // Existing patterns... + private static final Pattern HEADER_PATTERN = Pattern.compile("(?:\\\\\\||\\\\\\\\|[^|])*?"); + private static final Pattern HEADER_NEXT_FIELD_PATTERN = Pattern.compile("(" + HEADER_PATTERN.pattern() + ")\\|"); + private static final Pattern HEADER_ESCAPE_CAPTURE = Pattern.compile("\\\\([\\\\|])"); + + // New patterns for extension parsing + private static final String EXTENSION_KEY_PATTERN = "(?:\\w+(?:\\.[^\\.=\\s\\|\\\\\\[\\]]+)*(?:\\[[0-9]+\\])?(?==))"; + private static final Pattern EXTENSION_KEY_ARRAY_CAPTURE = Pattern.compile("^([^\\[\\]]+)((?:\\[[0-9]+\\])+)$"); + private static final String EXTENSION_VALUE_PATTERN = "(?:\\S|\\s(?!" + EXTENSION_KEY_PATTERN + "=))*"; + private static final Pattern EXTENSION_NEXT_KEY_VALUE_PATTERN = Pattern + .compile("(" + EXTENSION_KEY_PATTERN + ")=(" + EXTENSION_VALUE_PATTERN + ")(?:\\s+|$)"); + + private static final Map HEADER_FIELD_SANITIZER_MAPPING = new HashMap<>(); + private static final Map EXTENSION_VALUE_SANITIZER_MAPPING = new HashMap<>(); + private static final Map EXTENSION_VALUE_SANITIZER_REVERSE_MAPPING = new HashMap<>(); + + private static final Map FIELD_MAPPING = new HashMap<>(); + private static final Map DECODE_MAPPING = new HashMap<>(); + + static { + HEADER_FIELD_SANITIZER_MAPPING.put("\\", "\\\\"); + HEADER_FIELD_SANITIZER_MAPPING.put("|", "\\|"); + HEADER_FIELD_SANITIZER_MAPPING.put("\n", " "); + HEADER_FIELD_SANITIZER_MAPPING.put("\r", " "); + + EXTENSION_VALUE_SANITIZER_MAPPING.put("\\", "\\\\"); + EXTENSION_VALUE_SANITIZER_MAPPING.put("=", "\\="); + EXTENSION_VALUE_SANITIZER_MAPPING.put("\n", "\\n"); + EXTENSION_VALUE_SANITIZER_MAPPING.put("\r", "\\n"); + + EXTENSION_VALUE_SANITIZER_REVERSE_MAPPING.put("\\\\", "\\"); + EXTENSION_VALUE_SANITIZER_REVERSE_MAPPING.put("\\=", "="); + EXTENSION_VALUE_SANITIZER_REVERSE_MAPPING.put("\\n", "\n"); + EXTENSION_VALUE_SANITIZER_REVERSE_MAPPING.put("\\r", "\n"); + + FIELD_MAPPING.put("src", "source.ip"); + FIELD_MAPPING.put("spt", "source.port"); + FIELD_MAPPING.put("dst", "destination.ip"); + FIELD_MAPPING.put("dpt", "destination.port"); + FIELD_MAPPING.put("suser", "source.user.name"); + FIELD_MAPPING.put("duser", "destination.user.name"); + // Add more mappings as needed + + // Initialize decode mappings + DECODE_MAPPING.put("src", "sourceAddress"); + DECODE_MAPPING.put("dst", "destinationAddress"); + DECODE_MAPPING.put("spt", "sourcePort"); + DECODE_MAPPING.put("dpt", "destinationPort"); + } + + void process(String cefString) { + List headerFields = new ArrayList<>(); + Matcher headerMatcher = HEADER_NEXT_FIELD_PATTERN.matcher(cefString); + int extensionStart = 0; + + for (int i = 0; i < 7 && headerMatcher.find(); i++) { + String field = headerMatcher.group(1); + field = HEADER_ESCAPE_CAPTURE.matcher(field).replaceAll("$1"); + headerFields.add(field); + extensionStart = headerMatcher.end(); + } + + if (headerFields.size() != 7 || !headerFields.get(0).startsWith("CEF:")) { + throw new IllegalArgumentException("Invalid CEF format"); + } + + CEFEvent event = new CEFEvent(); + event.setVersion(headerFields.get(0).substring(4)); + event.setDeviceVendor(headerFields.get(1)); + event.setDeviceProduct(headerFields.get(2)); + event.setDeviceVersion(headerFields.get(3)); + event.setDeviceEventClassId(headerFields.get(4)); + event.setName(headerFields.get(5)); + event.setSeverity(headerFields.get(6)); + + String extensionString = cefString.substring(extensionStart); + Map extensions = parseExtensions(extensionString); + + if (removeEmptyValue) { + extensions = removeEmptyValue(extensions); + } + + event.setExtensions(extensions); + + Map translatedFields = new HashMap<>(); + for (Map.Entry entry : extensions.entrySet()) { + String translatedKey = FIELD_MAPPING.getOrDefault(entry.getKey(), entry.getKey()); + translatedFields.put(translatedKey, entry.getValue()); + } + event.setTranslatedFields(translatedFields); + + ingestDocument.setFieldValue("cef", event.toObject()); + } + + private Map parseExtensions(String extensionString) { + Map extensions = new HashMap<>(); + Matcher matcher = EXTENSION_NEXT_KEY_VALUE_PATTERN.matcher(extensionString); + int lastEnd = 0; + while (matcher.find()) { + String key = matcher.group(1); + String value = matcher.group(2); + + // Expand abbreviated extension field keys + key = DECODE_MAPPING.getOrDefault(key, key); + + // Convert extension field name to strict legal field_reference + if (key.endsWith("]")) { + key = convertArrayLikeKey(key); + } + + extensions.put(key, desanitizeExtensionVal(value.trim())); + lastEnd = matcher.end(); + } + // If there's any remaining unparsed content, throw an exception + if (lastEnd < extensionString.length()) { + throw new IllegalArgumentException("Invalid extensions; keyless value present: " + + extensionString.substring(lastEnd)); + } + return extensions; + } + + private Map removeEmptyValue(Map map) { + map.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + return map; + } + + private String convertArrayLikeKey(String key) { + Matcher matcher = EXTENSION_KEY_ARRAY_CAPTURE.matcher(key); + if (matcher.matches()) { + return "[" + matcher.group(1) + "]" + matcher.group(2); + } + return key; + } + + public static String sanitizeExtensionKey(String value) { + return value.replaceAll("[^a-zA-Z0-9]", ""); + } + + public static String sanitizeExtensionVal(String value) { + String sanitized = value.replace("\r\n", "\n"); + for (Map.Entry entry : EXTENSION_VALUE_SANITIZER_MAPPING.entrySet()) { + sanitized = sanitized.replace(entry.getKey(), entry.getValue()); + } + return sanitized; + } + + public static String desanitizeExtensionVal(String value) { + String desanitized = value; + for (Map.Entry entry : EXTENSION_VALUE_SANITIZER_REVERSE_MAPPING.entrySet()) { + desanitized = desanitized.replace(entry.getKey(), entry.getValue()); + } + return desanitized; + } + + public static String sanitizeHeaderField(String field) { + StringBuilder result = new StringBuilder(); + for (char c : field.toCharArray()) { + String replacement = HEADER_FIELD_SANITIZER_MAPPING.get(String.valueOf(c)); + result.append(replacement != null ? replacement : c); + } + return result.toString(); + } + + public static String sanitizeExtensionValue(String value) { + StringBuilder result = new StringBuilder(); + for (char c : value.toCharArray()) { + String replacement = EXTENSION_VALUE_SANITIZER_MAPPING.get(String.valueOf(c)); + result.append(replacement != null ? replacement : c); + } + return result.toString(); + } + + public static class CEFEvent { + private String version; + private String deviceVendor; + private String deviceProduct; + private String deviceVersion; + private String deviceEventClassId; + private String name; + private String severity; + private Map extensions; + private Map translatedFields; + + // Getters and setters for all fields + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getDeviceVendor() { + return deviceVendor; + } + + public void setDeviceVendor(String deviceVendor) { + this.deviceVendor = deviceVendor; + } + + public String getDeviceProduct() { + return deviceProduct; + } + + public void setDeviceProduct(String deviceProduct) { + this.deviceProduct = deviceProduct; + } + + public String getDeviceVersion() { + return deviceVersion; + } + + public void setDeviceVersion(String deviceVersion) { + this.deviceVersion = deviceVersion; + } + + public String getDeviceEventClassId() { + return deviceEventClassId; + } + + public void setDeviceEventClassId(String deviceEventClassId) { + this.deviceEventClassId = deviceEventClassId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getSeverity() { + return severity; + } + + public void setSeverity(String severity) { + this.severity = severity; + } + + public Map getExtensions() { + return extensions; + } + + public void setExtensions(Map extensions) { + this.extensions = extensions; + } + + public Map getTranslatedFields() { + return translatedFields; + } + + public void setTranslatedFields(Map translatedFields) { + this.translatedFields = translatedFields; + } + + public Object toObject() { + Map event = new HashMap<>(); + event.put("version", version); + event.put("deviceVendor", deviceVendor); + event.put("deviceProduct", deviceProduct); + event.put("deviceVersion", deviceVersion); + event.put("deviceEventClassId", deviceEventClassId); + event.put("name", name); + event.put("severity", severity); + event.put("extensions", extensions); + event.put("translatedFields", translatedFields); + return event; + } + + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefProcessor.java new file mode 100644 index 0000000000000..6c4c7d01c3829 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefProcessor.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +import java.util.Map; + +/** + * A processor that breaks line from CSV file into separate fields. + * If there's more fields requested than there is in the CSV, extra field will + * not be present in the document after processing. + * In the same way this processor will skip any field that is empty in CSV. + *

+ * By default it uses rules according to + * RCF 4180 with one + * exception: whitespaces are + * allowed before or after quoted field. Processor can be tweaked with following + * parameters: + *

+ * quote: set custom quote character (defaults to ") + * separator: set custom separator (defaults to ,) + * trim: trim leading and trailing whitespaces in unquoted fields + * empty_value: sets custom value to use for empty fields (field is skipped if + * null) + */ +public final class CefProcessor extends AbstractProcessor { + + public static final String TYPE = "cef"; + + // visible for testing + final String field; + final boolean ignoreMissing; + final boolean removeEmptyValue; + + CefProcessor( + String tag, + String description, + String field, + String targetField, + boolean ignoreMissing, + boolean removeEmptyValue) { + super(tag, description); + this.field = field; + this.ignoreMissing = ignoreMissing; + this.removeEmptyValue = removeEmptyValue; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) { + String line = ingestDocument.getFieldValue(field, String.class, ignoreMissing); + if (line == null && ignoreMissing) { + return ingestDocument; + } else if (line == null) { + throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); + } + new CefParser(ingestDocument, removeEmptyValue).process(line); + return ingestDocument; + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements org.elasticsearch.ingest.Processor.Factory { + @Override + public CefProcessor create( + Map registry, + String processorTag, + String description, + Map config) { + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + + boolean removeEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, + "ignore_empty_value", true); + + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", + false); + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field"); + return new CefProcessor(processorTag, description, field, targetField, ignoreMissing, removeEmptyValue); + } + } +} \ No newline at end of file diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 9bf3c17e1ee63..5dc5fa19b8b91 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -36,65 +36,66 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { - public IngestCommonPlugin() {} + public IngestCommonPlugin() { + } @Override public Map getProcessors(Processor.Parameters parameters) { return Map.ofEntries( - entry(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService)), - entry(BytesProcessor.TYPE, new BytesProcessor.Factory()), - entry(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()), - entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()), - entry(CsvProcessor.TYPE, new CsvProcessor.Factory()), - entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)), - entry(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService)), - entry(DissectProcessor.TYPE, new DissectProcessor.Factory()), - entry(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()), - entry(DropProcessor.TYPE, new DropProcessor.Factory()), - entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)), - entry(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()), - entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)), - entry(GrokProcessor.TYPE, new GrokProcessor.Factory(parameters.matcherWatchdog)), - entry(GsubProcessor.TYPE, new GsubProcessor.Factory()), - entry(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory()), - entry(JoinProcessor.TYPE, new JoinProcessor.Factory()), - entry(JsonProcessor.TYPE, new JsonProcessor.Factory()), - entry(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService)), - entry(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory()), - entry(NetworkDirectionProcessor.TYPE, new NetworkDirectionProcessor.Factory(parameters.scriptService)), - entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)), - entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()), - entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)), - entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)), - entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()), - entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)), - entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)), - entry(SortProcessor.TYPE, new SortProcessor.Factory()), - entry(SplitProcessor.TYPE, new SplitProcessor.Factory()), - entry(TrimProcessor.TYPE, new TrimProcessor.Factory()), - entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()), - entry(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()), - entry(UriPartsProcessor.TYPE, new UriPartsProcessor.Factory()) - ); + entry(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService)), + entry(BytesProcessor.TYPE, new BytesProcessor.Factory()), + entry(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()), + entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()), + entry(CefProcessor.TYPE, new CefProcessor.Factory()), + entry(CsvProcessor.TYPE, new CsvProcessor.Factory()), + entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)), + entry(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService)), + entry(DissectProcessor.TYPE, new DissectProcessor.Factory()), + entry(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()), + entry(DropProcessor.TYPE, new DropProcessor.Factory()), + entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)), + entry(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()), + entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)), + entry(GrokProcessor.TYPE, new GrokProcessor.Factory(parameters.matcherWatchdog)), + entry(GsubProcessor.TYPE, new GsubProcessor.Factory()), + entry(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory()), + entry(JoinProcessor.TYPE, new JoinProcessor.Factory()), + entry(JsonProcessor.TYPE, new JsonProcessor.Factory()), + entry(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService)), + entry(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory()), + entry(NetworkDirectionProcessor.TYPE, new NetworkDirectionProcessor.Factory(parameters.scriptService)), + entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)), + entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()), + entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)), + entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)), + entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()), + entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)), + entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)), + entry(SortProcessor.TYPE, new SortProcessor.Factory()), + entry(SplitProcessor.TYPE, new SplitProcessor.Factory()), + entry(TrimProcessor.TYPE, new TrimProcessor.Factory()), + entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()), + entry(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()), + entry(UriPartsProcessor.TYPE, new UriPartsProcessor.Factory())); } @Override public List> getActions() { - return List.of(new ActionHandler<>(GrokProcessorGetAction.INSTANCE, GrokProcessorGetAction.TransportAction.class)); + return List + .of(new ActionHandler<>(GrokProcessorGetAction.INSTANCE, GrokProcessorGetAction.TransportAction.class)); } @Override public List getRestHandlers( - Settings settings, - NamedWriteableRegistry namedWriteableRegistry, - RestController restController, - ClusterSettings clusterSettings, - IndexScopedSettings indexScopedSettings, - SettingsFilter settingsFilter, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster, - Predicate clusterSupportsFeature - ) { + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature) { return List.of(new GrokProcessorGetAction.RestAction()); }