Skip to content

Examples

Stefan Bocutiu edited this page Jun 1, 2022 · 12 revisions

AVRO input

Here is an example for uploading the data from a Kafka topic which has the message value stored as Avro. The key converter is set to StringConverter but the key information is not being used.

The connector upload rules are:

  • 10MB file or
  • 100000 records or
  • 30 seconds since last write to cater for no more records available for the time being

Also it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

JSON input

Here is an example for uploading the data from a Kafka topic which has the message value stored as JSON. The key converter is set to StringConverter but the key information is not being used.

The connector upload rules are:

  • 10MB file or
  • 100000 records or
  • 30 seconds since last write to cater for no more records available for the time being

Also it flushes the parquet file every 1000 records. It’s when the flush happens that the file size check is updated.

name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
topics=payments 
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=payments
connect.ems.connection.id=****
connect.ems.commit.size.bytes=10000000
connect.ems.commit.records=100000
connect.ems.commit.interval.ms=30000
connect.ems.tmp.dir=/tmp/ems
connect.ems.authorization.key="AppKey ***"
connect.ems.error.policy=RETRY
connect.ems.max.retries=20
connect.ems.retry.interval=60000
connect.ems.parquet.write.flush.records=1000
connect.ems.debug.keep.parquet.files=false

Fix obfuscation

All the obfuscated fields are uploaded to EMS as *****. In this example credit_card and ssn fields are obfuscated.

...
name=kafka2ems
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.obfuscation.method="fix"
connect.ems.obfuscation.fields="credit_card, ssn"
...

SHA1 obfuscation

All the fields are encrypted with SHA1 and the result is converted to hex string. For example the text "this is a test" will end up translated to "9938a75e6d10a74d6b2e9bc204177de5b95f28fe". In this example credit_card and ssn fields are obfuscated.

...
connect.ems.obfuscation.method="sha1"
connect.ems.obfuscation.fields="credit_card, ssn"
...

SHA512 obfuscation

All the fields are encrypted with SHA1 and the result is converted to hex string. In this example credit_card and ssn fields are obfuscated.

...
connect.ems.obfuscation.method="sha512"
connect.ems.obfusation.sha512.salt="very secure not"
connect.ems.obfuscation.fields="credit_card, ssn"
...
Clone this wiki locally