@@ -83,20 +83,85 @@ jobs:
8383 - name : Install Kafka
8484 id : install_kafka
8585 run : |
86- kubectl apply -k ${{ github.workspace }}/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced
87- kubectl wait kafka beam-testing-cluster --for=condition=Ready --timeout=1800s
86+ echo "Deploying Kafka cluster using existing .test-infra/kubernetes/kafka-cluster configuration..."
87+ kubectl apply -R -f ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/
88+
89+ # Wait for pods to be created and ready
90+ echo "Waiting for Kafka cluster to be ready..."
91+ sleep 180
92+
93+ # Check pod status
94+ echo "Checking pod status..."
95+ kubectl get pods -l app=kafka
96+ kubectl get pods -l app=zookeeper
97+
98+ # Wait for at least one Kafka pod to be ready
99+ echo "Waiting for Kafka pods to be ready..."
100+ kubectl wait --for=condition=ready pod -l app=kafka --timeout=300s || echo "Kafka pods not ready, continuing anyway"
101+
102+ # Wait for Zookeeper to be ready
103+ echo "Waiting for Zookeeper pods to be ready..."
104+ kubectl wait --for=condition=ready pod -l app=zookeeper --timeout=300s || echo "Zookeeper pods not ready, continuing anyway"
105+
88106 - name : Set up Kafka brokers
89107 id : set_brokers
90108 run : |
109+ echo "Setting up Kafka brokers for existing cluster configuration..."
91110 declare -a kafka_service_brokers
92111 declare -a kafka_service_brokers_ports
112+
93113 for INDEX in {0..2}; do
94- kubectl wait svc/beam-testing-cluster-kafka-${INDEX} --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=1200s
95- kafka_service_brokers[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
96- kafka_service_brokers_ports[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.spec.ports[0].port}')
114+ echo "Setting up broker ${INDEX}..."
115+
116+ # Try to get LoadBalancer IP
117+ LB_IP=$(kubectl get svc outside-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}' 2>/dev/null || echo "")
118+
119+ if [ -n "$LB_IP" ] && [ "$LB_IP" != "null" ]; then
120+ echo "Using LoadBalancer IP: $LB_IP"
121+ kafka_service_brokers[$INDEX]=$LB_IP
122+ else
123+ echo "LoadBalancer IP not available, using NodePort approach..."
124+ # Get the first node's internal IP
125+ NODE_IP=$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
126+ kafka_service_brokers[$INDEX]=$NODE_IP
127+ fi
128+
129+ # Get the port
130+ PORT=$(kubectl get svc outside-${INDEX} -o jsonpath='{.spec.ports[0].port}' 2>/dev/null || echo "9094")
131+ kafka_service_brokers_ports[$INDEX]=$PORT
132+
97133 echo "KAFKA_SERVICE_BROKER_${INDEX}=${kafka_service_brokers[$INDEX]}" >> $GITHUB_OUTPUT
98134 echo "KAFKA_SERVICE_BROKER_PORTS_${INDEX}=${kafka_service_brokers_ports[$INDEX]}" >> $GITHUB_OUTPUT
135+
136+ echo "Broker ${INDEX}: ${kafka_service_brokers[$INDEX]}:${kafka_service_brokers_ports[$INDEX]}"
99137 done
138+
139+ - name : Create Kafka topic
140+ id : create_topic
141+ run : |
142+ echo "Creating Kafka topic 'beam'..."
143+
144+ # Get the first available Kafka pod
145+ KAFKA_POD=$(kubectl get pods -l app=kafka -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
146+
147+ if [ -z "$KAFKA_POD" ]; then
148+ echo "No Kafka pods found, skipping topic creation"
149+ exit 0
150+ fi
151+
152+ echo "Using Kafka pod: $KAFKA_POD"
153+
154+ # Wait a bit more for the pod to be fully operational
155+ echo "Waiting for pod to be fully operational..."
156+ sleep 60
157+
158+ # Create the topic using the correct container and path
159+ echo "Creating topic 'beam'..."
160+ kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --create --topic beam --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 || echo "Topic may already exist"
161+
162+ # Verify topic was created
163+ echo "Verifying topic creation..."
164+ kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181 || echo "Could not list topics"
100165 - name : Prepare test arguments
101166 uses : ./.github/actions/test-arguments-action
102167 with :
@@ -105,8 +170,11 @@ jobs:
105170 argument-file-paths : |
106171 ${{ github.workspace }}/.github/workflows/performance-tests-pipeline-options/xlang_KafkaIO_Python.txt
107172 arguments : |
108- --filename_prefix=gs://temp-storage-for-perf-tests/${{ matrix.job_name }}/${{github.run_id}}/
173+ --test_class=KafkaIOPerfTest
174+ --kafka_topic=beam
109175 --bootstrap_servers=${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}
176+ --read_timeout=3000
177+ --filename_prefix=gs://temp-storage-for-perf-tests/${{ matrix.job_name }}/${{github.run_id}}/
110178 - name : run shadowJar
111179 uses : ./.github/actions/gradle-command-self-hosted-action
112180 with :
@@ -120,4 +188,4 @@ jobs:
120188 -Prunner=DataflowRunner \
121189 -PloadTest.mainClass=apache_beam.io.external.xlang_kafkaio_perf_test \
122190 -PpythonVersion=3.9 \
123- '-PloadTest.args=${{ env.beam_PerfTests_xlang_KafkaIO_Python_test_arguments_1 }}'
191+ '-PloadTest.args=${{ env.beam_PerfTests_xlang_KafkaIO_Python_test_arguments_1 }}'
0 commit comments