UniRx 기초 - soo:bak
작성일 :
UniRx(Unity Reactive Extensions)
는 Unity
개발자들이 반응형 프로그래밍
을 적용할 수 있도록 해주는 라이브러리이다.
이는 반응형 확장(Rx, Reactive Extention)
의 개념을 Unity
에 적용한 것으로, 일본인 @neuecc
가 Unity
용으로 최적화하여 공개하였다.
GitHub - neuecc/UniRx (링크)
이를 통해, 기존의 Unity
이벤트 처리 방식보다 더 선언적이고, 간결하며, 관리하기 쉬운 코드 작성을 가능하게 한다.
> 관련 글 - 반응형 프로그래밍 패러다임(Reactive Programming Paradigm) - soo:bak
UniRx 의 핵심 개념 - Observable
Observable
은 UniRx
의 기반을 이루는 클래스로, 데이터나 이벤트의 시퀀스를 표현하며,
“시간이 지나면서 변화하는 어떠한 것”, 즉, 데이터 스트림이나 이벤트 흐름을 추상화하고 관리한다.
시퀀스 생성
Observable.Create
가장 기본적인 시퀀스 생성 방법으로, Observable
을 만드는 함수이다.
데이터 발행(Publish)
로직을 직접 정의할 수 있다.
1
2
3
4
5
6
var myObservable = Observable.Create<int>(observer => {
// 이 곳에 데이터 발행(Publish) 로직을 작성
observer.OnNext(1); // 값을 발행(Emmitting)
observer.OnCompleted(); // 시퀀스 완료
return Disposable.Empty; // 리소스 해제
});
Observable.Interval
일정한 시간 간격마다 값을 발행(Emmitting)
하는 Observable
을 생성한다.
1
2
3
var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => Debug.Log(x));
// 매 초마다 숫자를 증가시키면서 출력
Observable.FromCoroutine
코루틴에서 반환된 값을 Observable
시퀀스로 변환한다.
1
2
3
4
5
6
7
8
IEnumerator MyCoroutine() {
yield return new WaitForSeconds(1);
yield return 2;
}
var coroutineObservable = Observable.FromCoroutine<int>(observer => MyCoroutine())
.Subscribe(x => Debug.Log(x));
// 코루틴이 반환하는 값을 구독
구독과 스트림
구독(Subscribe)
은 Observable
에서 발행(Publish)
하는 데이터를 받아 처리하는 과정이다.
1
2
3
4
5
6
7
var observable = Observable.Range(1, 10); // 1부터 10까지의 값을 발행(Emmitting)하고, 이 데이터 스트림을 발행(Publish)하는 Observable
var subscription = observable.Subscribe(
x => Debug.Log($"OnNext: {x}"); // 각 값에 대하여 처리
ex => Debug.LogError($"OnError: {ex}"); // 에러 발생시 처리
() => Debug.Log("OnCompleted") // 스트림 완료시 처리
);
// 구독을 통해 발행된 데이터 스트림을 처리
여기서, Subscribe
메소드는 세 가지 파라미터를 받는다.
OnNext
값이 발행(Emmitting)될 때마다 호출된다.
OnError
에러가 발생(Occur)
했을 때 호출된다.
OnCompleted
데이터 시퀀스가 완료되었을 때 호출된다.
종료와 에러 처리
Observable
시퀀스는 정상적으로 종료될 수도 있지만, 에러로 인하여 종료될 수 있다.
따라서, 구독자는 이를 적절히 처리할 수 있어야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var myObservable = Observable.Create<int>(observer => {
observer.OnNext(1);
observer.OnError(new Exception("오류 발생!"));
observer.OnNext(2); // 오류 발생 후에는 무시됨
observer.OnCompleted(); // 오류 발생 후에는 무시됨
return Disposable.Empty;
});
myObservable.Subscribe(
x => Debug.Log($"OnNext: {x}"),
ex => Debug.Log($"OnError: {ex.Message}"),
() => Debug.Log("OnCompleted")
);
// "1" 출력 후 "오류 발생!" 출력, 이후 이벤트는 무시
UniRx 의 핵심 개념 - Operators
Operator
는 Observable
스트림에 적용되는 함수들로, 스트림을 변형하거나 조작하는 역할을 한다.
이를 통해 복잡한 데이터 처리 작업을 쉽게 만든다.
필터링 : Where
Where
연산자는 특정 조건을 만족하는 데이터만 통과시키는 필터 역할을 한다.
이를 통해, 불필요한 데이터를 제거하고, 관심있는 데이터만 선택할 수 있다.
1
2
3
4
Observable.Range(1, 10)
.Where(x => x % 2 == 0) // 짝수만 필터링
.Subscribe(x => Debug.Log(x));
// 출력 : 2, 4, 6, 8, 10
변환 : Select
Select
연산자는 스트림의 각 요소를 변환하는 데 사용된다.
이를 통해, 데이터를 원하는 형태로 매핑(Mapping)
할 수 있다.
1
2
3
4
Observable.Range(1, 5)
.Select(x => x * x) // 각 요소를 제곱
.Subscribe(x => Debug.Log(x));
// 출력 : 1, 4, 9, 16, 25
결합 : Merge
와 Concat
Merge
연산자는 여러 스트림을 병렬로 결합한다.
즉, 각 스트림에서 발행(Emmitting)되는 값은 시간 순서대로 합쳐진다.
1
2
3
4
5
6
var first = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
var second = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(3);
first.Merge(second)
.Subscribe(x => Debug.Log(x));
// 출력 : 0, 0, 1, 1, 2, 2 (두 스트림의 값이 병렬로 병합됨)
Concat
연산자는 여러 스트림을 순차적으로 결합한다.
즉, 하나의 스트림이 완료된 후 다음 스트림의 값이 발행(Emmitting)된다.
1
2
3
4
5
6
7
var first = Observable.Range(1, 3);
var second = Observable.Range(4, 3);
first.Concat(second)
.Subscribe(x => Debug.Log(x));
// 출력 : 1, 2, 3, 4, 5, 6 (첫 번쨰 스트림 후에 두 번째 스트림의 값이 이어서 출력됨)
에러 처리 : Catch
와 Retry
Catch
연산자는 Observable
에서 발생한 에러를 처리하고, 다른 Observable
로 대체할 수 있도록 한다.
1
2
3
4
5
6
7
8
Observable.Throw<int>(new Exception("에러 발생"))
.Catch(Observable.Return(0)) // 에러 발생 시 0 을 반환하는 Observable 로 대체
.Subscribe(
x => Debug.Log($"OnNext: {x}"),
ex => Debug.Log($"OnError: {ex}"),
() => Debug.Log("OnCompleted")
);
// 출력 : OnNext: 0, OnCompleted
Retry
연산자는 에러 발생 시 Observable
을 다시 구독하여 처음부터 다시 실행한다.
즉, Observable
내부의 로직이 처음부터 다시 시작되어, 이전에 실패했던 작업을 새롭게 시도한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var returyCount = 0;
Observable.Create<int>(observer => {
if (++retryCount < 3)
observer.OnError(new Exception("에러 발생"));
else {
observer.OnNext(42);
observer.OnCompleted();
}
return Disposable.Empty;
})
.Retry(3) // 최대 3회까지 재시도
.Subscribe(
x => Debug.Log($"OnNext: {x}"),
ex => Debug.Log($"OnError: {ex}"),
() => Debug.Log("OnCompleted")
);
// 처음 두 번은 에러, 세 번째 시도에서 42 출력 후 완료
UniRx 의 핵심 개념 - Schedulers
Schedulers
는 UniRx
에서 매우 중요한 역할을 하는데, 이는 Schedulers
들이 Observable
의 작업이 실행될 스레드 또는 컨텍스트를 결정하기 때문이다.
이를 통해 개발자는 작업의 실행 컨텍스트를 세밀하게 제어할 수 있으며, 이는 특히 멀티스레딩 환경에서 중요성을 가진다.
MainThreadScheduler
MainThreadScheduler
는 Observable
의 작업을 Unity
의 메인 스레드에서 실행하도록 한다.
Unity
의 많은 API
들은 메인 스레드에서만 안전하게 호출될 수 있기 때문에, 이 스케줄러는 UI 업데이트, 게임 오브젝트와의 상호작용 등 Unity
의 주요 기능과 관련된 작업에 사용된다.
1
2
3
4
5
6
7
8
9
Observable.Start(() => {
// 백그라운드 스레드에서 실행되는 코드
return "결과";
})
.ObserveOnMainThread() // 메인 스레드로 전환
.Subscribe(x => {
// 메인 스레드에서 실행되는 코드
Debug.Log("이 코드는 메인 스레드에서 실행됩니다.: " + x);
});
위 예시 코드에서 Observable.Start
는 백그라운드 스레드에서 작업을 시작하지만, ObserveOnMainThread
를 통해 결과를 메인 스레드로 전환하여 Unity
의 메인 스레드 관련 작업을 안전하게 처리한다.
ThreadPoolScheduler
ThreadPoolScheduler
는 작업을 백그라운드 스레드에서 실행한다.
이는 네트워크 요청, 파일 I/O, 복잡한 계산 등과 같이 시간이 오래 걸리는 작업에 적합하다.
이를 사용하면, 메인 스레드의 블로킹을 방지하고, 애플리케이션의 반응성을 향상시킬 수 있다.
1
2
3
4
5
6
7
8
9
Observable.Start(() => {
// 시간이 많이 걸리는 작업
Thread.Sleep(1000); // 대기를 표현하기 위한 예시
return "작업 완료";
}, Scheduler.ThreadPool) // ThreadPoolScheduler 사용
.Subscribe(x => {
// 메인 스레드에서 실행될 결과 처리
Debug.Log("결과 처리: " + x);
});
위 예시코드는 Schduler.ThreadPool
을 사용하여 백그라운드 스레드에서 시간이 많이 걸리는 작업을 처리하고, 작업이 완료되면 결과를 메인 스레드에서 구독하여 처리한다.
UniRx 의 핵심 개념 - Subject
Subject
는 UniRx
에서 특별한 역할을 하는 클래스로, Observable
과 Observer
의 역할을 동시에 수행한다.
이벤트 발생
Subject
는 외부에서 이벤트를 수동으로 발생시킬 수 있다.
이는 Subject
가 직접적으로 값을 발행할 수 있음을 의미하며, 이를 통해 다양한 이벤트 소스를 통합하고 관리할 수 있다.
다중 구독자 관리
하나의 Subject
에 여러 구독자가 구독할 수 있다.
이 때, Subject
에서 발생하는 이벤트는 모든 구독자에게 전달된다.
이는 하나의 이벤트 소스에서 여러 구독자가 관련 데이터를 받아야할 경우 유용하다.
Subject
의 종류
PublishSubject
가장 기본적인 형태의Subject
로, 구독 이후에 발생하는 이벤트만 구독자에게 전달한다.
1
2
3
4
5
6
7
8
var subject = new PublishSubject<int>();
subject.Subscribe(x => Debug.Log($"첫 번쨰 구독: {x}"));
subject.OnNext(1);
subject.OnNext(2);
subject.Subscribe(x => Debug.Log($"두 번쨰 구독: {x}"));
subject.OnNext(3);
//출력 : 첫 번째 구독 : 1, 첫 번째 구독: 2, 첫 번째 구독: 3, 두 번째 구독: 3
ReplaySubject
구독자가 구독을 시작한 시점에 상관없이, 모든 데이터 이벤트를 저장하고 새로운 구독자에게 전달한다.
1
2
3
4
5
6
7
var replaySubject = new ReplaySubject<int>();
replaySubject.OnNext(1);
replaySubject.OnNext(2);
replaySubject.Subscribe(x => Debug.Log($"구독: {x}"));
replaySubject.OnNext(3);
// 출력 : 구독: 1, 구독: 2, 구독: 3
BehaviorSubject
생성 시 초기값을 가지며, 구독자에게 가장 최근의 값 혹은 초기값을 전달한다.
1
2
3
4
5
var behaviorSubject = new BehaviorSubject<int>(0);
behaviorSubject.Subscribe(x => Debug.Log($"구독: {x}"));
behaviorSubject.OnNext(1);
behaviorSubject.OnNext(2);
// 출력 : 구독: 0, 구독: 1, 구독: 2
AsyncSubject
Observable
이 완료될 때 까지, 발행(Emmitting)되는 값을 구독자에게 전달(Delivering)하지 않고, 완료가 된 후 마지막 값을 구독자에게 전달한다.
1
2
3
4
5
6
var asyncSubject = new AsyncSubject<int>();
asyncSubject.Subscribe(x => Debug.Log($"구독: {x}"));
asyncSubject.OnNext(1);
asyncSubject.OnNext(2);
asyncSubject.OnCompleted();
// 출력 : 구독: 2