Android 공부/Coroutine

[XX캠퍼스] 07.Kotlin Coroutines & Flow (공유객체, Mutex, Actor )

Machine_웅 2022. 7. 25. 18:07
728x90
반응형

공유 객체 문제

import kotlin.system.*
import kotlinx.coroutines.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

withContext는 수행이 완료될 때 까지 기다리는 코루틴 빌더입니다.

뒤의 println("Counter = $counter") 부분은 잠이 들었다 

withContext 블록의 코드가 모두 수행되면 깨어나 호출됩니다.

( runBlocking은 쓰레드를 잡고있고, withContext는 밖에있는 코루틴을 잠들게 한다. )

 

위의 코드는 불행히도 항상 10000이 되는 것은 아닙니다. 

Dispatchers.Default에 의해 코루틴이 어떻게 할당되냐에 따라 값이 달라집니다.

( 동시성문제 )

 


volatile을 적용하기 ( 동시성 해결 못함 )

손 쉽게 생각할 수 있는 방법은 volatile입니다.

import kotlin.system.*
import kotlinx.coroutines.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

@Volatile // 코틀린에서 어노테이션입니다.
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

volatile은 가시성 문제만을 해결할 뿐

동시에 읽고 수정해서 생기는 문제를 해결하지 못합니다.

( 다른스레드에서 현재 값을 볼수는 있으나, 현재값을 증가시키는 타이밍이 겹치는 경우 값이 1만 증가됨 )

 


스레드 안전한 자료구조 사용하기 ( 항상 적합하지는 않다 )

AtomicInteger와 같은 스레드 안전한 자료구조를 사용하는 방법이 있습니다.

import java.util.concurrent.atomic.*
import kotlin.system.*
import kotlinx.coroutines.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

val counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

val counter = AtomicInteger()

counter.incrementAndGet() // 다른 스레드의 간섭없이 항상 성공한다. 

 

 

AtomicInteger가 이 문제에는 적합한데 항상 정답은 아닙니다.

 


스레드 한정

newSingleThreadContext를 이용해서 특정한 스레드를 만들고 해당 스레드를 사용할 수 있습니다.

import java.util.concurrent.atomic.*
import kotlin.system.*
import kotlinx.coroutines.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

var counter = 0
val counterContext = newSingleThreadContext("CounterContext")

fun main() = runBlocking {
    withContext(counterContext) { // 전체 소스를 하나의 스레드에서 
        massiveRun {
            counter++
        }
    }
    
    // --------------------------------
    withContext(Dispatchers.Default) {
        massiveRun {
        	withContext(counterContext) { // 더하는 코드를 하나의 스레드에서 
            	counter++
            }
        }
    }
    println("Counter = $counter")
    
}

val counterContext = newSingleThreadContext("CounterContext") 

withContext(counterContext) { ... }

// 특정 쓰레드를 하나 생성해서 그것만 쓰도록 만들어 낸다.

얼마만큼 한정지을 것인지는 자유롭게 정해보세요.  ( 모든코드 ,, 코드 일부 ) 


 뮤텍스 ( 상호배제 )

뮤텍스는 상호배제(Mutual exclusion)의 줄임말입니다.

공유 상태를 수정할 때 임계 영역(critical section)를 이용하게 하며,

(임계영역 : 한번에 한사람만 임계영역에 들어갈 수 있다 )

 

임계 영역을 동시에 접근하는 것을 허용하지 않습니다.

 

import kotlin.system.*
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

val mutex = Mutex()

mutex.withLock {  // 여러 쓰레드중 한개 쓰래드만 진입을 하고 다른 쓰레드는 기다린다. 
        counter++
}

 


액터

액터는 1973년에 칼 휴이트가 만든 개념으로 액터가 독점적으로 자료를 가지며

그 자료를 다른 코루틴과 공유하지 않고 액터를 통해서만 접근하게 만듭니다.

 

먼저 실드 클래스를 만들어서 시작합시다.

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
  1. 실드(sealed) 클래스는 외부에서 확장이 불가능한 클래스이다.                                                                        CounterMsg는 IncCounter와 GetCounter 두 종류로 한정됩니다.
  2. IncCounter는 싱글톤으로 인스턴스를 만들 수 없습니다. 액터에게 값을 증가시키기 위한 신호로 쓰입니다.
  3. GetCounter는 값을 가져올 때 쓰며 CompletableDeferred<Int>를 이용해 값을 받아옵니다.                                            ( response를 이용해 향후에 데이터를 받겠다 ).

* 실드클래스를 만들면 상속을 받을수 있는 자식이 몇이고 누구인지 확정이 된다. 

 

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // 액터 안에 상태를 캡슐화해두고 다른 코루틴이 접근하지 못하게 합니다.

    for (msg in channel) { // 외부에서 보내는 것은 채널을 통해서만 받을 수 있습니다.(recieve)
        when (msg) {
            is IncCounter -> counter++ // 증가시키는 신호.
            is GetCounter -> msg.response.complete(counter) // 현재 상태를 반환합니다.
        }
    }
}

채널은 송신 측에서 값을 send할 수 있고 수신 측에서 receive를 할 수 있는 도구입니다.

3부와 4부에서 채널에 대해 상세히 다루겠습니다.

 

전체 코드는 다음과 같은 형태가 됩니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")  
}

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0
    for (msg in channel) { //  suspension Point  값이 오면 깨어난다. 없음 잠이듬. 
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() = runBlocking<Unit> {
    val counter = counterActor()
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter) //  suspension Point 
            // 액터에게 요청후 잠들었다가 호출이 끈나면 깨어남
        }
    }

    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response)) //  suspension Point
    println("Counter = ${response.await()}")//  suspension Point
    counter.close()
}

 

* 액터는 아직 일반적인 기술은 아니다. ( 스칼라에서는 수용해서 발전해 나가고 있음. ) 

=> 지금당장은 엑터를 쓰는 사람이 많지 않더라도 중요한 기술이 되어갈 것이다.?

728x90
반응형