안드로이드/KOTLIN

[KOTLIN] 코루틴 와다다 훑기 (~잡과 자식 코루틴 기다리기)

개굴이모자 2024. 5. 18. 21:25
반응형

장점

  • 멈추고 재개 가능
  • 메인 스레드에서 실행하면서 잠깐 중단도 가능
  • 중단되어도 스레드는 블로킹이 되지 않으며 뷰를 바꾸거나 다른 코루틴을 실행 가능
  • 데이터가 준비되면 코루틴은 메인 스레드에서 대기하다가 멈춘 지점에서 다시 작업을 수행
  • 비용이 매우 저렴하다

 

Sequence

  • 시퀀스 빌더는 중간에 yield 를 이용해 값을 돌려줄 수 있다
  • 반환만 가능하며, 중단 함수는 사용할 수 없다
    • 중단이 필요하다면 flow 를 사용하자
val fibonacci: Sequence<BigInteger> = sequence {
	var first = 0.toBigInteger()
	var second = 1.toBigInteger()
	while (true) {
		yield(first)
		val temp = first
		first += second
		second = temp
	}
}

fun main() {
	print(fibonacci.take(10).toList())
}
// [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

 

 

중단 함수

  • 비디오 게임에서 체크포인트에 저장 후 차후 재개 시 같은 포인트에서 시작하는 것과 유사하다
  • 코루틴은 Continuation 객체를 반환해 멈췄던 곳에서 다시 실행할 수 있도록 제공한다
    • Continuation 객체는 직렬화/역직렬화가 가능하기에 체크 포인트로써의 역할을 수행할 수 있다
  • runblocking, launch 등과 같은 코루틴 빌더를 이용해 코루틴을 만들면 그 내부에서 중단함수를 호출할 수 있다
suspend fun requestUser(): User {
	// do something
	return suspendCoroutine<User> { cont ->
		requestUser { user -> // request user 가 서버에 찔러 받는 함수라 가정한다
        	// suspendCoroutine 으로 중단한 코루틴에서 requestUser 의 값을 기다려 받은 후
			// 이후 로직을 진행한다고 볼 수 있다.
			cont.resume(user) 
		}
	}
}

suspend fun main() {
	println("Before")
	val user = requestUser()
	println(user)
	println("After")
}
  • suspendCoroutine 은 중단함수의 예시이다
  • 중단 된 코루틴은 continuation 객체를 이용해 재개할 수 있다
  • suspendCoroutine 호출 시, Continuation 객체로 반환 될 값의 타입을 지정할 수 있으며 resume 을 통해 해당 타입의 값을 전달 받게 된다
  • 예외가 발생하는 경우, resume 대신 resumeWithException 을 호출해 중단함수에 exception 을 throw 할 수 있다

 

 

코루틴의 내부 구조

  • Continuation 객체는 내부에 상태를 나타내는 숫자와 로컬 데이터들을 지닌다
  • 함수의 Continuation 객체가 해당 함수를 부르는 다른 함수의 Continuation 객체를 decorate 하는 관계로 실행을 재개하거나 재개된 함수를 완료 시 사용 되는 콜 스택처럼 사용된다
  • Continuation 은 내부에 label 로 중단 지점 및 재개될 시점을 알 수 있게한다
  • 함수가 재개 후 사용할 지역 변수나 파라미터와 같은 상태를 가지고 있는 경우 Continuation 객체에 상태를 저장한다
    • 값은 중단되기 직전에 저장되며, 함수가 resume 될 때 복구된다
  • 콜 스택과 유사하게 중단 시의 상태 및 지역변수/파라미터, 재개될 위치 정보를 지닌 채 부르는 다른 Continuation 객체를 참조하고 또 다른 Continuation 객체를 참조해 마치 양파와 같은 형태를 지닌다
suspend fun a() {
	val user = readUser()
	b()
	b()
	b()
	println(user)
}

suspend fun b() {
	for (i in 1..10) {
		c(i)
	}
}

suspend fun c(i: Int) {
	delay(i * 100L)
	println("Tick")
}


// Continuation 으로 표현 시 아래와 유사하다
CContinuation(
	i = 4,
	label = 1,
	completion = BContinuation(
		i = 4,
		label = 1,
		completion = AContinuation(
			label = 2,
			user = User@1234,
			completion = ...
		)
	)
)

 

 

언어 차원에서와 라이브러리 기준의 코루틴

  • 코틀린 언어 자체에서 즉, 컴파일러가 지원하는 코틀린 기본 라이브러리에 포함된 코루틴과
    kotlinx.coroutines 라이브러리의 코루틴은 다르다
  • 언어 차원에서는 자유도를 위해 코루틴을 최소한으로 지원하며,
    라이브러리에서는 사용하기 쉬우며 동시성을 명확히 구현할 수 있도록 한다

 

코루틴 빌더

  • 일반 함수와 suspend 함수 사이를 연결시키는 다리 역할을 한다
  • kotlinx.coroutines 라이브러리에서는 'launch', 'runBlocking', 'async' 의 코루틴 빌더를 제공한다
  • launch
    • CoroutineScope 인터페이스의 extension 이다
      • CoroutineScope 는 부모 코루틴과 자식 코루틴 사이의 관계를 정립하기 위한 목적으로 사용되는 구조화된 동시성의 핵심이다
    • 중단된 코루틴을 유지하는 것은 공짜나 다름 없기 때문에 가볍다
  • runBlocking
    • CoroutineSope의 확장 함수가 아니므로, 자식 코루틴이 될 수 없다
    • 코루틴은 스레드를 블로킹하지 않고 작업을 중단 시키는 것이 일반적인 법칙이지만, 블로킹이 필요한 경우에 사용된다
    • 내부에서 delay 를 호출할 경우 실행이 완료될 때 까지 현재 스레드를 중단 가능한 상태로 블로킹하므로 Thread.sleep 과 유사하게 동작한다
    • 프로그램이 끝나는 것을 방지하기 위해 메인 함수 등에서 스레드를 블로킹할 경우, 스레드를 블로킹할 필요가 있는 유닛 테스트 등에서 사용된다
  • async
    • launch 와 유사하지만, 값을 생성하며 이를 람다 표현식에 의해 반환한다
    • Deferred<T> 타입의 객체를 반환하며, T는 생성되는 값의 타입이다
    • Deferred 는  suspend 메서드인 await 을 통해 작업이 끝났을 때 값을 반환한다
    • 값이 생성되기 전에 await 을 호출하면 값이 나올 때 까지 기다리게 된다
fun main() = runBlocking {
	val res1 = GlobalScope.async {
		delay(1000L)
		"Text 1"
	}
	val res2 = GlobalScope.async {
		delay(3000L)
		"Text 2"
	}
	val res3 = GlobalScope.async {
		delay(2000L)
		"Text 3"
	}
	println(res1.await())
	println(res2.await())
	println(res3.await())
}
// (1초후)
// Text 1
// (2초후)
// Text 2
// Text 3

 

  • 구조화된 동시성
    • 코루틴에서 부모와 자식간의 관계가 된다는 것은 부모가 자식들 모두를 기다리는 것을 말한다
    • 부모는 자식들을 위한 스코프를 제공하고 자식들을 해당 스코프 내에서 호출하게 되므로 구조화된 동시성이라는 관계가 성립된다
    • 가장 중요한 특징
      • 자식은 부모로부터 컨텍스트를 상속 받는다
      • 부모는 모든 자식이 작업을 마칠 때까지 기다린다
      • 부모 코루틴이 취소되면 자식 코루틴도 취소된다
      • 자식 코루틴에서 예외가 발생하면 부모 코루틴 또한 에러로 소멸한다
  • coroutineScope 는 suspend function 밖에서 스코프를 만들 수 있다
    • suspend function 을 coroutineSope 로 함께 시작할 수도 있으며 이는 main 함수와 runBlocking 을 함께 사용하는 것보다 세련되게 사용할 수 있다
suspend fun main(): Unit = coroutineScope {
	launch {
		delay(1000L)
		println("World!")
	}
	println("Hello,")
}
// Hello,
// (1초 후)
// World!

 

 

CoroutineContext

  • CoroutineContext는 원소나 원소들의 집합을 나타내는 인터페이스로, Job, CoroutineName, coroutineDispatcher 와 같은 Element 객체들이 인덱싱된 집합으로 collection 과 유사하다
  • 컨텍스트에서 모든 원소는 식별을 위한 key 를 가지고 있고, 이는 주소로 비교된다
  • CoroutineContext 는 get 을 이용해 키를 가진 원소를 찾을 수 있다
  • CoroutineName 을 찾기 위해서는 CoroutineName("Sample name") 과 같이 사용해 차후 ctx["Sample name"] 와 같이 참조할 수 있다
  • CoroutineContext 는 다른 CoroutineContext 를 합쳐 하나로 만들 수 있으며, 합쳐지면 두가지 키를 모두 가지게 된다
    • 만일 같은 키를 가진 새로운 원소가 더해지면 기존 원소를 대체한다
  • CoroutineContext 는 컬렉션과 유사해서 빈 컨텍스트 또한 만들 수 있으며 원소가 없으므로 더해도 아무런 변화가 없다
  • minusKey 함수에 제거할 키를 넣어 원소를 Context 에서 제거할 수 있다
  • 각 원소를 조작해야하는 경우 fold 메서드를 사용할 수 있다
  • CoroutineContext 는 코루틴의 데이터를 저장하고 전달하는 방식으로, 자식은 부모로부터 이를 상속받는다
  • 모든 자식 빌더는 인자 안에서 정의된 특정 컨텍스트를 가질 수 있는데 이는 부모로부터 상속 받은 컨텍스트를 대체한다
  • defaultContext  < parentContext < childContext 순으로 CoroutineContext 가 우선 된다.
  • DefaultContext 는 어디서도 키가 지정되지 않았을 경우에 사용된다
  • CoroutineContext 를 커스텀하게 만들고자 할 경우, CoroutineContext.Element 인터페이스를 구현 할 수 있다
  • suspend function 에서는 부모의 context 에 접근하는 것이 가능하다
suspend fun printName() {
	println(coroutineContext[CoroutineName]?.name)
}

suspend fun main() = withContext(CoroutineName("Outer")) {
	printName() // Outer
	launch(CoroutineName("Inner")) {
		printName() // Inner
	}
	delay(10)
	printName() // Outer
}

 

 

Job 과 Child Coroutine 기다리기

  • 부모와 자식 간의 특성
    • 자식은 부모로부터 컨텍스트를 상속받는다
    • 부모는 모든 자식이 작업을 마칠 때까지 기다린다
    • 부모 코루틴이 취소되면 자식 코루틴 또한 취소된다
    • 자식 코루틴에서 에러가 발생하면 부모 코루틴 또한 에러로 소멸한다
  • Job 이란, 수명을 지니며, 취소가 가능하다
  • Job 의 lifecycle 에 대하여
    • 지연 시작되는 코루틴만 NEW 상태에서 시작하며, 이 코루틴들은 ACTIVE 가 되기 위해 작업이 실행되어야한다
    • 대부분의 코루틴은 ACTIVE 상태에서 시작하여, Job 이 실행되고, 코루틴은 이를 수행한다
      • 실행이 완료되면 상태는 COMPLETING 으로 변경되고, 자식의 실행이 완료되기를 기다린다
      • 자식의 실행이 모두 끝났을 경우 COMPLETED 로 바뀐다
      • 만일 Job 이 ACTIVE, COMPLETING 단계에서 실행 도중 취소되거나 실패한다면 CANCELLING 상태로 바뀐다
      • CANCELLING 상태에서 연결을 끊거나 자원을 반납하는 처리를 할 수 있다
  • Job 이 어느 단계에 있는지 로깅하기 위해서는 toString 으로 상태를 확인할 수 있으며, 상태에 따라 처리를 분기하기 위해서는 isActive, isCompleted, isCancelled 프로퍼티를 사용해야한다
  • Coroutine 라이브러리의 모든 코루틴 빌더는 자신만의 Job 을 생성해 반환하므로 어느 곳에서나 사용할 수 있다
  • Job 또한 CoroutineContext 이므로 coroutineContext[Job] 으로 접근할 수도 있으나, coroutineContext.job 과 같이 확장 property 로 편하게 접근할 수 있다
  • 모든 코루틴은 자신만의 Job 을 생성하며, 부모/인자 로부터 온 Job 은 새로운 Job 의 부모로 사용된다
  • 부모 Job 은 모든 자식 Job을 children 프로퍼티로 참조할 수 있기 때문에 CoroutineScope 내에서 기다리거나, 취소/예외 구현을 할 수 있다
  • 새로운 Job Context 가 부모의 Job 을 대체할 경우 구조화된 동시성이 유효하지 않아진다
// 새로운 job 이 부모로부터 상속 받은 job 을 대체하여 인과관계가 없게 만들었고, 구조화된 동시성을 잃음

fun main(): Unit = runBlocking {
	launch(Job()) { // 새로운 Job이 부모로부터 상속받은 Job을 대체
		delay(1000)
		println("Will not bew printed")
	}
}
// 아무것도 출력되지 않고, 즉시 종료됨

 

  • Job 은 코루틴이 Completed 나 Cancelled 상태로 완료될 때까지 중단함수인 join 메서드를 이용해 기다릴 수 있다
  • Job 은 팩토리 함수를 이용할 경우 코루틴 없이도 생성할 수 있으나, Job을 생성한 후 다른 코루틴의 부모로 지정한 뒤 join 을 호출할 경우 프로그램이 종료되지 않을 수 있으니 주의해야한다
fun main(): Unit = coroutineScope {
	val job = Job()
	launch(job) { // 새로운 Job이 상속받은 Job을 대체한다.
		delay(1000)
		println("Test1")
	}
	launch(job) { // 새로운 Job이 상속받은 Job을 대체한다.
		delay(2000)
		println("Test2")
	}
	job.join() // 여기서 영원히 대기하게 된다.
    // 정상적인 호출을 위해 job.children.forEach {it.join} 을 수행할 수 있다
	println("Will not be printed")
}
// (1초후)
// Test1
// (2초후)
// Test2
// (영원히 실행된다.)

 

 

취소

  • Job 인터페이스는 아래와 같은 효과를 갖는 cancel 메서드를 지닌다
    • 호출한 코루틴은 첫번째 중단점에서 Job 을 끝낸다
    • Job 이 자식을 가지고 있다면 그 또한 취소되나, 부모는 영향을 받지 않는다
    • Job 이 취소되면 취소된 Job 은 새로운 코루틴의 부모로 사용될 수 없다
  • Cancel 함수는 예외들을 인자로 넣어 취소된 원인을 명확히하고 분기할 수 있다
  • 코루틴을 취소하기 위해 사용되는 Exception 은 CancelationException 혹은 그것의 서브 타입이어야한다
  • cancel 이 호출된 후 다음 작업의 진행 전, 취소 과정를 기다리기 위해 join 을 일반적으로 호출하며 그러지 않을 경우 race condition 이 유발될 수 있다
    • kotlinx.coroutines 라이브러리는 cancel 과 join 을 함께 호출하는 방법으로 cancelAndJoin 이라는 확장함수를 제공한다
public suspend fun Job.cancelAndJoin() {
	cancel()
	return join()
}

suspend fun main(): Unit = coroutineScope {
	val job = Job()
	launch(job) {
		repeat(1_000) { i ->
			delay(200)
			println("Printing $i")
		}
	}
	
	delay(1100)
	job.cancelAndJoin()
	println("Cancelled successfully")
}
// Printing 0
// Printing 1
// Printing 2
// Printing 3
// Printing 4
// Cancelled Successfully

 

  • Job 팩토리 함수로 생성되어 Job에 딸린 다수의 코루틴을 한번에 취소할 경우, coroutineContext.cancelChildren() 을 사용할 수 있다
  • Job 을 취소할 경우, try-catch 를 이용해 exception 을 잡고 finally 블록 등에서 관련된 정리를 할 수 있다
suspend fun main(): Unit = coroutineScope {
	val job = Job()
	launch(job) {
		try {
			delay(Random.nextLong(2000))
			println("Done")
		} finally {
			print("Will always be printed")
		}
	}
	delay(1000)
	job.cancelAndJoin()
}
// Will always be printed
// 또는 (이를테면 random이 1000 미만일 경우)
// Done
// Will always be printed
  • Exception 을 catch 한 뒤 코드 중에 suspend 를 허용하지 않으므로 있을 경우 무시된다
  • 취소된 후의 처리에 suspend 함수를 반드시 호출해야하는 경우 withContext(NonCancellable) 로 랩핑하여 블럭의 컨텍스트를 취소될 수 없는 Job 으로 만들어 사용할 수 있다
  • 자원을 해제할 때 사용되는 invokeOnCompletion 메서드는 Job 이 Completed 나 Cancelled 와 같은 마지막 상태에 도달했을 때 호출될 핸들러를 지정한다
suspend fun main(): Unit = coroutineScope {
	val job = launch {
		delay(1000)
	}
	job.invokeOnCompletion { e: Throwable? ->
		println("Finished")
	
	/}
	delay(400)
	job.cancelAndJoin()
}
// Finished

 

  • 중단점이 없으면 취소를 할 수 없으며, 중단점을 만들기 위해 코루틴을 중단하고 즉시 재시작하는 yield 를 사용할 수 있다
    • 중단이 가능하지 않으면서 집약적인 연산들이 suspend 함수에 있을 경우 yield 를 넣어주는 것이 좋다
  • job 의 상태를 추적할 수도 있는데, coroutinecontext 의 job 에 접근해 active 하지 않은지 확인하고 중지하도록 하면 된다
  • job 이 active 가 아닐 때 ensureActive() 로 CancelationException 을 던질 수 있다
    • yield 함수는 최상위 suspend 함수이기에 scope 가 필요하지 않아 일반적인 suspend 함수에서도 사용할 수 있지만 ensureActive 함수의 경우 coroutineScope 에서 호출되어야하며 job 이 더이상 active 상태가 아니라면 예외를 던지는 역할을 한다
    • suspendCancellableCoroutine 함수는 suspendCoroutine 과 유사하지만 Continuation 객체를 wrapping 하여 코루틴이 취소되었을 때 행동을 정의할 수 있다
반응형