본문 바로가기

개발

[kotlin] Kotlin Coroutine - 2

CoroutineBuilder 

- CoroutineScope의 확장함수로, 다양한 요구사항에 맞게 개별적인 Coroutine(코루틴)을 만드는 방법

 

launch() - Job

- 결과가 없는 코루틴을 생성하는 빌더

- 여기서 결과는 반환인스턴스가 아닌 결과값(Value)을 의미. 반환하는 Job인스턴스는 생성된 해당 코루틴을 제어

- 반환된 job을 가지고 해당 코루틴을 제어 'cancel(), cancelAndJoin(), join()' 등

public fun CoroutineScope.launch(
  context: CoroutineContext = EmptyCoroutineContext,	// context, 스레드영역 설정(Main/IO/Default)
  start: CoroutineStart = CoroutineStart.DEFAULT,	// start, 코루틴 실행시점 설정(DEFAULT/LAZY/)
  block: suspend CoroutineScope.() -> Unit		// suspend, 코루틴 실행 블록
): Job
launch {
    delay(1000L)
    println("Sample Main Thread Running")	// Sample Coroutin 출력
}

// context인수 값을 Dispatchers.IO로 설정 (백그라운드 작업)
launch(Dispatchers.IO) {	
    delay(1000L)
    println("Sample Background Thread Running")
}

val job = launch(Dispatchers.IO) {	
    delay(1000L)
    println("Sample Background Thread Running2")
}
job.join()

async() - Deferred<T>

- launch와 다르게 결과를 가지는 코루틴을 생성하는 빌더

- Deferred의 await() 함수를 통해 코루틴 영역의 마지막 라인(결과)를 내가 원하는 시점에 await()으로 결과를 받는다 - 지연 

public fun <T> CoroutineScope.async(
  context: CoroutineContext = EmptyCoroutineContext,
  start: CoroutineStart = CoroutineStart.DEFAULT,
  block: suspend CoroutineScope.() -> T
): Deferred<T>
fun main() = runBlocking<Unit> {
    val deferred = async {	
        delay(100)	
        println("async Start")
        "async result"	// 해당 코루틴블럭(Deferred)의 await() 호출 시 반환 값
    }

    println("Test")

    val job = GlobalScope.launch {	
        try {
            delay(200)		
            println("launch Start")
            delay(Long.MAX_VALUE)	
        } finally {
            println("launch Cancelled")	// 해당 job의 cancel() 취소 시 호출되는 finally
        }
    }

    delay(500)
    job.cancelAndJoin()	// job 취소, 대기

    delay(500)	
    println(deferred.await())// deferred의 결과 값 얻기
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 호출 순서
Test
async Start
launch Start
launch Cancelled
async result

 

즉시실행

- 기본적으로 코루틴 시작은 즉시 실행

- 실행을 미루고 싶을 경우 launch/async함수 호출시 start인수를 CoroutineStart.LAZY로 설정

- 실행을 위한 호출 launch - join(): 즉시실행 / start(): 지연실행,  async - await(): 즉시실행, 지연실행

- start()와 join()/await()의 차이는 start는 실행만 하고 종료를 대기하지 않는다.

- start() - non blocking, join()/await() - blocking()

 

fun main() = runBlocking<Unit> {
    val deferred = async(start = CoroutineStart.LAZY) {	
        println("async Start")
        "async Result"
    }

    println("Test")

    val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
        println("launch Start")
    }

    job.start()	//start
    // job.join() start + 대기


    // deferred.start()	// start
    deferred.await()	// start + 대기
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
Test
launch Start
async Start

 

withContext() - T

- async와 유사하나 withContext()는 Deferred<T>객체로 반환하지 않고, 결과(T)를 그 자리에서 반납 

 

 

runBlocking() - T

- runBlocking은 Scope내의 코루틴(루틴)들이 모두 완료할 때 까지 스레드를 점유

 

actor() - SendChannel<E>

- 채널을 통해 코루틴 블럭(Scope)와 외부에서 통신을 통해 전송 / 처리의 루틴을 실행하는 빌더
- actor 빌더는 SendChannel<E>를 반환

- Send채널을 통해 actor()블록으로 채널을 통해 전송을 할 수 있다. 

-  actor{} 블록 내부는 수신자(Receiver), 반환된 SendChannel이 송신자(Sender)

 

produce() - ReceiveChannel<E>

-  produce도 actor와 같이 채널을 통해 전송 / 처리 루틴을 실행하는 빌더

- produce()빌더는 ReceiveChannel<E>를 반환

- produce{} 블록 내부는 송신자(Sender), 반환된 ReceiveChannel이 수신자(Receiver)

 


 

coroutineScope

- 코루틴 스코프를 생성하고 모든 실행된 자식 코루틴이 완료될때까지 종료하지 않는다.

 - runBlocking이랑 body와 모든 자식들이 완료될 때 까지 기다리는 점이 유사해보이지만

runBlocking은 thread를 block하지만 coroutineScope는 잠시 중단하고 다른 자원의 사용을 위해 thread를 해제한다.

- runblocking은 일반함수, coroutineScope는 suspend 함수와 같다

fun main() = runBlocking {
    doWorld()
}

suspend fun doWorld() = coroutineScope {  // this: CoroutineScope
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}
------------------
Hello
World!

 


 멀티 스레드 환경의 문제 

- 어플리케이션의 퍼포먼스 측면에서 싱글 스레드에 비해 큰 이득을 가져다주지만, race condition을 적절히 제어하지 않을 경우 데이터의 손실이 발생

- race condition이란 여러 개의 스레드가 하나의 공유 가능하고 변경 가능한 자원에 접근하게 되는 상황

- synchronization을 통해 race condition을 적절히 제어하는 것은 멀티스레드 환경의 개발을 할 때에 매우 중요

 

Single Thread

- 스레드를 하나만 이용

- 싱글스레드를 사용하도록 코루틴의 context를 지정해주면 데이터의 결함 없이 결과를 도출할 수 있지만 멀티스레드 환경에 비해 매우 느리다

suspend fun GlobalScope.massiveRun(action: suspend () -> Unit) {
    val n = 100     
    val k = 1000
    val time = measureTimeMillis {
        val jobs = List(n) {   
            launch {
                repeat(k) {     
                    action()   
                }
            }
        }
        jobs.forEach { it.join() } 
    }
}


@ObsoleteCoroutinesApi
val counterContext = newSingleThreadContext("CounterContext")

@ObsoleteCoroutinesApi
fun main() {
    runBlocking {
        GlobalScope.massiveRun {
            withContext(counterContext) {
                counter++
            }
        }
        println("Counter = $counter")
    }
}
--------------------------------------
Completed 100000 actions in 1553 ms
Counter = 100000

Process finished with exit code 0

 

Mutual Exclusion ( Mutex )

- mutual exclusion은 공유자원에 변경이 일어나는 순간에 적절한 block을 통해 race condition의 발생을 막는 동기화 제어 기법

val mutex = Mutex();
var counter = 0

fun main() = runBlocking {
    GlobalScope.massiveRun {
        mutex.withLock {
            counter++
        }
    }

    println("Counter = $counter")
}
-----------------------------------
Completed 100000 actions in 789 ms
Counter = 100000

Process finished with exit code 0

Actor

- 동기화 이슈 자원을 actor 클래스의 멤버변수로 정의되어 있는 Channel을 통해 자원으로의 접근이 가능하다.

- channel은 FIFO방식의 queue형태로 구현되기 때문에 sequential한 접근을 보장해 동기화 이슈를 해결한다.

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

@ObsoleteCoroutinesApi
fun main() {
    runBlocking {
        val counter = counterActor()
        GlobalScope.massiveRun {
            counter.send(IncCounter)
        }

        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        println("Counter = ${response.await()}")
        counter.close() 
    }
}

--------------------------------------
Completed 100000 actions in 1202 ms
Counter = 100000

Process finished with exit code 0

 


참고:

https://kotlinlang.org/docs/coroutines-basics.html#scope-builder

https://jaejong.tistory.com/64

https://velog.io/@dvmflstm/Kotlin-Coroutine%EC%97%90%EC%84%9C%EC%9D%98-%EB%8F%99%EA%B8%B0%ED%99%94-%EC%A0%9C%EC%96%B4

'개발' 카테고리의 다른 글

[kotlin] Kotlin Coroutine - 1  (0) 2023.01.13
[kotlin] Coroutines under the hood - 번역  (0) 2022.12.28
[kotlin] 코루틴 개념 정리  (0) 2021.11.09