비동기(Asynchronized)란?
안녕하세요~ ㅎㅎ
오늘은 서버 개발에 있어서 아주아주 중요한 항목인~
비동기 처리에 대해 알아보려고 해요!!
일반적인 REST API로 서버를 만들 때
보통의 경우에는 단일 쓰레드로, Request / Response 형태를 가지고 있지만,
요청에 대한 요구사항이 크거나 동일한 속성이 여러개를 띄고 있거나 빠른 응답이 필요하다면
고려해야되는 것중의 하나가 바로 멀티쓰레드 처리 인데요.!
요번 시간에는 자바(Spring)에서 어떻게 멀티쓰레드로 처리할지 알아보도록 하겠습니다.!
먼저 동기, 비동기 처리가 어떤 것인지 알아보도록 할까요?
위 그림처럼 단일 쓰레드에서는 요청이 길어지면 길어질 수록 처리시간이 선형적으로 늘어날 수 밖에 없습니다 ㅠ
그러니까 놀고 있는 CPU 자원들을 최대한 끌어올려서 놀지 않게 처리하는 것이죠.!
그렇다면 Java에서는 어떠한 기능들이 있을까요?
Java 비동기 처리
자바에서는 예전부터 내려오던 Future라는 클래스가 있습니다!
무려 Jdk 1.5 버젼부터 내려오던 것이죠
하지만 Future에는 get() 하기 위한 너무 복잡한 try - catch와
여러개의 Future List들을 처리할려면 정말 코드가 산으로가는 현상들이 많이 발생합니다 ㅠㅠ
그래서 나온 것이 바로.! CompletableFuture ( Jdk 1.8 )
주요 기능들은(Jdk 1.8) 버젼에 많이 추가가 되었고,
더 추가된 항목들은(Jdk 1.9) 버젼에 더 추가된 기능들이 있습니다!
Completable Future에는 많은 기능들이 있습니다!
한번 살펴보도록 할까요.?
CompletableFuture의 주요기능
먼저 일반적으로는 정적 팩토리 메서드를 호출해서 사용하는 것이 일반적입니다.!
객체 생성보다는 훨씬 가독성이 좋기 때문이죠
Function | Parameter Type | Return | Description |
runAsync | Runnable, (Option) Executor | CompletableFuture<Void> | Task를 받아서 실행하고 반환한다 |
supplyAsync | Supplier<U>, (Option) Executor | CompletableFuture<U> | Lambda를 받아서 리턴타입(U)를 반환한다 |
allOf | CompletableFuture<?>... | CompletableFuture<Void> | CompletableFuture 리스트를 받아서 하나로 반환한다 |
anyOf | CompletableFuture<?>... | CompletableFuture<Void> | CompleatableFuture 리스트를 받아서 하나로 반환한다 |
일단 대표적으로 쓰이는 것이 바로 runAsync와 supplyAsync입니다.!
바로 코드로 볼까요?
RunAsync
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": hi");
});
Runnable은 익명 클래스로 Lambda로 던질 수 있습니다!
리턴 타입은 따로 없고, 그냥 Task를 받아서 소비하는 역할이죠.!
실행해보면?
내부적으로 알아서 ForkJoinPool을 사용한 것을 볼 수 있습니다.!
이거는 조금 위험한(?) 방식이죠.
만일 시스템이 거대하고 쓰레드를 직접 컨트롤하고 싶은 경우가 많을텐데
그렇다면 어떻게 해야할 까요?
간단합니다.
Executor를 파라미터로 보내면 됩니다!
Executor executor = Executors.newFixedThreadPool(30);
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": hi");
}, executor);
우리가 지정한 ThreadPool을 보내게 되면?
이렇게 지정한 쓰레드 풀 내에서 작업을 실행하게 됩니다!!
그럼 이제 가장 중요한 supplyAsync를 알아보도록 할까요?
SupplyAsync
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName() + ": hi";
});
runAsync와의 차이점은 단순히 리턴값이 있냐 없냐 그 차이에요!
AllOf, AnyOf
AllOf, AnyOf는 CompletableFuture들을 하나로 묶어줍니다
대신 2개의 차이점은
AllOf 은 모든 CompletableFuture들이 Task들을 완료한다는 조건으로 사용하고
AnyOf 은 하나라도 Task들을 성공한다는 조건으로 사용합니다!
CompletableFuture<Void> future = CompletableFuture.allOf(
IntStream.range(0, 10)
.limit(5) // 5개의 Intstream을 만들어서
.boxed() // Stream으로 묶고
.map(time -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName() + ": hi";
}, executor)
) // CompletableFuture로 묶은다음
.toArray(CompletableFuture[]::new) // 배열로 수집한다.!
);
이러한 느낌으로 사용할 수 있어요!
Exceptionally
멀티쓰레드 환경에서 가장 중요한 것은
바로 예외상황이죠!!!
예외가 발생하면 Exceptionally를 통해 핸들링 할 수 있습니다
@Test
@DisplayName("supplyAsync 테스트")
void supplyAsync_test() {
Executor executor = Executors.newFixedThreadPool(30); // Thread Pool 만들고
final List<CompletableFuture<String>> futures = IntStream.range(0, 100)
.limit(10) // 10개의 랜덤한 0 ~ 100 숫자
.boxed()
.map(time -> CompletableFuture.supplyAsync(() -> { // 해당 숫자만큼 쓰레드 정지
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName() + ": hi";
}, executor)
.orTimeout(2L, TimeUnit.MILLISECONDS) // 대신 타임아웃은 2ms
.exceptionally(e -> Thread.currentThread().getName() + ": Failed") // 예외 발생시 실행할 태스크
)
.collect(Collectors.toList());
futures.stream()
.map(CompletableFuture::join)
.forEach(System.out::println);
}
CompletableFuture에 타임아웃을 걸어주고
예외상황이면 실패라는 메세지를 받도록 해볼게요!
해당 테스트를 돌려보면?
실패한 Task와 성공한 Task리스트들을 볼 수 있네요.!
추가된 기능
그렇다면 Java9에 추가된 기능은 무엇일까요?
여기서 우리가 제일 중요하게 살펴보아야 할 것은 바로
timeOut 기능과 completeAsync인데요!
TimeOut은 해당 시간내에 작업을 처리하지 못하면,
해당 Thread에서 TimeOutException을 던지게 됩니다!
completeAsync는 supplyAsync와 똑같다고 생각하면 됩니다!
한마디로, supplyAsync가 completeAsync의 정적 팩터리 버젼이죠!
그럼 이제 실제 도메인을 예시로 한번 실습해볼까요?
Cafe 만들기
먼저 예시 Cafe라는 도메인을 예시로 해볼게요
package com.huisam.springstudy.completablefuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class Coffee {
private final String name;
static Coffee of(String coffeeName) {
delay();
return new Coffee(coffeeName);
}
private static void delay() {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Coffee를 만들고 싶은데, Coffee를 만들 때는 아쉽게 제작하는데 시간이 걸리네요 ㅠ
무려 0.1초나 걸리는 어려움이 있습니다 ㅠㅠ
그래서 이러한 Coffee들을 만드는 Cafe를 차려보도록 할게요!
package com.huisam.springstudy.completablefuture;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
class Cafe {
List<Coffee> asyncOrder(List<String> coffeeNames) {
final List<CompletableFuture<Coffee>> futures = coffeeNames.stream() // 멀티쓰레드
.map(coffeeName ->
CompletableFuture.supplyAsync(() // 비동기 처리를 위한 supplyAsync
-> Coffee.of(coffeeName))
.orTimeout(1L, TimeUnit.SECONDS) // Time제한
.exceptionally(e -> Coffee.of("Failed Coffee")) // 예외가 발생하면 망한 커피
)
.collect(Collectors.toUnmodifiableList());
return futures.stream() // Stream의 Lazy한 속성 때문에 분리
.map(CompletableFuture::join)
.collect(Collectors.toUnmodifiableList());
}
List<Coffee> order(List<String> coffeeNames) {
return coffeeNames.stream() // 단일 쓰레드로 동기처리
.map(Coffee::of)
.collect(Collectors.toUnmodifiableList());
}
}
일반적으로 처리하는 order와 멀티쓰레드로 처리하는 asyncOrder를 만들어보았어요!
Order 메서드의 경우
- CoffeeName List를 순회하면서
- Coffee를 만들고
- 이를 리스트로 수집합니다!
AsyncOrder 메서드의 경우
- CoffeName List를 순회하면서
- Coffee를 비동기로 만들고.!
- 멀티쓰레도 처리하는데
- 시간이 Over되면 예외를 발생
- 예외가 발생된 항목은 "Failed Coffee" 로 생산!
- 이를 CompletableFuture로 수집
- 다시 이에 대한 stream을 순회하면서
- Join을 통해 멀티쓰레드를 다시 하나의 메인쓰레드로 수집
- 리스트로 반환
라는 작업으로 볼 수 있습니다.!
그럼 이제 정말 제대로 되는지 테스트해봅시다.!
class CafeTest {
private final Cafe cafe = new Cafe();
@Test
@Timeout(value = 200L, unit = TimeUnit.MILLISECONDS)
@DisplayName("동기 처리 테스트")
void sync_test() {
/* given */
List<String> coffeeNames = List.of("americano", "latte", "moca", "water");
/* when */
final List<Coffee> coffees = cafe.order(coffeeNames);
/* then */
coffees.stream()
.map(Coffee::getName)
.forEach(name ->
assertThat(coffeeNames.contains(name))
);
}
@Test
@Timeout(value = 200L, unit = TimeUnit.MILLISECONDS)
@DisplayName("비동기 처리 테스트")
void async_test() {
/* given */
List<String> coffeeNames = List.of("americano", "latte", "moca", "water");
/* when */
final List<Coffee> coffees = cafe.asyncOrder(coffeeNames);
/* then */
coffees.stream()
.map(Coffee::getName)
.forEach(name ->
assertThat(coffeeNames.contains(name))
);
}
}
4개의 주문에 대해서 200ms라는 시간제한을 둔 테스트에요!
동기처리는 단일쓰레드로 100ms가 4번 블록킹 될것이고,
비동기처리는 멀티쓰레드로 각자의 쓰레드가 100ms 블록킹 될 것이에요!
실행해보면?
아아 아쉽게도 동기 처리는 성공하지 못했네요 ㅠㅠ
마치면서
오늘은 CompletableFuture에 대해서 알아보았는데요
이 개념이 앞으로 배울 Reactive에 있어서 가장 기초가 되는 항목이기에
정말 꼼꼼한 공부가 필요해요
물론 저도 모든 개념을 정리하지는 못했지만,
모자른 점이나 정리하지 못한점이 있다면 댓글로 남겨주시면 좋을 것 같습니다!
미약하게나마 읽어주셔서 감사하고,
멀티쓰레드 시스템은 항상 신중하고 조심히 설계하시길 바래요!
( 항상 모든 예외를 생각해주는 것이 좋습니다 )
참고
Java9 - CompletableFuture
Oracle - CompletableFuture
'Developer > Kotlin & Java' 카테고리의 다른 글
Kotlin - Null 을 다루는 방법 / 체이닝 / lateinit (0) | 2021.01.16 |
---|---|
JVM과 Garbage Collection - G1GC vs ZGC (6) | 2020.11.02 |
Java8 - Stream과 함수형 인터페이스(lambda 표현식)에 대해서 (0) | 2020.05.01 |
자바 직렬화 - Java Serialization (0) | 2020.03.20 |
SOLID - DIP(Dependency Inversion Principle)란 : 의존성 역전 원칙 (8) | 2019.11.27 |