안녕하세요~! 이전에 Coroutine 에 대한 기본 개념들을 알아보았죠
이제 학습했던 Coroutine 기본 지식들을 기반으로 Spring Webflux 에 접목시킬 시간인데요.
어떻게 Spring Webflux 에서는 Coroutine 을 접목시켰는지 코드를 보면서 연결 포인트들을 알아보려고 합니다.
혹시나 Coroutine 에 익숙하지 않거나 개념을 모르신다면 아래 게시글을 참조해주세요 ^-^
https://huisam.tistory.com/entry/coroutine2
Spring Webflux
Spring WebFlux 는 Reactor 의 철학을 기반으로 Spring 의 기술과 접목하여
비동기 서버에 대한 인터페이스를 제공하는 것을 목표로 하는데요.
https://docs.spring.io/spring-framework/reference/web/webflux/new-framework.html
그렇기 때문에 Reactor 의 Mono / Flux 인터페이스를 앞장세워 모든 비동기 처리를 진행하게 됩니다.
결과적으로, 비동기 서버를 구축하고 개발하기 위해서는 Reactor 의 근본이 되는 Observer 패턴을 기반으로 한
Mono / Flux 의 복잡한 인터페이스들을 배워야 했었습니다.
이는 MVC 에 익숙한 많은 Spring 개발자들에게 큰 허들이 되고 난관으로 다가왔습니다.
특히나 MVC 신규 입사자가 처음 보는 Reactor 를 익히기에는 울상을 지으면서, 많은 시간과 노력이 필요했죠 😭
그러나 시간이 지나면서 jvm 기반의 언어인 kotlin 이 나왔고, Coroutine 이라는 비동기 인터페이스를 제공하면서
Spring 에서도 Reactor 와 Coroutine 을 접목시켜 Coroutine 철학을 기반으로한 비동기 서버를 제공하려는 움직임들이 보이고 있습니다.
특히 Spring Boot 3.x(=Spring Core 6.x) 부터는 CoWebFilter 라는 인터페이스도 제공하고 있습니다.
abstract class CoWebFilter : WebFilter {
final override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return mono(Dispatchers.Unconfined) {
filter(exchange, object : CoWebFilterChain {
override suspend fun filter(exchange: ServerWebExchange) {
return chain.filter(exchange).cast(Unit.javaClass).awaitSingleOrNull() ?: Unit
}
})}.then()
}
protected abstract suspend fun filter(exchange: ServerWebExchange, chain: CoWebFilterChain)
}
interface CoWebFilterChain {
suspend fun filter(exchange: ServerWebExchange)
}
또한 Spring Boot 에서는 이미 공식적으로 Kotlin Coroutine 에 대한 버젼 관리도 같이 하고 있습니다.
https://docs.spring.io/spring-boot/docs/current/reference/html/dependency-versions.html
이러한 정황들을 바탕으로 Spring 은 Kotlin 의 라이브러리들을 접목하려는 노력을 가하고 있다 라고 볼 수 있습니다.
결과적으로, 이미 여러분들이 Kotlin 에 능숙한 개발자라면 굳이 Reactor 의 철학을 익혀가면서 비동기 서버를 도입할 이유는 없다고 봅니다.
그래서 본격적으로 WebFlux 가 어떻게 Coroutine 을 접목시켰는지 확인해보려고 합니다.
WebFlux 와 Coroutine 의 접목점은 어디일까?
예제 Controller 와 함께 찾아가보도록 해볼게요
@RestController
@RequestMapping("/api/v1/coroutine")
class CoroutineController {
@GetMapping("/hello")
suspend fun hello(
@RequestParam(required = false) param: String?
): String {
return "Hello client: ${param ?: "default"}"
}
}
간단하게 Controller 로 만들고 suspend 함수를 선언하여 API 요청을 받을 수 있도록 지정하였습니다.
WebFlux 에서 HTTP 요청의 흐름은
1. org.springframework.web.server.WebFilter
2. org.springframework.web.reactive.DisptacherHandler
3. org.springframework.web.reactive.HandlerAdaptor
순으로 요청이 흘러가게 됩니다.
요청의 마지막 단계, HandlerAdpator 의 구현체인 RequestMappingHandlerAdpater 를 한번 살펴보도록 해볼게요.
DisptacherHandler 에서 받은 CoroutineHandler 를 기반으로 invocableMethod 를 가져와 실행하게 되는데요.
invoke 메서드를 따라가다 보면 아래와 같은 함수를 만나게 됩니다.
invoke target method 가 suspend 함수인지 여부를 체크하여 suspend 함수라면 별도의 invoke 메서드를 호출하게 되는군요
suspend 함수를 decompile 해보시면 Continutation 객체가 함수의 마지막 파라미터로 넘겨서 각각의 함수 호출처마다 중단 포인트들을 선언하게 됩니다.
invokeSuspendingFunction 을 따라가다 보면 최종적으로 아래와 같은 코드를 보게 되는데요!
바로 여기가 우리가 찾던 최종 접목점입니다
핵심은 Mono 에서 Coroutine 으로 변환되는 이 코드입니다.
Mono<Object> mono = MonoKt.mono(context, (scope, continuation) ->
KCallables.callSuspend(function, getSuspendedFunctionArgs(method, target, args), continuation))
.filter(result -> !Objects.equals(result, Unit.INSTANCE))
.onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);
즉, Mono / Flux 기반의 인터페이스에서 Coroutine 인터페이스로 옮길 수 있게 되는 것입니다.
public fun <T> mono(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> {
require(context[Job] === null) { "Mono context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return monoInternal(GlobalScope, context, block)
}
private fun <T> monoInternal(
scope: CoroutineScope, // support for legacy mono in scope
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val reactorContext = context.extendReactorContext(sink.currentContext())
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
/**
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
*/
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()
심지어 Reactor 에서 관리되는 Context 들을 CoroutineContext 로 변환하여 저장할 수 있도록 하고 있습니다.
즉, Reactor 진영에서 관리하는 객체들을 Coroutine 진영의 객체로 변환하여 가져올 수 있다는 것을 의미합니다.
모든 비밀은 풀렸군요.
HTTP 요청이 어떻게 Mono Flux 기반의 Reactor 인터페이스에서
Coroutine 기반의 인터페이스로 들어오게 되는지 원리를 알게 되었습니다.
그러면 마지막에 언급드렸던 Context 들이 잘 넘어오는지만 더 한번 확인해볼까요?
Reactor Context 에서 Coroutine Context 로 데이터 전달해보기
먼저 Reactor 진영에서 Context 데이터를 만들어주기 위한 Custom WebFilter 를 하나 만들어보겠습니다.
@Component
class ContextFilter : WebFilter {
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return chain.filter(exchange)
.contextWrite { it.put("key", "test value") }
}
}
Filter chain 을 하기전에 ReactorContext 객체에 key value 쌍으로 데이터를 만들어 지정하였습니다.
위에서 변환되는 모든 과정을 지나, RestController 에 CoroutineContext 에 접근할 수 있는 코드를 작성해보겠습니다.
import kotlinx.coroutines.reactor.ReactorContext
@RestController
@RequestMapping("/api/v1/coroutine")
class CoroutineController {
@GetMapping("/hello")
suspend fun hello(
@RequestParam(required = false) param: String?
): String {
coroutineContext[ReactorContext]?.context
return "Hello client: ${param ?: "default"}"
}
}
갑자기 나타난 ReactorContext 가 뜬금 없을 수 있습니다.
하지만 위 코드에 있는 ReactorContext 는 Coroutine 진영에 있는 CoroutineContext 입니다.
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
// `Context.of` is zero-cost if the argument is a `Context`
public constructor(contextView: ContextView): this(Context.of(contextView))
public companion object Key : CoroutineContext.Key<ReactorContext>
override fun toString(): String = context.toString()
}
CoroutineContext 는 Key 를 기반으로 각각의 CoroutineContext 에 접근할 수 있습니다.
특히, CoroutineContext 는 독립적인 CoroutineContext 를 선언하여 사용하지 않는 이상 관리되고 있는 CoroutineContext 는 계속 전파하게 됩니다.
자 그러면 debug 를 찍고 한번 자세하게 확인해보겠습니다.
아까 WebFilter 에서 지정한 reactor.util.context.Context 의 key value 가
CoroutineContext 기반으로 잘 들어가 있는 것을 볼 수 있었네요 ^-^
그렇다면 MVC 기반의 동기 서버 개발자들에겐 의문점이 들 수 있습니다.
왜 비동기 서버에서는 Thread 기반이 아닌 객체(Context) 기반으로 데이터를 전달하게 될까요??
비동기 방식의 흐름은 쓰레드를 고정시켜놓고 대기 시간이 있다면 쓰레드를 대기하는 것이 아닌
여러 쓰레드를 왔다 갔다 하면서 처리하면서 시스템 자원을 최대로 활용하기 위함입니다.
이는 동시성 이라는 개념하고도 연결되는데요. 동시성에 대해서는 아래 게시글을 참고해주세요!
https://huisam.tistory.com/entry/coroutine1
이는 비동기 서버 공통 매커니즘에 해당하는데요
비동기 서버를 운영한다고 하면 고정 쓰레드 기반의 사고방식에서 벗어나 생각하는 것이 매우 중요합니다.
정리하며
Spring WebFlux 가 어떻게 Coroutine 기반의 인터페이스를 제공하는지에 대해 알아보았네요 ㅎㅎ
Spring 진영에서도 Coroutine 인터페이스를 제공하기 위한 개발들을 진행하고 있고, 기술이 점점 발전하고 있는 추세입니다.
비동기 서버를 운영하고 싶다면 Coroutine 기반의 Spring WebFlux 로 점차 도전해보는 것이 어떤가 싶네요.
'Developer > Spring' 카테고리의 다른 글
Micrometer Tracing 에 대해 알아보자(a.k.a. spring cloud sleuth) (0) | 2023.10.28 |
---|---|
Spring 에서 Transactional 을 사용할 때 Exception 이 발생하는 상황에 주의하자 (1) | 2023.06.11 |
[JPA] Hibernate 에서 Query statement caching 은 어떻게 이루어질까? (5) | 2023.01.08 |
[Test] 효과적인 Integration Test 를 위한 Spring Test Context 를 구성해보자 (2) | 2022.09.24 |
[Kotlin/Feign] Apache Http Client5 에 대해 알아보자 (0) | 2021.11.06 |