관리 메뉴

피터의 개발이야기

[kotlin] 코틀린 코루틴의 정석 - 비동기 다중 스레드 시 공유 데이터 정합성 문제 본문

Programming/Kotlin

[kotlin] 코틀린 코루틴의 정석 - 비동기 다중 스레드 시 공유 데이터 정합성 문제

기록하는 백앤드개발자 2024. 6. 8. 10:10
반응형

ㅁ 들어가며

 코틀린 코루틴의 정석 책을 보고 정리한 글입니다.

11장 코루틴의 심화 - 비동기 다중 스레드의 공유 데이터 정합성 문제

 대량 트래픽을 다루는 비동기 시스템에서 프로세스의 경합으로 인해 데이터의 충돌이 발생하는 경우가 종종 있다. 예를 들어 30초후 종료처리해야하는 프로세스와 완료 결과를 받아 정상종료하는 두 모듈이 1ms 미만에 동일 시점에 프로세스가 시작되는 경우 경합이 발생하게 된다.  스레드 간의 공유 데이터도 이와 비슷한 문제점을 지니고 있다.

 

 아래의 소스는 만번의 카운트를 다중 스레드로 처리하는 로직이다. 

fun main() = runBlocking<Unit> {
  val startTime = System.currentTimeMillis()
  withContext(Dispatchers.IO) {
    repeat(10_000) {
      launch {
        count += 1
        println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
      }
    }
  }
  println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
}

/*
// 결과1:
[DefaultDispatcher-worker-9 @coroutine#9920][지난 시간: 129ms]  count = 9892
[DefaultDispatcher-worker-51 @coroutine#9919][지난 시간: 129ms]  count = 9891
[DefaultDispatcher-worker-57 @coroutine#9918][지난 시간: 129ms]  count = 9890
[DefaultDispatcher-worker-61 @coroutine#9916][지난 시간: 129ms]  count = 9888
[DefaultDispatcher-worker-66 @coroutine#9915][지난 시간: 129ms]  count = 9887
[DefaultDispatcher-worker-59 @coroutine#9912][지난 시간: 129ms]  count = 9884
[main @coroutine#1][지난 시간: 131ms]  count = 9973
// 결과2:
[DefaultDispatcher-worker-21 @coroutine#9865][지난 시간: 115ms]  count = 9802
[DefaultDispatcher-worker-7 @coroutine#9861][지난 시간: 115ms]  count = 9799
[DefaultDispatcher-worker-49 @coroutine#9858][지난 시간: 115ms]  count = 9796
[DefaultDispatcher-worker-41 @coroutine#9855][지난 시간: 115ms]  count = 9793
[DefaultDispatcher-worker-34 @coroutine#9853][지난 시간: 115ms]  count = 9791
[DefaultDispatcher-worker-37 @coroutine#9844][지난 시간: 115ms]  count = 9783
[DefaultDispatcher-worker-15 @coroutine#9840][지난 시간: 115ms]  count = 9778
[main @coroutine#1][지난 시간: 116ms]  count = 9938
// 결과3:
[DefaultDispatcher-worker-12 @coroutine#9975][지난 시간: 114ms]  count = 9938
[DefaultDispatcher-worker-33 @coroutine#9973][지난 시간: 114ms]  count = 9936
[DefaultDispatcher-worker-2 @coroutine#9965][지난 시간: 114ms]  count = 9928
[DefaultDispatcher-worker-43 @coroutine#9964][지난 시간: 114ms]  count = 9927
[DefaultDispatcher-worker-62 @coroutine#9963][지난 시간: 114ms]  count = 9926
[DefaultDispatcher-worker-64 @coroutine#9961][지난 시간: 114ms]  count = 9924
[main @coroutine#1][지난 시간: 115ms]  count = 9964
*/

ㅇ 결과를 보면 27번, 62번, 36번 프로세스 경합이 발생하게 되어 정확히 10,000으로 연산되지 못하였다.

ㅇ 이처럼 멀티 스레드 환경에서 실행되는 복수의 코루틴이 특정 값을 공통으로 사용하면 메모리 가시성 문제나 프로세스 경합 문제가 발생하여 데이터 정합성에 문제가 생긴다.

 

메모리 가시성 문제

  스레드가 변수를 읽는 메모리 공간이 CPU 캐시와 메인 메모리로 이루어져서, CPU 연산 후 데이터가 CPU캐시에서 메모리로 저장되는 과정 중 다른 스레드가 메모리를 참조하면서 발생하는 문제이다.

[DefaultDispatcher-worker-6 @coroutine#5][지난 시간: 8ms]  count = 25
[DefaultDispatcher-worker-8 @coroutine#83][지난 시간: 25ms]  count = 75
[main @coroutine#1][지난 시간: 28ms]  count = 120

 이를 해결하기 위한 가장 쉬운 방법은 스레드가 처리할 수 있는 최소 시간을 보장해 주는 방법일 것이다. 1ms에 40번의 연산은 경합없이 충분히 처리할 수 있지만, 그 이상으로 연산을 수행할 경우 확률적으로 스레드간의 메모리 가시성 문제가 발생할 수 있다.

 

스레드 경합 문제

 2개 이상의 스레드가 동시에 연산을 수행하면서 같은 연산이 두 번 일어나는 경우이다. 첫번재 연산이 100+1이 CPU에서 끝나 CPU 캐시에 있는 상태에서 다른 스레드가 메모리에 있는 100을 읽어 다시 1을 더해 메모리에 넣게 되면, 메모리상에서는 101을 두번 업데이트하게 되게 결론적으로 첫번째 연산은 사라지게 된다. 이 문제는 하드웨어 메모리 구조가 JVM의 스택, 힙 영역을 구분하지 않기 때문에 모든 영역에서 발생할 수 있다.

 

 비동기 처리 방식에 있어서 모니터링이 중요한데, 이와 관련해서 [Redis] 쿠버네티스 환경에서 Redis 모니터링의 필요성에 정리하였는데, 스레드의 경합을 최소화 하기 위해서는 지연률을 모니터링하고 적절하게 클러스터링이나 리팩토링을 해주어 성능 개선을 해주어야 한다. 

 

해결방법: @Volatile 어노테이션

@Volatile 어노테이션은 CPU 캐시 메모리를 사용하지 않고 메인 메모리를 사용하도록 한다. 하지만 메모리 가시성 문제는 여전히 해결되지는 않았다. 여러 스레드가 동시에 메인 메모리에 저장되 값을 참조하면서 CPU에서 연산 중인 값과도 경합이 이루어지기 때문이다. 하나의 변수를 여러 스레드들이 접근하지 못하도록 하는 방법에 대해 알아보자.

 

해결방법: Mutex

Mutext 객체를 사용해 다중 스레드가 특정 코드 블록의 접근을 싱글톤으로 만들어 문제를 해결했다. Mutext의 lock과 unlock 함수를 이용해 다른 코루틴이 진입을 제한하여 마치 RDMS의 트렌젝션 처럼 데이터의 일원성을 보장하게 된다.

var count = 0
val mutex = Mutex()

fun main() = runBlocking<Unit> {
  val startTime = System.currentTimeMillis()
  withContext(Dispatchers.Default) {
    repeat(10_000) {
      launch {
        mutex.lock()
        count += 1
        mutex.unlock()
        println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
      }
    }
  }
  println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
}
/*
~~~
[DefaultDispatcher-worker-9 @coroutine#9998][지난 시간: 122ms]  count = 9997
[DefaultDispatcher-worker-9 @coroutine#9999][지난 시간: 122ms]  count = 9998
[DefaultDispatcher-worker-9 @coroutine#10000][지난 시간: 122ms]  count = 9999
[DefaultDispatcher-worker-9 @coroutine#10001][지난 시간: 122ms]  count = 10000
[main @coroutine#1][지난 시간: 122ms]  count = 10000
*/

ㅇ mutex가 싱글톤으로 만들어줘서 결국 스케줄링된 하나의 스레드가 계속 연상을 수행하는 특징이 생겼다.

 

        mutex.lock()
        count += 1
        mutex.unlock()
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        mutex.withLock {
          count += 1
        }

ㅇ 문법적으로 lock-unlock과 withLock이 있다.

 

해결방법: 싱글스레드

var count = 0
val singleThread = newSingleThreadContext("singleThread")

fun main() = runBlocking<Unit> {
  val startTime = System.currentTimeMillis()
  withContext(singleThread) {
    repeat(10_000) {
      launch {
        count += 1
        println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
      }
    }
  }
  println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
}

[singleThread @coroutine#9998][지난 시간: 93ms]  count = 9997
[singleThread @coroutine#9999][지난 시간: 93ms]  count = 9998
[singleThread @coroutine#10000][지난 시간: 93ms]  count = 9999
[singleThread @coroutine#10001][지난 시간: 93ms]  count = 10000
[main @coroutine#1][지난 시간: 93ms]  count = 10000

ㅇ 공유 스레드풀에서 싱글 스레드 Context를 생성하여 코루틴을 실행하면, 다중 스레드에 의한 메모리 가시성 문제를 해결할 수 있다.

ㅇ 이와 같은 방법이 Redis가 처리하는 방식이기도 하다.

  ㄴ Redis는 RDMS의 트렌젝션처리와 다르게 싱글스레드로 처리하여 데이터의 경합을 원천 차단하고 있다. 

  ㄴ 대신 하나의 명령어를 10㎲미만으로 처리하도록 설계되어 속도에 있어서는 RDMS보다 100배 정도 빠르다.

  ㄴ 다만 RDMS처럼 관계성있는 데이터, 예를 들어 Join, Group by와 같은 작업을 수행할 수 없다.

 

해결방법: AtomicInteger

var count = AtomicInteger(0)

fun main() = runBlocking<Unit> {
  val startTime = System.currentTimeMillis()
  withContext(Dispatchers.Default) {
    repeat(10_000) {
      launch {
        count.getAndAdd(1)
        println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
      }
    }
  }
  println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
}

[DefaultDispatcher-worker-2 @coroutine#9992][지난 시간: 98ms]  count = 9992
[DefaultDispatcher-worker-11 @coroutine#9987][지난 시간: 98ms]  count = 9987
[DefaultDispatcher-worker-13 @coroutine#9985][지난 시간: 98ms]  count = 9985
[DefaultDispatcher-worker-10 @coroutine#9983][지난 시간: 98ms]  count = 9983
[DefaultDispatcher-worker-12 @coroutine#9978][지난 시간: 98ms]  count = 9978
[main @coroutine#1][지난 시간: 98ms]  count = 10000

ㅇ AtomicInteger은 값 자체에 원자성을 부여하는 방식이다.

ㅇ Atomin{Type}의 다른 타입도 있다.

ㅇ 이런 경우 다중 스레드가 스케줄이 할당되기는 하지만 하나의 스레드가 count에 연산하는 동안 다른 스레드는 기다려야 한다.

~~~~~      
      launch {
        val currentCount = count.get()
        count.set(currentCount + 1)
        println("[${Thread.currentThread().name}][${getElapsedTime(startTime)}]  count = ${count}")
      }
~~~~~      
[DefaultDispatcher-worker-12 @coroutine#9996][지난 시간: 99ms]  count = 9959
[DefaultDispatcher-worker-10 @coroutine#9995][지난 시간: 99ms]  count = 9958
[DefaultDispatcher-worker-6 @coroutine#9992][지난 시간: 99ms]  count = 9955
[DefaultDispatcher-worker-7 @coroutine#9990][지난 시간: 99ms]  count = 9953
[main @coroutine#1][지난 시간: 99ms]  count = 9963

ㅇ 원자성이 주입된 값을 다른 메모리에 주입한 후 다시 값을 set하는 경우 다른 스레드가 count의 값을 읽거나 변경할 수 있다.

 

ㅁ 함께 보면 좋은 사이트

 조세영 - 코틀린 코루틴의 정석

반응형
Comments