Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,24 @@ public class SinkConnectorConfig {
// timeunit: ms, default 5000ms
private int idleTimeout = 5000;

// maximum number of HTTP/1 connections a client will pool, default 5
private int maxConnectionPoolSize = 5;
// maximum number of HTTP/1 connections a client will pool, default 50
private int maxConnectionPoolSize = 50;

// retry config
private HttpRetryConfig retryConfig = new HttpRetryConfig();

// webhook config
private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();

private String deliveryStrategy = "ROUND_ROBIN";

private boolean skipDeliverException = false;

// managed pipelining param, default true
private boolean isParallelized = true;

private int parallelism = 2;


/**
* Fill default values if absent (When there are multiple default values for a field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@ public class SourceConnectorConfig {
*/
private int maxFormAttributeSize = 1024 * 1024;

// protocol, default Common
// max size of the queue, default 1000
private int maxStorageSize = 1000;

// batch size, default 10
private int batchSize = 10;

// protocol, default CloudEvent
private String protocol = "Common";

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();

// data consistency enabled, default true
private boolean dataConsistencyEnabled = false;
private boolean dataConsistencyEnabled = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public synchronized List<E> fetchRange(int start, int end, boolean removed) {
count++;
}
return items;

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.http.sink;

import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
Expand All @@ -32,6 +33,10 @@

import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Getter;
import lombok.SneakyThrows;
Expand All @@ -45,6 +50,12 @@ public class HttpSinkConnector implements Sink, ConnectorCreateService<Sink> {
@Getter
private HttpSinkHandler sinkHandler;

private ThreadPoolExecutor executor;

private final LinkedBlockingQueue<ConnectRecord> queue = new LinkedBlockingQueue<>(10000);

private final AtomicBoolean isStart = new AtomicBoolean(true);

@Override
public Class<? extends Config> configClass() {
return HttpSinkConfig.class;
Expand Down Expand Up @@ -90,11 +101,30 @@ private void doInit() {
} else {
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
}
boolean isParallelized = this.httpSinkConfig.connectorConfig.isParallelized();
int parallelism = isParallelized ? this.httpSinkConfig.connectorConfig.getParallelism() : 1;
executor = new ThreadPoolExecutor(parallelism, parallelism, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new EventMeshThreadFactory("http-sink-handler"));
}

@Override
public void start() throws Exception {
this.sinkHandler.start();
for (int i = 0; i < this.httpSinkConfig.connectorConfig.getParallelism(); i++) {
executor.execute(() -> {
while (isStart.get()) {
ConnectRecord connectRecord = null;
try {
connectRecord = queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (connectRecord != null) {
sinkHandler.handle(connectRecord);
}
}
});
}
}

@Override
Expand All @@ -114,7 +144,18 @@ public void onException(ConnectRecord record) {

@Override
public void stop() throws Exception {
isStart.set(false);
while (!queue.isEmpty()) {
ConnectRecord record = queue.poll();
this.sinkHandler.handle(record);
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.sinkHandler.stop();
log.info("All tasks completed, start shut down http sink connector");
}

@Override
Expand All @@ -125,8 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
log.warn("ConnectRecord data is null, ignore.");
continue;
}
// Handle the ConnectRecord
this.sinkHandler.handle(sinkRecord);
queue.put(sinkRecord);
} catch (Exception e) {
log.error("Failed to sink message via HTTP. ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class HttpExportMetadata implements Serializable {

private LocalDateTime receivedTime;

private String httpRecordId;

private String recordId;

private String retriedBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,33 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.Getter;

/**
* AbstractHttpSinkHandler is an abstract class that provides a base implementation for HttpSinkHandler.
*/
public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {

@Getter
private final SinkConnectorConfig sinkConnectorConfig;

@Getter
private final List<URI> urls;

private final HttpDeliveryStrategy deliveryStrategy;

private int roundRobinIndex = 0;

protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
this.sinkConnectorConfig = sinkConnectorConfig;
this.deliveryStrategy = HttpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
// Initialize URLs
String[] urlStrings = sinkConnectorConfig.getUrls();
this.urls = Arrays.stream(urlStrings)
.map(URI::create)
.collect(Collectors.toList());
}

public SinkConnectorConfig getSinkConnectorConfig() {
return sinkConnectorConfig;
}

public List<URI> getUrls() {
return urls;
}

/**
* Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
Expand All @@ -65,23 +66,38 @@ public List<URI> getUrls() {
public void handle(ConnectRecord record) {
// build attributes
Map<String, Object> attributes = new ConcurrentHashMap<>();
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));

// send the record to all URLs
for (URI url : urls) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s",
this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);

// add AttemptEvent to the attributes
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);

// deliver the record
deliver(url, httpConnectRecord, attributes, record);

switch (deliveryStrategy) {
case ROUND_ROBIN:
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(1));
URI url = urls.get(roundRobinIndex);
roundRobinIndex = (roundRobinIndex + 1) % urls.size();
sendRecordToUrl(record, attributes, url);
break;
case BROADCAST:
for (URI broadcastUrl : urls) {
attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));
sendRecordToUrl(record, attributes, broadcastUrl);
}
break;
default:
throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy);
}
}

private void sendRecordToUrl(ConnectRecord record, Map<String, Object> attributes, URI url) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s",
this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);

// add AttemptEvent to the attributes
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);

// deliver the record
deliver(url, httpConnectRecord, attributes, record);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.sink.handler;

public enum HttpDeliveryStrategy {
ROUND_ROBIN,
BROADCAST
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private void doInitWebClient() {
.setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
.setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
.setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
.setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
.setPipelining(sinkConnectorConfig.isParallelized());
this.webClient = WebClient.create(vertx, options);
}

Expand All @@ -108,7 +109,7 @@ private void doInitWebClient() {
*/
@Override
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes,
ConnectRecord connectRecord) {
ConnectRecord connectRecord) {
// create headers
Map<String, Object> extensionMap = new HashMap<>();
Set<String> extensionKeySet = httpConnectRecord.getExtensions().keySet();
Expand Down Expand Up @@ -203,6 +204,9 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
// failure
record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
}
} else {
log.warn("still have requests to process, size {}|attempt num {}",
multiHttpRequestContext.getRemainingRequests(), attemptEvent.getAttempts());
}
}

Expand Down
Loading
Loading