java 8 정리7

Java 정리 2021. 5. 26. 12:03

인프런 강의 9일차.

 - 더 자바, Java 8 (백기선 강사님)

 

1. Callable

 - Runnable과 유사하지만 작업의 결과를 받을 수 있다.

 

2. Future

 - 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

 - 결과를 가져오기 get()

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
	Thread.sleep(1000);
   	return "Callable";
});
System.out.println("Hello");
String result = helloFuture.get();
System.out.println(result);
executorService.shutdown();

 - 블록킹 콜이다.

 - 타임아웃(최대한 기다릴 시간)을 정할 수 있다.

 

3. 작업 상태 확인하기 isDone()

 - 완료 했으면 true, 아니면 false를 리턴

 

4. 작업 취소하기 cancel()

 - 취소 했으면 true, 못했으면 false를 리턴

 - parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날 때 까지 기다린다.

 

5. 여러 작업 동시에 실행하기 invokeAll()

 - 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.

 

6. 여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()

 - 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.

 - 블록킹 콜이다.

 

package me.whiteship.java8to11;

import java.sql.SQLOutput;
import java.util.Arrays;
import java.util.concurrent.*;

public class App {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };

        Callable<String> java = () -> {
            Thread.sleep(3000L);
            return "Java";
        };

        Callable<String> dhpark = () -> {
            Thread.sleep(1000L);
            return "dhpark";
        };

        //3개의 Callable을 하나의 Collection으로 넘겨서 주면 Future의 List가 나오고, 그에 대해 3개의 Callable을 한번에 수행한다.
        executorService.invokeAll(Arrays.asList(hello, java, dhpark));

        Future<String> helloFuture = executorService.submit(hello);
        System.out.println("Started!!!");       //get을 수행하기 전 까지는 코드가 쭉 실행이 된다,

        helloFuture.cancel(false);  //cancel을 하게 되면 isDone은 무조건 true가 된다!

        //get을 만나는 순간 결과값을 가져올 때 까지 기다린다(=블록킹 콜)
        //만약 cancel이 된 Future를 get하게 된다면 익셉션이 발생한다.
        helloFuture.get();

        String s = executorService.invokeAny(Arrays.asList(hello, java, dhpark));//any이므로 하나가 끝나면 바로 결과가 나온다
        //가장 짧은 timeout인 dhpark이 호출된다.
        //단 이 때 쓰레드의 타입 및 개수에 따라 다음과 같은 아웃풋이 나온다.
        //SingleThread로 선언된 경우 첫 Callable이 끝날 때 가지 다음 Callable을 호출하지 못 하므로 첫번째 Callable인 Hello 호출
        //fixedThread(2)로 선언 된 경우 dhpark은 들어갈 자리가 없으므로 앞의 2개의 Callable 중 먼저 끝나는 Hello가 출력
        //fixedThread(4)로 선언 된 경우 가장 짧은 Callable인 dhpark이 출력
        System.out.println(s);

        System.out.println(helloFuture.isDone());
        System.out.println("End!!!!");
        executorService.shutdown();
    }
}

'Java 정리' 카테고리의 다른 글

java 8 정리9  (0) 2021.06.17
java 8 정리8  (0) 2021.06.02
java 8 정리6  (0) 2021.05.21
java 8 정리5  (0) 2021.05.20
java 8 정리4  (0) 2021.05.19

java 8 정리5

Java 정리 2021. 5. 20. 15:40

인프런 강의 7일차.

 - 더 자바, Java 8 (백기선 강사님)

 

1. CompletableFuture 1

 - 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스.

   > Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.

 

 - Future로는 하기 어렵던 작업들

   > Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.

   > 블로킹 코드(get())을 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.

   > 여러 Future를 조합할 수 없다. ex) Event 정보 가져온 다음 Event에 참석하는 회원의 목록 가져오기

   > 예외 처리용 API를 제공하지 않는다.

 

 - CompletableFuture

   > Implements Future

   > Impletments CompletableFuture

 

 - 비동기로 작업 실행하기

   > 리턴값이 없는 경우 : runAsync()

   > 리턴값이 있는 경우 : supplySync()

   > 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다.(기본은 ForkJoinPool.commonPool())

 

 - 콜백 제공하기

   > thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백

   > thenAccept(Consumer) : 리턴값을 또 다른 작업으로 처리하는 콜백 (리턴없이)

   > thenRun(Runnable) : 리턴값을 받아 다른 작업을 처리하는 콜백

   > 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.

 

 - 조합하기

   > thenCompose() : 두 작업이 서로 이어서 실행하도록 조합

   > thenCombine() : 두 작업을 독립적으로 실행하고, 둘 다 종료했을 때 콜백 실행

   > allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

   > anyOf() : 여러 작업 중 가장 빨리 끝난 하나의 결과에 콜백 실행

 

 - 예외처리

   > exceptionally(Function) : exception 발생 시 처리하는 코드

   > handle(BiFunction) : handle은 예외가 발생했을 때, 발생하지 않았을 때 둘 다 사용가능하다

 

package me.whiteship.java8to11;

import ch.qos.logback.core.util.ExecutorServiceUtil;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class App {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        Future<String> future = executorService.submit(() -> "hello");

        future.get();   //get이 블로킹 코드이니 get 이후에 콜백 로직이 들어와야한다.

        //Case1. 명시적으로 dhpark 값 세팅해서 선언
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        completableFuture.complete("dhpark");   //completablefuture의 기본 값을 dhpark으로 정의
        System.out.println(completableFuture.get());    //dhpark 출력

        //Case2. static factory method 사용
        CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("dhpark");
        System.out.println(completableFuture1.get());    //dhpark 출력

        //리턴이 없는 작업은 runAsync 사용
        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());        //future만 정의했기 때문에 수행되지 않고, join()이나 get()을 해야 동작한다.
        });
        System.out.println(completableFuture2.get());

        //리턴이 있는 경우 supplyAsync 사용
        CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        System.out.println(completableFuture3.get());

        //위의 예제들은 Future로도 구현 가능한 기능이라고 봐도 무방하다.
        //CompletableFuture는 추가로 callback 구현이 가능.
        //리턴이 있는 supplyAsync에 callback구현(thenApply)
        CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply((s) -> {       //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
            System.out.println(Thread.currentThread().getName());
            return s.toUpperCase();
        });
        System.out.println(completableFuture4.get());       //get을 호출하지 않으면 아무 일이 일어나지 않는건 동일함

        //리턴이 없는 콜백의 경우 thenAccept
        CompletableFuture<Void> completableFuture5 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept((s) -> {       //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
            System.out.println(Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        });
        completableFuture5.get();       //get을 호출하지 않으면 아무 일이 일어나지 않는건 동일함

        //전달되는 파라미터(리턴) 없이 동작을 하기만 하면 되는 것은 thenRun
        //별다른 Executor 정의가 없다면 ForkJoinPool.commonPool을 사용하게 된다.
        //ForkJoinPool : Java7에 도입된 기능, Executor를 구현한 구현체. 작업을 DeQueue를 사용해서 자기 쓰레드가 할일이 없으면 할일을 가져와서 처리하는 방식
        //자기가 파생시키는 subTask들을 다른 쓰레드에 분산시켜서 작업을 처리하고, 결과를 모아서 최종 결과값을 도출해냄
        //원할 경우 Executor를 새로 정의하는 것도 가능하다.
        //새로 정의한 경우 supplyAsync의 2번째 인자로 전달해야한다.
        ExecutorService executorService1 = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> completableFuture6 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }, executorService1).thenRun(() -> {    //새로 정의한 executorService를 사용해서 쓰레드 호출
            //ExecutorService를 추가 하지 않은 경우 Hello ForkJoinPool.commonPool-worker 출력
            //ExecutorService를 추가한 경우 Hello pool-1-thread-1 출력
            //java5때의 Future로는 get 호출하기 이전에 콜백 정의가 불가능했다.
            System.out.println(Thread.currentThread().getName());
        });
        //}, executorService1);  //callback에서도 정의된 executor를 사용할 수 있다. 이 때 callback은 @async를 사용해야 한다. thenRunAsync, thenApplyAsync, ...
        //다만 supplyAsync때 사용된 쓰레드와 callback에서 사용된 쓰레드가 다를 수 있다!

        //쓰레드 조합해서 사용하기
        //thenCompose

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        //Case1. 2개의 쓰레드 작업 조합
        CompletableFuture<String> completableFuture7 = hello.thenCompose(App::getWorld);
        System.out.println(completableFuture7.get());       //Hello World 출력

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World" + Thread.currentThread().getName());
            return "World";
        });

        //Case2. 입력 값은 2개, 결과 값은 1개로 처리
        CompletableFuture<String> completableFuture8 = hello.thenCombine(world, (h, w) -> h + " " + w); //BiFunction에 해당하는 람다 실행
        System.out.println(completableFuture8.get());   //hello world 출력

        //Case3. 2개 이상의 태스크를 합쳐서 한번에 실행
        //인자로 넘어가는 태스크의 결과가 동일한 타입인지 보장할 수 없고, 태스크가 전부 성공한다는 보장도 없기에 결과가 무의미하다.
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(hello, world)
                .thenAccept(System.out::println);       //thenAccept가 실행되어서 null이 출력됨
        System.out.println(voidCompletableFuture.get());        //결과가 null이다.

        //Case3의 결과가 null이기에 해당 결과를 출력시킬 수 있는 방법은 Collection으로 모아서 한데 처리해야한다.
        List<CompletableFuture<String>> completableFutures = Arrays.asList(hello, world);
        CompletableFuture[] futuresArray = completableFutures.toArray(new CompletableFuture[completableFutures.size()]);
        CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
                .thenApply(v -> {   //결과인 v는 무의미하고, return이 중요하다.
                    return completableFutures.stream()
                            //thenApply가 호출되는 시점은 futuresArray의 모든 작업이 끝난 상태이다!
                            //CompletableFuture.get()을 써도 되나 get은 checkedException이 발생하므로 exception까지 정의해줘야 가능함.
                            .map(CompletableFuture::join)       //join은 uncheckedException이 발생
                            .collect(Collectors.toList());

                });
        listCompletableFuture.get().forEach(System.out::println);       //Hello \n World 출력됨

        //Case4. 아무거나 빨리 끝나는거 결과 하나 받아서 실행
        CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
        voidCompletableFuture1.get();    //hello랑 world중에 먼저 끝나는 작업 출력

        //Case5. Exception 처리
        boolean throwError = true;
        CompletableFuture<String> exceptionHello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalStateException();
            }

            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {    //에러 발생 시 수행하는 코드
            System.out.println(ex);
            return "Error!";
        });

        //Case6. handle은 예외가 발생했을 때, 발생하지 않았을 때 둘 다 사용가능하다 (BiFunction)
        CompletableFuture<String> exceptionHello1 = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalStateException();
            }

            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) -> {
            if(ex != null){
                System.out.println(ex);
                return "Error!";
            }
            return result;      //에러가 없으면 결과 리턴
        });

    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + "World";
        });
    }
}

'Java 정리' 카테고리의 다른 글

java 8 정리7  (0) 2021.05.26
java 8 정리6  (0) 2021.05.21
java 8 정리4  (0) 2021.05.19
java 8 정리3  (0) 2021.05.17
java 8 정리2  (0) 2021.05.13