Android 공부/Coroutine

[XX캠퍼스] 19.Kotlin Coroutines & Flow ( 팬아웃, 팬인 )

Machine_웅 2022. 7. 29. 15:40
728x90
반응형

https://dalinaum.github.io/coroutines-example/19

 

팬 아웃, 팬 인

팬 아웃, 팬 인 예제 87: 팬 아웃 여러 코루틴이 동시에 채널을 구독할 수 있습니다. import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) { sen

dalinaum.github.io

 

팬 아웃

여러 코루틴이 동시에 채널을 구독할 수 있습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100L)
    }
}

fun CoroutineScope.processNumber(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("${id}가 ${it}을 받았습니다.")
    }
}


fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat (5) {
        processNumber(it, producer)
    }
    delay(1000L)
    producer.cancel()
}

 

 팬 인

팬 인은 반대로 생산자가 많은 것입니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// 샌드채널
suspend fun produceNumbers(channel: SendChannel<Int>, from: Int, interval: Long) {
    var x = from
    while (true) {
        channel.send(x)
        x += 2
        delay(interval)
    }
}

fun CoroutineScope.processNumber(channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("${it}을 받았습니다.")
    }
}


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>() // 리시브채널 , 샌드채널 
    launch {
        produceNumbers(channel, 1, 100L)
    }// 생산자1
    launch {
        produceNumbers(channel, 2, 150L)
    }// 생산자2 
    
    processNumber(channel)
    delay(1000L)
    coroutineContext.cancelChildren()
}
  • coroutineContext의 자식이 아닌 본인을 취소하면 어떻게 될까요?
  • processNumber를 suspend 함수의 형태로 변형하면 어떻게 될까요?
  • 다른 방법으로 취소할 수 있을까요?

 

 

공정한 채널

두 개의 코루틴에서 채널을 서로 사용할 때 공정하게 기회를 준다는 것을 알 수 있습니다.

 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

suspend fun someone(channel: Channel<String>, name: String) {
    for (comment in channel) {
        println("${name}: ${comment}")
        channel.send(comment.drop(1) + comment.first())
        delay(100L)
    }
}

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        someone(channel, "민준")
    }
    launch {
        someone(channel, "서연")
    }
    channel.send("패스트 캠퍼스")
    delay(1000L)
    coroutineContext.cancelChildren()
}

수행결과 
민준: 패스트 캠퍼스
서연: 스트 캠퍼스패
민준: 트 캠퍼스패스
서연:  캠퍼스패스트
민준: 캠퍼스패스트 
서연: 퍼스패스트 캠
민준: 스패스트 캠퍼
서연: 패스트 캠퍼스
민준: 스트 캠퍼스패
서연: 트 캠퍼스패스
민준:  캠퍼스패스트

* 골고루 수행하게 된다. 

* 채널이 누구에게 쏠리지 않고 공평하게 처리함. 

 

 select

먼저 끝나는 요청을 처리하는 것이 중요할 수 있습니다.

이 경우에 select를 쓸 수 있습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

// 리턴값 : 리시브체널
fun CoroutineScope.sayFast() = produce<String> {
    // 코루틴 스코프 + 샌드채널
    while (true) {
        delay(100L)
        send("패스트")
    }
}

// 리턴값 : 리시브체널
fun CoroutineScope.sayCampus() = produce<String> {
     // 코루틴 스코프 + 샌드채널
    while (true) {
        delay(150L)
        send("캠퍼스")
    }
}

fun main() = runBlocking<Unit> {
    val fasts = sayFast()
    val campuses = sayCampus()
    repeat (5) { //5번동안 select
        select<Unit> { // 먼저 끈내는 애만 듣겠다.
            fasts.onReceive {
                println("fast: $it")
            }
            campuses.onReceive {
                println("campus: $it")
            }
        }
    }
    coroutineContext.cancelChildren()
}

실행결과 
fast: 패스트
campus: 캠퍼스
fast: 패스트
fast: 패스트
campus: 캠퍼스

채널에 대해 onReceive를 사용하는 것 이외에도 아래의 상황에서 사용할 수 있습니다.

 

select 문을 사용할 수 있는 곳. 

  • Job - onJoin
  • Deferred - onAwait  ( async 가 반환하는게 Deferred )
  • SendChannel - onSend
  • ReceiveChannel - onReceive, onReceiveCatching
  • delay - onTimeout
728x90
반응형