RxJS의 Operator중에 Transformation관련 Operator들에 대해 정리한 글입니다.

buffer

buffer는 말그대로 Obervable의 실행을 일정기간 미루는 것이다. 시리즈 성격의 buffer Operators는 몇가지가 있는데, 이들 모두 공통적으로 Obervable의 실행을 일정기간 미룬다.

buffer

Obervable을 인자로 받으며, 인자의 Obervable이 subscribe될 때 미루고 있던 이전의 Obervable의 subscribe값들을 배열로 반환한다.

아래 예재에서는 document 클릭할때마다 1초마다 쌓인 스트림을 배열로 반환 한다.

1
2
3
const obBuffer = Rx.Observable.interval(1000)
.buffer(Rx.Observable.fromEvent(document,'click'));
obBuffer.subscribe(x=>console.log(x));

bufferCount

bufferCount는 두개의 인자를 받는다. 첫 번째는 도출되는 buffer의 최대 사이즈가 오며, 두 번째는 새로운 buffer가 몇번의 스트림을 주기로 실행될것인지가 온다. 즉 bufferCount(3,2)라면 여기서 도출되는 스트림은, input Observable의 스트림중 최근 3개를 인자로 묶은 배열이 2번의 주기로 실행되는 것이다.(말이 어렵다… ㅠ)

1
2
3
const obBufferCount = Rx.Observable.interval(1000)
.bufferCount(3, 4);
obBufferCount.subscribe(x=>console.log(x)); //[0,1,2], [4,5,6] ...

bufferTime

Observable 값을 특정 시간단위로 지연시켜 배출한다.

1
2
const obBufferTime = Rx.Observable.interval(1000).bufferTime(3000);
obBufferTime.subscribe(x=>console.log(x)); //[0, 1], [2,3,4], [5,6,7]

bufferToggle

bufferToggle은 두개의 인자를 받는다. 첫 번째는 buffer를 시작하는, 두 번째는 buffer를 마무리 하기 위한 인자가 온다. 첫 번째는 구독가능하거나 Promise로 반환되는(Subscribable Or Promise) 값이오고, 두 번째는 구독가능하거나 Promise로 반환되는(Subscribable Or Promise) 값을 도출하는 함수가 온다. 결국 두개의 Observable을 인자로 받아 시작점과 끝나는 지점을 정하고 이 안에 있는 스트림을 배열로 묶어 배출한다. 만일 promise라면 resolve함수가 실행되는 시점이 시작하거나 마무리하는 시점이 되며, 이는 시점을 정할뿐 인자를 넘겨 줄 수 없다(아마도….)

1
2
3
4
5
6
7
8
9
const obBufferToggle = Rx.Observable.interval(1000) //1초에 1씩 증가하는 값을 반환하는 스트림
.bufferToggle(Rx.Observable.fromEvent(document, 'click'), (e)=>{ //시작 시점은 문서를 클릭할때
return new Promise(function(res, rej){
setTimeout(() =>{
res(100)//인자는 쓰이지 않는다...?
}, 2000)
})//끝나는 시점은 2초가 지난 후.
});
obBufferToggle.subscribe(x=>console.log(x)); //[n, n+1], [m, m+1]...

map

map

map은 Array method들 중에 가장 많이 쓰이는 것 중에 하나이다. RxJS의 맵도 거의 같은 역할을 한다. 차이점이라면 Array method는 배열의 인자를 순회하면서 콜백 함수에 의한 값을 리턴하는 반면, RxJS의 map은 스트림 값을 받아 콜백함수에 의한 값을 그대로 흘려보내준다는 것이다.

1
2
3
const obMap = Rx.Observable.from([1,2,3,4])
.map(x => x+10);
obMap.subscribe(x => console.log(x)); // 11, 12, 13, 14

mapTo

mapTo는 map과 거의 유사하다. 하지만 아웃풋 값이 mapTo의 인자 값으로 고정이다. 다음에 나오는 operator들에도 뒤에 to가 붙는 것들이 있는데, 대부분 기본 operator 기능에 인자 값을 subscribe로 넘기는 비슷한 역할을 한다.

1
2
3
const obMapTo = Rx.subscribe.from([1,2,3,4])
.mapTo('10');
obMapTo.subscribe(x => console.log(x)); //10, 10, 10, 10

concatMap exhaustMap mergeMap switchMap

이 Operator들은 인자로 Observable을 받는다. 그리고 인풋의 Observable과 인자의 Observable을 합쳐(flat) 하나의 스트림을 반환한다. 두 번째 인자로는 콜백함수를 포함시킬 수 있다. 인자로는 총 4개를 받는데, Outer Observable의 값, Inner Observable의 값, Outer Observable의 Index, Inner Observable의 Index로 구성된다. 각각은 다음과 같은 특징을 가지고 있다.

  • concat Observable이 끝나면 이어서 다음 Observable을 실행. 첫 번째 Observable이 끝나기 에 두 번째 Observable이 실행해도 첫 번째 Observable이 끝나고 나서 두 번째가 실행된다.
  • exhaust Observable이 실행되고 있을때 실행되는 다음 Observable을 무시한다.
  • merge Observable이 발생하는대로 모두 실행한다. (flatMap과 동일한 것으로 보인다.)
  • switch Observable이 실행되고 있을때 다음 Observable이 실행되면 먼저실행된 Observable을 중단시킨다.

모든 Operator가 비슷하게 작동하므로 소스예제는 하나만 기록한다. 중간에 Operator만 바꾸면 작동한다.

1
2
3
4
5
const clicks = Rx.Observable.fromEvent(document, 'click');
const higherOrder = clicks.concatMap(ev => Rx.Observable.interval(1000).take(3), (i,j,k,l) => {
return [i,j,k,l];
});
higherOrder.subscribe(x => console.log(x));

window

window는 Transformation Operator중 개념 이해하기가 가장 난해했다. 결과적으로 이야기 하면… Outer Observable이 발생할 때마다 Inner Observable을 각각의 Observable(window)로 쪼개서 배출한다. 이는 마치 자바스크립트의 split method와 비슷하다.
window로 시작하는 Operator들이 몇가지 있는데 기본 동작 원칙은 같다. 한 Observable의 스트림을 기반으로 새로운 Observable들을 반환한다. 그리고 이 Operator의 인자로 언제 새로운 Observable을 만들고, 언제 없앨지를 정한다.

아래 소스에서는 2초마다 스트림이 발생하는 Observable(interval)이 있고, 각 스트림이 실행될 때 마다 클릭 이벤트를 바라보는 Observable(clicks)이 생성된다. 이 clicks는 스트림이 끝날 때 같이 종료된다. 또한 clicks은 한 스트림 안에서 2번까지의 클릭만 스트림을 발생시킨다. 다소 복잡할 수 있지만 아래의 소스를 이해한다면 window의 개념을 어느정도 잡을 수 있을 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
const clicks = Rx.Observable.fromEvent(document, 'click');
const interval = Rx.Observable.interval(2000);
const result = clicks.window(interval) //2초마다 클릭 이벤트를 바라보는 Observable(윈도우)틀 배출한다.
.map(win => win.take(2)) // 각 윈도우는 2개의 스트림만을 받는다.
result.subscribe(x => {
console.log('out');//2초마다 클릭이벤트 Observable 발생
x.subscribe(
y => console.log(y),//클릭할때 스트림 발생
err => console.log(err),
() => console.log('finish')//2번의 클릭 스트림 발생했거나 2초가 지났으면 complete매소드 실행
)}
);

windowCount

windowCount는 인자로 숫자를 받는다. 이 숫자 개수만큼 Outer Observable의 스트림을 묶어서 하나의 Observable로 분출한다. 위에서 살펴본 인자의 숫자만큼 대기한다는 측면에서 bufferCount과 매칭된다.

아래 소스는 클릭 스트림이 3번 쌓일때마다 클릭 Observable을 complete하고 새로운 클릭 스트림을 만든다. (순수함수라는 측면에서 전역변수를 쓰는건 안좋은 방법이지만 이해를 돕기위해 cnt라는 전역변수를 만들었다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
var cnt = 0;
const clicks = Rx.Observable.fromEvent(document, 'click');
const result = clicks.windowCount(3)
result.subscribe(x => {
console.log('click Observable Start'); //클릭 Observable 생성
x.subscribe(
y => console.log(++cnt),//클릭할때마다 숫자 1씩 증가
e => e,
() => (cnt = 0, console.log('click Observable completed')) //클릭 Observable 종료
),
e => e,
() => (console.log('completed'));
});

windowTime

windowTime는 인자로 두개의 숫자를 받는다, 첫번째는 생성된 window가 지속될 시간이고, 두번째는 window의 생성 주기를 나타내는 시간이다.

1
2
3
4
5
6
7
8
9
10
11
const clicks = Rx.Observable.fromEvent(document, 'click');
const result = clicks.windowTime(2000, 5000) //Observable이 2초후 종료됨. Observable생성 5초 후에 새로운 Observable이 시작.
.map(win => win.take(2)) // 각 윈도우는 2개의 스트림만을 받는다.
result.subscribe(x => {
console.log('start')
x.subscribe(
y => console.log(y), //클릭할때 스트림 발생
err => console.log(err),
() => console.log('finish') //2초가 지나거나 클릭 스트림이 2번 발생할경우 complete
)
});

windowToggle