A high-performance Apache Flink sink connector for publishing streaming data to Google Cloud Pub/Sub. This connector implements the Flink Sink V2 API and provides robust, scalable integration with GCP Pub/Sub topics.
Note: This connector is published under the
com.geotab.dnagroup ID to avoid namespace collision with the official Apache Flink connector atorg.apache.flink:flink-connector-gcp-pubsub. This is a Geotab-maintained implementation with additional features and optimizations.
- Flink Sink V2 API: Built on Apache Flink's modern Sink API (v2) for improved performance and reliability
- Backpressure Management: Configurable in-flight request limits to control throughput and memory usage
- Batch Publishing: Support for batching messages by size, count, and delay thresholds to optimize performance
- Error Handling: Configurable error handling with retry logic and fatal exception classification
- Metrics Integration: Built-in Flink metrics for monitoring bytes out, records out, and error counts
- Compression Support: Optional message compression to reduce network bandwidth
- Custom Serialization: Flexible serialization schema support for any data type
- GCP Authentication: Full support for GCP credentials and authentication mechanisms
- Apache Flink 1.19.1 or later
- Java 11 or later
- Google Cloud Pub/Sub API access
- GCP service account credentials
# Clone the repository
git clone https://github.com/Geotab/flink-connector-gcp-pubsub.git
cd flink-connector-gcp-pubsub
# Build with Gradle
./gradlew build
# Install to local Maven repository
./gradlew publishToMavenLocalAdd the following dependency to your pom.xml:
<dependency>
<groupId>com.geotab.dna</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>1.0.0</version>
</dependency>Add the following to your build.gradle:
dependencies {
implementation 'com.geotab.dna:flink-connector-gcp-pubsub:1.0.0'
}import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.gcp.pubsub.sink.PubSubSinkV2;
import org.apache.flink.connector.gcp.pubsub.sink.PublisherConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create your data stream
DataStream<String> stream = env.fromElements("message1", "message2", "message3");
// Configure GCP credentials
Credentials credentials = ServiceAccountCredentials.fromStream(
new FileInputStream("/path/to/service-account-key.json")
);
// Build publisher configuration
PublisherConfig publisherConfig = PublisherConfig.builder()
.credentials(credentials)
.batchRequestByteThreshold(1000000L) // 1 MB
.batchElementCountThreshold(100L) // 100 messages
.batchDelayThreshold(Duration.ofMillis(10)) // 10 ms
.enableCompression(true)
.build();
// Create the PubSub sink
PubSubSinkV2<String> pubSubSink = PubSubSinkV2.<String>builder()
.projectId("your-gcp-project-id")
.topicId("your-topic-id")
.serializationSchema(new SimpleStringSchema())
.publisherConfig(publisherConfig)
.maxInFlightRequests(1000)
.failOnError(false)
.build();
// Add sink to the stream
stream.sinkTo(pubSubSink);
// Execute the Flink job
env.execute("Flink GCP Pub/Sub Sink Example");import org.apache.flink.api.common.serialization.SerializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomSerializer<T> implements SerializationSchema<T> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(T element) {
try {
return mapper.writeValueAsBytes(element);
} catch (Exception e) {
throw new RuntimeException("Serialization failed", e);
}
}
}
// Use custom serializer
PubSubSinkV2<MyDataClass> sink = PubSubSinkV2.<MyDataClass>builder()
.serializationSchema(new CustomSerializer<>())
// ... other configurations
.build();import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;
RetrySettings retrySettings = RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(100))
.setRetryDelayMultiplier(2.0)
.setMaxRetryDelay(Duration.ofSeconds(60))
.setMaxAttempts(5)
.build();
PublisherConfig config = PublisherConfig.builder()
.credentials(credentials)
.retrySettings(retrySettings)
.build();| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
projectId |
String | Yes | - | GCP project ID containing the Pub/Sub topic |
topicId |
String | Yes | - | Pub/Sub topic ID to publish messages to |
serializationSchema |
SerializationSchema | Yes | - | Schema to serialize elements into byte arrays |
publisherConfig |
PublisherConfig | Yes | - | Publisher configuration (credentials, batching, etc.) |
maxInFlightRequests |
int | Yes | - | Maximum number of concurrent in-flight requests (must be > 0) |
failOnError |
boolean | Yes | - | If true, fail immediately on errors; if false, retry non-fatal errors |
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
credentials |
Credentials | Yes | - | GCP authentication credentials |
batchRequestByteThreshold |
Long | No | 1000 bytes | Maximum batch size in bytes before publishing |
batchElementCountThreshold |
Long | No | 100 | Maximum number of messages in a batch before publishing |
batchDelayThreshold |
Duration | No | 10ms | Maximum delay before publishing a batch |
enableCompression |
Boolean | No | false | Enable message compression |
retrySettings |
RetrySettings | No | null | Custom retry configuration for failed publish attempts |
The connector implements backpressure through the maxInFlightRequests parameter. When the number of pending publish operations reaches this limit, the writer blocks until some requests complete. This prevents overwhelming the Pub/Sub service and controls memory usage.
// Configure for high throughput
.maxInFlightRequests(5000)
// Configure for lower memory footprint
.maxInFlightRequests(100)The connector classifies exceptions into two categories:
-
Fatal Exceptions: Unrecoverable errors that fail the job immediately
- Topic not found (
NOT_FOUNDstatus) - Project not found (
NOT_FOUND: Requested project not found)
- Topic not found (
-
Non-Fatal Exceptions: Transient errors that can be retried
- Network timeouts
- Temporary service unavailability
- Rate limiting
Configure error behavior with the failOnError parameter:
failOnError = true: Fail the job on any error (recommended for critical pipelines)failOnError = false: Retry non-fatal errors indefinitely (may cause message duplication)
The connector exposes the following Flink metrics:
numBytesOut: Total bytes successfully published to Pub/SubnumRecordsOut: Total number of records successfully publishednumRecordsOutErrors: Total number of records that failed to publish
Access metrics through Flink's metric system or monitoring dashboards.
The connector consists of the following key components:
- PubSubSinkV2: Main sink implementation following Flink's Sink V2 API
- PubSubWriter: Writer implementation handling message publishing and backpressure
- PubSubWriterClient: Wrapper around GCP's Publisher client
- PublisherConfig: Configuration object for Pub/Sub publisher settings
- PubSubExceptionClassifiers: Exception classification for error handling
-
Choose appropriate batch settings: Balance latency vs. throughput by tuning
batchDelayThreshold,batchElementCountThreshold, andbatchRequestByteThreshold -
Monitor in-flight requests: Set
maxInFlightRequestsbased on your memory constraints and desired throughput -
Use compression for large messages: Enable
enableCompressionwhen publishing large payloads to reduce network costs -
Implement proper serialization: Ensure your serialization schema is efficient and handles null values appropriately
-
Handle credentials securely: Use GCP service accounts and avoid hardcoding credentials
-
Enable checkpointing: Configure Flink checkpointing to ensure exactly-once or at-least-once delivery semantics
Ensure your service account has the pubsub.publisher role on the target topic.
Verify the projectId and topicId are correct and the topic exists in GCP.
Increase maxInFlightRequests or tune batching parameters to improve throughput.
Reduce maxInFlightRequests or enable compression to lower memory usage.
Geotabbers: Che-Wei Chou / Mark Ma