-
Notifications
You must be signed in to change notification settings - Fork 197
Open
Description
Issue: when using the connector with Strimzi Operator for Kubernetes i get the following error:
2021-12-29 10:47:14,093 INFO [Consumer clientId=connector-consumer-alerts-bq-0, groupId=connect-alerts-bq] Resetting offset for partition alerts-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-cluster-kafka-0.kafka-cluster-kafka-brokers.message-bus.svc:9093 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-alerts-bq-0]
2021-12-29 10:48:49,051 ERROR WorkerSinkTask{id=alerts-bq-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error getting access token for service account: oauth2.googleapis.com (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-alerts-bq-0]
com.google.cloud.bigquery.BigQueryException: Error getting access token for service account: oauth2.googleapis.com
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:113)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:285)
at com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:678)
at com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:675)
at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:674)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.lambda$retrieveCachedTable$2(BigQuerySinkTask.java:338)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1133)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.retrieveCachedTable(BigQuerySinkTask.java:338)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordTable(BigQuerySinkTask.java:210)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Error getting access token for service account: oauth2.googleapis.com
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:444)
at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157)
at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145)
at com.google.auth.oauth2.ServiceAccountCredentials.getRequestMetadata(ServiceAccountCredentials.java:603)
at com.google.auth.http.HttpCredentialsAdapter.initialize(HttpCredentialsAdapter.java:91)
at com.google.cloud.http.HttpTransportOptions$1.initialize(HttpTransportOptions.java:159)
at com.google.api.client.http.HttpRequestFactory.buildRequest(HttpRequestFactory.java:88)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:422)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:283)
... 22 more
Caused by: java.net.UnknownHostException: oauth2.googleapis.com
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:220)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:299)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:474)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:569)
at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:203)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:189)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:113)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:441)
My setup looks like this:
Dockerfile:
FROM confluentinc/cp-server-connect-base:5.5.1
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt wepay/kafka-connect-bigquery:2.1.10 && confluent-hub install --no-prompt castorm/kafka-connect-http:0.8.6
FROM quay.io/strimzi/kafka:0.26.0-kafka-3.0.0
USER root:root
COPY --from=0 /usr/share/confluent-hub-components /usr/share/confluent-hub-components
COPY --from=0 /usr/share/java /usr/share/java
RUN 'mkdir' '-p' '/opt/kafka/plugins' \
&& 'cp' '-R' '/usr/share/confluent-hub-components' '/opt/kafka/plugins'
RUN echo "plugin.path=/opt/kafka/plugins,/usr/share/java,/usr/share/confluent-hub-components," >> /opt/kafka/config/connect-standalone.properties
RUN echo "plugin.path=/opt/kafka/plugins,/usr/share/java,/usr/share/confluent-hub-components," >> /opt/kafka/config/connect-distributed.properties
USER 1001
KafkaConnect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
namespace: message-bus
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.0.0
resources:
requests:
cpu: 12
memory: 64Gi
limits:
cpu: 12
memory: 64Gi
replicas: 1
bootstrapServers: kafka-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: kafka-cluster-cluster-ca-cert
certificate: ca.crt
image: docker.io/alexanghel23/kafka-connect-plugins:v0.2.0
template:
connectContainer:
env:
- name: https_proxy
value: http://XXX.XXX.XX.XXX:3128
- name: http_proxy
value: http://XXX.XXX.XX.XXX:3128
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /opt/kafka/external-configuration/gcp-credentials/kafka-bq.json
externalConfiguration:
volumes:
- name: gcp-credentials
secret:
secretName: kafka-bq
config:
config.providers: env
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
KafkaConnector:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: alerts-bq
namespace: message-bus
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasksMax: 1
config:
project: ai4neo-dev
defaultDataset: ".*=test_kafka"
topics: alerts
keySource: FILE
keyfile: "/opt/kafka/external-configuration/gcp-credentials/kafka-bq.json"
proxy.url: "http://xxx.xxx.xx.xxx:3128"
Service account:
{
"type": "service_account",
"project_id": "ai4neo-dev",
"private_key_id": "81<omitted>1e",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEv<omitted>qpefw=\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "10<omitted>21",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/kafka-bq%40ai4neo-dev.iam.gserviceaccount.com"
}
curl oauth2.googleapis.com inside the Kafka Connect cluster pod returns this:
<!DOCTYPE html>
<html lang=en>
<meta charset=utf-8>
<meta name=viewport content="initial-scale=1, minimum-scale=1, width=device-width">
<title>Error 404 (Not Found)!!1</title>
<style>
*{margin:0;padding:0}html,code{font:15px/22px arial,sans-serif}html{background:#fff;color:#222;padding:15px}body{margin:7% auto 0;max-width:390px;min-height:180px;padding:30px 0 15px}* > body{background:url(//www.google.com/images/errors/robot.png) 100% 5px no-repeat;padding-right:205px}p{margin:11px 0 22px;overflow:hidden}ins{color:#777;text-decoration:none}a img{border:0}@media screen and (max-width:772px){body{background:none;margin-top:0;max-width:none;padding-right:0}}#logo{background:url(//www.google.com/images/branding/googlelogo/1x/googlelogo_color_150x54dp.png) no-repeat;margin-left:-5px}@media only screen and (min-resolution:192dpi){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) no-repeat 0% 0%/100% 100%;-moz-border-image:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) 0}}@media only screen and (-webkit-min-device-pixel-ratio:2){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_color_150x54dp.png) no-repeat;-webkit-background-size:100% 100%}}#logo{display:inline-block;height:54px;width:150px}
</style>
<a href=//www.google.com/><span id=logo aria-label=Google></span></a>
<p><b>404.</b> <ins>That’s an error.</ins>
<p>The requested URL <code>/</code> was not found on this server. <ins>That’s all we know.</ins>
Thank you!
Metadata
Metadata
Assignees
Labels
No labels