안녕하세요 ^^ 우리는 이전 게시글들을 통하여 Coroutine 이 어떤 구조로 이루어져 있고,
어떤 방식으로 동작하게 되는 것인지를 알게 되었어요.
이번에는 Coroutine 에서 활용되는 데이터 스트림인 Flow 에 대해서 학습하고 어떻게 활용할 수 있을지에 대해 배워보겠습니다.
Flow 란
Coroutine 의 Flow 는 일종의 데이터 스트림인데요.
데이터를 제공하는 원천지(Producer) 에서 부터 시작해서, 최종적으로 데이터를 읽어들이는 소비지(Consumer) 까지 연결하는
데이터 스트림이라고 보시면 됩니다.
그렇다면 왜 이러한 Flow 를 기반으로 한 데이터 스트림을 사용하게 될까요?
이 질문은 왜 우리가 데이터 스트림을 사용하냐? 라는 질문과 연결됩니다.
데이터 스트림을 사용하는 이유는 가공의 유연성, 실시간성을 지니기 때문입니다.
우리가 소비자에게 제공하는 비즈니스는 단순한 하나의 도메인이 갖는 비즈니스가 아닌 여러 도메인을 응집한 비즈니스를 제공하게 됩니다.
이때, 화면에서 제공되는 데이터는 각각의 요구사항에 맞게 유연하게 변경하고, 빠르게 제공되어야할 필요가 있는데, 이를 데이터 스트림 기반으로 한 pipeline 으로 유연성과 실시간성을 가져올 수 있기 때문이에요
Coroutine 에서도 역시나 데이터 스트림에 기술적 요구사항을 만족하기 위해 Flow 라는 인터페이스를 제공하였고,
Suspend 함수 기반으로 쓰레드를 더 효과적으로 활용하여 이를 극대화 하였습니다.
Flow 어떻게 구성되어 있는지는 하나씩 알아보도록 해보겠습니다.
Flow 의 구성요소
제일 첫번째 그림에서 보여주었듯이 2가지 는 필수 입니다.
- Producer: Upstream 으로 데이터를 제공하는 생산자
- Consumer: Upstream 에서 제공된 데이터를 가져와서 읽어들이는 소비자
- Intermediary(선택): 데이터를 경유하는 중간자
이것을 코드로 표현하면 아래와 같습니다.
fun main(): Unit = runBlocking(Dispatchers.Default) {
flow { // producer
repeat(times = 3) { data ->
delay(300L)
emit(data)
println("produced $data")
}
}.collect { data -> // consumer
delay(700L)
println("consumed $data")
}
}
flow { } 는 Producer 로직을 구현할 수 있고, collect { } 는 Consumer 로직을 구현할 수 있습니다.
사용법은 아주 간단하죠?
여기서, Producer 로직만 있고, Consumer 로직을 제외하면 어떻게 될까요?
fun main(): Unit = runBlocking(Dispatchers.Default) {
flow {
repeat(times = 3) { data ->
delay(300L)
emit(data)
println("produced $data")
}
}
}
이 코드를 통해 Flow 의 특성을 알 수가 있는데요. 바로 Flow 는 Cold stream 이라는 것입니다.
Cold stream 은 누군가가 Consume 하기 전까지는 데이터를 발행하지 않습니다.
즉, 하나의 flow 객체는 여러 Consumer 가 붙어서 데이터를 받아갈 수 있다는 것을 의미합니다.
재사용성이 아주 좋겠죠?
또한, Flow 는 순차적인(sequence) 데이터 흐름 방식을 가지고 있습니다.
이것을 통해 우리는 기본적인 Flow 의 특성을 알아보았고, 조금 더 심화해보도록 합시다.
Flow 에 Intermediary 가 필요한 이유?
아직까지 Flow 가 어떤식으로 흐르게 되는지는 감이 안오는 것 같습니다. 로깅을 좀 더 추가해서 가동해볼까요?
fun main(): Unit = runBlocking(Dispatchers.Default) {
val flowStartTime = AtomicLong(0L)
flow {
repeat(times = 3) { data ->
delay(300L)
produceLog(flowStartTime.get(), data)
emit(data)
}
}.onStart {
flowStartTime.set(System.currentTimeMillis()) // 시작 시간 기록
}.collect { data ->
delay(700L)
consumerLog(flowStartTime.get(), data)
}
}
private fun produceLog(flowStartTime: Long, data: Int) {
println( // 1. 생산 시간 기록 2. 쓰레드 이름 기록 3. 데이터 기록
"""
Time: ${elapsedFromStartTime(flowStartTime)}ms / Producer Coroutine : ${Thread.currentThread().name} / data : $data [>>]
""".trimIndent()
)
}
private fun consumerLog(flowStartTime: Long, data: Int) {
println( // 1. 소비 시간 기록 2. 쓰레드 이름 기록 3. 데이터 기록
"""
Time: ${elapsedFromStartTime(flowStartTime)}ms / Consumer Coroutine : ${Thread.currentThread().name} / data : $data [<<]
""".trimIndent()
)
}
private fun elapsedFromStartTime(flowStartTime: Long) = System.currentTimeMillis() - flowStartTime
Flow 에서 데이터 스트림이 어떤 시간 순서로 인하여 스트림 되는지 알아보기 위해 간단하게 로그를 추가하였습니다.
놀랍게도 Producer 의 300ms latency 와 Consumer 의 700ms latency 를 합쳐서
하나의 데이터를 발행 / 구독 하는데 1000ms 가 걸리는 것을 볼 수가 있네요.
이런 방식이라면 우리는 실제 현업에 적용할 수 없습니다. 결과적으로 데이터를 효과적으로 가져오기는 커녕 latency 를 순차적으로 받게 되는 형태니까요.
왜 이렇게 될까요?
바로 하나의 Coroutine 이 Produce 와 Consume 을 동시에 처리하기 때문입니다.
그래서 우리는 이를 분리하는 Intermediary(buffer) 를 둠으로써 해결할 수 있습니다.
fun main(): Unit = runBlocking(Dispatchers.Default) {
val flowStartTime = AtomicLong(0L)
flow {
repeat(times = 3) { data ->
delay(300L)
produceLog(flowStartTime.get(), data)
emit(data)
}
}.onStart {
flowStartTime.set(System.currentTimeMillis())
}.buffer(capacity = 10, onBufferOverflow = BufferOverflow.SUSPEND)
.collect { data ->
delay(700L)
consumerLog(flowStartTime.get(), data)
}
}
channel 기반의 buffer 를 통해 Intermediary 를 선언하게 되면, 놀랍게도 아래와 같이 실행됩니다.
왜 이렇게 변화된 것일까요?
원리는 간단합니다.
Channel buffer 라는 중간 데이터 스트림 관리자를 통하여 producer 는 produce 하는 자원으로 집중할 수 있고,
consumer 는 consume 하는 주언으로 집중할 수 있기 때문입니다.
즉 producer 와 consume 의 coroutine 을 분리할 수 있습니다.
물론 buffer 의 함수 파라미터는 2가지인데요. 요구사항에 맞는 방향으로 설정할 수 있습니다.
Parameter | 설명 |
capactity | buffer 의 size 를 몇으로 지정할 것인지 설정 단, default 는 64. |
onBufferOverFlow | Buffer 의 size 가 초과했을 때 데이터에 대한 보관을 어떻게 처리할 것인지 결정 단, default 는 SUSPEND SUSPEND - 일시중지 DROP_OLDEST - 오래된 데이터 drop DROP_LATEST - 최신 데이터 drop |
위 설정만 알고 있다면, 이 Buffer 전략은 다양하게 처리할 수 있습니다.
예를 들면, conflate 라는 메서드는 내부적으로 DROP_OLDEST 라는 전략을 취하게 되는데요.
fun main(): Unit = runBlocking(Dispatchers.Default) {
val flowStartTime = AtomicLong(0L)
flow {
repeat(times = 3) { data ->
delay(300L)
produceLog(flowStartTime.get(), data)
emit(data)
}
}.onStart {
flowStartTime.set(System.currentTimeMillis())
}.conflate()
.collect { data ->
delay(700L)
consumerLog(flowStartTime.get(), data)
}
}
그렇기 때문에 Consume 과정중에 delay 가 생긴다면, 상황에 따라 Produce 된 마지막 데이터만 읽게 됩니다.
이런 과정을 통해 우리는 Flow 가 어떤식으로 동작하고, 어떻게 활용될 수 있는지를 알아보았습니다.
물론 제가 알려드린 것 외에도 참 다양한 기능들을 제공합니다 ^^
이에 대한 기능들은 직접적으로 코드를 타이핑해보면서 탐구하면 좋겠네요.
정리하며
Flow 에 대해서 종합적으로 정리하면요~!
- Flow 는 Coroutine 에서의 데이터 스트림이다.
- Flow 는 순차적으로 처리되며 Cold 스트림의 성격을 띄고 있다.
- Producer 와 Consumer 의 Coroutine 을 분리함으로써 유연한 전략을 기반으로 데이터 스트림 처리를 할 수 있다.
다들 Flow 를 적극적으로 활용하면 좋겠네요. :)
참고 자료
Android의 Kotlin 흐름
Coroutine Flow
'Developer > Kotlin & Java' 카테고리의 다른 글
[Kotlin] Coroutine - 5. 코루틴의 Channel 의 모든 것 (0) | 2023.03.05 |
---|---|
[Kotlin] Coroutine - 4. 코루틴에서의 예외(exception) 핸들링 (0) | 2023.02.04 |
[Kotlin] Coroutine - 2. CoroutineScope & Context & Dispathcer 을 파헤쳐보자 (2) | 2022.03.01 |
[Kotlin] Coroutine - 1. 코루틴에서 동시성이란? (5) | 2022.02.05 |
Kotlin Collection Util Method - 코틀린의 Collection Util 함수들을 파헤쳐보자 (2) | 2021.05.30 |