import com.caster.test.entity.BookDTO;
import com.github.javafaker.Book;
import com.github.javafaker.Faker;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author caster.hsu
* @Since 2023/9/13
* example web : https://mahmoudanouti.wordpress.com/2018/01/26/20-examples-of-using-javas-completablefuture/
*/
@Slf4j
public class CompletableFutureServiceTests {
private String nowTime() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"));
}
private void randomSleep() {
try {
TimeUnit.SECONDS.sleep(RandomUtils.nextInt(2, 5));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void case_1() {
log.debug("Thread name:{}", Thread.currentThread().getName());
CompletableFuture.runAsync(() -> log.debug("case_1 > runAsync", Thread.currentThread().getName(), nowTime()));
}
public void case_2() {
log.debug("Thread name:{}", Thread.currentThread().getName());
ExecutorService fixOne = Executors.newSingleThreadExecutor();
try {
CompletableFuture.runAsync(() -> log.debug("case_2 > runAsync"), fixOne);
} finally {
fixOne.shutdown();
}
}
public void case_3() throws ExecutionException, InterruptedException {
CompletableFuture<String> future_str = CompletableFuture.completedFuture("Message")
// .thenApply(s -> { // 同步 所以都會看到是 Main thread
.thenApplyAsync(s -> { // 非同步 thread 會看到是 ForkJoinPool.commonPool-worker-XX
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("> thenApplyAsync ");
return s.toUpperCase();
});
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", future_str.get());
}
public void case_4() throws ExecutionException, InterruptedException {
ExecutorService firstPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("First_Pool-%d").get());
ExecutorService secondPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Second_Pool-%d").get());
try {
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
log.debug("First block.");
return "Message";
}, secondPool)
.thenApply(s -> {
log.debug("Change word. -> 'Disconnected'");
return "Disconnected";
})
// .thenApply(s -> { // 同步 所以都會看到是 Main thread
.thenApplyAsync(s -> { // 非同步 thread 會看到是 ForkJoinPool.commonPool-worker-XX
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("Second block.");
return s.toUpperCase();
}, firstPool);
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", future_str.get());
} finally {
firstPool.shutdown();
secondPool.shutdown();
}
}
public void case_5() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("First block.");
return "Message";
});
log.debug("Main thread is coming....");
boolean cr1 = future_str.complete("Disconnected"); // 設定完成值,後續get 就會取用到這個值. 如果已經完成了,在呼叫這方法,也不會覆蓋原本的結果. 可根據回傳boolean 值來判斷.
log.debug("cr1 >" + cr1);
// log.debug("futureReturn:{}", StringUtils.defaultString(future_str.get(1, TimeUnit.SECONDS), "defaultString"));// 設定等待值. 超時會throw TimeoutException
// log.debug("futureReturn:{}", StringUtils.defaultString(future_str.get(5, TimeUnit.SECONDS), "defaultString"));// 設定等待值. 超時會throw TimeoutException
// log.debug("futureReturn:{}", future_str.get()); // 沒設定超時,就會hand在這裡,等到回傳值。
// log.debug("futureReturn:{}", future_str.getNow("NowDefaultString")); // 如果還沒完成,則會直接取用設定值來繼續往下走,不會hand在這裡。
}
public void case_6() throws ExecutionException, InterruptedException, TimeoutException {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("测试一下异常情况");
}
return "s1";
}).exceptionally(e -> {
e.printStackTrace();
return "hello world";
}).join();
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", result); // 沒設定超時,就會hand在這裡,等到回傳值。
}
public void case_7() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1)
throw new RuntimeException("G啦");
return "A_001";
}).exceptionally(e -> {
log.debug(e.getMessage());
return "C_003";
}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
log.debug("Compose block");
return "B_002";
})).thenCombine(CompletableFuture.supplyAsync(() -> {
log.debug("Combine block one");
return "Message_001";
}), (s1, s2) -> {
log.debug("Combine block two, s1:{}, s2:{}", s1, s2);
return "Message_002";
});
log.debug("Main thread is coming....");
log.debug("futureReturn:{}", future_str.join()); // 沒設定超時,就會hand在這裡,等到回傳值。
/**
* 結果
* 14:49:18.197 [main] DEBUG com.caster.test.future.CompletableFutureService - Main thread is coming....
* 14:49:18.197 [ForkJoinPool.commonPool-worker-37] DEBUG com.caster.test.future.CompletableFutureService - Combine block one
* 14:49:20.205 [ForkJoinPool.commonPool-worker-51] DEBUG com.caster.test.future.CompletableFutureService - java.lang.RuntimeException: G啦
* 14:49:20.206 [ForkJoinPool.commonPool-worker-37] DEBUG com.caster.test.future.CompletableFutureService - Compose block
* 14:49:20.206 [ForkJoinPool.commonPool-worker-51] DEBUG com.caster.test.future.CompletableFutureService - Combine block two, s1:B_002, s2:Message_001
* 14:49:20.208 [main] DEBUG com.caster.test.future.CompletableFutureService - futureReturn:Message_002
*/
}
public void case_8() throws ExecutionException, InterruptedException, TimeoutException {
log.debug("Main process isDaemon ? => " + Thread.currentThread().isDaemon());
CompletableFuture<String> future_str = CompletableFuture.supplyAsync(() -> {
try {
log.debug("" + Thread.currentThread().isDaemon());
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "A_001";
}).exceptionally(e -> {
log.debug(e.getMessage());
return "C_003";
});
log.debug("Main thread is coming....");
if (future_str.isDone())
log.debug("program completed.");
else
log.debug("program processing....");
log.debug("futureReturn:{}", future_str.get());
}
@Test
public void case_9() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s.toUpperCase();
});
log.debug("Main thread is coming....");
assertNull(cf.getNow(null));
log.debug("Main thread waiting....");
assertEquals("MESSAGE", cf.get());
}
@Test
public void test_6() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
log.debug(result.toString());
assertTrue(result.length() > 0, "Result was empty");
}
static volatile StringBuffer result = new StringBuffer();
static final String LOG_STR = "time:{} > 1:{}";
static final String LOG_STR_TWO = "time:{} > 2:{}";
static final String LOG_STR_THREE = "time:{} > result:{}";
@Test
public void test_7_() throws ExecutionException, InterruptedException, TimeoutException {
// StringBuilder result = new StringBuilder();
CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> {
result.append(s);
log.debug(LOG_STR_THREE, nowTime(), result);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.debug(LOG_STR, result.toString(), nowTime()); // result 會取不到值, 特別的時間差狀況.
log.debug(LOG_STR_TWO, nowTime(), result);
}
@Test
public void test_7() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_8() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "message upon cancel" : "");
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally");
try {
cf.join();
fail("Should have thrown an exception");
} catch (CompletionException ex) { // just for testing
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
}
@Test
public void test_8_1() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
.thenApplyAsync(s -> {
log.debug("doing.....");
return s.toUpperCase();
},
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); // 可延遲執行
log.debug("Main thread is coming....");
TimeUnit.SECONDS.sleep(5);
}
@Test
public void test_8_2() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
.thenApplyAsync(s -> {
log.debug("doing.....");
return s.toUpperCase();
},
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)) // 可延遲執行
.handle((s, th) -> (th != null) ? "happen exception." : "nothing happen.");
// CompletableFuture<String> cf = CompletableFuture.completedFuture("message")
// .thenApplyAsync(s -> {
// log.debug("doing.....");
// return s.toUpperCase();
// },
// CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); // 可延遲執行
// CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "happen exception." : "nothing happen.");
log.debug("Main thread is coming....");
log.debug("result:{}", cf.get());
// log.debug("_result:{}", exceptionHandler.get());
/**
* 2023-09-14 13:48:48.000849 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - Main thread is coming....
* 2023-09-14 13:48:49.000858 [ForkJoinPool.commonPool-worker-51] DEBUG c.c.t.f.CompletableFutureServiceTests - doing.....
* 2023-09-14 13:48:49.000860 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - result:MESSAGE
* 2023-09-14 13:48:49.000861 [main] DEBUG c.c.t.f.CompletableFutureServiceTests - _result:nothing happen.
*/
TimeUnit.SECONDS.sleep(5);
}
@Test
public void test_9() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture<String> cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue(cf.cancel(true), "Was not canceled");
assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally");
assertEquals("canceled message", cf2.join());
}
@Test
public void test_9_1() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
if (1 == 1)
throw new RuntimeException("G La......");
return s;
});
CompletableFuture<String> cf2 = cf.exceptionally(throwable -> {
if (throwable instanceof CancellationException)
return "canceled message";
log.error("canceled message, info:{}", throwable.getMessage());
return "happened other Exception.....";
});
TimeUnit.SECONDS.sleep(1);
// assertTrue(cf.cancel(true), "Was not canceled");
// assertTrue(cf.isDone(), "Was not completed");
assertTrue(cf.isCompletedExceptionally(), "Was not completed exceptionally"); // 以非正常方式完成,則回傳true
// assertEquals("canceled message", cf2.join());
assertEquals("happened other Exception.....", cf2.join());
}
@Test
public void test_10() throws ExecutionException, InterruptedException, TimeoutException {
/**
* 兩個階段,再將回傳值傳給Function,但不保證哪個階段優先處理完結束。
* no guarantees on which one will be passed to the Function
*/
String original = "Message";
CompletableFuture<String> cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture<String> cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
log.debug("result:{}", cf2.get());
assertTrue(cf2.join().endsWith(" from applyToEither"));
TimeUnit.SECONDS.sleep(5);
}
private static String delayedUpperCase(String s) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("delayedUpperCase >> {}", s);
return s.toUpperCase();
}
private static String delayedLowerCase(String s) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("delayedLowerCase >> {}", s);
return s.toLowerCase();
}
@Test
public void test_11() throws ExecutionException, InterruptedException, TimeoutException {
String original = "Message";
StringBuffer result = new StringBuffer();
CompletableFuture<Void> cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> {
return delayedUpperCase(s);
})
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
log.debug("Main thread is coming....");
cf.join();
log.debug("processing....");
assertTrue(result.toString().endsWith("acceptEither"), "Result was empty");
}
@Test
public void test_11_1() throws ExecutionException, InterruptedException, TimeoutException {
List<String> messages = Arrays.asList("Msg1", "Msg2", "Msg3", "Msg4", "Msg5", "Msg6", "Msg7", "Msg8", "Msg9",
"Msg10", "Msg11", "Msg12");
ExecutorService executor = Executors.newFixedThreadPool(4);
List<String> mapResult = new ArrayList<>();
CompletableFuture<?>[] fanoutRequestList = new CompletableFuture[messages.size()];
int count = 0;
for (String msg : messages) {
CompletableFuture<?> future = CompletableFuture
.supplyAsync(() -> sendNotification(msg), executor).exceptionally(ex -> "Error")
.thenAccept(mapResult::add);
fanoutRequestList[count++] = future;
}
try {
// log.debug("Main thread is coming....");
CompletableFuture.allOf(fanoutRequestList).get();
//CompletableFuture.allOf(fanoutRequestList).join();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// log.debug("Main thread print mapResult....");
mapResult.stream().filter(s -> !s.equalsIgnoreCase("Error")).forEach(System.out::println);
}
private String sendNotification(String msg) {
return RandomStringUtils.randomAlphabetic(10) + "_" + msg;
}
@Test
public void test_12() throws ExecutionException, InterruptedException, TimeoutException {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
log.debug("Main thread is coming....");
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_13() throws ExecutionException, InterruptedException, TimeoutException {
// 可以觀察 thread
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(CompletableFutureServiceTests::delayedUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(CompletableFutureServiceTests::delayedLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}
@Test
public void test_14() throws ExecutionException, InterruptedException, TimeoutException {
// 可以觀察 thread
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
}
@Test
public void test_15() throws ExecutionException, InterruptedException, TimeoutException {
// 可以觀察 thread
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}
@Test
public void test_16() throws ExecutionException, InterruptedException, TimeoutException {
String original = "Message";
CompletableFuture<String> cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)).thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
}
@Test
public void test_17() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = messages.stream()
// .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
log.debug("Main thread is coming....");
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
if (th == null) {
assertTrue(isUpperCase((String) res));
result.append(res);
}
}).join();
assertTrue(result.length() > 0, "Result was empty");
}
private boolean isUpperCase(String s) {
log.debug("check isUpperCase param:{}", s);
return StringUtils.isAllUpperCase(s);
}
@Test
public void test_18() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
// .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
.collect(Collectors.toList());
log.debug("Main thread is coming....");
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_19() throws ExecutionException, InterruptedException, TimeoutException {
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
List<CompletableFuture<String>> futures = messages.stream()
// .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) // 要加 join or get
.collect(Collectors.toList());
log.debug("Main thread is coming....");
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
allOf.join();
assertTrue(result.length() > 0, "Result was empty");
}
@Test
public void test_20() throws ExecutionException, InterruptedException, TimeoutException {
List<BookDTO> books_ = LongStream.rangeClosed(0, 100).mapToObj(o -> {
Book book = Faker.instance(Locale.TAIWAN).book();
return new BookDTO().setAuthor(book.author()).setGenre(book.genre()).setPublisher(book.publisher()).setTitle(book.title()).setTime(LocalDateTime.now());
}).collect(Collectors.toList());
CompletableFuture.completedFuture(books_).thenCompose(books -> {
List<CompletionStage<BookDTO>> updatedCars = books.stream()
.map(book -> rating().thenApply(r -> book.setRating(r))).collect(Collectors.toList());
CompletableFuture<Void> done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v ->
updatedCars.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}).whenComplete((books, th) -> {
if (th == null) {
books.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();
}
@Test
public void test_20_chatGPT_optimization() throws ExecutionException, InterruptedException, TimeoutException {
List<BookDTO> books_ = LongStream.rangeClosed(0, 100).mapToObj(o -> {
Book book = Faker.instance(Locale.TAIWAN).book();
return new BookDTO().setAuthor(book.author()).setGenre(book.genre()).setPublisher(book.publisher()).setTitle(book.title()).setTime(LocalDateTime.now());
}).collect(Collectors.toList());
List<CompletableFuture<BookDTO>> updatedBooks = books_.stream()
.map(book -> rating().thenApply(r -> book.setRating(r)))
.collect(Collectors.toList());
CompletableFuture<Void> allOf = CompletableFuture.allOf(updatedBooks.toArray(new CompletableFuture[0]));
List<BookDTO> completedBooks = allOf.thenApply(v ->
updatedBooks.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).join();
completedBooks.forEach(System.out::println);
}
private CompletableFuture<Float> rating() {
return CompletableFuture.completedFuture(RandomUtils.nextFloat(0f, 1f));
}