[TIL] 코틀린 코루틴에서 어떻게 지연 - 재개가 이루어지는가?
이 문서는 Kt.Academy를 보고 배운 내용을 작성하였습니다. suspendCoroutine
을 이해하고 코틀린에서 어떤 방식으로 지연 - 재개가 이루어지는 살펴봅니다.
suspendCoroutine 특징 살펴보기
아래와 같이 코루틴 코드를 작성하면 순차적으로 실행이 되면서 결과가 나옵니다.
fun main() = runBlocking<Unit> {
launch {
println("1")
println("2")
println("3")
println("4")
}
}
// 결과 1, 2, 3, 4
중간에 suspendCoroutine을 넣으면 supendCoroutine을 선언한 부분에서 실행이 멈추게 됩니다.
fun main() = runBlocking<Unit> {
launch {
println("before")
suspendCoroutine<Unit> { }
println("after")
}
}
// before 까지 출력되고 멈춤
suspendCoroutine 함수는 아래와 같이 정의되어 있습니다.
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
}
인라인 함수이기 때문에 컴파일 시점에 블록이 해제될 것이고 suspendCoroutineUninterceptedOrReturn
함수를 리턴할 것입니다. 여기서 suspendCoroutineUninterceptedOrReturn
함수도 인라인 함수이기 때문에 안의 블록이 해제될 것입니다.
결과적으로 c: Continuation<T>
는 현재 호출된 코루틴의 continuation
이 될 것이고 현재 Continuation
상태를 가지고 SafeContinuation
이라는 새로운 Continutation을 만들 것입니다.
길게 이야기했지만 결국 현재 Continuation
자체를 suspendCoroutine
의 람다식 파라미터로 사용할 수 있게 됩니다.
그래서 아래와 같은 코드를 수행해보면 true가 프린트 되는 것을 확인할 수 있습니다.
fun main() = runBlocking<Unit> {
launch {
println("before")
suspendCoroutine<Unit> {
println(it.context == this.coroutineContext)
}
println("after")
}
}
// before, true
여기서 함수가 다시 재개할 수 있게 하려면 다음과 같이 작성하면 됩니다.
fun main() = runBlocking<Unit> {
launch {
println("before")
suspendCoroutine<Unit> {
println("progress~")
it.resume(Unit)
}
println("after")
}
}
// before, progress~, after
suspendCoroutine 응용
suspendCoroutine을 사용하여 delay
함수를 사용하지 않고 딜레이를 주는 함수를 작성해봅시다.
일단 무작정 Thread.sleep을 선언해보겠습니다.
suspend fun main() {
println("Before")
Thread.sleep(1_000L)
println("After")
}
그러면 다음과 같은 경고를 줍니다.
Possibly blocking call in non-blocking context could lead to thread starvation
Non-blocking
블럭에서 sleep
와 같은 Blocking
함수를 호출하는 것은 쓰레드를 무작정 멈추기 때문에 CPU를 쓸 수 없는 상태를 만들 수도 있다는 의미입니다.
코드를 다음과 같이 바꿔보도록 하겠습니다.
suspend fun main() {
println("Before")
suspendCoroutine<Unit> { continuation ->
thread {
println("Suspended")
sleep(1_000L)
continuation.resume(Unit)
println("Resumed")
}
}
println("After")
}
위와 같이 작성하면 suspendCoroutine
이 코루틴을 멈추게 하기 때문에 아까와 같은 경고창은 발생하지 않을 겁니다.
코드를 정리해서 다음과 같이 작성해봅시다.
fun invokeAfterSecond(operation: () -> Unit) {
thread {
Thread.sleep(1000)
operation.invoke()
}
}
suspend fun main() {
println("Before")
suspendCoroutine<Unit> { continuation ->
invokeAfterSecond {
continuation.resume(Unit)
}
}
println("After")
}
이 함수는 잘 동작하지만 1초 동안 비활성 상태인 스레드를 생성하기 때문에 비용이 매우 비쌉니다. 자바의 ScheduledExecutorService를 활용해서 개선해보겠습니다.
private val executor =
Executors.newSingleThreadScheduledExecutor {
Thread(it, "scheduler").apply { isDaemon = true }
}
suspend fun main() {
println("Before")
suspendCoroutine<Unit> { continuation ->
executor.schedule({
continuation.resume(Unit)
}, 1000, TimeUnit.MILLISECONDS)
}
println("After")
}
newSingleThreadScheduledExecutor
는 하나의 싱글 스레드만 사용하면서 여러개의 딜레이를 줄 수 있는 함수입니다.
그리고 여기서, 다시 suspendCoroutine
을 함수로 추출해보겠습니다.
private val executor =
Executors.newSingleThreadScheduledExecutor {
Thread(it, "scheduler").apply { isDaemon = true }
}
private suspend fun delay(time: Long) {
suspendCoroutine<Unit> { continuation ->
executor.schedule({
continuation.resume(Unit)
}, time, TimeUnit.MILLISECONDS)
}
}
suspend fun main() {
println("Before")
delay(1000L)
println("After")
}
그러면 우리가 아는 delay
함수를 만들 수 있습니다. 여전히 스레드를 생성하지만 모든 코루틴에 대해서 하나의 스레드만 사용합니다. Thread.sleep
을 선언할 때마다 스레드를 차단하는 것보다는 훨씬 낫습니다.
이 방법이 실제 delay
함수를 구현하는 핵심 아이디어입니다.
값 반환
suspendCoroutine
함수를 살펴보면 suspendCoroutine<T>
의 와 resume(T)
의 타입이 같을 것을 확인할 수 있습니다. 이는 resume
으로 넣은 값이 suspendCoroutine
의 반환값이 되기 때문입니다. 다음 예제 코드들을 봅시다.
fun main() = runBlocking<Unit> {
val i: Int = suspendCoroutine<Int> { cont ->
cont.resume(42)
}
println(i) // 42
val str: String = suspendCoroutine<String> { cont ->
cont.resume("Some text")
}
println(str) // Some text
val b: Boolean = suspendCoroutine<Boolean> { cont ->
cont.resume(true)
}
println(b) // true
}
resume
으로 넣은 파라미터 값들이 suspendCoroutine
의 반환 값으로 나오는 것을 확인할 수 있습니다.
이런 특징은 다른 서버에 무언가 요청을 했을 때 활용할 수 있습니다. 다음과 같이 다른 서버로 요청을 했다고 가정해봅시다.
fun main() = runBlocking {
println("request start ")
val result = suspendCoroutine<String> { continuation ->
request { str ->
continuation.resume(str)
}
}
println("result : $result")
println("request end ")
}
inline fun request(block: (String) -> Unit) {
block.invoke("result")
}
request
함수로 람다식을 넘기는데 continuation.resume
을 . 이렇게 되면 request
함수에서 요청을 보냈을 때 결과 값을 block.invoke()
파라미터로 넘겨주기만 한다면 suspendCoroutine
의 반환 값으로 요청에 대한 결과 값을 받을 수 있다.
코드의 설명을 여기서 마치고 중요한 점은 코루틴으로 구성되기 때문에 요청 - 응답
이 긴 경우에는 해당 스레드의 코루틴 작업이 쉬는 것이 스레드 자체가 쉬는 것
은 아닙니다. 해당 스레드는 다른 코루틴 Job
을 수행할 수 있기 때문입니다.
예외가 있는 resume
우리가 호출하는 대부분의 함수는 예외를 반환할 수 있습니다. suspendCoroutine
도 마찬가지입니다.
아래 예제를 살펴보겠습니다.
fun main() = runBlocking {
try {
suspendCoroutine<Unit> { cont ->
cont.resumeWithException(IllegalStateException())
}
} catch (e: IllegalStateException) {
println("Caught!")
}
}
resumeWithException
을 통해서 예외를 전달할 수 있고 그 밖에 try-catch 문을 통해서 예외를 잡을 수도 있습니다.
함수를 지연하는 것이 아니라 코루틴을 지연하는 것이다.
아래의 예제를 보자.
// Do not do this
var continuation: Continuation<Unit>? = null
suspend fun suspendAndSetContinuation() {
suspendCoroutine<Unit> { cont ->
continuation = cont
}
}
suspend fun main() {
println("Before")
suspendAndSetContinuation()
continuation?.resume(Unit)
println("After")
}
// Before
위 예제에서는 suspendAndSetContinuation
함수에서 continuation
라는 전역 변수를 초기화합니다.. 그리고 suspendAndSetContinuation
함수 호출 후에 resume
을 하는데 이렇게 하면 아무런 의미가 없습니다. 왜냐하면 suspendCroutine
은 함수를 지연하는 것이 아니라 코루틴을 지연하는 것이 때문에 코루틴 자체가 지연되서 다른 코루틴에서 continuation
을 resume
하지 않으면 의미가 없습니다.
아래 예제는 다른 코루틴에서 함수를 재개하는 것입니다.
// Do not do this, potential memory leak
var continuation: Continuation<Unit>? = null
suspend fun suspendAndSetContinuation() {
suspendCoroutine<Unit> { cont ->
continuation = cont
}
}
suspend fun main() = coroutineScope {
println("Before")
launch {
delay(1000)
continuation?.resume(Unit)
}
suspendAndSetContinuation()
println("After")
}
// Before
// (1 second delay)
// After
다른 코루틴에서 재개를 하면 지연되 코루틴을 계속 진행할 수 있습니다.
참고 자료
https://le0nidas.gr/2021/03/28/use-suspendcoroutine-to-connect-callbacks-and-coroutines/