Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

import static java.util.Map.entry;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;

Expand Down Expand Up @@ -131,26 +131,26 @@ public boolean getIgnoreMissing() {
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String sourceIp = ingestDocument.getFieldValue(sourceIpField, String.class, ignoreMissing);
String destinationIp = ingestDocument.getFieldValue(destinationIpField, String.class, ignoreMissing);
Object ianaNumber = ingestDocument.getFieldValue(ianaNumberField, Object.class, true);
Supplier<Object> transport = () -> ingestDocument.getFieldValue(transportField, Object.class, ignoreMissing);
Supplier<Object> sourcePort = () -> ingestDocument.getFieldValue(sourcePortField, Object.class, ignoreMissing);
Supplier<Object> destinationPort = () -> ingestDocument.getFieldValue(destinationPortField, Object.class, ignoreMissing);
Object icmpType = ingestDocument.getFieldValue(icmpTypeField, Object.class, true);
Object icmpCode = ingestDocument.getFieldValue(icmpCodeField, Object.class, true);
public IngestDocument execute(IngestDocument document) throws Exception {
String sourceIp = document.getFieldValue(sourceIpField, String.class, ignoreMissing);
String destinationIp = document.getFieldValue(destinationIpField, String.class, ignoreMissing);
Object ianaNumber = document.getFieldValue(ianaNumberField, Object.class, true);
Supplier<Object> transport = () -> document.getFieldValue(transportField, Object.class, ignoreMissing);
Supplier<Object> sourcePort = () -> document.getFieldValue(sourcePortField, Object.class, ignoreMissing);
Supplier<Object> destinationPort = () -> document.getFieldValue(destinationPortField, Object.class, ignoreMissing);
Object icmpType = document.getFieldValue(icmpTypeField, Object.class, true);
Object icmpCode = document.getFieldValue(icmpCodeField, Object.class, true);
Flow flow = buildFlow(sourceIp, destinationIp, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode);
if (flow == null) {
if (ignoreMissing) {
return ingestDocument;
return document;
} else {
throw new IllegalArgumentException("unable to construct flow from document");
}
}

ingestDocument.setFieldValue(targetField, flow.toCommunityId(seed));
return ingestDocument;
document.setFieldValue(targetField, flow.toCommunityId(seed));
return document;
}

public static String apply(
Expand All @@ -164,7 +164,6 @@ public static String apply(
Object icmpCode,
int seed
) {

Flow flow = buildFlow(
sourceIpAddrString,
destIpAddrString,
Expand Down Expand Up @@ -256,6 +255,7 @@ public String getType() {
/**
* Converts an integer in the range of an unsigned 16-bit integer to a big-endian byte pair
*/
// visible for testing
static byte[] toUint16(int num) {
if (num < 0 || num > 65535) {
throw new IllegalStateException("number [" + num + "] must be a value between 0 and 65535");
Expand All @@ -266,7 +266,7 @@ static byte[] toUint16(int num) {
/**
* Attempts to coerce an object to an integer
*/
static int parseIntFromObjectOrString(Object o, String fieldName) {
private static int parseIntFromObjectOrString(Object o, String fieldName) {
if (o == null) {
return 0;
} else if (o instanceof Number number) {
Expand Down Expand Up @@ -296,28 +296,28 @@ public static final class Factory implements Processor.Factory {
@Override
public CommunityIdProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String tag,
String description,
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip", DEFAULT_SOURCE_IP);
String sourcePortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_port", DEFAULT_SOURCE_PORT);
String destIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip", DEFAULT_DEST_IP);
String destPortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_port", DEFAULT_DEST_PORT);
String ianaNumberField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "iana_number", DEFAULT_IANA_NUMBER);
String transportField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "transport", DEFAULT_TRANSPORT);
String icmpTypeField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "icmp_type", DEFAULT_ICMP_TYPE);
String icmpCodeField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "icmp_code", DEFAULT_ICMP_CODE);
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET);
int seedInt = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "seed", 0);
String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "source_ip", DEFAULT_SOURCE_IP);
String sourcePortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "source_port", DEFAULT_SOURCE_PORT);
String destIpField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "destination_ip", DEFAULT_DEST_IP);
String destPortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "destination_port", DEFAULT_DEST_PORT);
String ianaNumberField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "iana_number", DEFAULT_IANA_NUMBER);
String transportField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "transport", DEFAULT_TRANSPORT);
String icmpTypeField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "icmp_type", DEFAULT_ICMP_TYPE);
String icmpCodeField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "icmp_code", DEFAULT_ICMP_CODE);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field", DEFAULT_TARGET);
int seedInt = ConfigurationUtils.readIntProperty(TYPE, tag, config, "seed", 0);
if (seedInt < 0 || seedInt > 65535) {
throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535");
throw newConfigurationException(TYPE, tag, "seed", "must be a value between 0 and 65535");
}

boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true);
boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", true);
return new CommunityIdProcessor(
processorTag,
tag,
description,
sourceIpField,
sourcePortField,
Expand All @@ -335,9 +335,13 @@ public CommunityIdProcessor create(
}

/**
* Represents flow data per https://github.com/corelight/community-id-spec
* Represents flow data per the <a href="https://github.com/corelight/community-id-spec">Community ID</a> spec.
*/
public static final class Flow {
private static final class Flow {

private Flow() {
// this is only constructable from inside this file
}

private static final List<Transport.Type> TRANSPORTS_WITH_PORTS = List.of(
Transport.Type.Tcp,
Expand All @@ -357,7 +361,7 @@ public static final class Flow {

/**
* @return true iff the source address/port is numerically less than the destination address/port as described
* at https://github.com/corelight/community-id-spec
* in the <a href="https://github.com/corelight/community-id-spec">Community ID</a> spec.
*/
boolean isOrdered() {
int result = new BigInteger(1, source.getAddress()).compareTo(new BigInteger(1, destination.getAddress()));
Expand Down Expand Up @@ -401,7 +405,7 @@ String toCommunityId(byte[] seed) {
}
}

static class Transport {
static final class Transport {
public enum Type {
Unknown(-1),
Icmp(1),
Expand All @@ -417,22 +421,19 @@ public enum Type {

private final int transportNumber;

private static final Map<String, Type> TRANSPORT_NAMES;

static {
TRANSPORT_NAMES = new HashMap<>();
TRANSPORT_NAMES.put("icmp", Icmp);
TRANSPORT_NAMES.put("igmp", Igmp);
TRANSPORT_NAMES.put("tcp", Tcp);
TRANSPORT_NAMES.put("udp", Udp);
TRANSPORT_NAMES.put("gre", Gre);
TRANSPORT_NAMES.put("ipv6-icmp", IcmpIpV6);
TRANSPORT_NAMES.put("icmpv6", IcmpIpV6);
TRANSPORT_NAMES.put("eigrp", Eigrp);
TRANSPORT_NAMES.put("ospf", Ospf);
TRANSPORT_NAMES.put("pim", Pim);
TRANSPORT_NAMES.put("sctp", Sctp);
}
private static final Map<String, Type> TRANSPORT_NAMES = Map.ofEntries(
entry("icmp", Icmp),
entry("igmp", Igmp),
entry("tcp", Tcp),
entry("udp", Udp),
entry("gre", Gre),
entry("ipv6-icmp", IcmpIpV6),
entry("icmpv6", IcmpIpV6),
entry("eigrp", Eigrp),
entry("ospf", Ospf),
entry("pim", Pim),
entry("sctp", Sctp)
);

Type(int transportNumber) {
this.transportNumber = transportNumber;
Expand All @@ -443,15 +444,15 @@ public int getTransportNumber() {
}
}

private Type type;
private int transportNumber;
private final Type type;
private final int transportNumber;

Transport(int transportNumber, Type type) { // Change constructor to public
private Transport(int transportNumber, Type type) {
this.transportNumber = transportNumber;
this.type = type;
}

Transport(Type type) { // Change constructor to public
private Transport(Type type) {
this.transportNumber = type.getTransportNumber();
this.type = type;
}
Expand All @@ -464,7 +465,8 @@ public int getTransportNumber() {
return transportNumber;
}

public static Transport fromNumber(int transportNumber) {
// visible for testing
static Transport fromNumber(int transportNumber) {
if (transportNumber < 0 || transportNumber >= 255) {
// transport numbers range https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
throw new IllegalArgumentException("invalid transport protocol number [" + transportNumber + "]");
Expand All @@ -487,7 +489,7 @@ public static Transport fromNumber(int transportNumber) {
return new Transport(transportNumber, type);
}

public static Transport fromObject(Object o) {
private static Transport fromObject(Object o) {
if (o instanceof Number number) {
return fromNumber(number.intValue());
} else if (o instanceof String protocolStr) {
Expand Down Expand Up @@ -537,34 +539,31 @@ public enum IcmpType {
V6HomeAddressDiscoveryRequest(144),
V6HomeAddressDiscoveryResponse(145);

private static final Map<Integer, Integer> ICMP_V4_CODE_EQUIVALENTS;
private static final Map<Integer, Integer> ICMP_V6_CODE_EQUIVALENTS;

static {
ICMP_V4_CODE_EQUIVALENTS = new HashMap<>();
ICMP_V4_CODE_EQUIVALENTS.put(EchoRequest.getType(), EchoReply.getType());
ICMP_V4_CODE_EQUIVALENTS.put(EchoReply.getType(), EchoRequest.getType());
ICMP_V4_CODE_EQUIVALENTS.put(TimestampRequest.getType(), TimestampReply.getType());
ICMP_V4_CODE_EQUIVALENTS.put(TimestampReply.getType(), TimestampRequest.getType());
ICMP_V4_CODE_EQUIVALENTS.put(InfoRequest.getType(), InfoReply.getType());
ICMP_V4_CODE_EQUIVALENTS.put(RouterSolicitation.getType(), RouterAdvertisement.getType());
ICMP_V4_CODE_EQUIVALENTS.put(RouterAdvertisement.getType(), RouterSolicitation.getType());
ICMP_V4_CODE_EQUIVALENTS.put(AddressMaskRequest.getType(), AddressMaskReply.getType());
ICMP_V4_CODE_EQUIVALENTS.put(AddressMaskReply.getType(), AddressMaskRequest.getType());

ICMP_V6_CODE_EQUIVALENTS = new HashMap<>();
ICMP_V6_CODE_EQUIVALENTS.put(V6EchoRequest.getType(), V6EchoReply.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6EchoReply.getType(), V6EchoRequest.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6RouterSolicitation.getType(), V6RouterAdvertisement.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6RouterAdvertisement.getType(), V6RouterSolicitation.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6NeighborAdvertisement.getType(), V6NeighborSolicitation.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6NeighborSolicitation.getType(), V6NeighborAdvertisement.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6MLDv1MulticastListenerQueryMessage.getType(), V6MLDv1MulticastListenerReportMessage.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6WhoAreYouRequest.getType(), V6WhoAreYouReply.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6WhoAreYouReply.getType(), V6WhoAreYouRequest.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6HomeAddressDiscoveryRequest.getType(), V6HomeAddressDiscoveryResponse.getType());
ICMP_V6_CODE_EQUIVALENTS.put(V6HomeAddressDiscoveryResponse.getType(), V6HomeAddressDiscoveryRequest.getType());
}
private static final Map<Integer, Integer> ICMP_V4_CODE_EQUIVALENTS = Map.ofEntries(
entry(EchoRequest.getType(), EchoReply.getType()),
entry(EchoReply.getType(), EchoRequest.getType()),
entry(TimestampRequest.getType(), TimestampReply.getType()),
entry(TimestampReply.getType(), TimestampRequest.getType()),
entry(InfoRequest.getType(), InfoReply.getType()),
entry(RouterSolicitation.getType(), RouterAdvertisement.getType()),
entry(RouterAdvertisement.getType(), RouterSolicitation.getType()),
entry(AddressMaskRequest.getType(), AddressMaskReply.getType()),
entry(AddressMaskReply.getType(), AddressMaskRequest.getType())
);

private static final Map<Integer, Integer> ICMP_V6_CODE_EQUIVALENTS = Map.ofEntries(
entry(V6EchoRequest.getType(), V6EchoReply.getType()),
entry(V6EchoReply.getType(), V6EchoRequest.getType()),
entry(V6RouterSolicitation.getType(), V6RouterAdvertisement.getType()),
entry(V6RouterAdvertisement.getType(), V6RouterSolicitation.getType()),
entry(V6NeighborAdvertisement.getType(), V6NeighborSolicitation.getType()),
entry(V6NeighborSolicitation.getType(), V6NeighborAdvertisement.getType()),
entry(V6MLDv1MulticastListenerQueryMessage.getType(), V6MLDv1MulticastListenerReportMessage.getType()),
entry(V6WhoAreYouRequest.getType(), V6WhoAreYouReply.getType()),
entry(V6WhoAreYouReply.getType(), V6WhoAreYouRequest.getType()),
entry(V6HomeAddressDiscoveryRequest.getType(), V6HomeAddressDiscoveryResponse.getType()),
entry(V6HomeAddressDiscoveryResponse.getType(), V6HomeAddressDiscoveryRequest.getType())
);

private final int type;

Expand Down Expand Up @@ -606,7 +605,7 @@ public static IcmpType fromNumber(int type) {
};
}

public static Integer codeEquivalent(int icmpType, boolean isIpV6) {
private static Integer codeEquivalent(int icmpType, boolean isIpV6) {
return isIpV6 ? ICMP_V6_CODE_EQUIVALENTS.get(icmpType) : ICMP_V4_CODE_EQUIVALENTS.get(icmpType);
}
}
Expand Down
Loading