검색결과 리스트
Sync에 해당되는 글 1건
- 2021.05.20 java 8 정리5
글
java 8 정리5
인프런 강의 7일차.
- 더 자바, Java 8 (백기선 강사님)
1. CompletableFuture 1
- 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스.
> Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.
- Future로는 하기 어렵던 작업들
> Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
> 블로킹 코드(get())을 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
> 여러 Future를 조합할 수 없다. ex) Event 정보 가져온 다음 Event에 참석하는 회원의 목록 가져오기
> 예외 처리용 API를 제공하지 않는다.
- CompletableFuture
> Implements Future
> Impletments CompletableFuture
- 비동기로 작업 실행하기
> 리턴값이 없는 경우 : runAsync()
> 리턴값이 있는 경우 : supplySync()
> 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다.(기본은 ForkJoinPool.commonPool())
- 콜백 제공하기
> thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백
> thenAccept(Consumer) : 리턴값을 또 다른 작업으로 처리하는 콜백 (리턴없이)
> thenRun(Runnable) : 리턴값을 받아 다른 작업을 처리하는 콜백
> 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
- 조합하기
> thenCompose() : 두 작업이 서로 이어서 실행하도록 조합
> thenCombine() : 두 작업을 독립적으로 실행하고, 둘 다 종료했을 때 콜백 실행
> allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
> anyOf() : 여러 작업 중 가장 빨리 끝난 하나의 결과에 콜백 실행
- 예외처리
> exceptionally(Function) : exception 발생 시 처리하는 코드
> handle(BiFunction) : handle은 예외가 발생했을 때, 발생하지 않았을 때 둘 다 사용가능하다
package me.whiteship.java8to11;
import ch.qos.logback.core.util.ExecutorServiceUtil;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
future.get(); //get이 블로킹 코드이니 get 이후에 콜백 로직이 들어와야한다.
//Case1. 명시적으로 dhpark 값 세팅해서 선언
CompletableFuture<String> completableFuture = new CompletableFuture<>();
completableFuture.complete("dhpark"); //completablefuture의 기본 값을 dhpark으로 정의
System.out.println(completableFuture.get()); //dhpark 출력
//Case2. static factory method 사용
CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("dhpark");
System.out.println(completableFuture1.get()); //dhpark 출력
//리턴이 없는 작업은 runAsync 사용
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName()); //future만 정의했기 때문에 수행되지 않고, join()이나 get()을 해야 동작한다.
});
System.out.println(completableFuture2.get());
//리턴이 있는 경우 supplyAsync 사용
CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
System.out.println(completableFuture3.get());
//위의 예제들은 Future로도 구현 가능한 기능이라고 봐도 무방하다.
//CompletableFuture는 추가로 callback 구현이 가능.
//리턴이 있는 supplyAsync에 callback구현(thenApply)
CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenApply((s) -> { //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(completableFuture4.get()); //get을 호출하지 않으면 아무 일이 일어나지 않는건 동일함
//리턴이 없는 콜백의 경우 thenAccept
CompletableFuture<Void> completableFuture5 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenAccept((s) -> { //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
completableFuture5.get(); //get을 호출하지 않으면 아무 일이 일어나지 않는건 동일함
//전달되는 파라미터(리턴) 없이 동작을 하기만 하면 되는 것은 thenRun
//별다른 Executor 정의가 없다면 ForkJoinPool.commonPool을 사용하게 된다.
//ForkJoinPool : Java7에 도입된 기능, Executor를 구현한 구현체. 작업을 DeQueue를 사용해서 자기 쓰레드가 할일이 없으면 할일을 가져와서 처리하는 방식
//자기가 파생시키는 subTask들을 다른 쓰레드에 분산시켜서 작업을 처리하고, 결과를 모아서 최종 결과값을 도출해냄
//원할 경우 Executor를 새로 정의하는 것도 가능하다.
//새로 정의한 경우 supplyAsync의 2번째 인자로 전달해야한다.
ExecutorService executorService1 = Executors.newFixedThreadPool(4);
CompletableFuture<Void> completableFuture6 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}, executorService1).thenRun(() -> { //새로 정의한 executorService를 사용해서 쓰레드 호출
//ExecutorService를 추가 하지 않은 경우 Hello ForkJoinPool.commonPool-worker 출력
//ExecutorService를 추가한 경우 Hello pool-1-thread-1 출력
//java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
System.out.println(Thread.currentThread().getName());
});
//}, executorService1); //callback에서도 정의된 executor를 사용할 수 있다. 이 때 callback은 @async를 사용해야 한다. thenRunAsync, thenApplyAsync, ...
//다만 supplyAsync때 사용된 쓰레드와 callback에서 사용된 쓰레드가 다를 수 있다!
//쓰레드 조합해서 사용하기
//thenCompose
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
//Case1. 2개의 쓰레드 작업 조합
CompletableFuture<String> completableFuture7 = hello.thenCompose(App::getWorld);
System.out.println(completableFuture7.get()); //Hello World 출력
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World" + Thread.currentThread().getName());
return "World";
});
//Case2. 입력 값은 2개, 결과 값은 1개로 처리
CompletableFuture<String> completableFuture8 = hello.thenCombine(world, (h, w) -> h + " " + w); //BiFunction에 해당하는 람다 실행
System.out.println(completableFuture8.get()); //hello world 출력
//Case3. 2개 이상의 태스크를 합쳐서 한번에 실행
//인자로 넘어가는 태스크의 결과가 동일한 타입인지 보장할 수 없고, 태스크가 전부 성공한다는 보장도 없기에 결과가 무의미하다.
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(hello, world)
.thenAccept(System.out::println); //thenAccept가 실행되어서 null이 출력됨
System.out.println(voidCompletableFuture.get()); //결과가 null이다.
//Case3의 결과가 null이기에 해당 결과를 출력시킬 수 있는 방법은 Collection으로 모아서 한데 처리해야한다.
List<CompletableFuture<String>> completableFutures = Arrays.asList(hello, world);
CompletableFuture[] futuresArray = completableFutures.toArray(new CompletableFuture[completableFutures.size()]);
CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
.thenApply(v -> { //결과인 v는 무의미하고, return이 중요하다.
return completableFutures.stream()
//thenApply가 호출되는 시점은 futuresArray의 모든 작업이 끝난 상태이다!
//CompletableFuture.get()을 써도 되나 get은 checkedException이 발생하므로 exception까지 정의해줘야 가능함.
.map(CompletableFuture::join) //join은 uncheckedException이 발생
.collect(Collectors.toList());
});
listCompletableFuture.get().forEach(System.out::println); //Hello \n World 출력됨
//Case4. 아무거나 빨리 끝나는거 결과 하나 받아서 실행
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
voidCompletableFuture1.get(); //hello랑 world중에 먼저 끝나는 작업 출력
//Case5. Exception 처리
boolean throwError = true;
CompletableFuture<String> exceptionHello = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalStateException();
}
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).exceptionally(ex -> { //에러 발생 시 수행하는 코드
System.out.println(ex);
return "Error!";
});
//Case6. handle은 예외가 발생했을 때, 발생하지 않았을 때 둘 다 사용가능하다 (BiFunction)
CompletableFuture<String> exceptionHello1 = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalStateException();
}
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).handle((result, ex) -> {
if(ex != null){
System.out.println(ex);
return "Error!";
}
return result; //에러가 없으면 결과 리턴
});
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + "World";
});
}
}
'Java 정리' 카테고리의 다른 글
java 8 정리7 (0) | 2021.05.26 |
---|---|
java 8 정리6 (0) | 2021.05.21 |
java 8 정리4 (0) | 2021.05.19 |
java 8 정리3 (0) | 2021.05.17 |
java 8 정리2 (0) | 2021.05.13 |