CompletableFuture

經資深工程師Justin友人,提醒我Java8還有這東西可以搞一搞,就來試看看發現驚為天人,真是厲害的東西。

CompletableFuture 是 Java 8 中引入的一個非常有用的工具,用於處理非同步任務和並行計算。它提供了一個功能強大的 API,使得處理非同步操作變得簡單且具有可讀性。以下是一些解析和衍生用法:

  1. 非同步操作CompletableFuture 主要用於處理非同步操作。您可以使用 supplyAsyncrunAsync 等方法來啟動非同步操作,並使用 thenApplythenAcceptthenCombine 等方法來處理操作的結果。這使得您可以在不阻塞主線程的情況下執行耗時的操作。

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
    future.thenApply(result -> result * 2)
          .thenAccept(finalResult -> System.out.println("Final result: " + finalResult));
  2. 組合操作CompletableFuture 允許您組合多個非同步操作,以便在某些操作完成時執行其他操作。這通常使用 thenComposethenCombinethenComposeAsync 等方法實現。

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
    
    CompletableFuture<Integer> combined = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
  3. 異常處理:您可以使用 exceptionallyhandle 等方法來處理異常情況。這使得您能夠優雅地處理錯誤,而不必使用傳統的 try-catch 塊。

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        // 可能引發異常的代碼
    });
    
    future.exceptionally(ex -> {
        System.out.println("An exception occurred: " + ex);
        return 0; // 錯誤時返回默認值
    });
  4. 等待多個 CompletableFuture 完成:如果您需要等待多個 CompletableFuture 完成,可以使用 CompletableFuture.allOfCompletableFuture.anyOf

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
    
    CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
    allOfFuture.join(); // 等待所有 CompletableFuture 完成
  5. 串行化和並行化CompletableFuture 允許您以串行或並行的方式執行操作,具體取決於您的需求。您可以使用 thenApply 等方法來串行執行操作,或使用 thenApplyAsync 等方法以並行方式執行操作。

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
    future.thenApplyAsync(result -> result * 2); // 以並行方式執行操作
  6. 等待和獲取結果:最後,要獲取 CompletableFuture 的結果,可以使用 joinget 方法。請注意,join 是一個無異常版本,如果操作引發異常,它會將其包裝為 CompletionException

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
    int result = future.join(); // 等待並獲取結果

除了 CompletableFuture,還有其他類似的工具,如 Guava 的 ListenableFuture 和 RxJava 的 Observable,它們提供了不同風格和功能的非同步處理。選擇使用哪個取決於您的項目需求和代碼風格。

還有參考到網路上的資料進行練習

參考網址:https://mahmoudanouti.wordpress.com/2018/01/26/20-examples-of-using-javas-completablefuture/

import com.caster.test.entity.BookDTO;
import com.github.javafaker.Book;
import com.github.javafaker.Faker;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.junit.jupiter.api.Assertions.*;


/**
 * @author caster.hsu
 * @Since 2023/9/13
 * example web : https://mahmoudanouti.wordpress.com/2018/01/26/20-examples-of-using-javas-completablefuture/
 */
@Slf4j
public class CompletableFutureServiceTests {
    private String nowTime() {
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"));
    }

    private void randomSleep() {
        try {
            TimeUnit.SECONDS.sleep(RandomUtils.nextInt(2, 5));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    public void case_1() {
        log.debug("Thread name:{}", Thread.currentThread().getName());
        CompletableFuture.runAsync(() -> log.debug("case_1 > runAsync", Thread.currentThread().getName(), nowTime()));
    }

    public void case_2() {
        log.debug("Thread name:{}", Thread.currentThread().getName());
        ExecutorService fixOne = Executors.newSingleThreadExecutor();
        try {
            CompletableFuture.runAsync(() -> log.debug("case_2 > runAsync"), fixOne);
        } finally {
            fixOne.shutdown();
        }
    }

    public void case_3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future_str = CompletableFuture.completedFuture("Message")
//                .thenApply(s -> { // 同步 所以都會看到是 Main thread
                .thenApplyAsync(s -> { // 非同步 thread 會看到是 ForkJoinPool.commonPool-worker-XX
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    log.debug("> thenApplyAsync ");
                    return s.toUpperCase();
                });
        log.debug("Main thread is coming....");
        log.debug("futureReturn:{}", future_str.get());
    }

    public void case_4() throws ExecutionException, InterruptedException {
        ExecutorService firstPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("First_Pool-%d").get());
        ExecutorService secondPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Second_Pool-%d").get());
        try {
            CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
                        log.debug("First block.");
                        return "Message";
                    }, secondPool)
                    .thenApply(s -> {
                        log.debug("Change word. -> 'Disconnected'");
                        return "Disconnected";
                    })
//                .thenApply(s -> { // 同步 所以都會看到是 Main thread
                    .thenApplyAsync(s -> { // 非同步 thread 會看到是 ForkJoinPool.commonPool-worker-XX
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        log.debug("Second block.");
                        return s.toUpperCase();
                    }, firstPool);
            log.debug("Main thread is coming....");
            log.debug("futureReturn:{}", future_str.get());
        } finally {
            firstPool.shutdown();
            secondPool.shutdown();
        }
    }

    public void case_5() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.debug("First block.");
            return "Message";
        });
        log.debug("Main thread is coming....");

        boolean cr1 = future_str.complete("Disconnected"); // 設定完成值,後續get 就會取用到這個值. 如果已經完成了,在呼叫這方法,也不會覆蓋原本的結果. 可根據回傳boolean 值來判斷.
        log.debug("cr1 >" + cr1);
//        log.debug("futureReturn:{}", StringUtils.defaultString(future_str.get(1, TimeUnit.SECONDS), "defaultString"));// 設定等待值. 超時會throw TimeoutException
//        log.debug("futureReturn:{}", StringUtils.defaultString(future_str.get(5, TimeUnit.SECONDS), "defaultString"));// 設定等待值. 超時會throw TimeoutException
//        log.debug("futureReturn:{}", future_str.get()); // 沒設定超時,就會hand在這裡,等到回傳值。
//        log.debug("futureReturn:{}", future_str.getNow("NowDefaultString")); // 如果還沒完成,則會直接取用設定值來繼續往下走,不會hand在這裡。
    }

    public void case_6() throws ExecutionException, InterruptedException, TimeoutException {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("测试一下异常情况");
            }
            return "s1";
        }).exceptionally(e -> {
            e.printStackTrace();
            return "hello world";
        }).join();
        log.debug("Main thread is coming....");
        log.debug("futureReturn:{}", result); // 沒設定超時,就會hand在這裡,等到回傳值。
    }

    public void case_7() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1)
                throw new RuntimeException("G啦");
            return "A_001";
        }).exceptionally(e -> {
            log.debug(e.getMessage());
            return "C_003";
        }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
            log.debug("Compose block");
            return "B_002";
        })).thenCombine(CompletableFuture.supplyAsync(() -> {
            log.debug("Combine block one");
            return "Message_001";
        }), (s1, s2) -> {
            log.debug("Combine block two, s1:{}, s2:{}", s1, s2);
            return "Message_002";
        });
        log.debug("Main thread is coming....");
        log.debug("futureReturn:{}", future_str.join()); // 沒設定超時,就會hand在這裡,等到回傳值。
        /**
         * 結果
         * 14:49:18.197 [main] DEBUG com.caster.test.future.CompletableFutureService - Main thread is coming....
         * 14:49:18.197 [ForkJoinPool.commonPool-worker-37] DEBUG com.caster.test.future.CompletableFutureService - Combine block one
         * 14:49:20.205 [ForkJoinPool.commonPool-worker-51] DEBUG com.caster.test.future.CompletableFutureService - java.lang.RuntimeException: G啦
         * 14:49:20.206 [ForkJoinPool.commonPool-worker-37] DEBUG com.caster.test.future.CompletableFutureService - Compose block
         * 14:49:20.206 [ForkJoinPool.commonPool-worker-51] DEBUG com.caster.test.future.CompletableFutureService - Combine block two, s1:B_002, s2:Message_001
         * 14:49:20.208 [main] DEBUG com.caster.test.future.CompletableFutureService - futureReturn:Message_002
         */
    }

    public void case_8() throws ExecutionException, InterruptedException, TimeoutException {
        log.debug("Main process isDaemon ? => " + Thread.currentThread().isDaemon());
        CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
            try {
                log.debug("" + Thread.currentThread().isDaemon());
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "A_001";
        }).exceptionally(e -> {
            log.debug(e.getMessage());
            return "C_003";
        });
        log.debug("Main thread is coming....");

        if (future_str.isDone())
            log.debug("program completed.");
        else
            log.debug("program processing....");
        log.debug("futureReturn:{}", future_str.get());
    }

    @Test
    public void case_9() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            assertTrue(Thread.currentThread().isDaemon());
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return s.toUpperCase();
        });

        log.debug("Main thread is coming....");

        assertNull(cf.getNow(null));
        log.debug("Main thread waiting....");
        assertEquals("MESSAGE", cf.get());
    }


    @Test
    public void test_6() throws ExecutionException, InterruptedException, TimeoutException {
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture("thenAccept message")
                .thenAccept(s -> result.append(s));
        log.debug(result.toString());
        assertTrue(result.length() > 0, "Result was empty");
    }


    static volatile StringBuffer result = new StringBuffer();
    static final String LOG_STR = "time:{} > 1:{}";
    static final String LOG_STR_TWO = "time:{} > 2:{}";
    static final String LOG_STR_THREE = "time:{} > result:{}";

    @Test
    public void test_7_() throws ExecutionException, InterruptedException, TimeoutException {
//        StringBuilder result = new StringBuilder();

        CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
                .thenAcceptAsync(s -> {
                    result.append(s);
                    log.debug(LOG_STR_THREE, nowTime(), result);
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });

        log.debug(LOG_STR, result.toString(), nowTime()); // result 會取不到值, 特別的時間差狀況.
        log.debug(LOG_STR_TWO, nowTime(), result);

    }

    @Test
    public void test_7() throws ExecutionException, InterruptedException, TimeoutException {
        StringBuilder result = new StringBuilder();
        CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
                .thenAcceptAsync(s -> result.append(s));
        cf.join();
        assertTrue(result.length() > 0, "Result was empty");
    }

    @Test
    public void test_8() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
                CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
        CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "message upon cancel" : "");
        cf.completeExceptionally(new RuntimeException("completed exceptionally"));
        assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally");
        try {
            cf.join();
            fail("Should have thrown an exception");
        } catch (CompletionException ex) { // just for testing
            assertEquals("completed exceptionally", ex.getCause().getMessage());
        }
        assertEquals("message upon cancel", exceptionHandler.join());
    }

    @Test
    public void test_8_1() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
                .thenApplyAsync(s -> {
                            log.debug("doing.....");
                            return s.toUpperCase();
                        },
                        CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); // 可延遲執行
        log.debug("Main thread is coming....");
        TimeUnit.SECONDS.sleep(5);
    }

    @Test
    public void test_8_2() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
                .thenApplyAsync(s -> {
                            log.debug("doing.....");
                            return s.toUpperCase();
                        },
                        CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)) // 可延遲執行
                .handle((s, th) -> (th != null) ? "happen exception." : "nothing happen.");

//        CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
//                .thenApplyAsync(s -> {
//                            log.debug("doing.....");
//                            return s.toUpperCase();
//                        },
//                        CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); // 可延遲執行
//        CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "happen exception." : "nothing happen.");


        log.debug("Main thread is coming....");


        log.debug("result:{}", cf.get());
//        log.debug("_result:{}", exceptionHandler.get());
        /**
         * 2023-09-14 13:48:48.000849 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - Main thread is coming....
         * 2023-09-14 13:48:49.000858 [ForkJoinPool.commonPool-worker-51] DEBUG c.c.t.f.CompletableFutureServiceTests - doing.....
         * 2023-09-14 13:48:49.000860 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - result:MESSAGE
         * 2023-09-14 13:48:49.000861 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - _result:nothing happen.
         */

        TimeUnit.SECONDS.sleep(5);
    }

    @Test
    public void test_9() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
                CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
        CompletableFuture<String> cf2 = cf.exceptionally(throwable -> "canceled message");
        assertTrue(cf.cancel(true), "Was not canceled");
        assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally");
        assertEquals("canceled message", cf2.join());
    }

    @Test
    public void test_9_1() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            if (1 == 1)
                throw new RuntimeException("G La......");
            return s;
        });
        CompletableFuture<String> cf2 = cf.exceptionally(throwable -> {
            if (throwable instanceof CancellationException)
                return "canceled message";
            log.error("canceled message, info:{}", throwable.getMessage());
            return "happened other Exception.....";
        });
        TimeUnit.SECONDS.sleep(1);
//        assertTrue(cf.cancel(true), "Was not canceled");
//        assertTrue(cf.isDone(), "Was not completed");
        assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally"); // 以非正常方式完成,則回傳true
//        assertEquals("canceled message", cf2.join());
        assertEquals("happened other Exception.....", cf2.join());
    }

    @Test
    public void test_10() throws ExecutionException, InterruptedException, TimeoutException {
        /**
         * 兩個階段,再將回傳值傳給Function,但不保證哪個階段優先處理完結束。
         * no guarantees on which one will be passed to the Function
         */
        String original = "Message";
        CompletableFuture<String> cf1 = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayedUpperCase(s));
        CompletableFuture<String> cf2 = cf1.applyToEither(
                CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                s -> s + " from applyToEither");
        log.debug("result:{}", cf2.get());
        assertTrue(cf2.join().endsWith(" from applyToEither"));
        TimeUnit.SECONDS.sleep(5);
    }

    private static String delayedUpperCase(String s) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("delayedUpperCase >> {}", s);
        return s.toUpperCase();
    }

    private static String delayedLowerCase(String s) {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.debug("delayedLowerCase >> {}", s);
        return s.toLowerCase();
    }


    @Test
    public void test_11() throws ExecutionException, InterruptedException, TimeoutException {
        String original = "Message";
        StringBuffer result = new StringBuffer();
        CompletableFuture<Void> cf = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> {
                    return delayedUpperCase(s);
                })
                .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                        s -> result.append(s).append("acceptEither"));
        log.debug("Main thread is coming....");
        cf.join();
        log.debug("processing....");
        assertTrue(result.toString().endsWith("acceptEither"), "Result was empty");
    }

    @Test
    public void test_11_1() throws ExecutionException, InterruptedException, TimeoutException {
        List<String> messages = Arrays.asList("Msg1", "Msg2", "Msg3", "Msg4", "Msg5", "Msg6", "Msg7", "Msg8", "Msg9",
                "Msg10", "Msg11", "Msg12");
        ExecutorService executor = Executors.newFixedThreadPool(4);

        List<String> mapResult = new ArrayList<>();

        CompletableFuture<?>[] fanoutRequestList = new CompletableFuture[messages.size()];
        int count = 0;
        for (String msg : messages) {
            CompletableFuture<?> future = CompletableFuture
                    .supplyAsync(() -> sendNotification(msg), executor).exceptionally(ex -> "Error")
                    .thenAccept(mapResult::add);

            fanoutRequestList[count++] = future;
        }

        try {
//            log.debug("Main thread is coming....");
            CompletableFuture.allOf(fanoutRequestList).get();
            //CompletableFuture.allOf(fanoutRequestList).join();
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

//        log.debug("Main thread print mapResult....");
        mapResult.stream().filter(s -> !s.equalsIgnoreCase("Error")).forEach(System.out::println);
    }

    private String sendNotification(String msg) {
        return RandomStringUtils.randomAlphabetic(10) + "_" + msg;
    }

    @Test
    public void test_12() throws ExecutionException, InterruptedException, TimeoutException {
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                () -> result.append("done"));
        log.debug("Main thread is coming....");
        assertTrue(result.length() > 0, "Result was empty");
    }

    @Test
    public void test_13() throws ExecutionException, InterruptedException, TimeoutException {
        // 可以觀察 thread
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(CompletableFutureServiceTests::delayedUpperCase).thenAcceptBoth(
                CompletableFuture.completedFuture(original).thenApply(CompletableFutureServiceTests::delayedLowerCase),
                (s1, s2) -> result.append(s1 + s2));
        assertEquals("MESSAGEmessage", result.toString());
    }

    @Test
    public void test_14() throws ExecutionException, InterruptedException, TimeoutException {
        // 可以觀察 thread
        String original = "Message";
        CompletableFuture<String> cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
                .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                        (s1, s2) -> s1 + s2);
        assertEquals("MESSAGEmessage", cf.getNow(null));
    }

    @Test
    public void test_15() throws ExecutionException, InterruptedException, TimeoutException {
        // 可以觀察 thread
        String original = "Message";
        CompletableFuture<String> cf = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayedUpperCase(s))
                .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                        (s1, s2) -> s1 + s2);
        assertEquals("MESSAGEmessage", cf.join());
    }

    @Test
    public void test_16() throws ExecutionException, InterruptedException, TimeoutException {
        String original = "Message";
        CompletableFuture<String> cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
                .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)).thenApply(s -> upper + s));
        assertEquals("MESSAGEmessage", cf.join());
    }

    @Test
    public void test_17() throws ExecutionException, InterruptedException, TimeoutException {
        StringBuilder result = new StringBuilder();
        List<String> messages = Arrays.asList("a", "b", "c");
        List<CompletableFuture<String>> futures = messages.stream()
//                .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
                .collect(Collectors.toList());
        log.debug("Main thread is coming....");
        CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
            if (th == null) {
                assertTrue(isUpperCase((String) res));
                result.append(res);
            }
        }).join();
        assertTrue(result.length() > 0, "Result was empty");
    }

    private boolean isUpperCase(String s) {
        log.debug("check isUpperCase param:{}", s);
        return StringUtils.isAllUpperCase(s);
    }

    @Test
    public void test_18() throws ExecutionException, InterruptedException, TimeoutException {
        StringBuilder result = new StringBuilder();
        List<String> messages = Arrays.asList("a", "b", "c");
        List<CompletableFuture<String>> futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
//                .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
                .collect(Collectors.toList());
        log.debug("Main thread is coming....");
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
            futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
            result.append("done");
        });
        assertTrue(result.length() > 0, "Result was empty");
    }

    @Test
    public void test_19() throws ExecutionException, InterruptedException, TimeoutException {
        StringBuilder result = new StringBuilder();
        List<String> messages = Arrays.asList("a", "b", "c");
        List<CompletableFuture<String>> futures = messages.stream()
//                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
                .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
                .collect(Collectors.toList());
        log.debug("Main thread is coming....");
        CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
            futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
            result.append("done");
        });
        allOf.join();
        assertTrue(result.length() > 0, "Result was empty");
    }

    @Test
    public void test_20() throws ExecutionException, InterruptedException, TimeoutException {
        List<BookDTO> books_ = LongStream.rangeClosed(0, 100).mapToObj(o -> {
            Book book = Faker.instance(Locale.TAIWAN).book();
            return new BookDTO().setAuthor(book.author()).setGenre(book.genre()).setPublisher(book.publisher()).setTitle(book.title()).setTime(LocalDateTime.now());
        }).collect(Collectors.toList());


        CompletableFuture.completedFuture(books_).thenCompose(books -> {

            List<CompletionStage<BookDTO>> updatedCars = books.stream()
                    .map(book -> rating().thenApply(r -> book.setRating(r))).collect(Collectors.toList());

            CompletableFuture<Void> done = CompletableFuture
                    .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));

            return done.thenApply(v ->
                    updatedCars.stream()
                            .map(CompletionStage::toCompletableFuture)
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));

        }).whenComplete((books, th) -> {
            if (th == null) {
                books.forEach(System.out::println);
            } else {
                throw new RuntimeException(th);
            }
        }).toCompletableFuture().join();
    }

    @Test
    public void test_20_chatGPT_optimization() throws ExecutionException, InterruptedException, TimeoutException {
        List<BookDTO> books_ = LongStream.rangeClosed(0, 100).mapToObj(o -> {
            Book book = Faker.instance(Locale.TAIWAN).book();
            return new BookDTO().setAuthor(book.author()).setGenre(book.genre()).setPublisher(book.publisher()).setTitle(book.title()).setTime(LocalDateTime.now());
        }).collect(Collectors.toList());

        List<CompletableFuture<BookDTO>> updatedBooks = books_.stream()
                .map(book -> rating().thenApply(r -> book.setRating(r)))
                .collect(Collectors.toList());

        CompletableFuture<Void> allOf = CompletableFuture.allOf(updatedBooks.toArray(new CompletableFuture[0]));

        List<BookDTO> completedBooks = allOf.thenApply(v ->
                updatedBooks.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
        ).join();
        completedBooks.forEach(System.out::println);
    }

    private CompletableFuture<Float> rating() {
        return CompletableFuture.completedFuture(RandomUtils.nextFloat(0f, 1f));
    }

接過這20個範例練習過後,基本上可以掌握到不少用法,並且可以自行研究衍生用法。 前陣子研究玩Redisson 分布式鎖機制,藉由此次機會來個異種結合,並且觀看其變化。

import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import java.util.concurrent.*;
import java.util.stream.LongStream;

/**
 * @author caster.hsu
 * @Since 2023/4/27
 */

@Slf4j
public class CyclicBarrierRedisLockTest {
    private CyclicBarrier cyclicBarrier;
    private CyclicBarrier cyclicBarrierSecond;
    private static long startTime;

    public void mainProcess(int totalWorkers) {
        cyclicBarrier = new CyclicBarrier(totalWorkers, () -> startTime = System.currentTimeMillis());

        cyclicBarrierSecond = new CyclicBarrier(totalWorkers, () -> log.debug("共 {} 條 Async Thread, 總花費時間:{}(ms)", totalWorkers, (System.currentTimeMillis() - startTime)));

        ExecutorService defaultPool = Executors.newFixedThreadPool(totalWorkers, new ThreadFactoryBuilder().setNameFormat("Customer_Pool-%d").get());
        LongStream.rangeClosed(1, totalWorkers).forEach(o -> {
            CompletableFuture.completedFuture(cyclicBarrier)
                    .thenAcceptBothAsync(
                            CompletableFuture.completedFuture(cyclicBarrierSecond), (s1, s2) -> {
                                try {
                                    log.debug("await()");
                                    s1.await();

                                    // todo: do something
                                    tryToLockByRedisson();

                                    log.debug("second await()");
                                    s2.await();
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                } catch (BrokenBarrierException e) {
                                    throw new RuntimeException(e);
                                }
                            }, defaultPool);
        });

    }

    public static final String REDISSON_LOCK = "redisson";

    private void tryToLockByRedisson() {
        try {
            RLock rLock = redissonClient.getLock(REDISSON_LOCK);

            // 嘗試鎖定
            boolean lockSuccess = rLock.tryLock(5, 0, TimeUnit.SECONDS); // 等待時間內收到釋放鎖消息,則重新去競爭鎖
            if (lockSuccess) {
                log.debug("got key and do something.......");

                // 如果當前線程鎖住並且持有該鎖
                if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                    log.debug("release lock success.");
                    rLock.unlock();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static RedissonClient redissonClient = null;

    @Test
    public void redissonLockWithCyclicBarrierTest() throws InterruptedException {
        RedissonLockTest redissonLockTest = new RedissonLockTest();
        redissonClient = redissonLockTest.redissonClient();

        try {
            mainProcess(5);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            TimeUnit.SECONDS.sleep(5);

            // 關閉 Redisson 客戶端
            redissonClient.shutdown();
        }
    }
}

console log

##redis connection pool init
2023-09-18 15:16:39.170 [redisson-netty-2-19] INFO  o.r.c.pool.MasterConnectionPool - 24 connections initialized for 172.20.160.120/172.20.160.120:6379

## async thread ready to process & sync start
2023-09-18 15:16:39.196 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()

## start processing
2023-09-18 15:16:39.222 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.226 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.229 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.237 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.240 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.242 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.244 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.259 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.262 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.264 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.267 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.268 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.272 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.274 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.276 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.276 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - 共 5 條 Async Thread, 總花費時間:80(ms)

## redisson client shutdown
2023-09-18 15:16:44.223 [redisson-netty-2-2] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-2
2023-09-18 15:16:44.223 [redisson-netty-2-1] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-1
2023-09-18 15:16:44.223 [redisson-netty-2-4] DEBUG io.netty.buffer.PoolThreadCache - Freed 4 thread-local buffer(s) from thread: redisson-netty-2-4
2023-09-18 15:16:44.223 [redisson-netty-2-3] DEBUG io.netty.buffer.PoolThreadCache - Freed 4 thread-local buffer(s) from thread: redisson-netty-2-3
2023-09-18 15:16:44.223 [redisson-netty-2-8] DEBUG io.netty.buffer.PoolThreadCache - Freed 9 thread-local buffer(s) from thread: redisson-netty-2-8
2023-09-18 15:16:44.224 [redisson-netty-2-10] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-10
2023-09-18 15:16:44.223 [redisson-netty-2-6] DEBUG io.netty.buffer.PoolThreadCache - Freed 9 thread-local buffer(s) from thread: redisson-netty-2-6
2023-09-18 15:16:44.225 [redisson-netty-2-12] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-12
2023-09-18 15:16:44.225 [redisson-netty-2-14] DEBUG io.netty.buffer.PoolThreadCache - Freed 5 thread-local buffer(s) from thread: redisson-netty-2-14
2023-09-18 15:16:44.225 [redisson-netty-2-16] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-16
2023-09-18 15:16:44.225 [redisson-netty-2-20] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-20
2023-09-18 15:16:44.225 [redisson-netty-2-22] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-22
2023-09-18 15:16:44.226 [redisson-netty-2-18] DEBUG io.netty.buffer.PoolThreadCache - Freed 9 thread-local buffer(s) from thread: redisson-netty-2-18
2023-09-18 15:16:44.226 [redisson-netty-2-24] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-24
2023-09-18 15:16:44.226 [redisson-netty-2-28] DEBUG io.netty.buffer.PoolThreadCache - Freed 4 thread-local buffer(s) from thread: redisson-netty-2-28
2023-09-18 15:16:44.226 [redisson-netty-2-26] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-26
2023-09-18 15:16:44.226 [redisson-netty-2-30] DEBUG io.netty.buffer.PoolThreadCache - Freed 5 thread-local buffer(s) from thread: redisson-netty-2-30

一方面可以確認分散式鎖正常運作,一方面透過async方式確認程式運作順暢,Java8真的藏很多東西可以學。

Last updated