안녕하세요~!
Coroutine 에서 제공하는 다양한 Interfa
ce 중에서 Queue 와 비슷한 개념을 가진
Channel 에 대해서 알아볼려고 해요
Channel 이란
Channel 은 쉽게 말씀드리면 데이터를 stream 처럼 전송하기 위한 인터페이스라고 보시면 됩니다.
마치 Kafka 처럼 데이터를 제공하는 producer 와 데이터를 소비하는 consumer 의 형태로 구성됩니다.
위 그림처럼 인터페이스도 데이터를 제공하는 SendChannel 과 데이터를 소비하는 ReceiveChannel 로 이루어진 것을 볼 수 있죠.
public interface SendChannel<in E> {
public suspend fun send(element: E)
}
public interface ReceiveChannel<out E> {
public suspend fun receive(): E
}
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
Queue Collection 과 동일하게 Channel 에서 첫번째로 제공된 데이터는 항상 첫번째로 소비 되는 것이 보장됩니다.
First come First served
개념자체는 아주 간단하기 때문에 설명은 여기까지 하고 자세한 것은 예제와 함께 깊이 들어가보도록 할게요
Channel 예제
가장 기본적인 채널을 만들어보겠습니다.
// 버퍼가 없는 채널. 수신자가 receive 를 호출하기 전까지 send 를 호출해도 대기한다
val rendezvousChannel = Channel<Int>()
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
rendezvousChannel.send(it)
println("$rendezvousChannel send $it")
delay(10)
}
rendezvousChannel.close()
}
위 코드는 10ms 의 주기로 channel 에 데이터를 보내는 데이터 생산 코드입니다.
생산 코드를 만들고, 간단하게 channel 에 있는 모든 데이터를 소비하는 함수를 하나 만들어볼게요.
private suspend fun <E> consumeAll(channel: Channel<E>) {
for (data in channel) {
println("[$channel] consuming data: $data")
}
}
consumeAll(rendezvousChannel)
위 코드를 실행하면 놀랍게도 produce 와 consume 이 섞이는 현상을 볼 수 있습니다.
왜 그럴까요?
Channel 의 기본 생성자를 들어가보면 다음과 같습니다.
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
아하 capacity 정책이 RENDEZVOUS 이고, BufferOverflow 정책이 SUSPEND 라서 그렇구나
라는 것을요
그렇다면 capacity 정책과 bufferOverflow 정책이 뭐길래 그럴까요? 간단하게 하나씩 탐구해보겠습니다.
Capacity 정책
Coroutine 에서 제공하는 Capacity 정책은 여러가지가 있습니다.
RENDEZVOUS
RENDEZVOUS 는 쉽게 말씀드리면 Buffer 가 존재하지 않는 정책입니다.
따라서 위 예제처럼 데이터를 소비해야만 그 다음 데이터가 생산될 수 있는 형태기 때문에 하나씩 교차하면서 데이터를 생산 / 소비 하게 되는 것이죠.
UNLIMITED
UNLIMITED 는 버퍼가 무한한 채널입니다.
데이터의 소비 유무에 상관없이 무조건 데이터를 생산하기 때문에 자칫하면 OutOfMemory 가 발생될 위험이 있습니다.
val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
unlimitedChannel.send(it)
println("$unlimitedChannel send $it")
delay(10)
}
unlimitedChannel.close()
}
delay(500)
consumeAll(unlimitedChannel)
위와 같은 예제 코드를 만들고 돌려보면 어떻게 될까요?
데이터의 소비 유무와 상관없이 무조건 생산되는 것을 볼 수 있네요
BUFFERED
BUFFERED 는 크기가 고정된 buffer 를 사용하는 Channel 입니다.
Buffer 가 가득차게 되면 데이터를 더 이상 생산하지 않고 생산자도 대기하게 되는 것이죠
실제로 현업에서 가장 사용할일이 많은 Channel 유형중의 하나라고 보셔도 되겠습니다.
데이터 생산 소비에 대한 역할을 조절할 수 있기 때문에 리액티브 진영의 배압(Backpressue) 와 같은 역할을 할 수 있죠
val arrayChannel = Channel<Int>(5)
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
arrayChannel.send(it)
println("$arrayChannel send $it")
delay(10)
}
arrayChannel.close()
}
delay(500)
consumeAll(arrayChannel)
5개 buffer size 를 갖는 채널을 만들고 10개의 데이터를 생산하고, 소비는 모든 데이터를 순차적으로 소비하는 코드를 작성해보았어요.
5개의 size 를 유지하는 모습을 볼 수 있고, 소비되기 시작함과 동시에 새로운 데이터를 생산하는 것을 볼 수 있네요.
CONFLATED
CONFLATED 는 buffer 가 1인 channel 를 유지하지만, 이미 생산된 데이터가 있는 경우 데이터를 override 하게 됩니다.
val conflatedChannel = Channel<Int>(Channel.CONFLATED)
CoroutineScope(Dispatchers.Default).launch {
repeat(10) {
conflatedChannel.send(it)
println("$conflatedChannel send $it")
delay(10)
}
conflatedChannel.close()
}
delay(500)
consumeAll(conflatedChannel)
데이터 생산자는 데이터를 10개를 보내고, 500ms 후에 소비한다고 가정하면
이전에 생산된 데이터는 모두 읽지 못하고, 마지막에 생산된 데이터만 소비하는 것을 볼 수 있네요
요약하면 아래와 같습니다.
분류 | 설명 |
RENDEZVOUS | Buffer 가 존재하지 않는 Channel |
UNLIMITED | Buffer 가 무한대인 Channel |
BUFFERED | Buffer 의 크기를 한정해서 사용하는 Channel |
CONFLATE | Buffer 의 크기는 1이고, 마지막에 제공된 데이터만 보장하는 Channel |
BufferOverflow 정책
Channel 에서 Buffer 를 어떻게 사용할 수 있는지를 보았다면 Overflow 정책은 어떤게 있을까요?
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
Buffer 가 넘치게 되는 경우 취할 수 있는 행동은 크게 3가지 종류가 있습니다.
종류 | 설명 |
SUSPEND | 더 이상 데이터를 Channel 에 받아들이지 않고 대기한다. ( default ) |
DROP_OLDEST | 가장 오래된 데이터를 제거하고 새로운 데이터를 받아들인다. ( queue ) |
DROP_LATEST | 가장 최신의 데이터를 제거하고 새로운 데이터를 받아들인다. ( stack ) |
종류가 딱 원하는 만큼 있기 때문에 요구사항에 따라 입맛에 맞는 전략을 취할 수 있습니다 : )
정리하며
오늘은 Channel 의 종류에 대해 알아보았네요. 비교적 간단하지만 개념에 대해서는 딱 정리하고 가면 좋겠네요 ㅎㅎ
- Channel 은 Coroutine 간에 데이터를 주고 받기 위해 만들어진 인터페이스다
- Channel 은 데이터를 제공하는 생산자, 데이터를 소비하는 소비자가 있다
- Channel 은 Buffer 에 대한 정책을 다양하게 지정할 수 있다
- Channel 에서 BufferOverflow 발생시에 대한 정책을 다양하게 지정할 수 있다
참고
Coroutine Channel
'Developer > Kotlin & Java' 카테고리의 다른 글
Java21 Virtual thread vs Kotlin Coroutine - 가상쓰레드와 코루틴에 대해 고찰해보자 (2) | 2024.01.27 |
---|---|
[Kotlin] Kotlin Annotation 에 대해 톧아보기 (0) | 2023.04.01 |
[Kotlin] Coroutine - 4. 코루틴에서의 예외(exception) 핸들링 (0) | 2023.02.04 |
[Kotlin] Coroutine - 3. 코루틴의 Flow 활용 (2) | 2022.11.27 |
[Kotlin] Coroutine - 2. CoroutineScope & Context & Dispathcer 을 파헤쳐보자 (2) | 2022.03.01 |