작성일 :


UniRx(Unity Reactive Extensions)Unity 개발자들이 반응형 프로그래밍 을 적용할 수 있도록 해주는 라이브러리이다.

이는 반응형 확장(Rx, Reactive Extention)의 개념을 Unity에 적용한 것으로, 일본인 @neueccUnity 용으로 최적화하여 공개하였다.

GitHub - neuecc/UniRx (링크)

이를 통해, 기존의 Unity 이벤트 처리 방식보다 더 선언적이고, 간결하며, 관리하기 쉬운 코드 작성을 가능하게 한다.


> 관련 글 - 반응형 프로그래밍 패러다임(Reactive Programming Paradigm) - soo:bak

UniRx 의 핵심 개념 - Observable

ObservableUniRx 의 기반을 이루는 클래스로, 데이터나 이벤트의 시퀀스를 표현하며,

“시간이 지나면서 변화하는 어떠한 것”, 즉, 데이터 스트림이나 이벤트 흐름을 추상화하고 관리한다.



시퀀스 생성

  • Observable.Create

가장 기본적인 시퀀스 생성 방법으로, Observable 을 만드는 함수이다.

데이터 발행(Publish) 로직을 직접 정의할 수 있다.

1
2
3
4
5
6
  var myObservable = Observable.Create<int>(observer => {
    // 이 곳에 데이터 발행(Publish) 로직을 작성
    observer.OnNext(1); // 값을 발행(Emmitting)
    observer.OnCompleted(); // 시퀀스 완료
    return Disposable.Empty; // 리소스 해제
  });


  • Observable.Interval

일정한 시간 간격마다 값을 발행(Emmitting) 하는 Observable 을 생성한다.

1
2
3
  var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(1))
                                     .Subscribe(x => Debug.Log(x));
  // 매 초마다 숫자를 증가시키면서 출력


  • Observable.FromCoroutine

코루틴에서 반환된 값을 Observable 시퀀스로 변환한다.

1
2
3
4
5
6
7
8
  IEnumerator MyCoroutine() {
    yield return new WaitForSeconds(1);
    yield return 2;
  }

  var coroutineObservable = Observable.FromCoroutine<int>(observer => MyCoroutine())
                                      .Subscribe(x => Debug.Log(x));
  // 코루틴이 반환하는 값을 구독




구독과 스트림

구독(Subscribe)Observable 에서 발행(Publish) 하는 데이터를 받아 처리하는 과정이다.

1
2
3
4
5
6
7
  var observable = Observable.Range(1, 10); // 1부터 10까지의 값을 발행(Emmitting)하고, 이 데이터 스트림을 발행(Publish)하는 Observable
  var subscription = observable.Subscribe(
    x => Debug.Log($"OnNext: {x}"); // 각 값에 대하여 처리
    ex => Debug.LogError($"OnError: {ex}"); // 에러 발생시 처리
    () => Debug.Log("OnCompleted") // 스트림 완료시 처리
  );
  // 구독을 통해 발행된 데이터 스트림을 처리

여기서, Subscribe 메소드는 세 가지 파라미터를 받는다.

  • OnNext

값이 발행(Emmitting)될 때마다 호출된다.

  • OnError

에러가 발생(Occur) 했을 때 호출된다.

  • OnCompleted

데이터 시퀀스가 완료되었을 때 호출된다.




종료와 에러 처리

Observable 시퀀스는 정상적으로 종료될 수도 있지만, 에러로 인하여 종료될 수 있다.

따라서, 구독자는 이를 적절히 처리할 수 있어야 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  var myObservable = Observable.Create<int>(observer => {
    observer.OnNext(1);
    observer.OnError(new Exception("오류 발생!"));
    observer.OnNext(2); // 오류 발생 후에는 무시됨
    observer.OnCompleted(); // 오류 발생 후에는 무시됨
    return Disposable.Empty;
  });

  myObservable.Subscribe(
    x => Debug.Log($"OnNext: {x}"),
    ex => Debug.Log($"OnError: {ex.Message}"),
    () => Debug.Log("OnCompleted")
  );
  // "1" 출력 후 "오류 발생!" 출력, 이후 이벤트는 무시





UniRx 의 핵심 개념 - Operators

OperatorObservable 스트림에 적용되는 함수들로, 스트림을 변형하거나 조작하는 역할을 한다.

이를 통해 복잡한 데이터 처리 작업을 쉽게 만든다.



필터링 : Where

Where 연산자는 특정 조건을 만족하는 데이터만 통과시키는 필터 역할을 한다.

이를 통해, 불필요한 데이터를 제거하고, 관심있는 데이터만 선택할 수 있다.

1
2
3
4
  Observable.Range(1, 10)
    .Where(x => x % 2 == 0) // 짝수만 필터링
    .Subscribe(x => Debug.Log(x));
  // 출력 : 2, 4, 6, 8, 10




변환 : Select

Select 연산자는 스트림의 각 요소를 변환하는 데 사용된다.

이를 통해, 데이터를 원하는 형태로 매핑(Mapping) 할 수 있다.

1
2
3
4
  Observable.Range(1, 5)
    .Select(x => x * x) // 각 요소를 제곱
    .Subscribe(x => Debug.Log(x));
  // 출력 : 1, 4, 9, 16, 25




결합 : MergeConcat

Merge 연산자는 여러 스트림을 병렬로 결합한다.

즉, 각 스트림에서 발행(Emmitting)되는 값은 시간 순서대로 합쳐진다.

1
2
3
4
5
6
  var first = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
  var second = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(3);

  first.Merge(second)
    .Subscribe(x => Debug.Log(x));
  // 출력 : 0, 0, 1, 1, 2, 2 (두 스트림의 값이 병렬로 병합됨)



Concat 연산자는 여러 스트림을 순차적으로 결합한다.

즉, 하나의 스트림이 완료된 후 다음 스트림의 값이 발행(Emmitting)된다.

1
2
3
4
5
6
7
  var first = Observable.Range(1, 3);
  var second = Observable.Range(4, 3);

  first.Concat(second)
    .Subscribe(x => Debug.Log(x));
  // 출력 : 1, 2, 3, 4, 5, 6 (첫 번쨰 스트림 후에 두 번째 스트림의 값이 이어서 출력됨)




에러 처리 : CatchRetry

Catch 연산자는 Observable 에서 발생한 에러를 처리하고, 다른 Observable 로 대체할 수 있도록 한다.

1
2
3
4
5
6
7
8
  Observable.Throw<int>(new Exception("에러 발생"))
    .Catch(Observable.Return(0)) // 에러 발생 시 0 을 반환하는 Observable 로 대체
    .Subscribe(
      x => Debug.Log($"OnNext: {x}"),
      ex => Debug.Log($"OnError: {ex}"),
      () => Debug.Log("OnCompleted")
    );
  // 출력 : OnNext: 0, OnCompleted



Retry 연산자는 에러 발생 시 Observable 을 다시 구독하여 처음부터 다시 실행한다.

즉, Observable 내부의 로직이 처음부터 다시 시작되어, 이전에 실패했던 작업을 새롭게 시도한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  var returyCount = 0;
  Observable.Create<int>(observer => {
    if (++retryCount < 3)
      observer.OnError(new Exception("에러 발생"));
    else {
      observer.OnNext(42);
      observer.OnCompleted();
    }
    return Disposable.Empty;
  })
  .Retry(3) // 최대 3회까지 재시도
  .Subscribe(
    x => Debug.Log($"OnNext: {x}"),
    ex => Debug.Log($"OnError: {ex}"),
    () => Debug.Log("OnCompleted")
  );
  // 처음 두 번은 에러, 세 번째 시도에서 42 출력 후 완료





UniRx 의 핵심 개념 - Schedulers

SchedulersUniRx 에서 매우 중요한 역할을 하는데, 이는 Schedulers 들이 Observable 의 작업이 실행될 스레드 또는 컨텍스트를 결정하기 때문이다.

이를 통해 개발자는 작업의 실행 컨텍스트를 세밀하게 제어할 수 있으며, 이는 특히 멀티스레딩 환경에서 중요성을 가진다.



MainThreadScheduler

MainThreadSchedulerObservable 의 작업을 Unity 의 메인 스레드에서 실행하도록 한다.

Unity 의 많은 API 들은 메인 스레드에서만 안전하게 호출될 수 있기 때문에, 이 스케줄러는 UI 업데이트, 게임 오브젝트와의 상호작용 등 Unity 의 주요 기능과 관련된 작업에 사용된다.

1
2
3
4
5
6
7
8
9
  Observable.Start(() => {
    // 백그라운드 스레드에서 실행되는 코드
    return "결과";
  })
  .ObserveOnMainThread() // 메인 스레드로 전환
  .Subscribe(x => {
    // 메인 스레드에서 실행되는 코드
    Debug.Log("이 코드는 메인 스레드에서 실행됩니다.: " + x);
  });


위 예시 코드에서 Observable.Start 는 백그라운드 스레드에서 작업을 시작하지만, ObserveOnMainThread 를 통해 결과를 메인 스레드로 전환하여 Unity 의 메인 스레드 관련 작업을 안전하게 처리한다.



ThreadPoolScheduler

ThreadPoolScheduler 는 작업을 백그라운드 스레드에서 실행한다.

이는 네트워크 요청, 파일 I/O, 복잡한 계산 등과 같이 시간이 오래 걸리는 작업에 적합하다.

이를 사용하면, 메인 스레드의 블로킹을 방지하고, 애플리케이션의 반응성을 향상시킬 수 있다.

1
2
3
4
5
6
7
8
9
  Observable.Start(() => {
    // 시간이 많이 걸리는 작업
    Thread.Sleep(1000); // 대기를 표현하기 위한 예시
    return "작업 완료";
  }, Scheduler.ThreadPool) // ThreadPoolScheduler 사용
  .Subscribe(x => {
    // 메인 스레드에서 실행될 결과 처리
    Debug.Log("결과 처리: " + x);
  });


위 예시코드는 Schduler.ThreadPool 을 사용하여 백그라운드 스레드에서 시간이 많이 걸리는 작업을 처리하고, 작업이 완료되면 결과를 메인 스레드에서 구독하여 처리한다.




UniRx 의 핵심 개념 - Subject

SubjectUniRx 에서 특별한 역할을 하는 클래스로, ObservableObserver 의 역할을 동시에 수행한다.



이벤트 발생

Subject 는 외부에서 이벤트를 수동으로 발생시킬 수 있다.

이는 Subject 가 직접적으로 값을 발행할 수 있음을 의미하며, 이를 통해 다양한 이벤트 소스를 통합하고 관리할 수 있다.



다중 구독자 관리

하나의 Subject 에 여러 구독자가 구독할 수 있다.

이 때, Subject 에서 발생하는 이벤트는 모든 구독자에게 전달된다.

이는 하나의 이벤트 소스에서 여러 구독자가 관련 데이터를 받아야할 경우 유용하다.



Subject 의 종류

  • PublishSubject 가장 기본적인 형태의 Subject 로, 구독 이후에 발생하는 이벤트만 구독자에게 전달한다.

1
2
3
4
5
6
7
8
var subject = new PublishSubject<int>();
subject.Subscribe(x => Debug.Log($"첫 번쨰 구독: {x}"));
subject.OnNext(1);
subject.OnNext(2);

subject.Subscribe(x => Debug.Log($"두 번쨰 구독: {x}"));
subject.OnNext(3);
//출력 : 첫 번째 구독 : 1, 첫 번째 구독: 2, 첫 번째 구독: 3, 두 번째 구독: 3



  • ReplaySubject 구독자가 구독을 시작한 시점에 상관없이, 모든 데이터 이벤트를 저장하고 새로운 구독자에게 전달한다.

1
2
3
4
5
6
7
var replaySubject = new ReplaySubject<int>();
replaySubject.OnNext(1);
replaySubject.OnNext(2);

replaySubject.Subscribe(x => Debug.Log($"구독: {x}"));
replaySubject.OnNext(3);
// 출력 : 구독: 1, 구독: 2, 구독: 3



  • BehaviorSubject 생성 시 초기값을 가지며, 구독자에게 가장 최근의 값 혹은 초기값을 전달한다.

1
2
3
4
5
var behaviorSubject = new BehaviorSubject<int>(0);
behaviorSubject.Subscribe(x => Debug.Log($"구독: {x}"));
behaviorSubject.OnNext(1);
behaviorSubject.OnNext(2);
// 출력 : 구독: 0, 구독: 1, 구독: 2



  • AsyncSubject Observable 이 완료될 때 까지, 발행(Emmitting)되는 값을 구독자에게 전달(Delivering)하지 않고, 완료가 된 후 마지막 값을 구독자에게 전달한다.

1
2
3
4
5
6
var asyncSubject = new AsyncSubject<int>();
asyncSubject.Subscribe(x => Debug.Log($"구독: {x}"));
asyncSubject.OnNext(1);
asyncSubject.OnNext(2);
asyncSubject.OnCompleted();
// 출력 : 구독: 2