Notice
Recent Posts
Recent Comments
Link
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
Archives
Today
Total
관리 메뉴

디지안의 개발일지

[실습] Kotlin + Armeria + gRPC 사용기 - gRPC 스트림편 본문

Kotlin

[실습] Kotlin + Armeria + gRPC 사용기 - gRPC 스트림편

안덕기 2021. 8. 13. 23:13

들어가기 전에

Kotlin + Armeria + gRPC 기본 예제에 대해서 궁금하다면 기본편을 확인하면 된다.

 

스트림은 왜 사용될까?

스트림이 정확하게 왜 필요한지는 잘 모른다. 다만, 경험적으로 다음과 같은 경우에 사용을 했다.

 

한순간에 많은 데이터를 전송하지 못하는 경우

인터넷에 동영상을 다운로드 한다고 생각해보자. 동영상의 경우 데이터가 너무 크기 때문에 파일을 한순간에 받을 수가 없다. 그렇기 때문에 일정 크기로 잘라서 여러번 다운로드 받으면 된다. 우리가 인터넷에서 다운로드를 할 때 이어 받기를 구현한다고 할 때 스트림을 이용해서 간단하게 구현할 수도 있을거라고 생각한다.

 

이미 만들어진 데이터를 전송하는 것이 아니라 만들어질 데이터에 대해서 전송하는 경우

예를 들어 채팅을 생각해보자. 이미 만들어진 데이터가 실시간으로 쌓이는 데이터를 전송할 때 연결을 유지한 상태에서 데이터만 오고 가는 것이 자원적으로 더 좋을 수도 있을 것이다.

 

gRPC 스트림 작성하기

먼저, gRPC-Kotlin에서 스트림은 flow라는 기술로 구현이 되어 있다. 이를 쓰기 위해서는 의존성을 추가해야한다. 의존성은 프로젝트 바로 아래에 있는 build.gradle.kts에 추가한다.

plugins {
    kotlin("jvm") version "1.5.10"
    id("com.google.protobuf") version "0.8.15"
}

allprojects {
    // 생략

    dependencies { 
        implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.1")
    }
}

protubuf 메시지에 rpc를 추가한다.

syntax = "proto3";

package me.dgahn.account.v1;

import "me/dgahn/account/v1/SignUpV1.proto";
import "me/dgahn/account/v1/GetProfileStreamV1.proto"; // 추가
import "google/protobuf/empty.proto"; // 추가

service AccountRouter {
  rpc signUpV1(SignUpRequestV1) returns (SignUpResponseV1) { }
  rpc getProfileStream(google.protobuf.Empty) returns (stream GetProfileStreamResponseV1) {} // 추가
}

rpc에 사용할 protobuf 메시지도 추가한다.

syntax = "proto3";

package me.dgahn.account.v1;

option java_multiple_files = true;
option java_outer_classname = "GetProfileStreamV1Proto";

message GetProfileStreamResponseV1 {
  bytes data = 1;
}

protubuf를 다시 빌드하고 AccountRouteService에 다음과 같이 추가한다.

class AccountRouteService: AccountRouterCoroutineImplBase() {

    // 생략

    override fun getProfileStream(request: Empty): Flow<GetProfileStreamResponseV1>  = try {
        AccountService.getProfileStream()
    } catch (e: Exception) {
        logger.error { e.stackTraceToString() }
        throw StatusException(Status.UNKNOWN.withDescription(e.stackTraceToString()))
    }

}

그리고 실제 로직에 대해서 작성한다. 로직은 간단하게 파일을 읽어서 100_000 Byte 읽어서 전달하도록 구성하였다. 실제로 돌아가는 것을 천천히 확인하기 위해서 Delay를 주었다.

private const val PIECE_SIZE = 100_000
private const val PIECE_MIN_SIZE = 0
private const val DELAY = 1_000L

object AccountService {
    fun signUp(account: Account) = account

    fun getProfileStream(): Flow<GetProfileStreamResponseV1> {
        val file = File(javaClass.classLoader.getResource("sample.jpg")!!.toURI())
        val fileSize = file.length()
        val fileInputStream = file.inputStream().buffered()
        return flow {
            getProfileStream(fileInputStream, fileSize.toInt())
        }
    }

    private suspend fun FlowCollector<GetProfileStreamResponseV1>.getProfileStream(
        fileInputStream: BufferedInputStream,
        fileSize: Int
    ) {
        var remain = fileSize
        fileInputStream.use {
            while (remain > PIECE_MIN_SIZE) {
                val readLength = if (remain < PIECE_SIZE) remain else PIECE_SIZE
                val readBytes = it.readNBytes(readLength)
                remain -= readLength
                emitProfile(readBytes)
                delay(DELAY)
            }
        }
    }

    private suspend fun FlowCollector<GetProfileStreamResponseV1>.emitProfile(
        readBytes: ByteArray
    ) = emit(
        GetProfileStreamResponseV1.newBuilder()
            .setData(ByteString.copyFrom(readBytes))
            .build()
    )

}

코드가 잘 동작하는지 확인하기 위해서 테스트 코드를 작성한다.

private val logger = KotlinLogging.logger {  }

class AccountRouteServiceTest : FunSpec({

    val service = AccountRouteService()

    afterEach {
        File("sample.jpg").delete()
    }

    test("프로필을 스트림으로 받을 수 있다") {
        val response = service.getProfileStream(Empty.getDefaultInstance())
        val savedFile = File("sample.jpg")
        val outBuffer = BufferedOutputStream(FileOutputStream(savedFile))
        outBuffer.use { stream ->
            response.collect {
                val data = it.data.toByteArray()
                logger.info { "전송 받은 데이터 크기 : ${data.size}" }
                stream.write(data)
            }
        }

        File(this.javaClass.classLoader.getResource("sample.jpg")!!.toURI()).length() shouldBe savedFile.length()
    }

})