diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/pom.xml b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/pom.xml new file mode 100644 index 000000000..c4930daa4 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/pom.xml @@ -0,0 +1,85 @@ + + + 4.0.0 + + org.apache.dubbo + dubbo-samples-async-result + 1.0-SNAPSHOT + + + dubbo-samples-async-result-consumer + + + 17 + 17 + UTF-8 + + + + + + org.apache.dubbo + dubbo + ${dubbo.version} + provided + + + org.apache.dubbo + dubbo-spring-boot-starter + + + org.apache.dubbo + dubbo-zookeeper-curator5-spring-boot-starter + + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-log4j2 + + + + + org.apache.dubbo + dubbo-samples-async-result-interface + 1.0-SNAPSHOT + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.jupiter + junit-jupiter + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/java/org/apache/dubbo/ConsumerApplication.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/java/org/apache/dubbo/ConsumerApplication.java new file mode 100644 index 000000000..0d414346b --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/java/org/apache/dubbo/ConsumerApplication.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo; + +import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableDubbo +public class ConsumerApplication { + public static void main(String[] args) { + SpringApplication.run(ConsumerApplication.class, args); + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/java/org/apache/dubbo/HiConsumer.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/java/org/apache/dubbo/HiConsumer.java new file mode 100644 index 000000000..d9b954250 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/java/org/apache/dubbo/HiConsumer.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo; + +import org.apache.dubbo.config.annotation.DubboReference; +import org.apache.dubbo.rpc.RpcContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +@Component +public class HiConsumer { + private static final Logger logger = LoggerFactory.getLogger(HiConsumer.class); + + @DubboReference(async = true, check = false) + private HiService hiService; + + + public void asyncCallMethod1() { + + CompletableFuture future = hiService.sayHelloAsync("Alice"); + + future.whenComplete((result, error) -> { + if (error != null) { + logger.debug("failed: {}", error.getMessage()); + } else { + logger.info("result: {}", result); + } + }); + + future.thenApply(result -> "[PROCESSED] " + result.toUpperCase()) + .thenAccept(processed -> logger.info("result: {}", processed)); + + try { + String result = future.get(3000, TimeUnit.MILLISECONDS); + logger.info("sync wait result: {}", result); + } catch (Exception e) { + logger.debug(e.getMessage()); + } + } + + + public void asyncCallMethod2() { + + String syncResult = hiService.sayHello("Bob"); + logger.info("sync return value (usually null, meaningless): {}", syncResult); + + CompletableFuture future = RpcContext.getServiceContext().getCompletableFuture(); + + if (future != null) { + future.whenComplete((result, error) -> { + if (error != null) { + logger.error(error.getMessage()); + } else { + logger.info("async callback result: {}", result); + } + }); + + future.thenApply(result -> "[PROCESSED] " + ((String) result).toUpperCase()) + .thenAccept(processed -> logger.info("result: {}", processed)); + + try { + Object result = future.get(3000, TimeUnit.MILLISECONDS); + logger.info("sync wait result: {}", result); + } catch (Exception e) { + logger.debug(e.getMessage()); + } + } + } + + + public void compareDemo() { + asyncCallMethod1(); + asyncCallMethod2(); + } + +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/resources/application.yaml b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/resources/application.yaml new file mode 100644 index 000000000..3d527caf4 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/main/resources/application.yaml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dubbo: + application: + name: dubbo-samples-async-result-consumer + protocol: + name: tri + port: -1 + registry: + address: zookeeper://${zookeeper.address:127.0.0.1}:2181 + +logging: + level: + org.apache.dubbo: DEBUG + org.apache.dubbo.rpc.AsyncRpcResult: DEBUG + pattern: + console: "%d{HH:mm:ss.SSS} %highlight{%-5p} [%t] %c{1.}:%L - %m%n" diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/test/java/org/apache/dubbo/async/result/consumer/ConsumerTest.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/test/java/org/apache/dubbo/async/result/consumer/ConsumerTest.java new file mode 100644 index 000000000..9a0a69e1f --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-consumer/src/test/java/org/apache/dubbo/async/result/consumer/ConsumerTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.async.result.consumer; + +import org.apache.dubbo.ConsumerApplication; +import org.apache.dubbo.HiConsumer; +import org.apache.dubbo.HiService; +import org.apache.dubbo.config.annotation.DubboReference; +import org.apache.dubbo.rpc.RpcContext; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +@SpringBootTest(classes = ConsumerApplication.class) +public class ConsumerTest { + private static final Logger log = LoggerFactory.getLogger(ConsumerTest.class); + + @Autowired + private HiConsumer consumer; + + @DubboReference(async = true, check = false) + private HiService hiService; + + @DubboReference(check = false) + private HiService hiServiceSync; + + @Test + public void testAsyncCallMethod1WithFilterChain() throws Exception { + CompletableFuture future = hiService.sayHelloAsync("Alice"); + if (future == null) { + throw new RuntimeException("Future should not be null"); + } + String result = future.get(5000, TimeUnit.MILLISECONDS); + + if (result == null) { + throw new RuntimeException("Result should not be null"); + } + + if (!result.contains("[short-circuit]")) { + throw new RuntimeException("Should contain AsyncShortCircuitFilter short-circuit identifier"); + } + + if (!result.contains("sayHelloAsync")) { + throw new RuntimeException("Should contain method name"); + } + + if (result.contains("[decorated]") || result.contains("[attach&decorated]") || result.contains("[Event-driven]")) { + throw new RuntimeException("In short-circuit mode, subsequent Filters should not be executed"); + } + log.info("successfully"); + } + + @Test + public void testAsyncCallMethod2WithAttachment() throws Exception { + + String ignored = hiService.sayHello("Bob"); + log.info("Sync return value: {}", ignored); + + CompletableFuture future = RpcContext.getServiceContext().getCompletableFuture(); + if (future == null) { + throw new RuntimeException("Should be able to get Future from RpcContext"); + } + + Object result = future.get(5000, TimeUnit.MILLISECONDS); + + if (!(result instanceof String)) { + throw new RuntimeException("Result should be String type"); + } + + String resultStr = (String) result; + if (!resultStr.contains("[short-circuit]")) { + throw new RuntimeException("Should contain short-circuit identifier"); + } + + if (!resultStr.contains("sayHello")) { + throw new RuntimeException("Should contain method name"); + } + log.info("successfully"); + } + + @Test + public void testHiConsumerAsyncMethods() throws Exception { + consumer.asyncCallMethod1(); + Thread.sleep(2000); // Wait for async callback to complete + + consumer.asyncCallMethod2(); + Thread.sleep(2000); // Wait for async callback to complete + } + + @Test + public void testSyncCallWithAttachment() throws Exception { + String result = hiServiceSync.sayHello("Charlie"); + log.info(" Sync call result: {}", result); + + if (!result.contains("[short-circuit]")) { + throw new RuntimeException("Should contain short-circuit identifier"); + } + + if (!result.contains("sayHello")) { + throw new RuntimeException("Should contain method name"); + } + + if (result.contains("[decorated]") || result.contains("[attach&decorated]") || result.contains("[Event-driven]")) { + throw new RuntimeException("In short-circuit mode, subsequent Filters should not be executed"); + } + log.info("successfully"); + } + + @Test + public void testCompareDemo() throws Exception { + consumer.compareDemo(); + Thread.sleep(3000); + } + + @Test + public void testFilterChainOrder() throws Exception { + CompletableFuture future = hiService.sayHelloAsync("FilterChainTest"); + String result = future.get(5000, TimeUnit.MILLISECONDS); + + log.info("Complete result: {}", result); + + int shortCircuitIdx = result.indexOf("[short-circuit]"); + int decoratedIdx = result.indexOf("[decorated]"); + int attachDecoratedIdx = result.indexOf("[attach&decorated]"); + int eventDrivenIdx = result.indexOf("[Event-driven]"); + + if (shortCircuitIdx < 0) { + throw new RuntimeException("Should contain short-circuit identifier"); + } + + if (decoratedIdx >= 0 || attachDecoratedIdx >= 0 || eventDrivenIdx >= 0) { + throw new RuntimeException("In short-circuit mode, subsequent Filters should not execute"); + } + log.info("successfully"); + } + + @Test + public void testConcurrentAsyncCalls() throws Exception { + + @SuppressWarnings("unchecked") + CompletableFuture[] futures = new CompletableFuture[5]; + for (int i = 0; i < 5; i++) { + String name = "User" + i; + futures[i] = hiService.sayHelloAsync(name); + } + CompletableFuture allOf = CompletableFuture.allOf(futures); + allOf.get(10000, TimeUnit.MILLISECONDS); + + for (int i = 0; i < 5; i++) { + String result = futures[i].get(); + if (result == null) { + throw new RuntimeException("Result #" + i + " should not be null"); + } + + if (!result.contains("[short-circuit]")) { + throw new RuntimeException("Result #" + i + " should contain short-circuit identifier"); + } + + if (!result.contains("sayHelloAsync")) { + throw new RuntimeException("Result #" + i + " should contain method name"); + } + + log.info("Async call #{} result: {}", i, result); + } + + log.info("successfully"); + } + +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-interface/pom.xml b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-interface/pom.xml new file mode 100644 index 000000000..90c3e906b --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-interface/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + org.apache.dubbo + dubbo-samples-async-result + 1.0-SNAPSHOT + + + dubbo-samples-async-result-interface + + + 17 + 17 + UTF-8 + + + diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-interface/src/main/java/org/apache/dubbo/HiService.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-interface/src/main/java/org/apache/dubbo/HiService.java new file mode 100644 index 000000000..80d9c786f --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-interface/src/main/java/org/apache/dubbo/HiService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo; + +import java.util.concurrent.CompletableFuture; + +public interface HiService { + public String sayHello(String name); + + public CompletableFuture sayHelloAsync(String name); + +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/pom.xml b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/pom.xml new file mode 100644 index 000000000..803fdcb6c --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/pom.xml @@ -0,0 +1,90 @@ + + + 4.0.0 + + org.apache.dubbo + dubbo-samples-async-result + 1.0-SNAPSHOT + + + dubbo-samples-async-result-provider + + + 17 + 17 + UTF-8 + + + + + org.apache.dubbo + dubbo + ${dubbo.version} + + + org.apache.dubbo + dubbo-spring-boot-starter + + + org.apache.dubbo + dubbo-zookeeper-curator5-spring-boot-starter + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-log4j2 + + + + org.apache.dubbo + dubbo-samples-async-result-interface + 1.0-SNAPSHOT + + + + io.dropwizard.metrics + metrics-core + 4.1.12.1 + + + org.xerial.snappy + snappy-java + 1.1.10.5 + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 17 + 17 + 17 + + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/EmbeddedZooKeeper.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/EmbeddedZooKeeper.java new file mode 100644 index 000000000..c4d5b8cc2 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/EmbeddedZooKeeper.java @@ -0,0 +1,305 @@ + +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.ServerSocket; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.util.ErrorHandler; + +/** + * from: https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java + *

+ * Helper class to start an embedded instance of standalone (non clustered) ZooKeeper. + *

+ * NOTE: at least an external standalone server (if not an ensemble) are recommended, even for + * {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication} + */ +public class EmbeddedZooKeeper implements SmartLifecycle { + + private static final Random RANDOM = new Random(); + + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class); + + /** + * ZooKeeper client port. This will be determined dynamically upon startup. + */ + private final int clientPort; + + /** + * Whether to auto-start. Default is true. + */ + private boolean autoStartup = true; + + /** + * Lifecycle phase. Default is 0. + */ + private int phase = 0; + + /** + * Thread for running the ZooKeeper server. + */ + private volatile Thread zkServerThread; + + /** + * ZooKeeper server. + */ + private volatile ZooKeeperServerMain zkServer; + + /** + * {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. + */ + private ErrorHandler errorHandler; + + private boolean daemon = true; + + /** + * Construct an EmbeddedZooKeeper with a random port. + */ + public EmbeddedZooKeeper() { + clientPort = findRandomPort(30000, 65535); + } + + /** + * Construct an EmbeddedZooKeeper with the provided port. + * + * @param clientPort port for ZooKeeper server to bind to + */ + public EmbeddedZooKeeper(int clientPort, boolean daemon) { + this.clientPort = clientPort; + this.daemon = daemon; + } + + /** + * Returns the port that clients should use to connect to this embedded server. + * + * @return dynamically determined client port + */ + public int getClientPort() { + return this.clientPort; + } + + /** + * Specify whether to start automatically. Default is true. + * + * @param autoStartup whether to start automatically + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * Specify the lifecycle phase for the embedded server. + * + * @param phase the lifecycle phase + */ + public void setPhase(int phase) { + this.phase = phase; + } + + /** + * {@inheritDoc} + */ + @Override + public int getPhase() { + return this.phase; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isRunning() { + return (zkServerThread != null); + } + + /** + * Start the ZooKeeper server in a background thread. + *

+ * Register an error handler via {@link #setErrorHandler} in order to handle + * any exceptions thrown during startup or execution. + */ + @Override + public synchronized void start() { + if (zkServerThread == null) { + zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter"); + zkServerThread.setDaemon(daemon); + zkServerThread.start(); + } + } + + /** + * Shutdown the ZooKeeper server. + */ + @Override + public synchronized void stop() { + if (zkServerThread != null) { + // The shutdown method is protected...thus this hack to invoke it. + // This will log an exception on shutdown; see + // https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details. + try { + Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown"); + shutdown.setAccessible(true); + shutdown.invoke(zkServer); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // It is expected that the thread will exit after + // the server is shutdown; this will block until + // the shutdown is complete. + try { + zkServerThread.join(5000); + zkServerThread = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while waiting for embedded ZooKeeper to exit"); + // abandoning zk thread + zkServerThread = null; + } + } + } + + /** + * Stop the server if running and invoke the callback when complete. + */ + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + /** + * Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none + * is provided, only error-level logging will occur. + * + * @param errorHandler the {@link ErrorHandler} to be invoked + */ + public void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + /** + * Runnable implementation that starts the ZooKeeper server. + */ + private class ServerRunnable implements Runnable { + + @Override + public void run() { + try { + Properties properties = new Properties(); + File file = new File(System.getProperty("java.io.tmpdir") + + File.separator + UUID.randomUUID()); + file.deleteOnExit(); + properties.setProperty("dataDir", file.getAbsolutePath()); + properties.setProperty("clientPort", String.valueOf(clientPort)); + + QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig(); + quorumPeerConfig.parseProperties(properties); + + zkServer = new ZooKeeperServerMain(); + ServerConfig configuration = new ServerConfig(); + configuration.readFrom(quorumPeerConfig); + + System.setProperty("zookeeper.admin.enableServer", "false"); + + zkServer.runFromConfig(configuration); + } catch (Exception e) { + if (errorHandler != null) { + errorHandler.handleError(e); + } else { + logger.error("Exception running embedded ZooKeeper", e); + } + } + } + } + + /** + * Workaround for SocketUtils.findRandomPort() deprecation. + * + * @param min min port + * @param max max port + * @return a random generated available port + */ + private static int findRandomPort(int min, int max) { + if (min < 1024) { + throw new IllegalArgumentException("Max port shouldn't be less than 1024."); + } + + if (max > 65535) { + throw new IllegalArgumentException("Max port shouldn't be greater than 65535."); + } + + if (min > max) { + throw new IllegalArgumentException("Min port shouldn't be greater than max port."); + } + + int port = 0; + int counter = 0; + + // Workaround for legacy JDK doesn't support Random.nextInt(min, max). + List randomInts = RANDOM.ints(min, max + 1) + .limit(max - min) + .mapToObj(Integer::valueOf) + .collect(Collectors.toList()); + + do { + if (counter > max - min) { + throw new IllegalStateException("Unable to find a port between " + min + "-" + max); + } + + port = randomInts.get(counter); + counter++; + } while (isPortInUse(port)); + + return port; + } + + private static boolean isPortInUse(int port) { + try (ServerSocket ignored = new ServerSocket(port)) { + return false; + } catch (IOException e) { + // continue + } + return true; + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/ProviderApplication.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/ProviderApplication.java new file mode 100644 index 000000000..ad4376c0a --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/ProviderApplication.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo; + +import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableDubbo +public class ProviderApplication { + public static void main(String[] args) { + // new EmbeddedZooKeeper(2181, false).start(); + SpringApplication.run(ProviderApplication.class,args); + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/AsyncShortCircuitFilter.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/AsyncShortCircuitFilter.java new file mode 100644 index 000000000..1e6f93104 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/AsyncShortCircuitFilter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.filter; + +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.AppResponse; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +@Activate(group = {CommonConstants.PROVIDER}, order = -2000) +public class AsyncShortCircuitFilter implements Filter { + private static final Logger logger = LoggerFactory.getLogger(AsyncShortCircuitFilter.class); + private final Executor executor = Executors.newCachedThreadPool(); + + @Override + public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { + + String method = invocation.getMethodName(); + // if (someCondition) { + // return invoker.invoke(invocation); // Pass through when condition not met + // } + + logger.debug(" method={}, not calling actual service", method); + + CompletableFuture future = new CompletableFuture<>(); + + executor.execute(() -> { + try { + String payload = "[short-circuit] async result for " + method; + AppResponse resp = new AppResponse(invocation); + resp.setValue(payload); + future.complete(resp); + logger.debug(payload); + } catch (Throwable t) { + AppResponse resp = new AppResponse(invocation); + resp.setException(t); + future.complete(resp); + logger.debug(t.getMessage()); + } + }); + + return new AsyncRpcResult(future, invocation); + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/AttachAndDecorateFilter.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/AttachAndDecorateFilter.java new file mode 100644 index 000000000..f3518ee5a --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/AttachAndDecorateFilter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.filter; + +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.AppResponse; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +@Activate(group = {CommonConstants.PROVIDER}, order = -900) +public class AttachAndDecorateFilter implements Filter { + private static final Logger logger = LoggerFactory.getLogger(AttachAndDecorateFilter.class); + + @Override + public Result invoke(Invoker invoker, Invocation inv) throws RpcException { + Result r = invoker.invoke(inv); + + if (r instanceof AsyncRpcResult) { + AsyncRpcResult ar = (AsyncRpcResult) r; + + CompletableFuture decorated = ar.getResponseFuture().thenApply(app -> { + + String traceId = UUID.randomUUID().toString().substring(0, 8); + app.setAttachment("trace-id", traceId); + app.setAttachment("filter", "AttachAndDecorateFilter"); + + if (app.hasException()) { + return app; + } + + if (app.getValue() instanceof String s) { + app.setValue("[attach&decorated] " + s); + } + + return app; + }); + + return new AsyncRpcResult(decorated, inv); + } else { + + AppResponse app = new AppResponse(inv); + + String traceId = UUID.randomUUID().toString().substring(0, 8); + app.setAttachment("trace-id", traceId); + app.setAttachment("filter", "AttachAndDecorateFilter"); + + Object originalValue = r.getValue(); + if (originalValue instanceof String) { + app.setValue("[attach&decorated] " + originalValue); + } else { + app.setValue(originalValue); + } + return AsyncRpcResult.newDefaultAsyncResult(app, inv); + } + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/DecorateResultFilter.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/DecorateResultFilter.java new file mode 100644 index 000000000..e77539e90 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/DecorateResultFilter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.filter; + +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.AppResponse; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; + +import java.util.concurrent.CompletableFuture; + +@Activate(group = {CommonConstants.PROVIDER}, order = -1000) +public class DecorateResultFilter implements Filter { + + @Override + public AsyncRpcResult invoke(Invoker invoker, Invocation invocation) throws RpcException { + + Result result = invoker.invoke(invocation); + + if (result instanceof AsyncRpcResult) { + AsyncRpcResult ar = (AsyncRpcResult) result; + CompletableFuture f = ar.getResponseFuture(); + + CompletableFuture decorated = f.thenApply(app -> { + if (!app.hasException()) { + Object v = app.getValue(); + AppResponse newResp = new AppResponse(invocation); + newResp.setValue("[decorated] " + v); + return newResp; + } + return app; + }); + + return new AsyncRpcResult(decorated, invocation); + } else { + String decorated = "[decorated] " + result.getValue(); + return AsyncRpcResult.newDefaultAsyncResult(decorated, invocation); + } + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/EventDrivenFilter.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/EventDrivenFilter.java new file mode 100644 index 000000000..ede089b84 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/filter/EventDrivenFilter.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.filter; + +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.rpc.AppResponse; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Activate(group = {CommonConstants.PROVIDER}, order = -800) +public class EventDrivenFilter implements Filter { + private static final Logger logger = LoggerFactory.getLogger(EventDrivenFilter.class); + + private final MockAsyncClient client = new MockAsyncClient(); + + @Override + public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { + String methodName = invocation.getMethodName(); + + /* ========== Pass-through mode (uncomment if needed) ========== + // Directly call subsequent services, no event-driven processing + logger.debug("EventDrivenFilter: Pass-through mode, method={}", methodName); + return invoker.invoke(invocation); + ========== Pass-through mode end ========== */ + + // ========== Default: Event-driven mode ========== + // Can selectively apply event-driven mode to certain methods + // if (!"sayHelloAsync".equals(methodName)) { + // return invoker.invoke(invocation); // Pass through + // } + + logger.debug("EventDrivenFilter: Event-driven mode, intercepting method {}, delegating to external async client", methodName); + + CompletableFuture future = new CompletableFuture<>(); + AsyncRpcResult ar = new AsyncRpcResult(future, invocation); + + client.send(invocation, (value, err) -> { + AppResponse resp = new AppResponse(invocation); + if (err != null) { + resp.setException(err); + } else { + resp.setValue(value); + } + future.complete(resp); + }); + + ar.whenCompleteWithContext((r, t) -> { + if (t != null) { + logger.error("exceptionally, method={}", methodName, t); + } else { + logger.debug("normally, method={}", methodName); + } + }); + + return ar; + } + + private static class MockAsyncClient { + private static final Logger logger = LoggerFactory.getLogger(MockAsyncClient.class); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); + + public void send(Invocation invocation, Callback callback) { + String methodName = invocation.getMethodName(); + Object[] args = invocation.getArguments(); + + logger.debug("method={}", methodName); + + scheduler.schedule(() -> { + try { + String result = String.format( + "[Event-driven] External service processing result: method=%s, args=%s", + methodName, + args.length > 0 ? args[0] : "none" + ); + callback.onComplete(result, null); + } catch (Exception e) { + callback.onComplete(null, e); + } + }, 500, TimeUnit.MILLISECONDS); + } + } + + @FunctionalInterface + interface Callback { + void onComplete(Object value, Throwable error); + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/impl/HiServiceImpl.java b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/impl/HiServiceImpl.java new file mode 100644 index 000000000..2f187a7a3 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/java/org/apache/dubbo/impl/HiServiceImpl.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.impl; + +import org.apache.dubbo.HiService; +import org.apache.dubbo.config.annotation.DubboService; + +import java.util.concurrent.CompletableFuture; + +@DubboService +public class HiServiceImpl implements HiService { + @Override + public String sayHello(String name) { + return "Hello " + name; + } + + @Override + public CompletableFuture sayHelloAsync(String name){ + return CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "async result: " + name; + }); + } +} diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter new file mode 100644 index 000000000..90f796554 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter @@ -0,0 +1,4 @@ +AsyncShortCircuit=org.apache.dubbo.filter.AsyncShortCircuitFilter +decorateResult=org.apache.dubbo.filter.DecorateResultFilter +attachAndDecorate=org.apache.dubbo.filter.AttachAndDecorateFilter +eventDriven=org.apache.dubbo.filter.EventDrivenFilter diff --git a/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/resources/application.yaml b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/resources/application.yaml new file mode 100644 index 000000000..63d0a62ed --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/dubbo-samples-async-result-provider/src/main/resources/application.yaml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dubbo: + application: + name: dubbo-samples-async-result-provider + protocol: + name: tri + port: -1 + registry: + address: zookeeper://${zookeeper.address:127.0.0.1}:2181 + scan: + base-packages: org.apache.dubbo + +logging: + level: + org.apache.dubbo: DEBUG + org.apache.dubbo.rpc.AsyncRpcResult: DEBUG + pattern: + console: "%d{HH:mm:ss.SSS} %highlight{%-5p} [%t] %c{1.}:%L - %m%n" diff --git a/2-advanced/dubbo-samples-async-result/pom.xml b/2-advanced/dubbo-samples-async-result/pom.xml new file mode 100644 index 000000000..374e89323 --- /dev/null +++ b/2-advanced/dubbo-samples-async-result/pom.xml @@ -0,0 +1,96 @@ + + + + org.apache + apache + 23 + + + pom + 4.0.0 + + org.apache.dubbo + dubbo-samples-async-result + 1.0-SNAPSHOT + + + dubbo-samples-async-result-consumer + dubbo-samples-async-result-provider + dubbo-samples-async-result-interface + + + + 17 + 17 + UTF-8 + 3.3.0 + 2.7.18 + 5.9.3 + 1.7.36 + + + + + + + org.apache.dubbo + dubbo-spring-boot-starter + ${dubbo.version} + provided + + + org.apache.dubbo + dubbo + ${dubbo.version} + compile + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + org.apache.dubbo + dubbo-zookeeper-curator5-spring-boot-starter + ${dubbo.version} + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 17 + 17 + + + + + +