본문 바로가기

Coroutines

Coroutines Flow #2

 

플로우 중간 연산자 - Intermediate flow operators

Flow는 Collection이나 Sequences에서와 같이 연산자로 흐름을 변환 할 수 있다. 이 연산자들은 Flow와 마찬가지로 콜드(Cold) 스트림이다. 이러한 연산자에 대한 호출은 일시 중단 기능은 아니며 새로운 변환에 정의를 빠르게 반환한다.

 

기본 연산자에는 map, filter와 같이 익숙한 이름이 있다. Sequences와의 중요한 차이점은 연산자 내의 코드블록이 일시 중단 함수를 호출 할 수 있다는 것이다.

 

요청을 수행하는 것이 일시 중단 기능으로 장기 실행되는 작업인 경우에도 요청의 흐름을 map 연산자로 결과에 매핑할 수 있다.

 

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}


# outputs
response 1
response 2
response 3

 

각 줄은 매 초마다 나타난다.

 

변환 연산자 - Transform operator

transform은 Flow Transformation operator 중에서 가장 일반적인 연산자이다. 복잡한 변환을 구현할 뿐만 아니라 map이나 filter를 모방하여 구현할 수 있다.

 

예를 들어, 오래걸리는 비동기 요청을 수행하기 전에 문자열을 생성하고 요청의 응답이 오면 나타낼 수 있다.

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }
    
    
# outputs
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

 

크기 제한 연산자 - Size-limiting operators

take와 같은 크기 제한 연산자는 최대에 도달하면 flow를 취소한다. 코루틴의 취소는 언제나 exception을 발생시키며 자원 관리 함수(ex try {...] finally {...} 블록과 같은)는 취소시 정상적으로 작동한다.

 

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}            


# outputs
1
2
Finally in numbers

 

숫자 2를 방출 한 후 numbers()함수에서 flow{...} 본문의 실행이 중지되었음을 보여준다.

 

플로우 종단 연산자 - Terminal flow operators

종단 연산자는 Flow를 시작하는 중단 함수이다. collect 연산자는 가장 기본적인 연산자이지만, 다른 종단 연산자가 있으므로 더 쉽게 만들 수 있다.

 

  • toList나 toSet으로 여러 Collection으로 변환할 수 있다.
  • 첫번째 값을 얻거나(first), flow가 단일 값(single)을 방출하게 할수 있다. 
  • flow를 reduce나 fold를 이용하여 값으로 변환할 수 있다.
val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)


# outputs
55

 

순차적인 플로우 - Flow are sequential

여러 Flow에서 동작하는 특수 연산자를 사용하지 않으면 Flow는 각 개별 콜렉션이 순차적으로 수행된다. 콜렉션은 종단 연산자를 호출하는 코루틴에서 직접 동작하며 기본적으로 새로운 코루틴에서 시작하지 않는다.

 

각각 방출된 값은 모든 중간 연산자를 거쳐 업 스트림에서 다운 스트림으로 처리 되고 마지막으로 종단 연산자로 전달 된다.

 

짝수를 찾아 문자열에 맵핑하는 다음 예제를 보자

 

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }   
    
    
# outputs
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

 

플로우 컨텍스트 - Flow context

Flowdml 콜렉션은 항상 호출한 코루틴의 context에서 수행된다. 예를들어 foo Flow가 있을때 세부 사항에 관계없이 지정된 context에서 다음 코드가 실행된다.

 

withContext(context) {
    foo.collect { value ->
        println(value) // run in the specified context 
    }
}

 

Flow의 이러한 특성을 컨텍스트 보존(context preservation) 이라고 부른다.

 

기본적으로 flow {...} 빌더의 코드는 해당 Flow의 collector가 제공하는 context에서 실행된다. 다음 예에서 호출 스레드를 출력하고 3개의 수를 방출하는 foo() 함수를 보자

 

fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    foo().collect { value -> log("Collected $value") } 
}    


# outputs
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

 

foo().collect가 메인 스레드에서 호출되므로, foo의 flow 본문은 메인 스레드에서 호출된다. 이것은 실행 컨텍스트를 신경쓰지 않고 호출자를 블럭하지 않는 빠르면서 비동기적인 가장 기본적인 방법이다.

 

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

 

withContext의 잘못된 방출 - Wrong emission withContext

하지만, 오래걸리는 CPU를 소모하는 작업들은 Dispatchers.Default context에서 실행되야 하고, UI를 업데이트 하는 코드들은 Dispatchers.Main context에서 실행 해야 된다. 보통 Kotlin coroutines에서 context 스위칭을 할때 withContext를 사용한다. 하지만 flow {...} 빌더 속 코드는 컨텍스트 보존(context preservation) 특성을 지켜야 하기 때문에 다른 context에서 방출하는것을 허용하지 않는다.

 

fun foo(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> println(value) } 
}   


# outputs
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
        but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at ...


flowOn 연산자 - flowOn operator

위 예외에서 flow 방출 컨텍스트를 변경할 수 있는 flowOn 연산자를 알려주고 있다. 아래 예제에서 flow의 올바른 context 변경을 보여주고, 스레드 이름을 표시해 스레드가 어떻게 동작하는지 보여준다.

 

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        log("Collected $value") 
    } 
}     


# outputs
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

 

메인 스레드에서 콜렉션이 일어날 동안 백그라운드 스레드에서 flow {...} 가 어떻게 작업하는지 확인해보자

 

flowOn 연산자에서 또 지켜봐야할 것은 flow의 기본 순차적인 특성을 변경했다는 것이다. 컬렉션은 하나의 코루틴("coroutine#1")에서 일어나고 동시에 방출은 다른 쓰레드에서  실행되는 다른 코루틴("coroutine#2")에서 발생한다.

 

context에서 CoroutineDispatcher를 변경할 경우 flowOn은 upstream을 위해 다른 코루틴을 생성한다. 

 

버퍼링 - Buffering

Flow를 다른 코루티에서 실행하는 것은 해당 Flow를 Collection하는데 걸리는 전체 시간의 관점으로 보면 도움이 될 수 있다. 특히, 오래걸리는 비동기 연산이면 더욱 그렇다.

 

예를들어 요소를 생산하는데 100ms 걸리는 느린 food() flow가 있고 컬렉터 또한 처리하는데 300ms가 걸린다고 고려하고 얼마나 시간일 걸리는지 고려해보자.

 

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}


# outputs 
1
2
3
Collected in 1220 ms

 

flow에서 buffer연산자를 사용하여 순차적인 실행 대신 collect와 동시에 foo()의 방출 코드를 실행할 수 있다.

 

val time = measureTimeMillis {
    foo()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")


# outputs
1
2
3
Collected in 1071 ms

 

파이프라인을 효과적으로 만들었기 때문에 동일한 숫자를 훨씬 빠르게 방출한다. 첫 번째 숫자를 방출하기 위해 100ms만 기다린 후 다음 각 숫자를 처리하는데 300ms만 소비하면 된다.

flowOn 연산자는 CoroutineDispatcher를 변경해야 할 때 buffering과 동일한 매커니즘을 사용한다.
하지만 여기서는 실행 컨텍스트를 변경하지 않고 명시적으로 buffering을 요청한다.

 

병합 - Conflation

Flow가 작업 또는 작업 업데이트 상태의 일부 결과를 나타내는 경우 방출하는 각각의 값을 처리하는 것은 불필요 하며, 대신에 가장 최신의 값 만 처리하는 것이 필요할 것이다. 이 경우에 처리가 너무 느릴 경우 conflate 연산자를 이용해 방출 된 중간 값들을 건너 뛸 수 있다.

 

val time = measureTimeMillis {
    foo()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")


# outputs
1
3
Collected in 758 ms

 

첫 번째 번호가 아직 처리중이고 세 번째 번호는 이미 생산되었으니 conflate에 의해 두 번째 번호는 스킵 되었고 가장 최근 번호만 collector에게 전달 된다.

'Coroutines' 카테고리의 다른 글

Coroutines Flow #1  (0) 2020.07.12