Skip to content

Examples

David Sloan edited this page Sep 2, 2022 · 12 revisions

AVRO input

Here is an example for uploading the data from a Kafka topic which has the message value stored as Avro. The key converter is set to StringConverter but the key information is not being used.

The connector upload rules are:

  • 10MB file or
  • 100000 records or
  • 30 seconds since last write to cater for no more records available for the time being

Also it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

JSON input

Here is an example for uploading the data from a Kafka topic which has the message value stored as JSON. The key converter is set to StringConverter but the key information is not being used.

The connector upload rules are:

  • 10MB file or
  • 100000 records or
  • 30 seconds since last write to cater for no more records available for the time being

Also it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

Primary Key(s)

Specifies a set of fields, from the incoming payload, which should be used as Primary Keys in Celonis. If this is not provided then all the fields are used.

// single field
...
connect.ems.data.primary.key=customer_id
...

// Composite PK
...
connect.ems.data.primary.key=name,address
...

Please refer to primary keys documentation to learn about the best practices.

Overwrite the Order field when using Primary Key(s)

If your data contains already an order field, use it since it will improve the performance and leads to less disk spaced required in EMS. Here is an example configuration when a field timestamp guarantees that two records with the same PK won't share the same value:

...
connect.ems.data.primary.key=customer_id
connect.ems.order.field.name=processed_ts
...

// Composite PK
...
connect.ems.data.primary.key=name,address
connect.ems.order.field.name=processed_ts
...

Please refer to primary keys documentation to learn about the best practices.

Fix obfuscation

All the obfuscated fields are uploaded to EMS as *****. In this example credit_card and ssn fields are obfuscated.

...
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.obfuscation.method="fix"
connect.ems.obfuscation.fields="credit_card, ssn"
...

SHA1 obfuscation

All the fields are encrypted with SHA1 and the result is converted to hex string. For example the text "this is a test" will end up translated to "9938a75e6d10a74d6b2e9bc204177de5b95f28fe". In this example credit_card and ssn fields are obfuscated.

...
connect.ems.obfuscation.method="sha1"
connect.ems.obfuscation.fields="credit_card, ssn"
...

SHA512 obfuscation

All the fields are encrypted with SHA1 and the result is converted to hex string. In this example credit_card and ssn fields are obfuscated.

...
connect.ems.obfuscation.method="sha512"
connect.ems.obfusation.sha512.salt="very secure not"
connect.ems.obfuscation.fields="credit_card, ssn"
...

Exploding Lists

The connector contains limited support for 'exploding' a message. This means converting a message consisting of a list of records into individual records that are individually presented to Connect.

Here is an example, presented as Json.

{
  "employees": [
    {
      "employee_number": 1,
      "name": "Arturo",
      "birth_year": 1940
    },
    {
      "employee_number": 2,
      "name": "Mallory",
      "birth_year": 1973
    },
    {
      "employee_number": 3,
      "name": "Wells",
      "birth_year": 1972
    },
    {
      "employee_number": 4,
      "name": "Brown",
      "birth_year": 1955
    }
  ]
}

Upon adding the connector configuration:

...
connect.ems.explode.mode=LIST
...

The sink will discard the List wrapper and pass each record to EMS independently.

First Message:

{
  "employee_number": 1,
  "name": "Arturo",
  "birth_year": 1940
}

Second Message

{
  "employee_number": 2,
  "name": "Mallory",
  "birth_year": 1973
}

(... and so on.)

Clone this wiki locally