Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
Expand All @@ -21,4 +9,4 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallback;
import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallbackFactory;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
private final TopicRouter<IN> topicRouter;
private final MessageDelayer<IN> messageDelayer;
private final PulsarCrypto pulsarCrypto;
private final SinkUserCallbackFactory<IN> sinkUserCallbackFactory;

PulsarSink(
SinkConfiguration sinkConfiguration,
Expand All @@ -98,7 +101,8 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
TopicRoutingMode topicRoutingMode,
@Nullable TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
PulsarCrypto pulsarCrypto) {
PulsarCrypto pulsarCrypto,
@Nullable SinkUserCallbackFactory<IN> sinkUserCallbackFactory) {
this.sinkConfiguration = checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
Expand All @@ -115,6 +119,7 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta

this.messageDelayer = checkNotNull(messageDelayer);
this.pulsarCrypto = checkNotNull(pulsarCrypto);
this.sinkUserCallbackFactory = sinkUserCallbackFactory;
}

/**
Expand All @@ -131,14 +136,20 @@ public static <IN> PulsarSinkBuilder<IN> builder() {
@Override
public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext)
throws PulsarClientException {
SinkUserCallback<IN> userCallback = null;
if (sinkUserCallbackFactory != null) {
userCallback = sinkUserCallbackFactory.create();
}

return new PulsarWriter<>(
sinkConfiguration,
serializationSchema,
metadataListener,
topicRouter,
messageDelayer,
pulsarCrypto,
initContext);
initContext,
userCallback);
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallback;
import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallbackFactory;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
Expand Down Expand Up @@ -112,6 +115,7 @@ public class PulsarSinkBuilder<IN> {
private TopicRouter<IN> topicRouter;
private MessageDelayer<IN> messageDelayer;
private PulsarCrypto pulsarCrypto;
private SinkUserCallbackFactory<IN> userCallbackFactory;

// private builder constructor.
PulsarSinkBuilder() {
Expand Down Expand Up @@ -387,6 +391,19 @@ public PulsarSinkBuilder<IN> setProperties(Properties properties) {
return this;
}

/**
* Set a factory for the {@link SinkUserCallback}. A callback is instantiated in each {@link PulsarWriter}
* and disposed of when the app shuts down.
*
* @param userCallbackFactory the factory.
* @return this PuslarSourceBuilder
*/
public PulsarSinkBuilder<IN> setUserCallbackFactory(
SinkUserCallbackFactory<IN> userCallbackFactory) {
this.userCallbackFactory = userCallbackFactory;
return this;
}

/**
* Build the {@link PulsarSink}.
*
Expand Down Expand Up @@ -483,7 +500,8 @@ public PulsarSink<IN> build() {
topicRoutingMode,
topicRouter,
messageDelayer,
pulsarCrypto);
pulsarCrypto,
userCallbackFactory);
}

// ------------- private helpers --------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.pulsar.sink.callback;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;

import org.apache.pulsar.client.api.MessageId;

/**
* An optional callback interface that users can plug into PulsarSink.
*
* @param <IN> The input type of the sink
*/
@PublicEvolving
public interface SinkUserCallback<IN> extends AutoCloseable {
/**
* This method is called before the message is sent to the topic. The user can modify the
* message. By default, the same message will be returned.
*
* @param element the element received from the previous operator.
* @param message the message wrapper with the element already serialized.
* @param topic the pulsar topic or partition that the message will be routed to.
*/
void beforeSend(IN element, PulsarMessage<?> message, String topic);

/**
* This method is called after producer has tried to write the message to the topic.
*
* @param element the element received from the previous operator.
* @param message the message that was sent to the topic.
* @param messageId the topic MessageId, if the send operation was successful.
*/
void onSendSucceeded(IN element, PulsarMessage<?> message, String topic, MessageId messageId);

/**
* This method is called after producer has tried to write the message to the topic.
*
* @param element the element received from the previous operator.
* @param message the message that was sent to the topic.
* @param topic the topic or partition that the message was sent to.
* @param ex the exception.
*/
void onSendFailed(IN element, PulsarMessage<?> message, String topic, Throwable ex);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.pulsar.sink.callback;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
* A serializable factory for SinkUserCallback.
*
* @param <IN> the input type of the Sink
*/
@PublicEvolving
public interface SinkUserCallbackFactory<IN> extends Serializable {
SinkUserCallback<IN> create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallback;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
Expand All @@ -43,12 +44,15 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -80,6 +84,8 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
private final MailboxExecutor mailboxExecutor;
private final AtomicLong pendingMessages;

private final SinkUserCallback<IN> userCallback;

/**
* Constructor creating a Pulsar writer.
*
Expand All @@ -91,8 +97,10 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
* @param serializationSchema Transform the incoming records into different message properties.
* @param metadataListener The listener for querying topic metadata.
* @param topicRouter Topic router to choose the topic by incoming records.
* @param messageDelayer Used to delay sending messages downstream in {@link SubscriptionType#Shared} subscription
* @param pulsarCrypto Used for end-to-end encryption.
* @param initContext Context to provide information about the runtime environment.
* @param userCallback The callback to, optionally, trigger when the writer takes action.
*/
public PulsarWriter(
SinkConfiguration sinkConfiguration,
Expand All @@ -101,7 +109,8 @@ public PulsarWriter(
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
PulsarCrypto pulsarCrypto,
InitContext initContext)
InitContext initContext,
@Nullable SinkUserCallback<IN> userCallback)
throws PulsarClientException {
checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
Expand Down Expand Up @@ -131,6 +140,8 @@ public PulsarWriter(
throw new FlinkRuntimeException("Cannot initialize schema.", e);
}

this.userCallback = userCallback;

// Create this producer register after opening serialization schema!
SinkWriterMetricGroup metricGroup = initContext.metricGroup();
this.producerRegister = new ProducerRegister(sinkConfiguration, pulsarCrypto, metricGroup);
Expand All @@ -157,10 +168,17 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
builder.deliverAt(deliverAt);
}

// invoke user callback before send
invokeUserCallbackBeforeSend(element, message, topic);

// Perform message sending.
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// We would just ignore the sending exception. This may cause data loss.
builder.sendAsync();
CompletableFuture<MessageId> future = builder.sendAsync();
future.whenComplete(
(id, ex) -> {
invokeUserCallbackAfterSend(element, message, topic, id, ex);
});
} else {
// Increase the pending message count.
pendingMessages.incrementAndGet();
Expand All @@ -175,10 +193,44 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
} else {
LOG.debug("Sent message to Pulsar {} with message id {}", topic, id);
}
invokeUserCallbackAfterSend(element, message, topic, id, ex);
});
}
}

private void callSafely(Runnable r) {
try {
r.run();
} catch (Throwable t) {
LOG.warn("Exception from user callback", t);
}
}

private void invokeUserCallbackBeforeSend(IN element, PulsarMessage<?> message, String topic) {
if (userCallback == null) {
return;
}

callSafely(() -> userCallback.beforeSend(element, message, topic));
}

private void invokeUserCallbackAfterSend(
IN element,
PulsarMessage<?> message,
String topic,
MessageId messageId,
Throwable exception) {
if (userCallback == null) {
return;
}

if (exception == null) {
callSafely(() -> userCallback.onSendSucceeded(element, message, topic, messageId));
} else {
callSafely(() -> userCallback.onSendFailed(element, message, topic, exception));
}
}

private void throwSendingException(String topic, Throwable ex) {
throw new FlinkRuntimeException("Failed to send data to Pulsar: " + topic, ex);
}
Expand Down Expand Up @@ -275,6 +327,6 @@ public Collection<PulsarCommittable> prepareCommit() {
@Override
public void close() throws Exception {
// Close all the resources and throw the exception at last.
closeAll(metadataListener, producerRegister);
closeAll(metadataListener, producerRegister, userCallback);
}
}
Loading