-
Notifications
You must be signed in to change notification settings - Fork 197
Description
The problem I'm facing is related to schema compatibility in Kafka Connect. The error message indicates that the top-level Kafka Connect schema must be of type 'struct', but the schema being received does not adhere to this requirement. This suggests that there's a mismatch between the schema expected by the connector and the schema being provided with the Kafka message.
Here's what I've tried:
Checking Kafka Message Schema: I've verified the schema of my Kafka message to ensure it's correctly structured. However, it seems that the schema is not compliant with the 'struct' type requirement.
Connector Configuration: I've checked the configuration of my Kafka connector to ensure that schema-related parameters are correctly set. Specifically, I've ensured that the key.converter.schemas.enable and value.converter.schemas.enable parameters are both configured to true if schemas are being used.
Compatibility of Versions: I've verified the compatibility of versions between my Kafka connector, Kafka cluster, and schema. Incompatibilities between versions can sometimes lead to schema compatibility issues.
Data Examination: I've examined the incoming data to ensure it adheres to the expected schema. Incorrect data can lead to schema conversion errors.
Despite these efforts, the error persists, indicating that there may still be a discrepancy between the schema expected by the connector and the schema provided with the Kafka message. To resolve this issue, I may need to further investigate the schema configurations and the format of the incoming data to ensure they align with the expectations of the Kafka connector. Additionally, considering the specific details of the schema and the connector's requirements may provide further insights into resolving the problem.
Here is an exemple of conf of the connect & connector:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-json-bigquery-sink
annotations:
strimzi.io/use-connector-resources: "true"
spec:
logging:
type: external
valueFrom:
configMapKeyRef:
name: strimzi-kafka-connect-log4j-properties
key: log4j.properties
bootstrapServers: strimzi-cluster:9092
template:
pod:
imagePullSecrets:
- name: $(AGILE_FABRIC_SECRET)
serviceAccount:
metadata:
annotations:
iam.gke.io/gcp-service-account: phx-kafka-bigquery-connector@my-gcp-project-id.iam.gserviceaccount.com
version: 3.5.2
replicas: 3
image: phenix-docker-virtual.enterpriserepo.fr.carrefour.com/phenix/platform/phx-strimzi-kafka-connect:2.0.0-SNAPSHOT
config:
config.providers: env,secrets
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: strimzi-connect-json-bigquery-sink
offset.storage.topic: strimzi-connect-json-bigquery-sink-offsets
config.storage.topic: strimzi-connect-json-bigquery-sink-configs
status.storage.topic: strimzi-connect-json-bigquery-sink-status
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
resources:
limits:
memory: 8Gi
requests:
cpu: 100m
memory: 8Gi
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: jmx-kafka-connect-prometheus
key: jmx-kafka-connect-prometheus.yml
externalConfiguration:
env:
- name: SCHEMA_REGISTRY_URL
valueFrom:
configMapKeyRef:
name: kafka-config
key: SCHEMA_REGISTRY
- name: GCP_PROJECT
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: GCP_PROJECT
- name: CONNECT_INTERNAL_KEY_CONVERTER
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_INTERNAL_KEY_CONVERTER
- name: CONNECT_INTERNAL_VALUE_CONVERTER
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_INTERNAL_VALUE_CONVERTER
- name: CONNECT_CONSUMER_MAX_POLL_RECORDS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_CONSUMER_MAX_POLL_RECORDS
- name: CONNECT_CONSUMER_MAX_POLL_INTERVAL_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_CONSUMER_MAX_POLL_INTERVAL_MS
- name: CONNECT_CONSUMER_ISOLATION_LEVEL
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_CONSUMER_ISOLATION_LEVEL
- name: CONNECT_CONSUMER_AUTO_OFFSET_RESET
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_CONSUMER_AUTO_OFFSET_RESET
- name: CONNECT_CONSUMER_SESSION_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_CONSUMER_SESSION_TIMEOUT_MS
- name: CONNECT_SESSION_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_SESSION_TIMEOUT_MS
- name: CONNECT_HEARTBEAT_INTERVAL_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_HEARTBEAT_INTERVAL_MS
- name: CONNECT_METRICS_RECORDING_LEVEL
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_METRICS_RECORDING_LEVEL
- name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_OFFSET_FLUSH_INTERVAL_MS
- name: CONNECT_OFFSET_FLUSH_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_OFFSET_FLUSH_TIMEOUT_MS
- name: CONNECTOR_RETRIES
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECTOR_RETRIES
- name: CONNECTOR_TIMEOUT_SECONDS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECTOR_TIMEOUT_SECONDS
- name: CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS
- name: CONNECT_REQUEST_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_REQUEST_TIMEOUT_MS
- name: CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY
- name: CONNECT_WORKER_SYNC_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: bigquery-connect-config
key: CONNECT_WORKER_SYNC_TIMEOUT_MS
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connect-json-bigquery-sink-role
namespace: platform-dev
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connect-json-bigquery-sink-role-binding
namespace: platform-dev
subjects:
- kind: ServiceAccount
name: connect-json-bigquery-sink-connect
namespace: platform-dev
roleRef:
kind: Role
name: connect-json-bigquery-sink-role
apiGroup: rbac.authorization.k8s.io
----
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: project-indep-connector
labels:
strimzi.io/cluster: connect-json-bigquery-sink
spec:
class: com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasksMax: 1
config:
topics: projects-indep-price-purchase-unit-test
project: "my-project-id"
defaultDataset: project_indep_test
keyfile: "${secrets:platform-dev/bigquery-connect-secrets:bigquery-connect.keyfile}"
keySource: JSON
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
key.converter.schemas.enable: false
bigquery.retry.wait.max.seconds: 600
bigquery.retry.backoff.max.ms: 600000
bigquery.retry.backoff.initial.delay.ms: 1000
errors.tolerance: all
errors.retry.delay.max.ms: 60000
errors.log.enable: true
errors.deadletterqueue.topic.name: "project_indep_dlq_bigquery_sink_dead_letter"
errors.deadletterqueue.context.headers.enable: true
allBQFieldsNullable: true
error.tolerance: all
sanitizeTopics: false
autoCreateTables: false
autoUpdateSchemas: false
schemaRetriever: com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
bufferSize: 100
maxWriteSize: 100
tableWriteWait: 1000
timestamp: UTC
bigQueryPartitionDecorator: false
PS: The data sent to Kafka must be in JSON format, but the schema is dynamic. Therefore, I cannot specify the schema by having a static BigQuery table already created. Instead, I need the connector to create the table dynamically while injecting the JSON data. My Kafka message must have a string key and a JSON-formatted value.
Can anyone help, please?