코딩하는 개굴이

[Spring] 리액티브 프로그래밍 기초 (with Kotlin) 본문

Spring

[Spring] 리액티브 프로그래밍 기초 (with Kotlin)

개굴이모자 2023. 9. 2. 22:43
반응형

해당 포스팅은 Fastcampus 의 실무 프로젝트로 배우는 Kotlin & Spring: 리팩토링부터 서비스 구현까지 강의를 기반으로 작성되었습니다.

 

 

 

 

리액티브 프로그래밍에 대해 이해하기 위해서는 먼저

비동기 프로그래밍과, Observer 패턴, Iterator 패턴에 대해 알아야한다.

순차적으로 차근히 알아보도록 하자.

 

 

 

비동기 프로그래밍 구현

Thread 사용하기

가장 기본이 되는 비동기 처리 방식으로, Runnable 인터페이스를 사용한다.

스레드가 1개일 경우, Single Thread 라고 부르고 하나보다 많을 경우 Multi Thread 라고 부른다.

멀티 스레드를 사용하면 애플리케이션에서 여러 작업들을 동시에 수행할 수 있는데, 스케줄링 알고리즘에 의해 스레드가 전환되어 작업을 처리하는 컨텍스트 스위칭 때문이다.

 

하나의 프로세스에는 최소한 하나 이상의 스레드가 존재하고, 프로세스 내의 스레드들은 동일한 메모리를 공유한다. 그러나, 스레드가 많아지면 메모리의 사용량이 높아지기 때문에 OOME (OutOfMemoryError) 가 발생할 수 있고, 동시 처리가 많을수록 대기 시간 때문에 응답 지연이 발생할 수 있다.

 

따라서, 이를 해결하기 위해 스레드 풀을 사용해야한다. 스레드풀을 사용해 애플리케이션에서 사용할 총 스레드 수를 제한하고 기존에 생성된 스레드를 재사용하여 빠른 응답이 가능하도록 한다. 직접 만들 수도 있지만 검증된 라이브러리를 사용하는 것을 권장하며, 예를들어 java.util.concurrent.ExecutorService를 사용할 수 있다.

  • Main Thread: JVM 언어에서 최초의 프로세스가 실행 될 때 가장 기본이 되는 스레드를 말한다.
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

fun main() {
for(i in 0..5) {
        Thread {
            println("current thread is: ${Thread.currentThread().name}")
        }.start()
    }
    println("current thread is: ${Thread.currentThread().name}")

    /**
     * <RESULT>
     * current thread is: Thread-0
     * current thread is: Thread-1
     * current thread is: Thread-2
     * current thread is: Thread-3
     * current thread is: Thread-4
     * current thread is: main
     * current thread is: Thread-5
     * */

    val pool: ExecutorService = Executors.newFixedThreadPool(5) //thread 를 유지하는 개수를 5개로 설정함
    try {
        for (i in 0..5) {
            pool.execute {
                println("current-thread-name : ${Thread.currentThread().name}")
            }
        }
    } finally {
        pool.shutdown()
    }
    println("current-thread-name : ${Thread.currentThread().name}")

    /**
     * <RESULT> - 반납된 스레드를 다시 사용
     * current-thread-name : pool-1-thread-1
     * current-thread-name : pool-1-thread-3
     * current-thread-name : pool-1-thread-2
     * current-thread-name : pool-1-thread-4
     * current-thread-name : pool-1-thread-5
     * current-thread-name : pool-1-thread-4
     * current-thread-name : main
     * */
}

 

Future

비동기 작업에 대한 결과를 얻고 싶을 경우 사용하게 된다. 작업에 대한 결과를 기다리면서 다른 작업을 병행하고 싶은 경우 사용하며, 결과를 얻기 위해 Callable 인터페이스를 사용한다.

import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

fun sum(a: Int, b: Int) = a + b
fun main() {
    val pool = Executors.newSingleThreadExecutor()
    val future = pool.submit(Callable {
        sum(100, 200)
    })
    println("계산 시작")
    val futureResult = future.get(1000L, TimeUnit.MICROSECONDS) // 비동기 작업의 결과를 기다린다. 만일 timeout 을 지정하지 않는다면 스레드가 무한정 블록킹될 수 있다.
    println(futureResult)
    println("계산 종료")
}

위 예시처럼, 무한정 블록킹이 된다거나, 동시에 실행되는 한 개 이상의 비동기 작업에 대한 결과를 하나로 조합하여 처리한다거나, 수동으로 complete 할 수 있는 방법을 지원하지 않는다.

 

Completable Future

fun completableFuture() {
    val completableFuture = CompletableFuture.supplyAsync {
        Thread.sleep(2000)
        sum(100, 200)
    }
    println("계산 시작")
    completableFuture.thenApplyAsync(::println) // 논블로킹으로 동작
    while (!completableFuture.isDone) {
        Thread.sleep(500)
        println("계산 결과를 집계 중입니다.")
    }
    println("계산 종료")
}

supplyAsync 로 비동기 함수를 생성하고, thenApplyAsync 를 이용해 논블로킹으로 뒤에 로직들을 별개로 진행한다. 별개의 진행 과정에서 completableFuture.isDone 을 통해 비동기 작업의 완료 여부를 체크한다.

CompletableFuture 를 쓰더라도 동일하게 get을 쓰면 Future 와 같이 블로킹 코드를 만들 수 있다.

 


 

Observer 패턴

옵저버 패턴이란, GoF 가 소개한 디자인 패턴 중 하나로, 관찰 대상이 되는 객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer) 에게 변경사항을 통지(notify)하는 디자인 패턴을 말한다.

관찰 대상인 Subject와 이를 관찰하는 Observer 로 이루어져있으며, 하나의 서브젝트에는 1개 이상의 옵저버를 등록할 수 있다. 서브젝트의 상태가 변경되면 자신을 관찰하는 옵저버들에게 이를 통지하게되고, 서브젝트의 변경사항을 통지받은 옵저버는 부가적인 처리를 수행한다. 옵저버 패턴은 옵저버를 상속하는 구체화(Concrete)클래스가 존재한다.

 

Subject 의 함수

  • add: 서브젝트의 상태를 관찰할 옵저버를 등록한다.
  • remove: 등록된 옵저버를 삭제한다.
  • notify: 변경된 상태를 옵저버에 통지한다.

 

Observer 의 함수

  • update: 서브젝트의 notify 내부에서 호출되며, Observer 는 변경사항을 받아 그에 따른 부가 기능을 처리한다.
import java.util.*
class Coffee(val name: String)
// Subject
class Barista : Observable() {
    private lateinit var coffeeName: String
    fun orderCoffee(name: String) {
        this.coffeeName = name
    }
    fun makeCoffee() {
        setChanged()
        notifyObservers(Coffee(this.coffeeName))
    }
}
// Observer
class Customer(val name: String) : Observer {
    override fun update(o: Observable?, arg: Any?) {
        val coffee = arg as Coffee
        println("${name}이 ${coffee.name}을 받았습니다")
    }
}
fun main() {
    val barista = Barista()
    barista.orderCoffee("아이스 아메리카노")
    val customer = Customer("고객1")
    val customer2 = Customer("고객2")
    val customer3 = Customer("고객3")
    barista.addObserver(customer)
    barista.addObserver(customer2)
    barista.addObserver(customer3)
    barista.addObserver(customer)
    barista.makeCoffee()
}
// 고객1이 아이스 아메리카노을 받았습니다

위 예시에서 Customer 클래스는 옵저버 인터페이스를 구현하여 Barista 클래스가 커피를 완성하면 notify 를 받아 update 함수에서 커피를 받는 로그를 출력한다.

Barista 클래스는 Observable 클래스를 상속해 makeCoffee 메서드를 실행하여 주문한 커피가 만들어지면 nofiyObserver 로 만들어진 Coffee 객체를 전달하며 그 전에 setChanged 를 호출하여 coffeeName 값의 변경 여부를 내부에 저장한다.

 

옵저버 패턴의 장점

옵저버 패턴을 사용하지 않으면 값의 변경에 따라 동작해야하는 로직에서는 일정 간격으로 값의 변경을 확인하는 처리가 있어야한다.

간격을 짧게 두고 확인한다면 빠르게 확인할 수는 있지만 불필요한 호출이 지속적으로 발생할 수 있으며, 간격이 너무 길 경우에는 즉시 변경 사항을 확인할 수 없다. 따라서, 옵저버 패턴은 관찰자인 옵저버가 서브젝트의 변화를 신경쓰지 않고 상태 변경의 주체인 서브젝트가 변경 사항을 옵저버에게 알려줌으로써 단점을 해결하는 푸시( Push-Based ) 방식을 사용한다.

 


 

Iterator 패턴

데이터의 집합에서 데이터를 순차적으로 꺼내기 위해 만들어진 디자인 패턴을 말하며, 컬렉션이 변경되더라도 동일한 인터페이스를 사용해 꺼내올 수 있기 때문에, 변경 사항 없이 사용할 수 있다. 데이터의 집합이 얼만큼의 크기인지 알 수 없는 경우 해당 패턴을 사용하면 순차적으로 꺼내올 수 있다.

data class Car(val brand: String)
class CarIterable(val cars: List<Car> = listOf()) : Iterable<Car> { //jvm 에서는 Iterable 로 Aggregate 를 구현할 수 있다.
    override fun iterator() = CarIterator(cars)
}
class CarIterator(val cars: List<Car> = listOf(), var index: Int = 0)
    : Iterator<Car> {
    override fun hasNext() = cars.size > index
    override fun next() = cars[index++]
}
fun main() {
    val carIterable = CarIterable(listOf(Car("람보르기니"), Car("페라리")))
    val carIterator = carIterable.iterator()
    while (carIterator.hasNext()) {
        println("브랜드 : ${carIterator.next()}")
    }
}

Iterable 인터페이스를 구현한 CarIterable 클래스는 CarsIterator 를 생성하는 iterator 함수를 오버라이드한다. CarIterator 클래스는 Iterator 인터페이스를 구현하여 데이터가 존재하는지 여부를 확인하는 hasNext와 데이터가 존재하면 데이터를 가져오는 next 함수를 오버라이드한다.

main 에서는 carIterator 가 다음 값이 있을 경우 로그를 출력하도록 하며 다음 값으로 넘기는 로직을 가진다.

 

Iterator 와 Observer 방식의 차이점

데이터를 제공한다는 면에서 두 패턴은 유사하다. 그러나 Iterator 패턴은 Aggregate 이 내부에 데이터를 저장하고, Iterator 를 사용해 데이터를 순차적으로 당겨오는 방식이기 때문에, 풀 기반 (Pull-Based) 방식이나, Observer 패턴은 데이터 제공자가 소비하는 측에 통지하는 푸시 기반이라는 차이점이 존재한다.

 


 

 

Reactive Programming

리액티브 프로그래밍은 데이터나 이벤트의 변경이 발생할 경우 이에 반응하여 처리하는 프로그래밍 기법으로, 비동기 프로그래밍을 처리하는 새로운 접근 방식이다.

 

리액티브 프로그래밍이 나오기 전 비동기 프로그래밍은 콜백 기반의 비동기 처리 방식을 사용하였기 때문에, 아래와 같이 코드의 복잡도가 높아지는 콜백 헬 현상을 해결하기 쉽지 않았다.

// Before with Callback only
fetch("/api/users/me") { user->
    fetch("/api/users/${user.id}/followers") { followers ->
        fetch("/api/users/${user.id}/likes") { likes ->
            fetch("/api/users/${user.id}/contacts") { contacts ->
        // 콜백 헬
            }
        }
    }
}

//======================================================================

// After with Reactive Programming
fetchReactive("/api/users/me")
    .zip { user -> fetchReactive("/api/users/${user.id}/followers") }
    .zip { user -> fetchReactive("/api/users/${user.id}/likes") }
    .zip { user -> fetchReactive("/api/users/${user.id}/contacts") }
    .flatMap { followers, likes, contacts ->
    // 로직 구현
}

그러나 위의 차이점이 보이는가? 함수형 프로그래밍 관점에서 리액티브 프로그래밍은 단점을 해결하고 쉽게 작성할 수 있도록 새로운 방식을 제시하였다.

 

리액티브 스트림의 구성

리액티브 스트림은 리액티브 프로그램의 표준 API 사양을 말한다.

  • Publisher: 데이터를 생성 후, Subscriber에게 통지한다. Publisher 가 제공할 수 있는 데이터의 양은 무한(Unbounded) 하고, 순차적(Sequential)인 처리가 보장된다.
  • Subscriber: 데이터를 구독하고 통지 받은 데이터를 처리한다.
    • onSubscribe: 구독 시, 최초에 한 번 호출한다.
    • onNext: 구독자가 요구하는 데이터의 수 만큼 호출한다.
    • onError: 에러 또는 더 이상 처리할 수 없는 경우 호출한다.
    • onComplete: 모든 처리가 정상적으로 완료된 경우 호출한다.
  • Subscription: Publisher/Subscriber 간의 데이터 교환이 가능하도록 연결하며, 전달 받을 데이터의 개수를 설정하고 요청하거나 구독을 해지 하는 등 데이터를 조정하는 역할을 수행한다.
  • Processor: Publisher, Subscriber 를 모두 상속받은 인터페이스로, 데이터를 가공하는 중간 단계에서 사용된다.

 

데이터의 처리 흐름

각 메서드의 호출은 Signal 이라고 부르며 각 시그널은 호출되는 순서가 다르다.

 

  • Subscriber 가 Publisher.subscribe 로 구독을 요청한다.
  • Publisher 는 Subscriber.onSubscribe 로 Subscriber 가 초기화 로직 등을 구현할 수 있도록 구독 시 최초의 한번만 호출한다.
  • request 로 받을 데이터의 개수를 요청한다.
  • onNext 로 받을 데이터가 있을 경우 호출하여 처리한다.
  • Publisher 에서는 처리 중 에러가 발생하면 Subscriber.onError 를 이용해 통지하고, 해당 시그널 이후 더이상 통지하지 않는다. 구독자는 이를 받아 별도의 처리를 수행한다.
  • onComplete 는 모든 데이터가 통지 된 시점에 발생자에서 호출하며, 이후에는 어떠한 시그널도 발생하지 않아야한다.
반응형
Comments