본문 바로가기

Coroutines

Coroutines Flow #1

 

일시 중단 함수는 비동기적으로 하나의 값만 리턴한다. 어떻게 하면  비동기적으로 계산된 여러 값을 리턴할 수 있을까?

 

# 다중값 표현 - Representing multiple values

Kotlin Collections를 이용하여 여러값을 나타낼 수 있다.

 

fun foo(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    foo().forEach { value -> println(value) } 
}


# outputs
1
2
3

 

# 시퀀스 - Sequences

CPU 연산이 요구되는 어떤 일들을 나타내고자 할 때 Sequences를 이용할 수 있다. (각 계산에 100ms 소요)

 

fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    foo().forEach { value -> println(value) } 
}


# outputs
1
2
3

 

이 코드는 이전과 같은 숫자를 출력하지만 각 숫자를 나타내기 전에 100ms를 기다린다.

 

 

# 중단 함수 - Suspending Functions

위 예제에서와 같은 코드는 코드를 실행하고 있는 메인 쓰레드를 정지시킨다. 이러한 연산들을 비동기 코드에서 실행될 때 food()에 suspend키워드를 붙여 함수를 중단 함수로 정의할 수 있다. 이 함수를 코루틴 스코프에서 소출하여 호출하는 쓰레드의 정지 없이 실행할 수 있고 결과를 리스트로 반환할 수 있도록 만둘 수 있다.

 

suspend fun foo(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}


# outputs
1
2
3

 

# Flows

List<Int>를 리턴 타입으로 사용하면 한 번에 결과를 반환할 수 있다. 비동기적으로 계산되는 스트림을 나타내기 위해 동기적인 계산에서 Sequence<Int>와 마찬가지로 Flow<Int>를 사용할 수 있다.

 

fun foo(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    foo().collect { value -> println(value) } 
}


# outputs
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

 

 위 코드는 메인 쓰레드를 차단하지 않고 숫자를 나타내기 전 100ms 동안 기다린다. 메인 스레드에서 실행되는 별도의 코루틴 스코프에서 100ms 마다 I'm not blocked 를 실행하는 것으로 확인할 수 있다.

 

이전 예제들과의 차이점

  • Flow를 사용하기 위해 flow builder 사용
  • flow {} 빌더 블럭내 코드는 일시 중단될 수 있다.
  • foo() 함수는 더이상 suspend를 사용하지 않음
  • emit 함수를 사용하여 결과값을 방출
  • collect 함수를 사용하여 flow로 부터 나온 값들을 수집한다.

flow {...} 본문에서 delay를 Thread.sleep로 바꾸면 메인 스레드가 blocked 되는 걸 볼 수 있다.

 

# Flows are cold

Flows는 Sequences와 비슷하게 차가운 스트림이다. flow builder 속 코드는 collect가 실행되기 전까지 동작하지 않는다.

 

fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}


# outputs
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

 

flow를 리턴하는 foo 함수가 suspend를 사용하지 않는 이유이다. 자체적으로 foo()는 빠르게 리턴하고, 아무것도 기다리지 않는다.

Flow는 Collect 될 때마다 시작되며, Collect를 다시 호출하면 "Flow started"를 표시한다.

 

# Flow Cancellation

Flow는 Coroutines의 일반적인 취소를 준수한다. 하지만 Flow 인프라스트럭쳐는 추가 취소지점을 도입하지 않는다. 취소에 있어 보다 명확함을 제공한다. 평소와 같이, Flow가 취소 가능한 일시 중단 기능(ex delay)으로 중단되면 Flow collect를 취소 할 수 있으며 그렇지 않으면 취소 할 수 없다.

 

다음 예제에서 withTimeoutOrNull 블록에서 실행될 때 제한시간에 Flow가 취소되고 코드 실행을 중지하는 방법을 보여준다.

 

fun foo(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        foo().collect { value -> println(value) } 
    }
    println("Done")
}


# outputs
Emitting 1
1
Emitting 2
2
Done

 

# Flow builders

이전 예제의 flow { ... } builder가 가장 기본적인 방법이다. 보다 쉬운 flow선언을 위한 다른 builder가 있다.

 

  • 고정된 값들을 방출하는 flowOf {} builder
  • .asFlow() 확장함수를 사용하여 다양한 컬렉션과 시퀀스를 Flow로 변환 가능
fun main() = runBlocking<Unit> {
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) }
}


# outpus
1
2
3

'Coroutines' 카테고리의 다른 글

Coroutines Flow #2  (0) 2020.07.16