Notice
Recent Posts
Recent Comments
Link
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Archives
Today
Total
관리 메뉴

디지안의 개발일지

[실습] Kotlin + Spring WebFlux + R2DBC - R2DBC 편 본문

Kotlin

[실습] Kotlin + Spring WebFlux + R2DBC - R2DBC 편

안덕기 2021. 8. 20. 01:21

R2DBC란?

현재 Java 진영에서 주로 사용하고 있는 데이터베이스 연결 드라이버는 JDBC다. JDBC는 완벽한 블록킹 API이므로 아무리 비동기식으로 서버를 구성하더라도 결국 JDBC에서 동기적으로 동작을 하게 된다. 그래서 나온게 R2DBC다. 공식 문서를 보면 다음과 같은 구절이 있다.

Part of the answer as to why R2DBC was created is the need for a non-blocking application stack to handle concurrency with a small number of threads and scale with fewer hardware resources. This need cannot be satisfied by reusing standardized relational database access APIs — namely JDBC –- as JDBC is a fully blocking API. Attempts to compensate for blocking behavior with a ThreadPool are of limited use.
- Spring Data R2DBC - Reference Documentation  중에-

적은 하드웨어 자원과 작은 스레드 수를 가지고 동시성을 동작시키기 위한 애플리케이션에 필요한 기술이라는 의미로 결과적으로 하드웨어를 최대한 활용하기 위한 기술이다. 아무래도 요즘 클라우드를 많이 사용하니 최소한의 클라우드 비용을 위해서는 논블록킹 기술이 각광 받고 있는 것으로 보인다.

 

CoroutineCrudRepository 기본 제공 함수 테스트

Spring Data R2DBC는 기본적으로 Java를 위한 프레임워크지만 Kotlin을 지원해준다. 적극적으로 지원해준다고 느껴지는 부분은 바로 이 CoroutineCrudRepository다. 

 

먼저, Enity를 선언해보자. Spring Data JPA와 다른 점은 @Entity를 별도로 선언하지 않아도 된다. @Table을 통해서 테이블 매핑하는 정도를 사용할 수 있다. 그리고 @Id를 통해서 PK가 무엇인지 선언할 수 있다.

@Table
data class Employee(
    @Id
    val id: String? = null,
    val name: String
) {
    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (javaClass != other?.javaClass) return false

        other as Employee

        if (id != other.id) return false
        if (name != other.name) return false

        return true
    }

    override fun hashCode(): Int = id.hashCode()

}

 

CoroutineCrudRepository는 다음과 같이 선언하면 된다.

@Repository
interface EmployeeCrudRepository : CoroutineCrudRepository<Employee, String>

 

그리고 테스트 코드를 작성해보자.

@ContextConfiguration(classes = [EmployeeCrudRepository::class, R2bcConfiguration::class])
class EmployeeRepositoryTest(employeeRepo: EmployeeCrudRepository) : FunSpec({

    extension(SpringExtension)

    test("직원을 저장할 수 있다.") {
        val employee = Employee(name = "test")
        val saved = employeeRepo.save(employee)

        saved.name shouldBe employee.name
    }

    test("직원을 조회할 수 있다.") {
        val employee = Employee(name = "test")
        val saved = employeeRepo.save(employee)

        val find = employeeRepo.findById(saved.id!!)
        find shouldBe saved
    }

    test("직원을 업데이트할 수 있다.") {
        val employee = Employee(name = "test")
        val saved = employeeRepo.save(employee)

        val find1 = employeeRepo.findById(saved.id!!)
        val newName = "test1"
        employeeRepo.save(find1!!.copy(name = newName))
        val find2 = employeeRepo.findById(saved.id!!)

        find2!!.name shouldBe newName
        find1 shouldNotBe find2
    }

})

 

첫번째 테스트는 간단하게 Employee를 저장할 수 있는지 확인하는 코드다. 주의 해야할 점은 CoroutineCrudRepository에서 제공하는 save()함수는 반드시 id가 null인 경우에만 새로운 Entity로 인식한다. 애플리케이션에서 새 Entity에서 ID를 부여하고 싶은 경우에는 isNew() 함수를 오버라이딩 해야한다. (관련 이슈)

test("직원을 저장할 수 있다.") {
  val employee = Employee(name = "test")
  val saved = employeeRepo.save(employee)

  saved.name shouldBe employee.name
}

 

두번째 테스트는 간단하게 조회에 대한 테스트를 작성했다.

test("직원을 조회할 수 있다.") {
  val employee = Employee(name = "test")
  val saved = employeeRepo.save(employee)

  val find = employeeRepo.findById(saved.id!!)
  find shouldBe saved
}

 

세번째 테스트는 Entity를 업데이트 해보았다. 다음과 같이 처음에 꺼낸 객체와 업데이트 이후에 꺼낸 객체가 다른 것을 확인할 수 있다. 

test("직원을 업데이트할 수 있다.") {
  val employee = Employee(name = "test")
  val saved = employeeRepo.save(employee)

  val find1 = employeeRepo.findById(saved.id!!)
  val newName = "test1"
  employeeRepo.save(find1!!.copy(name = newName))
  val find2 = employeeRepo.findById(saved.id!!)

  find2!!.name shouldBe newName
  find1 shouldNotBe find2
}

CoroutineCrudRepository 사용자 정의 함수

CoroutineCrudRepository도 JpaRepository 처럼 자동으로 쿼리를 생성해주는 함수를 제공한다.

@Repository
interface EmployeeCrudRepository : CoroutineCrudRepository<Employee, String> {

    suspend fun findByName(name: String): Employee
    
    fun findAllByNameStartsWith(name: String): Flow<Employee>
    
}

테스트 코드는 다음과 같다

test("test로 시작하는 직원 목록을 조회할 수 있다.") {
  val name = "test"
  employeeRepo.saveAll((1..100).map { Employee(name = "test_$it") }).collect()

  val find = employeeRepo.findAllByNameStartsWith(name)
  find.toList() shouldHaveSize 100
  find.collect { employee -> employee.name.startsWith("test") }
}

test("직원 목록을 조회할 수 있다.") {
  employeeRepo.saveAll((1..1000).map { Employee(name = "test_$it") }).collect()

  val find = employeeRepo.findAll()
  find.collectIndexed { index, value ->
    println(value.name)
    value.name shouldBe "test_${index + 1}"
  }
}

비동기로 수행했을 때 뭐가 좋은데?

flow를 통해서 콜백으로 함수를 전달하면 데이터베이스로부터 응답이 필요한 시점에 다른 행위를 할 수 있다. 다음 테스트 코드를 보자. 100개의 employee를 저장하면서 employee의 이름을 출력하도록 코드를 작성하였다.

test("test로 저장하면서 다른 일을 할 수 있다.") {
  employeeRepo.saveAll((1..100).map { Employee(name = "test_$it") })
    .collect { println(it.name) }

  val find = employeeRepo.findAll()
  find.toList() shouldHaveSize 100
}

100개를 넣다가 64개를 넣고 출력하는 모습을 볼 수 있다. 지금은 그냥 이름만 출력하는 내용이지만 함수에 어떤 내용을 넣냐에 따라 다른 작업을 진행할 수 있다.

// ...생략
00:08:41.741 [pool-2-thread-1] DEBUG org.springframework.r2dbc.core.DefaultDatabaseClient - Executing SQL statement [INSERT INTO employee (name) VALUES ($1)]
00:08:41.741 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Request:  INSERT INTO employee (name) VALUES ($1) {1: 'test_64'}
00:08:41.743 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Request:  CALL H2VERSION()
00:08:41.743 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Response: org.h2.result.LocalResultImpl@207abdc0 columns: 1 rows: 1 pos: -1
00:08:41.743 [pool-2-thread-1] DEBUG org.springframework.r2dbc.core.DefaultDatabaseClient - Executing SQL statement [INSERT INTO employee (name) VALUES ($1)]
00:08:41.743 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Request:  INSERT INTO employee (name) VALUES ($1) {1: 'test_65'}
test_1
test_2
test_3
test_4
test_5
test_6
test_7
test_8
test_9
test_10
test_11
test_12
test_13
test_14
test_15
test_16
test_17
test_18
test_19
test_20
test_21
test_22
test_23
test_24
test_25
test_26
test_27
test_28
test_29
test_30
test_31
test_32
test_33
test_34
test_35
test_36
test_37
test_38
test_39
test_40
test_41
test_42
test_43
test_44
test_45
test_46
test_47
test_48
test_49
test_50
test_51
test_52
test_53
test_54
test_55
test_56
test_57
test_58
test_59
test_60
test_61
test_62
test_63
test_64
00:08:41.748 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Request:  CALL H2VERSION()
00:08:41.748 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Response: org.h2.result.LocalResultImpl@38832b32 columns: 1 rows: 1 pos: -1
00:08:41.748 [pool-2-thread-1] DEBUG org.springframework.r2dbc.core.DefaultDatabaseClient - Executing SQL statement [INSERT INTO employee (name) VALUES ($1)]
00:08:41.748 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Request:  INSERT INTO employee (name) VALUES ($1) {1: 'test_66'}
00:08:41.752 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Request:  CALL H2VERSION()
00:08:41.752 [pool-2-thread-1] DEBUG io.r2dbc.h2.client.SessionClient - Response: org.h2.result.LocalResultImpl@6499adb0 columns: 1 rows: 1 pos: -1
// ...생략

NativeQuery 작성하기

R2DBC는 org.springframework.r2dbc.core.DatabaseClient를 통해서 NativeQuery를 작성할 수 있는 기능을 제공한다. 쿼리는 다음과 같이 작성할 수 있다. 특징으로는 Java의 SAM으로 반환되는 값을 매핑 시켜야 한다는 것이다.

@Repository
class EmployeeQueryRepository(private val client: DatabaseClient) {

    suspend fun findById(id: String): Employee? = client
        .sql("SELECT * FROM employee WHERE id = $1")
        .bind(0, id)
        .map(mapper())
        .one()
        .awaitFirstOrNull()

    suspend fun save(employee: Employee) = this.client.sql("INSERT INTO  employee (name) VALUES (:name)")
        .filter { statement, _ -> statement.returnGeneratedValues("id").execute() }
        .bind("name", employee.name)
        .fetch()
        .first()
        .map { r -> r["id"] as String }
        .awaitFirstOrNull()

    fun saveAll(data: List<Employee>): Flow<String> = client.inConnectionMany { connection ->
        val statement =
            connection.createStatement("INSERT INTO  employee (name) VALUES ($1)")
                .returnGeneratedValues("id")
        for (p in data) {
            statement.bind(0, p.name).add()
        }
        Flux.from(statement.execute())
            .flatMap { r -> r.map { row, _ -> row.get("id", String::class.java) } }
    }.asFlow()

    fun findAll() = client
        .sql("SELECT * FROM employee")
        .filter { statement, _ -> statement.fetchSize(10).execute() }
        .map(mapper())
        .flow()

    private fun mapper(): BiFunction<Row, RowMetadata, Employee> = BiFunction { row, _ ->
        Employee(
            id = row["id", String::class.java],
            name = row["name", String::class.java]
        )
    }

}

@Query와 @Modifying

CoroutineCrudRepository에서 @Query를 통해서도 Native Query를 작성할 수 있다. 특히, INSERT, UPDATE, DELETE를 할 때 @Query와 @Modifying을 사용하면 영향 받은 컬럼의 수를 반환 받을 수 있다.

@Repository
interface EmployeeCrudRepository : CoroutineCrudRepository<Employee, String> {

    @Modifying
    @Query("UPDATE employee SET name = :name")
    suspend fun updateName(name: String): Int

}

테스트 코드는 다음과 같다.

test("직원을 업데이트 할 수 있다.") {
  val employee = Employee(name = "test")
  employeeRepo.save(employee)

  val newName = "new_test"
  val updateResult = employeeRepo.updateName(newName)
  val actual = employeeRepo.findAll().toList().first()
  updateResult shouldBe 1
  actual.name shouldBe newName
}

 

마무리

Spring Webflux에서는 Mono와 Flux 타입을 많이 쓴다. Kotlin으로 작성한만큼 최대한 Kotlin의 Coroutine을 활용하기 위해 노력하였다. 더 좋은 예제가 있다면 좋을거 같은데 찾지 못한 아쉬움이 남는다.