java 8 정리5

Java 정리 2021. 5. 20. 15:40

인프런 강의 7일차.

 - 더 자바, Java 8 (백기선 강사님)

 

1. CompletableFuture 1

 - 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스.

   > Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.

 

 - Future로는 하기 어렵던 작업들

   > Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.

   > 블로킹 코드(get())을 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.

   > 여러 Future를 조합할 수 없다. ex) Event 정보 가져온 다음 Event에 참석하는 회원의 목록 가져오기

   > 예외 처리용 API를 제공하지 않는다.

 

 - CompletableFuture

   > Implements Future

   > Impletments CompletableFuture

 

 - 비동기로 작업 실행하기

   > 리턴값이 없는 경우 : runAsync()

   > 리턴값이 있는 경우 : supplySync()

   > 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다.(기본은 ForkJoinPool.commonPool())

 

 - 콜백 제공하기

   > thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백

   > thenAccept(Consumer) : 리턴값을 또 다른 작업으로 처리하는 콜백 (리턴없이)

   > thenRun(Runnable) : 리턴값을 받아 다른 작업을 처리하는 콜백

   > 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.

 

 - 조합하기

   > thenCompose() : 두 작업이 서로 이어서 실행하도록 조합

   > thenCombine() : 두 작업을 독립적으로 실행하고, 둘 다 종료했을 때 콜백 실행

   > allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

   > anyOf() : 여러 작업 중 가장 빨리 끝난 하나의 결과에 콜백 실행

 

 - 예외처리

   > exceptionally(Function) : exception 발생 시 처리하는 코드

   > handle(BiFunction) : handle은 예외가 발생했을 때, 발생하지 않았을 때 둘 다 사용가능하다

 

package me.whiteship.java8to11;

import ch.qos.logback.core.util.ExecutorServiceUtil;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class App {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> future = executorService.submit(() -> "hello");

        future.get();   //get이 블로킹 코드이니 get 이후에 콜백 로직이 들어와야한다.

        //Case1. 명시적으로 dhpark 값 세팅해서 선언
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        completableFuture.complete("dhpark");   //completablefuture의 기본 값을 dhpark으로 정의
        System.out.println(completableFuture.get());    //dhpark 출력

        //Case2. static factory method 사용
        CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("dhpark");
        System.out.println(completableFuture1.get());    //dhpark 출력

        //리턴이 없는 작업은 runAsync 사용
        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());        //future만 정의했기 때문에 수행되지 않고, join()이나 get()을 해야 동작한다.
        });
        System.out.println(completableFuture2.get());

        //리턴이 있는 경우 supplyAsync 사용
        CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        System.out.println(completableFuture3.get());

        //위의 예제들은 Future로도 구현 가능한 기능이라고 봐도 무방하다.
        //CompletableFuture는 추가로 callback 구현이 가능.
        //리턴이 있는 supplyAsync에 callback구현(thenApply)
        CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply((s) -> {       //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
            System.out.println(Thread.currentThread().getName());
            return s.toUpperCase();
        });
        System.out.println(completableFuture4.get());       //get을 호출하지 않으면 아무 일이 일어나지 않는건 동일함

        //리턴이 없는 콜백의 경우 thenAccept
        CompletableFuture<Void> completableFuture5 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept((s) -> {       //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
            System.out.println(Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        });
        completableFuture5.get();       //get을 호출하지 않으면 아무 일이 일어나지 않는건 동일함

        //전달되는 파라미터(리턴) 없이 동작을 하기만 하면 되는 것은 thenRun
        //별다른 Executor 정의가 없다면 ForkJoinPool.commonPool을 사용하게 된다.
        //ForkJoinPool : Java7에 도입된 기능, Executor를 구현한 구현체. 작업을 DeQueue를 사용해서 자기 쓰레드가 할일이 없으면 할일을 가져와서 처리하는 방식
        //자기가 파생시키는 subTask들을 다른 쓰레드에 분산시켜서 작업을 처리하고, 결과를 모아서 최종 결과값을 도출해냄
        //원할 경우 Executor를 새로 정의하는 것도 가능하다.
        //새로 정의한 경우 supplyAsync의 2번째 인자로 전달해야한다.
        ExecutorService executorService1 = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> completableFuture6 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }, executorService1).thenRun(() -> {    //새로 정의한 executorService를 사용해서 쓰레드 호출
            //ExecutorService를 추가 하지 않은 경우 Hello ForkJoinPool.commonPool-worker 출력
            //ExecutorService를 추가한 경우 Hello pool-1-thread-1 출력
            //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
            System.out.println(Thread.currentThread().getName());
        });
        //}, executorService1);  //callback에서도 정의된 executor를 사용할 수 있다. 이 때 callback은 @async를 사용해야 한다. thenRunAsync, thenApplyAsync, ...
        //다만 supplyAsync때 사용된 쓰레드와 callback에서 사용된 쓰레드가 다를 수 있다!

        //쓰레드 조합해서 사용하기
        //thenCompose

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        //Case1. 2개의 쓰레드 작업 조합
        CompletableFuture<String> completableFuture7 = hello.thenCompose(App::getWorld);
        System.out.println(completableFuture7.get());       //Hello World 출력

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World" + Thread.currentThread().getName());
            return "World";
        });

        //Case2. 입력 값은 2개, 결과 값은 1개로 처리
        CompletableFuture<String> completableFuture8 = hello.thenCombine(world, (h, w) -> h + " " + w); //BiFunction에 해당하는 람다 실행
        System.out.println(completableFuture8.get());   //hello world 출력

        //Case3. 2개 이상의 태스크를 합쳐서 한번에 실행
        //인자로 넘어가는 태스크의 결과가 동일한 타입인지 보장할 수 없고, 태스크가 전부 성공한다는 보장도 없기에 결과가 무의미하다.
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(hello, world)
                .thenAccept(System.out::println);       //thenAccept가 실행되어서 null이 출력됨
        System.out.println(voidCompletableFuture.get());        //결과가 null이다.

        //Case3의 결과가 null이기에 해당 결과를 출력시킬 수 있는 방법은 Collection으로 모아서 한데 처리해야한다.
        List<CompletableFuture<String>> completableFutures = Arrays.asList(hello, world);
        CompletableFuture[] futuresArray = completableFutures.toArray(new CompletableFuture[completableFutures.size()]);
        CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
                .thenApply(v -> {   //결과인 v는 무의미하고, return이 중요하다.
                    return completableFutures.stream()
                            //thenApply가 호출되는 시점은 futuresArray의 모든 작업이 끝난 상태이다!
                            //CompletableFuture.get()을 써도 되나 get은 checkedException이 발생하므로 exception까지 정의해줘야 가능함.
                            .map(CompletableFuture::join)       //join은 uncheckedException이 발생
                            .collect(Collectors.toList());

                });
        listCompletableFuture.get().forEach(System.out::println);       //Hello \n World 출력됨

        //Case4. 아무거나 빨리 끝나는거 결과 하나 받아서 실행
        CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
        voidCompletableFuture1.get();    //hello랑 world중에 먼저 끝나는 작업 출력

        //Case5. Exception 처리
        boolean throwError = true;
        CompletableFuture<String> exceptionHello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalStateException();
            }

            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {    //에러 발생 시 수행하는 코드
            System.out.println(ex);
            return "Error!";
        });

        //Case6. handle은 예외가 발생했을 때, 발생하지 않았을 때 둘 다 사용가능하다 (BiFunction)
        CompletableFuture<String> exceptionHello1 = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalStateException();
            }

            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) -> {
            if(ex != null){
                System.out.println(ex);
                return "Error!";
            }
            return result;      //에러가 없으면 결과 리턴
        });

    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + "World";
        });
    }
}

'Java 정리' 카테고리의 다른 글

java 8 정리7  (0) 2021.05.26
java 8 정리6  (0) 2021.05.21
java 8 정리4  (0) 2021.05.19
java 8 정리3  (0) 2021.05.17
java 8 정리2  (0) 2021.05.13