Kotlin

[TIL] 코틀린 코루틴에서 어떻게 지연 - 재개가 이루어지는가?

안덕기 2022. 2. 12. 01:24

이 문서는 Kt.Academy를 보고 배운 내용을 작성하였습니다. suspendCoroutine을 이해하고 코틀린에서 어떤 방식으로 지연 - 재개가 이루어지는 살펴봅니다.

suspendCoroutine 특징 살펴보기

아래와 같이 코루틴 코드를 작성하면 순차적으로 실행이 되면서 결과가 나옵니다.

fun main() = runBlocking<Unit> {
    launch {
        println("1")
        println("2")
        println("3")
        println("4")
    }
}

// 결과 1, 2, 3, 4

중간에 suspendCoroutine을 넣으면 supendCoroutine을 선언한 부분에서 실행이 멈추게 됩니다.

fun main() = runBlocking<Unit> {
    launch {
        println("before")
        suspendCoroutine<Unit> {  }
        println("after")
    }
}

// before 까지 출력되고 멈춤

suspendCoroutine 함수는 아래와 같이 정의되어 있습니다.

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

인라인 함수이기 때문에 컴파일 시점에 블록이 해제될 것이고 suspendCoroutineUninterceptedOrReturn 함수를 리턴할 것입니다. 여기서 suspendCoroutineUninterceptedOrReturn 함수도 인라인 함수이기 때문에 안의 블록이 해제될 것입니다.

결과적으로 c: Continuation<T> 는 현재 호출된 코루틴의 continuation이 될 것이고 현재 Continuation 상태를 가지고 SafeContinuation 이라는 새로운 Continutation을 만들 것입니다.

길게 이야기했지만 결국 현재 Continuation 자체를 suspendCoroutine 의 람다식 파라미터로 사용할 수 있게 됩니다.

그래서 아래와 같은 코드를 수행해보면 true가 프린트 되는 것을 확인할 수 있습니다.

fun main() = runBlocking<Unit> {
    launch {
        println("before")
        suspendCoroutine<Unit> {
            println(it.context == this.coroutineContext)
        }
        println("after")
    }
}

// before, true

여기서 함수가 다시 재개할 수 있게 하려면 다음과 같이 작성하면 됩니다.

fun main() = runBlocking<Unit> {
    launch {
        println("before")
        suspendCoroutine<Unit> {
            println("progress~")
            it.resume(Unit)
        }
        println("after")
    }
}

// before, progress~, after

suspendCoroutine 응용

suspendCoroutine을 사용하여 delay 함수를 사용하지 않고 딜레이를 주는 함수를 작성해봅시다.

일단 무작정 Thread.sleep을 선언해보겠습니다.

suspend fun main() {
    println("Before")
    Thread.sleep(1_000L)
    println("After")
}

그러면 다음과 같은 경고를 줍니다.

Possibly blocking call in non-blocking context could lead to thread starvation

Non-blocking 블럭에서 sleep와 같은 Blocking 함수를 호출하는 것은 쓰레드를 무작정 멈추기 때문에 CPU를 쓸 수 없는 상태를 만들 수도 있다는 의미입니다.

코드를 다음과 같이 바꿔보도록 하겠습니다.

suspend fun main() {
    println("Before")
    suspendCoroutine<Unit> { continuation ->
        thread {
            println("Suspended")
            sleep(1_000L)
            continuation.resume(Unit)
            println("Resumed")
        }
    }
    println("After")
}

위와 같이 작성하면 suspendCoroutine 이 코루틴을 멈추게 하기 때문에 아까와 같은 경고창은 발생하지 않을 겁니다.

코드를 정리해서 다음과 같이 작성해봅시다.

fun invokeAfterSecond(operation: () -> Unit) {
    thread {
        Thread.sleep(1000)
        operation.invoke()
    }
}

suspend fun main() {
    println("Before")

    suspendCoroutine<Unit> { continuation ->
        invokeAfterSecond {
            continuation.resume(Unit)
        }
    }

    println("After")
}

이 함수는 잘 동작하지만 1초 동안 비활성 상태인 스레드를 생성하기 때문에 비용이 매우 비쌉니다. 자바의 ScheduledExecutorService를 활용해서 개선해보겠습니다.

private val executor =
    Executors.newSingleThreadScheduledExecutor {
        Thread(it, "scheduler").apply { isDaemon = true }
    }

suspend fun main() {
    println("Before")

    suspendCoroutine<Unit> { continuation ->
        executor.schedule({
            continuation.resume(Unit)
        }, 1000, TimeUnit.MILLISECONDS)
    }

    println("After")
}

newSingleThreadScheduledExecutor 는 하나의 싱글 스레드만 사용하면서 여러개의 딜레이를 줄 수 있는 함수입니다.

그리고 여기서, 다시 suspendCoroutine 을 함수로 추출해보겠습니다.

private val executor =
    Executors.newSingleThreadScheduledExecutor {
        Thread(it, "scheduler").apply { isDaemon = true }
    }

private suspend fun delay(time: Long) {
    suspendCoroutine<Unit> { continuation ->
        executor.schedule({
            continuation.resume(Unit)
        }, time, TimeUnit.MILLISECONDS)
    }
}

suspend fun main() {
    println("Before")
    delay(1000L)
    println("After")
}

그러면 우리가 아는 delay 함수를 만들 수 있습니다. 여전히 스레드를 생성하지만 모든 코루틴에 대해서 하나의 스레드만 사용합니다. Thread.sleep 을 선언할 때마다 스레드를 차단하는 것보다는 훨씬 낫습니다.

이 방법이 실제 delay 함수를 구현하는 핵심 아이디어입니다.

값 반환

suspendCoroutine 함수를 살펴보면 suspendCoroutine<T>의 와 resume(T)의 타입이 같을 것을 확인할 수 있습니다. 이는 resume으로 넣은 값이 suspendCoroutine의 반환값이 되기 때문입니다. 다음 예제 코드들을 봅시다.

fun main() = runBlocking<Unit> {
    val i: Int = suspendCoroutine<Int> { cont ->
        cont.resume(42)
    }
    println(i) // 42

    val str: String = suspendCoroutine<String> { cont ->
        cont.resume("Some text")
    }
    println(str) // Some text

    val b: Boolean = suspendCoroutine<Boolean> { cont ->
        cont.resume(true)
    }
    println(b) // true
}

resume으로 넣은 파라미터 값들이 suspendCoroutine의 반환 값으로 나오는 것을 확인할 수 있습니다.

이런 특징은 다른 서버에 무언가 요청을 했을 때 활용할 수 있습니다. 다음과 같이 다른 서버로 요청을 했다고 가정해봅시다.

fun main() = runBlocking {
    println("request start ")
    val result = suspendCoroutine<String> { continuation ->
        request { str ->
            continuation.resume(str)
        }
    }
    println("result : $result")
    println("request end ")
}

inline fun request(block: (String) -> Unit) {
    block.invoke("result")
}

request 함수로 람다식을 넘기는데 continuation.resume을 . 이렇게 되면 request 함수에서 요청을 보냈을 때 결과 값을 block.invoke() 파라미터로 넘겨주기만 한다면 suspendCoroutine의 반환 값으로 요청에 대한 결과 값을 받을 수 있다.

코드의 설명을 여기서 마치고 중요한 점은 코루틴으로 구성되기 때문에 요청 - 응답이 긴 경우에는 해당 스레드의 코루틴 작업이 쉬는 것이 스레드 자체가 쉬는 것은 아닙니다. 해당 스레드는 다른 코루틴 Job을 수행할 수 있기 때문입니다.

예외가 있는 resume

우리가 호출하는 대부분의 함수는 예외를 반환할 수 있습니다. suspendCoroutine도 마찬가지입니다.

아래 예제를 살펴보겠습니다.

fun main() = runBlocking {
    try {
        suspendCoroutine<Unit> { cont ->
            cont.resumeWithException(IllegalStateException())
        }
    } catch (e: IllegalStateException) {
        println("Caught!")
    }
}

resumeWithException을 통해서 예외를 전달할 수 있고 그 밖에 try-catch 문을 통해서 예외를 잡을 수도 있습니다.

함수를 지연하는 것이 아니라 코루틴을 지연하는 것이다.

아래의 예제를 보자.

// Do not do this
var continuation: Continuation<Unit>? = null

suspend fun suspendAndSetContinuation() {
    suspendCoroutine<Unit> { cont ->
        continuation = cont
    }
}

suspend fun main() {
    println("Before")

    suspendAndSetContinuation()
    continuation?.resume(Unit)

    println("After")
}
// Before

위 예제에서는 suspendAndSetContinuation 함수에서 continuation라는 전역 변수를 초기화합니다.. 그리고 suspendAndSetContinuation 함수 호출 후에 resume을 하는데 이렇게 하면 아무런 의미가 없습니다. 왜냐하면 suspendCroutine은 함수를 지연하는 것이 아니라 코루틴을 지연하는 것이 때문에 코루틴 자체가 지연되서 다른 코루틴에서 continuationresume하지 않으면 의미가 없습니다.

아래 예제는 다른 코루틴에서 함수를 재개하는 것입니다.

// Do not do this, potential memory leak
var continuation: Continuation<Unit>? = null

suspend fun suspendAndSetContinuation() {
    suspendCoroutine<Unit> { cont ->
        continuation = cont
    }
}

suspend fun main() = coroutineScope {
    println("Before")

    launch {
        delay(1000)
        continuation?.resume(Unit)
    }

    suspendAndSetContinuation()
    println("After")
}
// Before
// (1 second delay)
// After

다른 코루틴에서 재개를 하면 지연되 코루틴을 계속 진행할 수 있습니다.

참고 자료

https://le0nidas.gr/2021/03/28/use-suspendcoroutine-to-connect-callbacks-and-coroutines/

https://kt.academy/article/cc-suspension