[Java] CompletableFuture 완벽 가이드: 비동기 프로그래밍 핵심 사용법

Java CompletableFuture, Sense

자바 8은 개발자들에게 강력한 기능들을 선물했습니다. 그중에서도 비동기 프로그래밍을 혁신적으로 개선한 CompletableFuture는 단연 돋보이는 기능입니다. 기존의 Future 인터페이스가 가진 한계점들을 극복하고, 보다 유연하고 조합 가능한 비동기 처리를 가능하게 해주었죠. 이번 포스팅에서는 CompletableFuture의 등장 배경과 핵심 기능, 그리고 사용법을 예제 코드와 함께 자세히 살펴보겠습니다.

1. Future의 한계와 CompletableFuture의 등장 배경

Java 5에서 Future가 도입되면서 비동기 작업의 결과를 가져올 수 있게 되었지만, 몇 가지 불편한 점들이 존재했습니다.

  • 외부에서의 완료 불가: Future는 일단 시작되면 외부에서 명시적으로 완료시킬 수 없었습니다. 오직 작업이 끝나거나 get() 메소드의 타임아웃을 통해서만 상태를 알 수 있었습니다.
  • 블로킹 get(): 작업 결과를 얻기 위해 get() 메소드를 호출하면, 결과가 준비될 때까지 현재 스레드가 블로킹(대기) 상태가 됩니다. 이는 비동기 처리의 이점을 상쇄시키는 요인이었습니다.
  • 조합의 어려움: 여러 Future 객체를 연쇄적으로 처리하거나(예: 첫 번째 작업의 결과를 두 번째 작업의 입력으로 사용), 여러 작업의 결과를 조합하는 것이 복잡했습니다.
  • 예외 처리의 번거로움: 비동기 작업 중 발생한 예외를 섬세하게 처리하기 어려웠습니다.

이러한 Future의 한계를 극복하고, 보다 선언적이고 함수형 프로그래밍 스타일에 맞는 비동기 처리를 지원하기 위해 Java 8에서 CompletableFuture가 등장했습니다. CompletableFutureFuture 인터페이스뿐만 아니라 CompletionStage 인터페이스도 구현하여, 다양한 조합 및 콜백 기능을 제공합니다.

2. CompletableFuture 핵심 기능 살펴보기

CompletableFuture는 크게 다음과 같은 기능들을 제공합니다.

  • 비동기 작업 실행
  • 작업 완료 후 콜백 처리
  • 여러 작업 조합
  • 예외 처리

각 기능들을 예제 코드와 함께 살펴보겠습니다.

2.1. 비동기 작업 실행

가장 기본적인 기능은 작업을 별도의 스레드에서 비동기적으로 실행하는 것입니다.

  • runAsync(Runnable runnable): 반환 값이 없는 작업을 비동기로 실행합니다.
  • supplyAsync(Supplier<U> supplier): 반환 값이 있는 작업을 비동기로 실행합니다.

기본적으로 이 메소드들은 ForkJoinPool.commonPool()을 사용하여 스레드를 할당받지만, Executor를 인자로 전달하여 사용할 스레드 풀을 직접 지정할 수도 있습니다.

runAsync() 예제:

import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureRunTest {

    @Test
    void runAsyncExample() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("비동기 작업 실행 (반환값 없음) - 현재 스레드: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000); // 작업 시뮬레이션
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        });

        System.out.println("메인 스레드 계속 실행 중: " + Thread.currentThread().getName());
        future.get(); // 비동기 작업이 완료될 때까지 대기
        System.out.println("비동기 작업 완료 후 메인 스레드: " + Thread.currentThread().getName());
    }
}

supplyAsync() 예제:

import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSupplyTest {

    @Test
    void supplyAsyncExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("비동기 작업 실행 (반환값 있음) - 현재 스레드: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000); // 작업 시뮬레이션
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "비동기 작업 결과!";
        });

        System.out.println("메인 스레드 계속 실행 중: " + Thread.currentThread().getName());
        String result = future.get(); // 비동기 작업의 결과를 가져옴 (블로킹)
        System.out.println("결과: " + result);
        System.out.println("비동기 작업 완료 후 메인 스레드: " + Thread.currentThread().getName());
    }
}

2.2. 콜백 처리

비동기 작업이 완료된 후, 그 결과를 받아 추가적인 작업을 수행하는 콜백을 등록할 수 있습니다.

  • thenApply(Function<? super T,? extends U> fn): 작업 결과를 받아 다른 타입의 값으로 변환합니다.
  • thenAccept(Consumer<? super T> action): 작업 결과를 받아 소비하고, 반환 값은 없습니다.
  • thenRun(Runnable action): 작업 결과와 관계없이 특정 Runnable 작업을 실행합니다.

thenApply() 예제:

import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCallbackTest {

    @Test
    void thenApplyExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " World")
                .thenApply(String::toUpperCase);

        System.out.println(future.get()); // HELLO WORLD
    }
}

thenAccept() 예제:

// CompletableFutureCallbackTest 클래스에 추가
    @Test
    void thenAcceptExample() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello Callback")
                .thenAccept(s -> System.out.println("결과 소비: " + s.toLowerCase()));

        future.get(); // "결과 소비: hello callback" 출력
    }

thenRun() 예제:

// CompletableFutureCallbackTest 클래스에 추가
    @Test
    void thenRunExample() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            // 어떤 작업 수행
            return "작업 완료";
        }).thenRun(() -> {
            System.out.println("이전 작업 완료 후 실행되는 작업입니다. 결과는 사용하지 않습니다.");
        });

        future.get();
    }

2.3. 작업 조합

여러 CompletableFuture를 조합하여 더 복잡한 비동기 흐름을 만들 수 있습니다.

  • thenCompose(Function<? super T, ? extends CompletionStage<U>> fn): 두 작업이 순차적으로 실행되어야 할 때 유용합니다. 첫 번째 작업의 결과를 두 번째 작업(새로운 CompletableFuture를 반환하는 함수)의 입력으로 사용합니다.
  • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn): 두 작업을 독립적으로 실행하고, 두 작업이 모두 완료되면 결과를 조합하여 새로운 값을 만듭니다.
  • allOf(CompletableFuture<?>... cfs): 여러 CompletableFuture 작업을 동시에 실행하고, 모든 작업이 완료될 때까지 기다립니다. 반환 타입은 CompletableFuture<Void>이며, 각 작업의 결과는 직접 가져와야 합니다.
  • anyOf(CompletableFuture<?>... cfs): 여러 CompletableFuture 작업 중 가장 먼저 완료되는 작업의 결과를 사용합니다.

thenCompose() 예제:

import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCombineTest {

    private CompletableFuture<String> getUserNameAsync() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("사용자 이름 가져오는 중...");
            try { Thread.sleep(500); } catch (InterruptedException ignored) {}
            return "Alice";
        });
    }

    private CompletableFuture<String> getGreetingAsync(String name) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(name + "에게 인사말 생성 중...");
            try { Thread.sleep(500); } catch (InterruptedException ignored) {}
            return "Hello, " + name + "!";
        });
    }

    @Test
    void thenComposeExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getUserNameAsync()
                .thenCompose(this::getGreetingAsync);

        System.out.println(future.get()); // Hello, Alice!
    }
}

thenCombine() 예제:

// CompletableFutureCombineTest 클래스에 추가
    private CompletableFuture<String> getOrderInfoAsync() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("주문 정보 가져오는 중...");
            try { Thread.sleep(700); } catch (InterruptedException ignored) {}
            return "Order #12345";
        });
    }

    @Test
    void thenCombineExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> userNameFuture = getUserNameAsync();
        CompletableFuture<String> orderInfoFuture = getOrderInfoAsync();

        CompletableFuture<String> combinedFuture = userNameFuture
                .thenCombine(orderInfoFuture, (userName, orderInfo) ->
                    userName + "님의 주문: " + orderInfo
                );

        System.out.println(combinedFuture.get()); // Alice님의 주문: Order #12345
    }

allOf() 예제:

// CompletableFutureCombineTest 클래스에 추가
    @Test
    void allOfExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(300); } catch (InterruptedException ignored) {}
            return "작업1 완료";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(500); } catch (InterruptedException ignored) {}
            return "작업2 완료";
        });

        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(future1, future2);
        allDoneFuture.get(); // 모든 작업이 끝날 때까지 대기

        // 각 Future의 결과는 join() 등으로 직접 가져와야 함
        System.out.println("future1 결과: " + future1.join());
        System.out.println("future2 결과: " + future2.join());
    }

anyOf() 예제:

// CompletableFutureCombineTest 클래스에 추가
    @Test
    void anyOfExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> fastFuture = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(200); } catch (InterruptedException ignored) {}
            return "빠른 작업 완료";
        });
        CompletableFuture<String> slowFuture = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
            return "느린 작업 완료";
        });

        CompletableFuture<Object> firstCompletedFuture = CompletableFuture.anyOf(fastFuture, slowFuture);

        // 가장 먼저 완료된 작업의 결과 (타입은 Object)
        System.out.println("가장 먼저 완료된 결과: " + firstCompletedFuture.get()); // 빠른 작업 완료
    }

2.4. 예외 처리

비동기 작업 중 발생할 수 있는 예외를 처리하는 방법도 제공합니다.

  • exceptionally(Function<Throwable, ? extends T> fn): 작업 중 예외가 발생했을 때, 해당 예외를 받아 처리하고 대체 값을 반환할 수 있습니다.
  • handle(BiFunction<? super T, Throwable, ? extends U> fn): 작업의 결과 (정상 완료 시) 또는 예외 (예외 발생 시)를 모두 받아 처리할 수 있는 더 유연한 방법을 제공합니다.

exceptionally() 예제:

import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionTest {

    @Test
    void exceptionallyExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("랜덤 예외 발생!");
            }
            return "정상 처리 완료";
        }).exceptionally(ex -> {
            System.err.println("예외 발생: " + ex.getMessage());
            return "예외 발생 시 기본값";
        });

        System.out.println("최종 결과: " + future.get());
    }
}

handle() 예제:

// CompletableFutureExceptionTest 클래스에 추가
    @Test
    void handleExample() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("또 다른 랜덤 예외!");
            }
            return "성공적인 데이터";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.err.println("handle에서 예외 감지: " + ex.getMessage());
                return "에러 시 대체 메시지";
            }
            return "결과: " + result.toUpperCase();
        });

        System.out.println("처리된 결과: " + future.get());
    }

3. CompletableFuture 사용 시 고려사항

CompletableFuture는 매우 강력하지만, 사용할 때 몇 가지 주의할 점이 있습니다.

  • 스레드 풀 관리: runAsyncsupplyAsync 등을 사용할 때 별도의 Executor를 지정하지 않으면 ForkJoinPool.commonPool()이 사용됩니다. CPU 집약적인 작업이 아닌 I/O 바운드 작업이 많을 경우, commonPool의 스레드가 모두 소진되어 다른 작업에 영향을 줄 수 있습니다. 따라서 작업의 성격에 맞게 적절한 스레드 풀을 구성하고 지정하는 것이 좋습니다.
  • get() 사용 주의: get() 메소드는 결과를 얻을 때까지 현재 스레드를 블로킹합니다. 무한정 대기하는 것을 방지하기 위해 타임아웃을 설정한 get(long timeout, TimeUnit unit)을 사용하거나, 콜백 기반의 비동기 체인을 최대한 활용하여 get() 호출을 최소화하는 것이 좋습니다.
  • 에러 전파: 콜백 체인 중 한 곳에서 예외가 발생하면, 후속 콜백들은 실행되지 않고 exceptionallyhandle 등으로 예외가 전파됩니다. 이를 명확히 이해하고 예외 처리 로직을 구성해야 합니다.

결론

CompletableFuture는 자바에서 비동기 프로그래밍을 훨씬 더 쉽고 효과적으로 다룰 수 있게 해주는 강력한 도구입니다. 콜백, 조합, 예외 처리 등 다양한 기능을 통해 복잡한 비동기 로직을 간결하고 읽기 쉽게 표현할 수 있습니다. 기존 Future의 한계를 넘어선 CompletableFuture를 적극적으로 활용하여, 더욱 반응성 좋고 효율적인 애플리케이션을 구축해 보시기 바랍니다.