Skip to content

extract: conntrack: level=error msg=failed to initialize pipeline: ConnectionTrack config is invalid: selector key "protocol" in scheduling group 0 is not defined in the keys #1048

@Nachtfalkeaw

Description

@Nachtfalkeaw

Hi,

I am trying to ingest ipfix netflow, add some subnets, add some soervices, add geolaction and then write it to loki as JSON. This is working.
However the amount of single logs is huge in my environment and I though the aggregations functions in flp could help me to collect several flows and then at specific intervals or at the end of a connections aggregates these flows into a single log line or a few log lines and then sends it to loki.

So my goal was some of the aggregation is done in flp and only the aggregated logs will be sent to loki.

For that reason I looked into this:
https://github.com/netobserv/flowlogs-pipeline?tab=readme-ov-file#connection-tracking

However, even with the provided example config and several tries to modify this config I receive the erro from the last line with selector problem.
if I remove the specific UDP selector from the example and only use the default, at least the flp pipline is running.

My config:

log-level: debug

# flow pipeline. defines the order. what to do first and after.

pipeline:
  - name: ipfix_netflow_ingest
  - name: transform_add_subnet
    follows: ipfix_netflow_ingest
  - name: transform_tcp_flags
    follows: transform_add_subnet
  - name: transform_add_geolocation
    follows: transform_tcp_flags
  - name: transform_add_service
    follows: transform_add_geolocation
  - name: extract_conntrack
    follows: transform_add_service
  - name: loki_write
    follows: extract_conntrack



parameters:
# https://github.com/netobserv/flowlogs-pipeline/blob/main/docs/api.md#ingest-netflowipfix-api
#    hostName: the hostname to listen on; defaults to 0.0.0.0
#    port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion. If both port and portLegacy are omitted, defaults to 2055
#    portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
#    workers: the number of netflow/ipfix decoding workers
#    sockets: the number of listening sockets
#    mapping: custom field mapping

  - name: ipfix_netflow_ingest
    ingest:
      type: ipfix
      ipfix:
        hostName: 0.0.0.0
        port: 2055
        workers: 4
        sockets: 1



# https://github.com/netobserv/flowlogs-pipeline/blob/main/docs/api.md#transform-network-api
# adds a new field with the /20 and /16 subnet mask for source and destination

  - name: transform_add_subnet
    transform:
      type: network
      network:
        rules:
          - type: add_subnet
            add_subnet:
              input: SrcAddr
              output: SrcSubnet20
              subnet_mask: /20
          - type: add_subnet
            add_subnet:
              input: SrcAddr
              output: SrcSubnet16
              subnet_mask: /16
          - type: add_subnet
            add_subnet:
              input: SrcAddr
              output: SrcSubnet8
              subnet_mask: /8
          - type: add_subnet
            add_subnet:
              input: DstAddr
              output: DstSubnet20
              subnet_mask: /20
          - type: add_subnet
            add_subnet:
              input: DstAddr
              output: DstSubnet16
              subnet_mask: /16
          - type: add_subnet
            add_subnet:
              input: DstAddr
              output: DstSubnet8
              subnet_mask: /8



# https://github.com/netobserv/flowlogs-pipeline/blob/main/docs/api.md#transform-network-api
# adds a new field with the /20 and /16 subnet mask for source and destination

  - name: transform_add_geolocation
    transform:
      type: network
      network:
        rules:
          - type: add_location
            add_location:
              input: DstAddr
              output: DstLocation
              file_path: /opt/grafana/alloy/flp/IP2LOCATION-LITE-DB9.BIN.zip
          - type: add_location
            add_location:
              input: SrcAddr
              output: SrcLocation
              file_path: /opt/grafana/alloy/flp/IP2LOCATION-LITE-DB9.BIN.zip


# https://github.com/netobserv/flowlogs-pipeline/blob/main/docs/api.md#transform-network-api
# converts the TcpFlags field which is an integer into a string

  - name: transform_tcp_flags
    transform:
      type: network
      network:
        rules:
          - type: decode_tcp_flags
            decode_tcp_flags:
              input: TcpFlags
              output: TcpFlagsString



# https://github.com/netobserv/flowlogs-pipeline/blob/main/docs/api.md#transform-network-api
# converts the dstPort into an service string

  - name: transform_add_service
    transform:
      type: network
      network:
        rules:
          - type: add_service
            add_service:
              input: DstPort
              output: DstService
              protocol: service
        servicesFile: /opt/grafana/alloy/flp/services




  - name: extract_conntrack
    extract:
      type: conntrack
      conntrack:
        keyDefinition:
          fieldGroups:
          - name: src_grp
            fields:
            - SrcAddr
            - SrcPort
          - name: dst_grp
            fields:
            - DstAddr
            - DstPort
          - name: protocol_grp
            fields:
            - Proto
          hash:
            fieldGroupRefs:
            - protocol_grp
            fieldGroupARef: src_grp
            fieldGroupBRef: dst_grp
        outputRecordTypes:
        - newConnection
        - endConnection
        - heartbeat
        - flowLog
        outputFields:
        - name: Bytes_total
          operation: sum
          input: Bytes
        - name: Bytes
          operation: sum
          splitAB: true
        - name: numFlowLogs
          operation: count
        - name: TimeFlowStart
          operation: min
          input: TimeReceived
        - name: TimeFlowEnd
          operation: max
          input: TimeReceived
        - name: TimeReceived   # wegen loki write mapping
          operation: min
          input: TimeReceived
        scheduling:
        - selector: # UDP connections
            Proto: 17
          endConnectionTimeout: 5s
          heartbeatInterval: 40s
          terminatingTimeout: 5s
        - selector: {} # Default group
          endConnectionTimeout: 10s
          heartbeatInterval: 30s
          terminatingTimeout: 5s
        tcpFlags:
          fieldName: TcpFlags
          detectEndConnection: true
          swapAB: true


# https://github.com/netobserv/flowlogs-pipeline/blob/main/docs/api.md#write-loki-api
# https://github.com/netobserv/flowlogs-pipeline/pull/970

  - name: loki_write
    write:
      type: loki
      loki:
        url: https://loki.sub.domain.de:3100
        tenantID: tenant_02
        batchWait: 15s
        batchSize: 15000000
        labels:
          - SamplerAddress
          - DstSubnet16
          - SrcSubnet16
          - DstSubnet20
          - SrcSubnet20
          - DstSubnet8
          - SrcSubnet8
        staticLabels:
          service_name: flowlogs-pipeline
          level: info
          instance: u999fmlab001l
        ignoreList:
          - MplsCount
          - CustomList_1
          - CustomList_2
          - CustomList_3
          - CustomList_4
          - CustomList_5
          - MplsLastTtl
          - CustomInteger_1
          - CustomInteger_2
          - CustomInteger_3
          - CustomInteger_4
          - CustomInteger_5
          - Mpls_1Label
          - Mpls_2Label
          - Mpls_3Label
          - Mpls_4Label
          - Mpls_5Label
          - CustomBytes_1
          - CustomBytes_2
          - CustomBytes_3
          - CustomBytes_4
          - CustomBytes_5
          - MplsLastLabel
          - MplsLabelIp
          - HasMpls
          - Mpls_1Ttl
          - Mpls_2Ttl
          - Mpls_3Ttl
          - Mpls_4Ttl
          - Mpls_5Ttl
          - BiFlowDirection
          - BgpCommunities
          - BgpNextHop
          - FlowDirection
          - ForwardingStatus
          - IngressVrfId
          - EgressVrfId
          - Ipv6FlowLabel
          - NextHop
          - ObservationDomainId
          - ObservationPointId
          - AsPath
          - DstAs
          - SrcAs
          - DstMac
          - SrcMac
          - DstVlan
          - SrcVlan
          - VlanId
        timestampLabel: TimeReceived
        timestampScale: 1s
        format: json
        reorder: true



# do not define in config to disable it.
# https://github.com/netobserv/flowlogs-pipeline/pull/966
# curl 127.0.0.1:9103/ready
# shows the health state of flp - does not seem to be usefull

health:
#  address: 127.0.0.1
#  port: 9103



# https://github.com/netobserv/flowlogs-pipeline?tab=readme-ov-file#metrics-settings
# https://github.com/netobserv/flowlogs-pipeline/pull/967
# provides metrics of flp in prometheus format. no need for the GO metrics

metricsSettings:
  address: 127.0.0.1
  suppressDefaultMetrics: true
  prefix: flp_operational_
  port: 9102

My debug logs:

Starting flowlogs-pipeline:
=====
Build version: v1.9.1-community-5895dcb
Build date: 2025-07-09 06:53

Using configuration:
{
    "PipeLine": "[{\"name\":\"ipfix_netflow_ingest\"},{\"follows\":\"ipfix_netflow_ingest\",\"name\":\"transform_add_subnet\"},{\"follows\":\"transform_add_subnet\",\"name\":\"transform_tcp_flags\"},{\"follows\":\"transform_tcp_flags\",\"name\":\"transform_add_geolocation\"},{\"follows\":\"transform_add_geolocation\",\"name\":\"transform_add_service\"},{\"follows\":\"transform_add_service\",\"name\":\"extract_conntrack\"},{\"follows\":\"extract_conntrack\",\"name\":\"loki_write\"}]",
    "Parameters": "[{\"ingest\":{\"ipfix\":{\"hostname\":\"0.0.0.0\",\"port\":2055,\"sockets\":1,\"workers\":4},\"type\":\"ipfix\"},\"name\":\"ipfix_netflow_ingest\"},{\"name\":\"transform_add_subnet\",\"transform\":{\"network\":{\"rules\":[{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet20\",\"subnet_mask\":\"/20\"},\"type\":\"add_subnet\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet16\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet8\",\"subnet_mask\":\"/8\"},\"type\":\"add_subnet\"},{\"add_subnet\":{\"input\":\"DstAddr\",\"output\":\"DstSubnet20\",\"subnet_mask\":\"/20\"},\"type\":\"add_subnet\"},{\"add_subnet\":{\"input\":\"DstAddr\",\"output\":\"DstSubnet16\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"},{\"add_subnet\":{\"input\":\"DstAddr\",\"output\":\"DstSubnet8\",\"subnet_mask\":\"/8\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"transform_add_geolocation\",\"transform\":{\"network\":{\"rules\":[{\"add_location\":{\"file_path\":\"/opt/grafana/alloy/flp/IP2LOCATION-LITE-DB9.BIN.zip\",\"input\":\"DstAddr\",\"output\":\"DstLocation\"},\"type\":\"add_location\"},{\"add_location\":{\"file_path\":\"/opt/grafana/alloy/flp/IP2LOCATION-LITE-DB9.BIN.zip\",\"input\":\"SrcAddr\",\"output\":\"SrcLocation\"},\"type\":\"add_location\"}]},\"type\":\"network\"}},{\"name\":\"transform_tcp_flags\",\"transform\":{\"network\":{\"rules\":[{\"decode_tcp_flags\":{\"input\":\"TcpFlags\",\"output\":\"TcpFlagsString\"},\"type\":\"decode_tcp_flags\"}]},\"type\":\"network\"}},{\"name\":\"transform_add_service\",\"transform\":{\"network\":{\"rules\":[{\"add_service\":{\"input\":\"DstPort\",\"output\":\"DstService\",\"protocol\":\"service\"},\"type\":\"add_service\"}],\"servicesfile\":\"/opt/grafana/alloy/flp/services\"},\"type\":\"network\"}},{\"extract\":{\"conntrack\":{\"keydefinition\":{\"fieldgroups\":[{\"fields\":[\"SrcAddr\",\"SrcPort\"],\"name\":\"src_grp\"},{\"fields\":[\"DstAddr\",\"DstPort\"],\"name\":\"dst_grp\"},{\"fields\":[\"Proto\"],\"name\":\"protocol_grp\"}],\"hash\":{\"fieldgrouparef\":\"src_grp\",\"fieldgroupbref\":\"dst_grp\",\"fieldgrouprefs\":[\"protocol_grp\"]}},\"outputfields\":[{\"input\":\"Bytes\",\"name\":\"Bytes_total\",\"operation\":\"sum\"},{\"name\":\"Bytes\",\"operation\":\"sum\",\"splitab\":true},{\"name\":\"numFlowLogs\",\"operation\":\"count\"},{\"input\":\"TimeReceived\",\"name\":\"TimeFlowStart\",\"operation\":\"min\"},{\"input\":\"TimeReceived\",\"name\":\"TimeFlowEnd\",\"operation\":\"max\"},{\"input\":\"TimeReceived\",\"name\":\"TimeReceived\",\"operation\":\"min\"}],\"outputrecordtypes\":[\"newConnection\",\"endConnection\",\"heartbeat\",\"flowLog\"],\"scheduling\":[{\"endconnectiontimeout\":\"5s\",\"heartbeatinterval\":\"40s\",\"selector\":{\"proto\":17},\"terminatingtimeout\":\"5s\"},{\"endconnectiontimeout\":\"10s\",\"heartbeatinterval\":\"30s\",\"selector\":{},\"terminatingtimeout\":\"5s\"}],\"tcpflags\":{\"detectendconnection\":true,\"fieldname\":\"TcpFlags\",\"swapab\":true}},\"type\":\"conntrack\"},\"name\":\"extract_conntrack\"},{\"name\":\"loki_write\",\"write\":{\"loki\":{\"batchsize\":15000000,\"batchwait\":\"15s\",\"format\":\"json\",\"ignorelist\":[\"MplsCount\",\"CustomList_1\",\"CustomList_2\",\"CustomList_3\",\"CustomList_4\",\"CustomList_5\",\"MplsLastTtl\",\"CustomInteger_1\",\"CustomInteger_2\",\"CustomInteger_3\",\"CustomInteger_4\",\"CustomInteger_5\",\"Mpls_1Label\",\"Mpls_2Label\",\"Mpls_3Label\",\"Mpls_4Label\",\"Mpls_5Label\",\"CustomBytes_1\",\"CustomBytes_2\",\"CustomBytes_3\",\"CustomBytes_4\",\"CustomBytes_5\",\"MplsLastLabel\",\"MplsLabelIp\",\"HasMpls\",\"Mpls_1Ttl\",\"Mpls_2Ttl\",\"Mpls_3Ttl\",\"Mpls_4Ttl\",\"Mpls_5Ttl\",\"BiFlowDirection\",\"BgpCommunities\",\"BgpNextHop\",\"FlowDirection\",\"ForwardingStatus\",\"IngressVrfId\",\"EgressVrfId\",\"Ipv6FlowLabel\",\"NextHop\",\"ObservationDomainId\",\"ObservationPointId\",\"AsPath\",\"DstAs\",\"SrcAs\",\"DstMac\",\"SrcMac\",\"DstVlan\",\"SrcVlan\",\"VlanId\"],\"labels\":[\"SamplerAddress\",\"DstSubnet16\",\"SrcSubnet16\",\"DstSubnet20\",\"SrcSubnet20\",\"DstSubnet8\",\"SrcSubnet8\"],\"reorder\":true,\"staticlabels\":{\"instance\":\"u999fmlab001l\",\"level\":\"info\",\"service_name\":\"flowlogs-pipeline\"},\"tenantid\":\"tenant_02\",\"timestamplabel\":\"TimeReceived\",\"timestampscale\":\"1s\",\"url\":\"https://loki.sub.domain.de:3100\"},\"type\":\"loki\"}}]",
    "DynamicParameters": "",
    "MetricsSettings": "{\"address\":\"127.0.0.1\",\"port\":9102,\"prefix\":\"flp_operational_\",\"suppressdefaultmetrics\":true}",
    "Health": {
        "Address": "0.0.0.0",
        "Port": 0
    },
    "Profile": {
        "Port": 0
    }
}
time=2025-07-30T08:56:01+02:00 level=debug msg=opts.PipeLine = [{"name":"ipfix_netflow_ingest"},{"follows":"ipfix_netflow_ingest","name":"transform_add_subnet"},{"follows":"transform_add_subnet","name":"transform_tcp_flags"},{"follows":"transform_tcp_flags","name":"transform_add_geolocation"},{"follows":"transform_add_geolocation","name":"transform_add_service"},{"follows":"transform_add_service","name":"extract_conntrack"},{"follows":"extract_conntrack","name":"loki_write"}]
time=2025-07-30T08:56:01+02:00 level=debug msg=stages = [{ipfix_netflow_ingest } {transform_add_subnet ipfix_netflow_ingest} {transform_tcp_flags transform_add_subnet} {transform_add_geolocation transform_tcp_flags} {transform_add_service transform_add_geolocation} {extract_conntrack transform_add_service} {loki_write extract_conntrack}]
time=2025-07-30T08:56:01+02:00 level=debug msg=params = [{ipfix_netflow_ingest 0xc000320230 <nil> <nil> <nil> <nil>} {transform_add_subnet <nil> 0xc0004275f0 <nil> <nil> <nil>} {transform_add_geolocation <nil> 0xc000427770 <nil> <nil> <nil>} {transform_tcp_flags <nil> 0xc000427830 <nil> <nil> <nil>} {transform_add_service <nil> 0xc000427860 <nil> <nil> <nil>} {extract_conntrack <nil> <nil> 0xc0004278f0 <nil> <nil>} {loki_write <nil> <nil> <nil> <nil> 0xc0004279b0}]
time=2025-07-30T08:56:01+02:00 level=debug msg=metrics settings = {{127.0.0.1 9102 <nil>} false flp_operational_ false true}
time=2025-07-30T08:56:01+02:00 level=debug msg=entering SetupElegantExit
time=2025-07-30T08:56:01+02:00 level=debug msg=registered exit signal channel
time=2025-07-30T08:56:01+02:00 level=debug msg=exiting SetupElegantExit
time=2025-07-30T08:56:01+02:00 level=info msg=StartServerAsync: addr = 127.0.0.1:9102 component=prometheus
time=2025-07-30T08:56:01+02:00 level=debug msg=entering newPipelineFromIngester
time=2025-07-30T08:56:01+02:00 level=debug msg=stages = [{ipfix_netflow_ingest } {transform_add_subnet ipfix_netflow_ingest} {transform_tcp_flags transform_add_subnet} {transform_add_geolocation transform_tcp_flags} {transform_add_service transform_add_geolocation} {extract_conntrack transform_add_service} {loki_write extract_conntrack}]
time=2025-07-30T08:56:01+02:00 level=debug msg=configParams = [{ipfix_netflow_ingest 0xc000320230 <nil> <nil> <nil> <nil>} {transform_add_subnet <nil> 0xc0004275f0 <nil> <nil> <nil>} {transform_add_geolocation <nil> 0xc000427770 <nil> <nil> <nil>} {transform_tcp_flags <nil> 0xc000427830 <nil> <nil> <nil>} {transform_add_service <nil> 0xc000427860 <nil> <nil> <nil>} {extract_conntrack <nil> <nil> 0xc0004278f0 <nil> <nil>} {loki_write <nil> <nil> <nil> <nil> 0xc0004279b0}]
time=2025-07-30T08:56:01+02:00 level=debug msg=stage = ipfix_netflow_ingest
time=2025-07-30T08:56:01+02:00 level=debug msg=findStageType: stage = ipfix_netflow_ingest
time=2025-07-30T08:56:01+02:00 level=info msg=Ingest IPFIX config: [hostname=0.0.0.0, port=2055, portLegacy=0, workers=4, sockets=1, mapping=no] component=ingest.Ipfix
time=2025-07-30T08:56:01+02:00 level=debug msg=pipeline = [0xc00031a540]
time=2025-07-30T08:56:01+02:00 level=debug msg=stage = transform_add_subnet
time=2025-07-30T08:56:01+02:00 level=debug msg=findStageType: stage = transform_add_subnet
time=2025-07-30T08:56:01+02:00 level=debug msg=pipeline = [0xc00031a540 0xc00031ac40]
time=2025-07-30T08:56:01+02:00 level=debug msg=stage = transform_add_geolocation
time=2025-07-30T08:56:01+02:00 level=debug msg=findStageType: stage = transform_add_geolocation
time=2025-07-30T08:56:01+02:00 level=info msg=Location DB already exists in /tmp/location_db.bin, using it
time=2025-07-30T08:56:01+02:00 level=debug msg=Loading location DB
time=2025-07-30T08:56:01+02:00 level=debug msg=pipeline = [0xc00031a540 0xc00031ac40 0xc00031af50]
time=2025-07-30T08:56:01+02:00 level=debug msg=stage = transform_tcp_flags
time=2025-07-30T08:56:01+02:00 level=debug msg=findStageType: stage = transform_tcp_flags
time=2025-07-30T08:56:01+02:00 level=debug msg=pipeline = [0xc00031a540 0xc00031ac40 0xc00031af50 0xc00031b2d0]
time=2025-07-30T08:56:01+02:00 level=debug msg=stage = transform_add_service
time=2025-07-30T08:56:01+02:00 level=debug msg=findStageType: stage = transform_add_service
time=2025-07-30T08:56:01+02:00 level=debug msg=pipeline = [0xc00031a540 0xc00031ac40 0xc00031af50 0xc00031b2d0 0xc00031b5e0]
time=2025-07-30T08:56:01+02:00 level=debug msg=stage = extract_conntrack
time=2025-07-30T08:56:01+02:00 level=debug msg=findStageType: stage = extract_conntrack
time=2025-07-30T08:56:01+02:00 level=error msg=failed to initialize pipeline: ConnectionTrack config is invalid: selector key "proto" in scheduling group 0 is not defined in the keys

So here my questions:

  1. Why is the selector not working? Something wrong with the example, a bug or my mistake?
  2. is this extract stage able to do what I want - collect multiple logs and aggregates it into fewer logs with (aggregated) information?
  3. I can not see any "Bytes_total" fields in the logs arriving at loki at all so it looks like the stage is not working like I expect it. There is a new field _HashID and _RecordType but no other fields

Here a logline ( if I remove the specific UDP selector scheduling group) with my specific labels:

{"Bytes":72,"DstAddr":"10.221.144.15","DstLocation_CityName":"-","DstLocation_CountryLongName":"-","DstLocation_CountryName":"-","DstLocation_Latitude":"0.000000","DstLocation_Longitude":"0.000000","DstLocation_RegionName":"-","DstNet":0,"DstPort":55788,"Etype":2048,"FragmentId":0,"FragmentOffset":0,"IcmpCode":0,"IcmpType":0,"InIf":286,"IpTos":0,"IpTtl":0,"NextHopAs":0,"OutIf":283,"Packets":2,"Proto":17,"SamplingRate":0,"SequenceNum":1500260832,"SrcAddr":"10.196.34.28","SrcLocation_CityName":"-","SrcLocation_CountryLongName":"-","SrcLocation_CountryName":"-","SrcLocation_Latitude":"0.000000","SrcLocation_Longitude":"0.000000","SrcLocation_RegionName":"-","SrcNet":0,"SrcPort":53,"TcpFlags":17,"TcpFlagsString":["FIN","ACK"],"TimeFlowEnd":1753824768,"TimeFlowEndMs":1753824768000,"TimeFlowStart":1753824768,"TimeFlowStartMs":1753824768000,"TimeReceived":1753824841,"Type":4,"_HashId":"704ef8b6e8d80a4b","_RecordType":"flowLog"}

Image

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions