프로그래밍/Kotlin

[Kotlin] Channel이란?

Lou Park 2020. 10. 7. 14:50

원문: https://proandroiddev.com/kotlin-coroutines-channels-csp-android-db441400965f

원문을 읽으면서 제가 이해하기 위해 번역한 것입니다.

개념만 남기고 예제는 거의 뺐으니 예제를 보시려면 여기로~


Channels 


Channel은 2개의 Coroutine 사이를 연결한 파이프라고 생각하면 된다.

이 파이프는 두 Coroutine 사이에서 정보를 전송할 수 있도록 한다.

하나의 Coroutine은 파이프를 통해서 정보를 보낼 수 있고, 다른 하나의 Coroutine은 정보를 받기위해 기다린다.


채널이라는 말을 일상생활에서는 TV에서 제일 많이 쓰는데..

채널을 통해서 우리는 방송국에서 쏴주는 방송을 본다. (일방적 커뮤니케이션)

사진에서 1번 Coroutine이 방송국, 2번 Coroutine이 안방의  TV일수 있겠다.


이 채널을 통한 두 Coroutine간의 커뮤니케이션은 메모리를 공유함으로서 이뤄지는게 아니라, 커뮤니케이션을 통해서 메모리를 공유한다.


Thread는 어떻게 Communicate 할까

소프트웨어를 만들면서 Resource를 Blocking하는 작업, 예를 들면 네트워킹이나 DB를 사용하거나 아니면 계산이 필요한 작업을 할때

우리는 그 작업들을 쓰레드로 뗀다. 이 쓰레드간에 공유할 자원이 필요할때 우리는 두개의 쓰레드가 동시에 그걸 쓰거나 읽게 하지 못하도록 자원을 lock하거나 메모리에 의존하게된다. 메모리 공유! 이것이 흔히 쓰레드가 커뮤니케이션하는 방식이다. 하지만 이렇게하면 데드록, 레이스 컨디션 같은 이슈가 발생할 수 있다.


*데드록: 교착상태라고도 하며 한정된 자원을 여러 곳에서 사용하려고 할때 발생

*레이스 컨디션: 한정된 공유 자원을 여러 프로세스가 동시에 이용하기 위해 경쟁을 벌이는 현상


Channel의 Communication은 어떻게 다를까


Channel은 커뮤니케이션을 통해서 공유를 한다는 점이 다르다. 이 글에서는 주문한 커피를 1명의 캐셔와 2명의 바리스타가 만드는 과정으로 설명을 하고있다.

아래 예제코드를 보자! 


캐셔는 채널을 통해서 2명의 바리스타와 커뮤니케이션을 한다. 

캐셔는 주문을 받고 채널로 주문을 보낸다. 바리스타가 커피를 만드는 작업을 끝내면 바로 다음 주문을 캐셔에게 받아서 처리한다.

더 이상 받을 주문이없다면  바리스타는 채널을 통해서 주문이 올때까지 suspend 된다.


아래 코드 14 번줄이 캐셔 Coroutine을 실행하는 부분이다.

채널에서 필요한 작업이 끝났다면 close()를 통해 닫아주는 것 잊지말자~


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
fun main(args: Array<String= runBlocking {
    // 주문한 커피 목록
    val orders = listOf(
        Menu.Cappuccino(CoffeBean.Regular, Milk.Whole),
        Menu.Cappuccino(CoffeBean.Decaf, Milk.Whole),
        Menu.Cappuccino(CoffeBean.Regular, Milk.NonFat),
        Menu.Cappuccino(CoffeBean.Premium, Milk.Whole)
    )
 
    log(orders)
 
    // Channel을 통해 주문들을 보낸다.
    val ordersChannel = Channel<Menu>()
    launch {
        for (order in orders) {
            ordersChannel.send(order)
        }
        ordersChannel.close()
    }
 
    val takenTime = measureTimeMillis {
        coroutineScope {
            // 바리스타 1, 2는 ordersChannel을 통해 내려온 주문들을 받아 커피를 만든다.
            launch(CoroutineName("barista-1")) { makeCoffee(ordersChannel) }
            launch(CoroutineName("barista-2")) { makeCoffee(ordersChannel) }
        }
    }
    println("Execution time: $takenTime ms")
}
 
private suspend fun makeCoffee(ordersChannel: ReceiveChannel<Menu>) {
    for (order in ordersChannel) {
        log("Processing order: $order")
        when (order) {
            is Menu.Cappuccino -> {
                // 대충 카푸치노 만드는 과정...
                val groundBeans = grindCoffeeBeans(order.beans()) // 커피콩 갈기
                val espressoShot = pullEspressoShot(groundBeans) // 에스프레소 샷
                val steamedMilk = steamMilk(order.milk()) // 스팀 밀크
                val cappuccino = makeCappuccino(order, espressoShot, steamedMilk) // 섞기
                log("Serve: $cappuccino"
            }
        }
    }
}
 
cs


makeCoffee()는 ReceiveChannel을 인자로 받고 채널을 순회하면서 커피를 만드는 작업을 수행한다.

Channel을 닫음으로서 그 Channel을 수신하고 있는 Function들에게 더 이상 처리할 프로세스가 없다고 알려 줄 수 있다.

이것은 makeCoffee()내의 for loop를 끝내고, Coroutine이 종료될 수 있도록한다. 


그렇다면 Channel을 닫지 않으면 어떻게 될까? 

그러면 바리스타 Coroutine은 퇴근도 못하고 계속 주문을 기다리는 무한 대기상태 (indefinite suspended state)가 될것이다.

그래서 main()에서 runBlocking을 사용중이기 때문에, main()은 영원히 안끝날거다...

runBlocking은 runBlocking 내의 suspend function이 모두 끝날때까지 기다리기 때문이다.



실행시 로그는 다음과 같이 찍힌다. 

두 명의 바리스타가 동시적으로 다른 주문을 수행하고, 하나의 스레드(main thread) 안에서 Coroutine이 실행되는걸 볼 수 있다.


채널의 속성들

전송할때 또는 수신할때 실행을 미루기(suspend)

채널에서전송하는것 또는 수신하는 것은 실행을 suspend할 수 있다. 위에서 본 것 처럼 캐셔가 Channel을 통해서 주문을 전송할때, Coroutine은 또 다른 바리스타 Coroutine이 주문을 받을 수 있을때까지 suspend 된다. 마찬가지로, 바리스타가 현재 수행하는 커피만드는 작업이 끝날때까지 새로운 주문을 받지않는 것처럼 수신할때도 실행을 미룰 수 있다.


채널 버퍼의 타입들

Channel은 여러 버퍼 타입을 통해 Coroutine과의 커뮤니케이션의 유연성을 제공한다.


1. Rendezvous (Unbuffered)


1
val channel = Channel<Menu>(capacity = Channel.RENDEZVOUS)
cs


랑데부...한글로 검색하니 뜻이 집합, 만나기로한 약속 이라고 한다.

특별한 채널 버퍼를 설정하지 않을 시 이 타입이 기본적으로 설정된다. Rendezvous는 버퍼가 없다. 이것이 위 예제코드에서 본 것처럼 수신측 Coroutine과 송신측 Coroutine이 모두 가능한 상태로 "모일때까지" suspend 되는 이유다.


2. Conflated


1
val channel = Channel<Menu>(capacity = Channel.CONFLATED)
cs

Conflate의 뜻은 "융합하다", "하나로 합치다"다.

이렇게하면 크기가 1인 고정 버퍼가 있는 채널이 생성된다. 만약에 수신하는 Coroutine이 송신하는 Coroutine을 따라잡지 못했다면, 송신하는 쪽은 새로운 값을 버퍼의 마지막 아이템에 덮어씌운다. 수신 Coroutine이 다음 값을 받을 차례가 되면, 송신 Coroutine이 보낸 마지막 값을 받는다. 즉 송신하는 쪽은 수신측 Coroutine이 가능할때까지 기다리는게 없다는 말이다. 수신측 Coroutine은 채널 버퍼에 값이 올때까지 suspend 된다.


3. Buffered

1
val channel = Channel<Menu>(capacity = 10)
cs

이 모드는 고정된 크기의 버퍼를 생성합니다. 버퍼는 Array 형식이다.

송신 Coroutine은 버퍼가 꽉 차있으면 새로운 값을 보내는 걸 중단한다. 수신 Coroutine은 버퍼가 빌때까지 계속해서 꺼내서 수행한다.


4. Unlimited


1
val channel = Channel<Menu>(capacity = Channel.UNLIMITED)
cs

이 모드는 이름처럼 제한 없는 크기의 버퍼를 가진다. 버퍼는 LinkedList 형식이다.

만약에 버퍼가 소비되지 않았다면 메모리가 ...힘들어할때까지 계속해서 아이템을 착착 채운다. 결국엔 OutOfMemeoryException을 일으키게 된다.

송신 Coroutine은 suspend 하지않지만, 수신 Coroutine은 버퍼가 비면 suspend 된다.