관리 메뉴

개발그래머

코루틴을 사용하여 조회 성능 최적화 본문

Kotlin

코루틴을 사용하여 조회 성능 최적화

임요환 2023. 12. 10. 18:57

상황

개발을 하면서 여러 DB 데이터, 외부 데이터를 조회를 하고 이 데이터들을 변형한 후 해당 다른 데이터에 상태를 변경해야 하는 작업 등을 할 때가 있었다.
예를 들어

  1. 업체 정보와 유저 정보를 조회한 후
  2. 하나로 합친 후
  3. 이 데이터를 조회한 유저의 상태를 변경

하는 작업을 해야 하는 경우가 있을 경우 이를 어떻게 해야 최적화를 할 수 있을지 고민해 보았다.
문득 코루틴을 사용하여 최적화하면 어떨까라는 생각을 하였고 코드를 작성하였다.

기본 코드 작성

@Service
class UserService(
    private val userRepository: UserRepository,
    private val userReader: UserReader,
) {
    private val logger: Logger = LoggerFactory.getLogger(UserReader::class.java)

    @Transactional
    fun getCustomers(name: String): List<Customer> {
        logger.info("start 시작")
        // 데이터 조회
        val user = userReader.getUser(name)
        val users = userReader.getUsers()
        val companies = companyReader.getCompanies()

        // users와 companies를 합쳐 customers를 만듬
        val customers: List<Customer> = users.map { Customer(it.name, it.phone) }
            .plus(companies.map { Customer(it.name, it.phone) })


        // user에 상태를 바꿈
        user.apply {
            this.isDone = true
        }

        return customers
    }
}

@Component
class UserReader(
    private val userRepository: UserRepository
) {
    private val logger: Logger = LoggerFactory.getLogger(UserReader::class.java)

    fun getUser(name: String): User {
        val time = Random.nextLong(1, 10) * 1000
        Thread.sleep(time)
        logger.info("{}ms 호출 현재 시간", time)
        return userRepository.findByName(name) ?: throw IllegalArgumentException()
    }

    fun getUsers(): MutableList<User> {
        val time = Random.nextLong(1, 10) * 1000
        Thread.sleep(time)
        logger.info("{}ms 호출 현재 시간", time)
        return userRepository.findAll()
    }
}

@Component
class CompanyReader(
    private val companyRepository: CompanyRepository
) {
    private val logger: Logger = LoggerFactory.getLogger(CompanyReader::class.java)

    fun getCompanies(): MutableList<Company> {
        val time = Random.nextLong(1, 10) * 1000
        Thread.sleep(time)
        logger.info("{}ms getCompanies 호출 현재 시간", time)
        return companyRepository.findAll()
    }
}

일반적으로는 동기적으로 코드를 작성하게 될 것이다.
동기적으로 작성해도 크게 문제가 되지 않지만 데이터를 조회할 때 외부 호출을 한다던가 조회 쿼리가 느린 경우 시간이 오래 걸리게 된다.
getUser()를 조회하는데 2초
getUsers()를 조회하는데 3초
getCompanies()를 조회하는 데 5초가 걸리면 시간은 총 2 + 3 + 5가 수행되어 10초가 걸리게 된다.
이 조회 성능을 향상시키기 위해 코루틴을 도입해 보았다.

코루틴을 간략하게 설명하자면

- 일종의 경량 스레드
- 루틴 → 메인 루틴(자바의 메인함수), 서브루틴(메인함수에서 호출되는 일반 함수)
- 협력형 멀티 태스킹을 위한 서브 루틴의 모음
- 서브 루틴을 이용해 논블락킹을 구현함
- 코루틴 블록내에서 suspend가 붙은 서브루틴을 만나는 경우 해당 서브루틴 로직을 수행하는 동안 쓰레드를 점유하지 않고 코루틴 블록을 호출한곳과 쓰레드를 공유하면서 동시성 프로그래밍을 가능하게 함
- 논블락킹 = 쓰레드와 같은 리소스를 점유하지 않는다

코루틴을 사용하여 조회 성능 향상

@Service
class UserService(
    private val userRepository: UserRepository,
    private val userReader: UserReader,
    private val companyReader: CompanyReader
) {
    private val logger: Logger = LoggerFactory.getLogger(UserReader::class.java)

    @Transactional
    fun getCustomers(name: String) = runBlocking {
        logger.info("start 시작")
        withContext(Executors.newFixedThreadPool(3).asCoroutineDispatcher()) {
            val user = async { userReader.getUser(name) }
            val users = async { userReader.getUsers() }
            val companies = async { companyReader.getCompanies() }

            // users와 companies를 합쳐 customers를 만듬
            val customers: List<Customer> = users.await().map { Customer(it.name, it.phone) }
                .plus(companies.await().map { Customer(it.name, it.phone) })

            // user에 상태를 바꿈
            userRepository.save(
                user.await().apply {
                    this.isDone = true
                }
            )

            customers
        }
    }
}

코루틴을 도입하여 코드를 수정해 보았다.
runBlocking을 사용하여 coroutine 관련 동작이 모두 수행한 후 종료할 수 있도록 하였다.
runBlocking을 작성하지 않으면 suspend 키워드가 외부로 나가게 되며 getCustomers를 호출하는 모든 곳에 suspend를 작성해야 한다.
async 키워드를 사용하여 조회들이 병렬로 작업을 진행할 수 있도록 해주었으며 await 키워드를 사용하여 해당 조회가 완료되면 작업을 진행할 수 있도록 하였다.
이제
getUser()를 조회하는데 2초
getUsers()를 조회하는데 3초
getCompanies()를 조회하는 데 5초가 걸리면 시간은 총 5 + a 정도만 걸리게 된다.
이 방식은 변경감지를 이용하지 못하는 단점이 있다.
코루틴은 쓰레드 대신 코루틴에서 생성하는 경량 쓰레드 객체를 사용하는데 withContext(Executors.newFixedThreadPool(3).asCoroutineDispatcher())로 코루틴을 사용하게 되면 실제 쓰레드가 생성되고 거기에 코루틴 작업들을 실행하게 된다.
그렇게 때문에 명시적으로 save를 해주지 않으면 user 객체는 영속성 컨텍스트에 존재하지 않아 변경 감지가 동작하지 않는다.

@Service
class UserService(
    private val userRepository: UserRepository,
    private val userReader: UserReader,
    private val companyReader: CompanyReader
) {
    private val logger: Logger = LoggerFactory.getLogger(UserReader::class.java)

    @Transactional
    fun getCustomers(name: String) = runBlocking {
        logger.info("start 시작")
        val user = async { userReader.getUser(name) }
        val users = async { userReader.getUsers() }
        val companies = async { companyReader.getCompanies() }

        // users와 companies를 합쳐 customers를 만듬
        val customers: List<Customer> = users.await().map { Customer(it.name, it.phone) }
            .plus(companies.await().map { Customer(it.name, it.phone) })

        // user에 상태를 바꿈
        user.await().apply {
            this.isDone = true
        }

        customers
    }
}

@Component
class UserReader(
    private val userRepository: UserRepository
) {
    private val logger: Logger = LoggerFactory.getLogger(UserReader::class.java)

    suspend fun getUser(name: String): User {
        val time = Random.nextLong(1, 10) * 1000
        delay(time)
        logger.info("{}ms getAsyncUser 호출 현재 시간", time)
        return userRepository.findByName(name) ?: throw IllegalArgumentException("no name")
    }

    suspend fun getUsers(): MutableList<User> {
        val time = Random.nextLong(1, 10) * 1000
        delay(time)
        logger.info("{}ms getAsyncUsers 호출 현재 시간", time)
        return userRepository.findAll()
    }
}

@Component
class CompanyReader(
    private val companyRepository: CompanyRepository
) {
    private val logger: Logger = LoggerFactory.getLogger(CompanyReader::class.java)

    suspend fun getCompanies(): MutableList<Company> {
        val time = Random.nextLong(1, 10) * 1000
        delay(10)
        logger.info("{}ms getAsyncCompanies 호출 현재 시간", time)
        return companyRepository.findAll()
    }
}

각각 조회 함수를 suspend로 만들게 되면 jpa 변경 감지도 정상적으로 사용할 수 있게 된다.

개인적인 생각

각각의 방식들은 트레이드오프가 필요할 것 같다.
첫 번째 방식은 Service 코드가 복잡해지는 대신에 Reader의 로직은 따로 수정을 하지 않아도 되며 기존에 해당 메서드들을 사용하는 곳에서 그대로 사용하면 된다.
두 번째 방식은 Reader 쪽에 suspend 키워드를 사용하게 되며 이 키워드를 처리하기 위해 여러 코드들을 수정해야 할 수도 있다.
마치 모든 메서드에 throws Exception을 사용하게 되면 이를 try-catch 하거나 throws Exceptions을 추가하기 전에 컴파일 에러가 발생하는 현상과 비슷하다.
개인적으로 기존 사용하는 코드들은 그대로 두고 성능 최적화가 필요한 부분을 위한 메서드를 새로 만드는 게 제일 나은 것 같다.
저렇게 많은 데이터를 조회해서 합치는 경우가 그리 많지 않을뿐더러 이러한 경우가 빈번하게 발생하는 경우 설계 자체가 문제일 확률이 크다.

추가적인 고민

일반적인 Spring MVC와 JPA과 함께 코루틴을 사용하면 발생하는 문제들이 예상치 못하게 일어난다.
jdbc는 기본적으로 Blocking IO이므로 위의 코드만으로 성능 최적화가 되지 않을 수도 있을 것 같다.
어찌 보면 Spring MVC + JDBC보다는 Spring Webflux + R2DBC에 좀더 최적화되지 않을까 생각이 들지만 여러 오픈소스 프로젝트에서 jpa를 Non Blocking 하게 사용할 수 있도록 노력하고 있다.
https://engineering.linecorp.com/ko/blog/kotlinjdsl-reactive-criteria-api-with-kotlin

 

Kotlin JDSL: Kotlin을 이용해 손쉽게 Reactive Criteria API를 작성해 봅시다

들어가며 안녕하세요. Global EC(Global E-Commerce, 이하 GEC)에서 주문 파트 개발을 담당하고 있는 강현식입니다. 같은 파트 서종현 님께서 'Kotlin JDSL: Kotlin을 이용해 좀 더 쉽게 JPA Criteria A...

engineering.linecorp.com

글을 작성하다 보니 개념적인 부분이 충분하지 못한 것 같고 이를 보완하기 위해 계속해서 보완해 나가야 할 것 같다.