CompletableFuture
經資深工程師Justin友人,提醒我Java8還有這東西可以搞一搞,就來試看看發現驚為天人,真是厲害的東西。
CompletableFuture
是 Java 8 中引入的一個非常有用的工具,用於處理非同步任務和並行計算。它提供了一個功能強大的 API,使得處理非同步操作變得簡單且具有可讀性。以下是一些解析和衍生用法:
非同步操作:
CompletableFuture
主要用於處理非同步操作。您可以使用supplyAsync
、runAsync
等方法來啟動非同步操作,並使用thenApply
、thenAccept
、thenCombine
等方法來處理操作的結果。這使得您可以在不阻塞主線程的情況下執行耗時的操作。CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42); future.thenApply(result -> result * 2) .thenAccept(finalResult -> System.out.println("Final result: " + finalResult));
組合操作:
CompletableFuture
允許您組合多個非同步操作,以便在某些操作完成時執行其他操作。這通常使用thenCompose
、thenCombine
、thenComposeAsync
等方法實現。CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20); CompletableFuture<Integer> combined = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
異常處理:您可以使用
exceptionally
、handle
等方法來處理異常情況。這使得您能夠優雅地處理錯誤,而不必使用傳統的try-catch
塊。CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 可能引發異常的代碼 }); future.exceptionally(ex -> { System.out.println("An exception occurred: " + ex); return 0; // 錯誤時返回默認值 });
等待多個 CompletableFuture 完成:如果您需要等待多個 CompletableFuture 完成,可以使用
CompletableFuture.allOf
或CompletableFuture.anyOf
。CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2); allOfFuture.join(); // 等待所有 CompletableFuture 完成
串行化和並行化:
CompletableFuture
允許您以串行或並行的方式執行操作,具體取決於您的需求。您可以使用thenApply
等方法來串行執行操作,或使用thenApplyAsync
等方法以並行方式執行操作。CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42); future.thenApplyAsync(result -> result * 2); // 以並行方式執行操作
等待和獲取結果:最後,要獲取 CompletableFuture 的結果,可以使用
join
或get
方法。請注意,join
是一個無異常版本,如果操作引發異常,它會將其包裝為CompletionException
。CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42); int result = future.join(); // 等待並獲取結果
除了 CompletableFuture
,還有其他類似的工具,如 Guava 的 ListenableFuture
和 RxJava 的 Observable
,它們提供了不同風格和功能的非同步處理。選擇使用哪個取決於您的項目需求和代碼風格。
還有參考到網路上的資料進行練習
import com.caster.test.entity.BookDTO;
import com.github.javafaker.Book;
import com.github.javafaker.Faker;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author caster.hsu
* @Since 2023/9/13
* example web : https://mahmoudanouti.wordpress.com/2018/01/26/20-examples-of-using-javas-completablefuture/
*/
@Slf4j
public class CompletableFutureServiceTests {
private String nowTime() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"));
}
private void randomSleep() {
try {
TimeUnit.SECONDS.sleep(RandomUtils.nextInt(2, 5));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void case_1() {
log.debug("Thread name:{}", Thread.currentThread().getName());
CompletableFuture.runAsync(() -> log.debug("case_1 > runAsync", Thread.currentThread().getName(), nowTime()));
}
public void case_2() {
log.debug("Thread name:{}", Thread.currentThread().getName());
ExecutorService fixOne = Executors.newSingleThreadExecutor();
try {
CompletableFuture.runAsync(() -> log.debug("case_2 > runAsync"), fixOne);
} finally {
fixOne.shutdown();
}
}
public void case_3() throws ExecutionException, InterruptedException {
CompletableFuture<String> future_str = CompletableFuture.completedFuture("Message")
// .thenApply(s -> { // 同步 所以都會看到是 Main thread
.thenApplyAsync(s -> { // 非同步 thread 會看到是 ForkJoinPool.commonPool-worker-XX
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("> thenApplyAsync ");
return s.toUpperCase();
});
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", future_str.get());
}
public void case_4() throws ExecutionException, InterruptedException {
ExecutorService firstPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("First_Pool-%d").get());
ExecutorService secondPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Second_Pool-%d").get());
try {
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
log.debug("First block.");
return "Message";
}, secondPool)
.thenApply(s -> {
log.debug("Change word. -> 'Disconnected'");
return "Disconnected";
})
// .thenApply(s -> { // 同步 所以都會看到是 Main thread
.thenApplyAsync(s -> { // 非同步 thread 會看到是 ForkJoinPool.commonPool-worker-XX
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("Second block.");
return s.toUpperCase();
}, firstPool);
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", future_str.get());
} finally {
firstPool.shutdown();
secondPool.shutdown();
}
}
public void case_5() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("First block.");
return "Message";
});
log.debug("Main thread is coming....");
boolean cr1 = future_str.complete("Disconnected"); // 設定完成值,後續get 就會取用到這個值. 如果已經完成了,在呼叫這方法,也不會覆蓋原本的結果. 可根據回傳boolean 值來判斷.
log.debug("cr1 >" + cr1);
// log.debug("futureReturn:{}", StringUtils.defaultString(future_str.get(1, TimeUnit.SECONDS), "defaultString"));// 設定等待值. 超時會throw TimeoutException
// log.debug("futureReturn:{}", StringUtils.defaultString(future_str.get(5, TimeUnit.SECONDS), "defaultString"));// 設定等待值. 超時會throw TimeoutException
// log.debug("futureReturn:{}", future_str.get()); // 沒設定超時,就會hand在這裡,等到回傳值。
// log.debug("futureReturn:{}", future_str.getNow("NowDefaultString")); // 如果還沒完成,則會直接取用設定值來繼續往下走,不會hand在這裡。
}
public void case_6() throws ExecutionException, InterruptedException, TimeoutException {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("测试一下异常情况");
}
return "s1";
}).exceptionally(e -> {
e.printStackTrace();
return "hello world";
}).join();
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", result); // 沒設定超時,就會hand在這裡,等到回傳值。
}
public void case_7() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1)
throw new RuntimeException("G啦");
return "A_001";
}).exceptionally(e -> {
log.debug(e.getMessage());
return "C_003";
}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
log.debug("Compose block");
return "B_002";
})).thenCombine(CompletableFuture.supplyAsync(() -> {
log.debug("Combine block one");
return "Message_001";
}), (s1, s2) -> {
log.debug("Combine block two, s1:{}, s2:{}", s1, s2);
return "Message_002";
});
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", future_str.join()); // 沒設定超時,就會hand在這裡,等到回傳值。
/**
* 結果
* 14:49:18.197 [main] DEBUG com.caster.test.future.CompletableFutureService - Main thread is coming....
* 14:49:18.197 [ForkJoinPool.commonPool-worker-37] DEBUG com.caster.test.future.CompletableFutureService - Combine block one
* 14:49:20.205 [ForkJoinPool.commonPool-worker-51] DEBUG com.caster.test.future.CompletableFutureService - java.lang.RuntimeException: G啦
* 14:49:20.206 [ForkJoinPool.commonPool-worker-37] DEBUG com.caster.test.future.CompletableFutureService - Compose block
* 14:49:20.206 [ForkJoinPool.commonPool-worker-51] DEBUG com.caster.test.future.CompletableFutureService - Combine block two, s1:B_002, s2:Message_001
* 14:49:20.208 [main] DEBUG com.caster.test.future.CompletableFutureService - futureReturn:Message_002
*/
}
public void case_8() throws ExecutionException, InterruptedException, TimeoutException {
log.debug("Main process isDaemon ? => " + Thread.currentThread().isDaemon());
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
try {
log.debug("" + Thread.currentThread().isDaemon());
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "A_001";
}).exceptionally(e -> {
log.debug(e.getMessage());
return "C_003";
});
log.debug("Main thread is coming....");
if (future_str.isDone())
log.debug("program completed.");
else
log.debug("program processing....");
log.debug("futureReturn:{}", future_str.get());
}
@Test
public void case_9() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s.toUpperCase();
});
log.debug("Main thread is coming....");
assertNull(cf.getNow(null));
log.debug("Main thread waiting....");
assertEquals("MESSAGE", cf.get());
}
@Test
public void test_6() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
log.debug(result.toString());
assertTrue(result.length() > 0, "Result was empty");
}
static volatile StringBuffer result = new StringBuffer();
static final String LOG_STR = "time:{} > 1:{}";
static final String LOG_STR_TWO = "time:{} > 2:{}";
static final String LOG_STR_THREE = "time:{} > result:{}";
@Test
public void test_7_() throws ExecutionException, InterruptedException, TimeoutException {
// StringBuilder result = new StringBuilder();
CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> {
result.append(s);
log.debug(LOG_STR_THREE, nowTime(), result);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.debug(LOG_STR, result.toString(), nowTime()); // result 會取不到值, 特別的時間差狀況.
log.debug(LOG_STR_TWO, nowTime(), result);
}
@Test
public void test_7() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_8() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "message upon cancel" : "");
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally");
try {
cf.join();
fail("Should have thrown an exception");
} catch (CompletionException ex) { // just for testing
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
}
@Test
public void test_8_1() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
.thenApplyAsync(s -> {
log.debug("doing.....");
return s.toUpperCase();
},
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); // 可延遲執行
log.debug("Main thread is coming....");
TimeUnit.SECONDS.sleep(5);
}
@Test
public void test_8_2() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
.thenApplyAsync(s -> {
log.debug("doing.....");
return s.toUpperCase();
},
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)) // 可延遲執行
.handle((s, th) -> (th != null) ? "happen exception." : "nothing happen.");
// CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
// .thenApplyAsync(s -> {
// log.debug("doing.....");
// return s.toUpperCase();
// },
// CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); // 可延遲執行
// CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "happen exception." : "nothing happen.");
log.debug("Main thread is coming....");
log.debug("result:{}", cf.get());
// log.debug("_result:{}", exceptionHandler.get());
/**
* 2023-09-14 13:48:48.000849 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - Main thread is coming....
* 2023-09-14 13:48:49.000858 [ForkJoinPool.commonPool-worker-51] DEBUG c.c.t.f.CompletableFutureServiceTests - doing.....
* 2023-09-14 13:48:49.000860 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - result:MESSAGE
* 2023-09-14 13:48:49.000861 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - _result:nothing happen.
*/
TimeUnit.SECONDS.sleep(5);
}
@Test
public void test_9() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture<String> cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue(cf.cancel(true), "Was not canceled");
assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally");
assertEquals("canceled message", cf2.join());
}
@Test
public void test_9_1() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
if (1 == 1)
throw new RuntimeException("G La......");
return s;
});
CompletableFuture<String> cf2 = cf.exceptionally(throwable -> {
if (throwable instanceof CancellationException)
return "canceled message";
log.error("canceled message, info:{}", throwable.getMessage());
return "happened other Exception.....";
});
TimeUnit.SECONDS.sleep(1);
// assertTrue(cf.cancel(true), "Was not canceled");
// assertTrue(cf.isDone(), "Was not completed");
assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally"); // 以非正常方式完成,則回傳true
// assertEquals("canceled message", cf2.join());
assertEquals("happened other Exception.....", cf2.join());
}
@Test
public void test_10() throws ExecutionException, InterruptedException, TimeoutException {
/**
* 兩個階段,再將回傳值傳給Function,但不保證哪個階段優先處理完結束。
* no guarantees on which one will be passed to the Function
*/
String original = "Message";
CompletableFuture<String> cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture<String> cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
log.debug("result:{}", cf2.get());
assertTrue(cf2.join().endsWith(" from applyToEither"));
TimeUnit.SECONDS.sleep(5);
}
private static String delayedUpperCase(String s) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("delayedUpperCase >> {}", s);
return s.toUpperCase();
}
private static String delayedLowerCase(String s) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("delayedLowerCase >> {}", s);
return s.toLowerCase();
}
@Test
public void test_11() throws ExecutionException, InterruptedException, TimeoutException {
String original = "Message";
StringBuffer result = new StringBuffer();
CompletableFuture<Void> cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> {
return delayedUpperCase(s);
})
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
log.debug("Main thread is coming....");
cf.join();
log.debug("processing....");
assertTrue(result.toString().endsWith("acceptEither"), "Result was empty");
}
@Test
public void test_11_1() throws ExecutionException, InterruptedException, TimeoutException {
List<String> messages = Arrays.asList("Msg1", "Msg2", "Msg3", "Msg4", "Msg5", "Msg6", "Msg7", "Msg8", "Msg9",
"Msg10", "Msg11", "Msg12");
ExecutorService executor = Executors.newFixedThreadPool(4);
List<String> mapResult = new ArrayList<>();
CompletableFuture<?>[] fanoutRequestList = new CompletableFuture[messages.size()];
int count = 0;
for (String msg : messages) {
CompletableFuture<?> future = CompletableFuture
.supplyAsync(() -> sendNotification(msg), executor).exceptionally(ex -> "Error")
.thenAccept(mapResult::add);
fanoutRequestList[count++] = future;
}
try {
// log.debug("Main thread is coming....");
CompletableFuture.allOf(fanoutRequestList).get();
//CompletableFuture.allOf(fanoutRequestList).join();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// log.debug("Main thread print mapResult....");
mapResult.stream().filter(s -> !s.equalsIgnoreCase("Error")).forEach(System.out::println);
}
private String sendNotification(String msg) {
return RandomStringUtils.randomAlphabetic(10) + "_" + msg;
}
@Test
public void test_12() throws ExecutionException, InterruptedException, TimeoutException {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
log.debug("Main thread is coming....");
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_13() throws ExecutionException, InterruptedException, TimeoutException {
// 可以觀察 thread
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(CompletableFutureServiceTests::delayedUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(CompletableFutureServiceTests::delayedLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}
@Test
public void test_14() throws ExecutionException, InterruptedException, TimeoutException {
// 可以觀察 thread
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
}
@Test
public void test_15() throws ExecutionException, InterruptedException, TimeoutException {
// 可以觀察 thread
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}
@Test
public void test_16() throws ExecutionException, InterruptedException, TimeoutException {
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)).thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
}
@Test
public void test_17() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = messages.stream()
// .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
log.debug("Main thread is coming....");
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
if (th == null) {
assertTrue(isUpperCase((String) res));
result.append(res);
}
}).join();
assertTrue(result.length() > 0, "Result was empty");
}
private boolean isUpperCase(String s) {
log.debug("check isUpperCase param:{}", s);
return StringUtils.isAllUpperCase(s);
}
@Test
public void test_18() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
// .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
.collect(Collectors.toList());
log.debug("Main thread is coming....");
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_19() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = messages.stream()
// .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
.collect(Collectors.toList());
log.debug("Main thread is coming....");
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_20() throws ExecutionException, InterruptedException, TimeoutException {
List<BookDTO> books_ = LongStream.rangeClosed(0, 100).mapToObj(o -> {
Book book = Faker.instance(Locale.TAIWAN).book();
return new BookDTO().setAuthor(book.author()).setGenre(book.genre()).setPublisher(book.publisher()).setTitle(book.title()).setTime(LocalDateTime.now());
}).collect(Collectors.toList());
CompletableFuture.completedFuture(books_).thenCompose(books -> {
List<CompletionStage<BookDTO>> updatedCars = books.stream()
.map(book -> rating().thenApply(r -> book.setRating(r))).collect(Collectors.toList());
CompletableFuture<Void> done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v ->
updatedCars.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}).whenComplete((books, th) -> {
if (th == null) {
books.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();
}
@Test
public void test_20_chatGPT_optimization() throws ExecutionException, InterruptedException, TimeoutException {
List<BookDTO> books_ = LongStream.rangeClosed(0, 100).mapToObj(o -> {
Book book = Faker.instance(Locale.TAIWAN).book();
return new BookDTO().setAuthor(book.author()).setGenre(book.genre()).setPublisher(book.publisher()).setTitle(book.title()).setTime(LocalDateTime.now());
}).collect(Collectors.toList());
List<CompletableFuture<BookDTO>> updatedBooks = books_.stream()
.map(book -> rating().thenApply(r -> book.setRating(r)))
.collect(Collectors.toList());
CompletableFuture<Void> allOf = CompletableFuture.allOf(updatedBooks.toArray(new CompletableFuture[0]));
List<BookDTO> completedBooks = allOf.thenApply(v ->
updatedBooks.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).join();
completedBooks.forEach(System.out::println);
}
private CompletableFuture<Float> rating() {
return CompletableFuture.completedFuture(RandomUtils.nextFloat(0f, 1f));
}
接過這20個範例練習過後,基本上可以掌握到不少用法,並且可以自行研究衍生用法。 前陣子研究玩Redisson 分布式鎖機制,藉由此次機會來個異種結合,並且觀看其變化。
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.*;
import java.util.stream.LongStream;
/**
* @author caster.hsu
* @Since 2023/4/27
*/
@Slf4j
public class CyclicBarrierRedisLockTest {
private CyclicBarrier cyclicBarrier;
private CyclicBarrier cyclicBarrierSecond;
private static long startTime;
public void mainProcess(int totalWorkers) {
cyclicBarrier = new CyclicBarrier(totalWorkers, () -> startTime = System.currentTimeMillis());
cyclicBarrierSecond = new CyclicBarrier(totalWorkers, () -> log.debug("共 {} 條 Async Thread, 總花費時間:{}(ms)", totalWorkers, (System.currentTimeMillis() - startTime)));
ExecutorService defaultPool = Executors.newFixedThreadPool(totalWorkers, new ThreadFactoryBuilder().setNameFormat("Customer_Pool-%d").get());
LongStream.rangeClosed(1, totalWorkers).forEach(o -> {
CompletableFuture.completedFuture(cyclicBarrier)
.thenAcceptBothAsync(
CompletableFuture.completedFuture(cyclicBarrierSecond), (s1, s2) -> {
try {
log.debug("await()");
s1.await();
// todo: do something
tryToLockByRedisson();
log.debug("second await()");
s2.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}, defaultPool);
});
}
public static final String REDISSON_LOCK = "redisson";
private void tryToLockByRedisson() {
try {
RLock rLock = redissonClient.getLock(REDISSON_LOCK);
// 嘗試鎖定
boolean lockSuccess = rLock.tryLock(5, 0, TimeUnit.SECONDS); // 等待時間內收到釋放鎖消息,則重新去競爭鎖
if (lockSuccess) {
log.debug("got key and do something.......");
// 如果當前線程鎖住並且持有該鎖
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
log.debug("release lock success.");
rLock.unlock();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static RedissonClient redissonClient = null;
@Test
public void redissonLockWithCyclicBarrierTest() throws InterruptedException {
RedissonLockTest redissonLockTest = new RedissonLockTest();
redissonClient = redissonLockTest.redissonClient();
try {
mainProcess(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
TimeUnit.SECONDS.sleep(5);
// 關閉 Redisson 客戶端
redissonClient.shutdown();
}
}
}
console log
##redis connection pool init
2023-09-18 15:16:39.170 [redisson-netty-2-19] INFO o.r.c.pool.MasterConnectionPool - 24 connections initialized for 172.20.160.120/172.20.160.120:6379
## async thread ready to process & sync start
2023-09-18 15:16:39.196 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
2023-09-18 15:16:39.196 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - await()
## start processing
2023-09-18 15:16:39.222 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.226 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.229 [Customer_Pool-4] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.237 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.240 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.242 [Customer_Pool-2] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.244 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.259 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.262 [Customer_Pool-0] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.264 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.267 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.268 [Customer_Pool-3] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.272 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - got key, and do something.......
2023-09-18 15:16:39.274 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - release lock success.
2023-09-18 15:16:39.276 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - second await()
2023-09-18 15:16:39.276 [Customer_Pool-1] DEBUG c.c.test.CyclicBarrierRedisLockTest - 共 5 條 Async Thread, 總花費時間:80(ms)
## redisson client shutdown
2023-09-18 15:16:44.223 [redisson-netty-2-2] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-2
2023-09-18 15:16:44.223 [redisson-netty-2-1] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-1
2023-09-18 15:16:44.223 [redisson-netty-2-4] DEBUG io.netty.buffer.PoolThreadCache - Freed 4 thread-local buffer(s) from thread: redisson-netty-2-4
2023-09-18 15:16:44.223 [redisson-netty-2-3] DEBUG io.netty.buffer.PoolThreadCache - Freed 4 thread-local buffer(s) from thread: redisson-netty-2-3
2023-09-18 15:16:44.223 [redisson-netty-2-8] DEBUG io.netty.buffer.PoolThreadCache - Freed 9 thread-local buffer(s) from thread: redisson-netty-2-8
2023-09-18 15:16:44.224 [redisson-netty-2-10] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-10
2023-09-18 15:16:44.223 [redisson-netty-2-6] DEBUG io.netty.buffer.PoolThreadCache - Freed 9 thread-local buffer(s) from thread: redisson-netty-2-6
2023-09-18 15:16:44.225 [redisson-netty-2-12] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-12
2023-09-18 15:16:44.225 [redisson-netty-2-14] DEBUG io.netty.buffer.PoolThreadCache - Freed 5 thread-local buffer(s) from thread: redisson-netty-2-14
2023-09-18 15:16:44.225 [redisson-netty-2-16] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-16
2023-09-18 15:16:44.225 [redisson-netty-2-20] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-20
2023-09-18 15:16:44.225 [redisson-netty-2-22] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-22
2023-09-18 15:16:44.226 [redisson-netty-2-18] DEBUG io.netty.buffer.PoolThreadCache - Freed 9 thread-local buffer(s) from thread: redisson-netty-2-18
2023-09-18 15:16:44.226 [redisson-netty-2-24] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-24
2023-09-18 15:16:44.226 [redisson-netty-2-28] DEBUG io.netty.buffer.PoolThreadCache - Freed 4 thread-local buffer(s) from thread: redisson-netty-2-28
2023-09-18 15:16:44.226 [redisson-netty-2-26] DEBUG io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: redisson-netty-2-26
2023-09-18 15:16:44.226 [redisson-netty-2-30] DEBUG io.netty.buffer.PoolThreadCache - Freed 5 thread-local buffer(s) from thread: redisson-netty-2-30
一方面可以確認分散式鎖正常運作,一方面透過async方式確認程式運作順暢,Java8真的藏很多東西可以學。
Last updated