이 글은 RxJS 공식사이트의 내용 중 subject 영역을 번역한 글입니다.

RxJS Subject는 값을 많은 관찰자(Observer)에게 multicast 할 수 있게하는 특별한 유형의 Observable입니다. 일반 Observable은 unicast(각각의 subscribe된 Observer는 바라보고 있는 Observable을 독립적으로 실행함)이지만, Subject는 multicast입니다.

Subject는 Observable과 비슷하지만 많은 Observers에게 멀티캐스트 할 수 있습니다. Subject는 EventEmitters와 유사합니다. 많은 수의 수신자(listeners)의 레지스트리를 관리합니다.

모든 Subject는 Observable입니다.
Subject가 주어진다면 Observer를 구독(subscribe)하여 값을 정상적으로 받을 수 있습니다. Observer의 관점에서는, Observable 실행이 일반 unicast Observable에서 오는지, 또는 Subject에서 오는지를 알 수 없습니다.

내부적으로 Subject에서 subscribe는 값을 전달하는 새로운 실행을 호출하지 않습니다. addListener가 다른 라이브러리 및 언어에서 일반적으로 작동하는 것과 마찬가지로 Observer를 Observer 목록에 등록합니다.

모든 Subject는 Observer입니다. Subject는 next(v), error(e), and complete() 메소드를 가진 객체입니다. Subject에 새 값을 공급하기위해 next를 호출하면 Subject를 수신하도록 등록 된 Observers로 멀티 캐스팅됩니다.

아래 예제에서는 Subject에 첨부 된 두 개의 Observer가 있으며, Subject에 값을 공급합니다.

1
2
3
4
5
6
7
8
9
10
11
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);

콘솔을 보면…

1
2
3
4
observerA: 1
observerB: 1
observerA: 2
observerB: 2

Subject는 Observer이므로, 아래 예제와 같이 Observable 구독에 대한 인수로 Subject를 제공 할 수도 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

위의 접근 방식으로, 우리는 Subject를 사용해서 unicast Observable 실행을 multicast로 변환했습니다. 이것은 Observable 실행을 여러 Observer들과 공유 할 수 있는 유일한 방법입니다.

Subject 유형에는 BehaviorSubject, ReplaySubject 및 AsyncSubject가 있습니다.




Multicasted Observables

“multicasted Observable”은 다수의 subscriber를 가질 수 있는 Subject를 통해 통지를 전달하는 반면, “unicast Observable”은 단일 Observer에게만 통지를 보냅니다.

multicasted Observable은 여러 Observers가 동일한 Observable 실행을 바라보도록 Subject를 사용합니다.

아래는 멀티캐스트 연산자가 작동하는 방식입니다.
Observer들은 아래의 Subject을 구독(subscribe)하고 Subject는 Observable 소스를 구독합니다. 다음 예제는 observable.subscribe(subject)를 사용한 이전 예제와 유사합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast는 평범한 Observable처럼 보이는 Observable을 반환하지만 구독(subscribe)할 때의 Subject와 같이 작동합니다.
multicastconnectableObservable을 리턴합니다. 이것은 connect() 메소드를 가진 간단한 Observable입니다.

connect() 메소드는 공유된 Observable 실행이 언제 시작될 것인지를 정확하게 결정합니다. connect()source.subscribe(subject)를 실행하기 때문에 connect()는 공유 Observable의 구독(subscribe)을 취소 할 수 있는 Subscription을 반환합니다.

Reference counting(참조 카운팅)

connect()를 수동으로 호출하고 Subscription을 처리하는 것은 종종 번거로운 일입니다. 일반적으로 첫 Observer가 도착하면 자동으로 연결하고, 마지막 Observer가 구독을 취소하면 공유 실행을 자동으로 취소하려고합니다.

아래 목록에 설명 된대로 subscriptions이 발생하는 다음 예제를 살펴보겠습니다.

  1. 첫번째 Observer가 multicasted Observable을 구독합니다.
  2. Multicasted Observable가 연결됩니다.
  3. 다음 값 0이 첫번째 Observer에게 전달됩니다.
  4. 두번째 Observer가 multicasted Observable을 구독합니다.
  5. 다음 값 1이 첫번째 Observer에게 전달됩니다.
  6. 다음 값 1이 두번째 Observer에게 전달됩니다.
  7. 첫번째 Observer multicasted Observable 구독을 취소합니다.
  8. 다음 값 2이 두번째 Observer에게 전달됩니다.
  9. 두번째 Observer multicasted Observable 구독을 취소합니다.
  10. Multicasted Observable 연결이 unsubscribe 되었습니다.

connect()를 명시적으로 호출하면 위 내용을 달성하기 위해 다음 코드를 작성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

connect()에 대한 명시적인 호출을 피하려면 connectableObservablerefCount() 메서드(참조 카운팅)를 사용할 수 있습니다. 이 메서드는 Observable을 반환하며 Observable은 구독자 수를 추적합니다. 가입자 수가 0에서 1로 증가하면 connect()가 호출되어 공유 실행이 시작됩니다. 구독자 수가 1에서 0으로 줄어들 때만 완전히 구독 취소되어 더 이상 실행을 중지합니다.

refCount는 멀티캐스트 된 Observable에 첫 번째 subscriber가 도착하면 자동으로 실행을 시작하고 마지막 subscriber가 떠날 때 실행을 중지합니다.

아래의 예제를 봅시다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCount() 메서드는 ConnectableObservable에만 존재하며 다른 ConnectableObservable이 아니라 Observable을 return합니다.