Rx.js With node.js

이 문서는 @staltz의 rx.js소개 gist 를 참고하여 작성되었다.
rx.js란 무엇인가와 이를 node.js 코드를 이용하여 rx.js에 대한 간단한 예제를 쓴 글이다.

Rx.Js

Rx.Js는 Reactive Programming을 위한 도구이다.
우선 Reactive Programming에 대해 알아보자.
리액티브 프로그래밍(Reactive Programming)은 비동기적인 데이터의 스트림에 대해 listen하고 react하는 형태로 프로그래밍 하는 것이다.
간단히 하자면 비동기적인 데이터 변화에 따른 이벤트에 대한 처리를 한다고 생각하면 된다.
node.js에 익숙한 사용자라면 새로울 것이 없는 개념이다.
다만 이벤트가 일어났을때 이에 대한 처리를 이미 제공하는 함수 도구를 이용하여 처리한다.
예를 들면 map, merge, flatMap등이다.

여기까지 보면 기존에 async, lodash등의 라이브러리를 사용하는 사용자들에게는 차이점이 무엇이냐는 생각이 들 수 있다.
개인적으로 이에 대한 차이점은 Reactive Programming은 기존과 달리 데이터의 상태에 초점을 맞추어 개발을 진행하는 것이라고 생각한다.

간단한 리퀘스트 요청하기

예제를 보면서 Rx.Js의 사용법을 알아보자.
다음은 간단한 특정 URL에 대한 GET request를 보내는 예제이다.

var Rx = require('rx');
var request = require('request-promise');

var requestStream = Rx.Observable.just('YOUR_URL');

requestStream.subscribe(requestUrl => {
    var options = { method: "GET", url: requestUrl};

    var responseStream = Rx.Observable.create((observer) => {
        request(options)
          .then((res, body) => {
            observer.onNext(res);
          }).catch((err) => {
            observer.onError(err);
          }).finally(() => {
            // observe를 끝낼 때 호출
            //observer.onCompleted();
          });
    });

    responseStream.subscribe((res) => {
        console.log('res');
      });
    });

우리가 Rx.Js를 이용하여 리퀘스트를 보내기위해 처음으로 할 일은 observable을 만든는 것이다.
observable은 간단히 이야기하면 이벤트 발생기라고 생각하면 된다.
우리는 observable에 대해 지속적으로 observe하는 형태로 코드를 만들 것이다.

첫번째로 Rx.Observable.just를 이용하면 하나의 데이터를 갖는 observable을 생성한다.
just는 이 데이터를 주는 이벤트를 emit한다.
observable을 만들었다면 이에 대해 반응하기위한 listen을 하기 위해 subscribe를 호출하면 된다.

Rx.Observable.subscribe는 observable에 대해 listen을 한다.
observable에 대한 이벤트가 발생한다면 해당 observable에 대한 subscribe가 호출되어 이를 알 수 있다.

Rx.Observable.create도 역시 observable을 만든다. create()는 순서대로 다음의 세 가지 인자를 갖는다. (이것이 subscribe가 감지하는 이벤트들이다.)

  1. onNext : 데이터 이벤트 발생(즉 정상 동작)
  2. onError : 에러 이벤트 발생
  3. onCompleted : 완료(가장 마지막에 호출해야 한다.)
    세가지를 명시하여 각각의 상황에 대해 observable을 어떻게 처리할 지 명시할 수 있다.

subscribe를 통해서 create를 이용해 만든 observable을 관측한다.
즉 subscribe 호출과 동시에 create()내부에 명시한 함수를 실행시켜 observe한다.
만일 위의 예제에서 then 내부에 다음의 코드가 있다면 1초마다 subscribe내의 console.log가 호출될 것이다.

request(options).then((res, body) => {
    setInterval(function() {
        observer.onNext('customData');
        }, 1000);
    })

원래 예제에서는 리퀘스트를 보내고 응답 값을 받은 후 onNext를 호출하였으므로 subscribe에서 이에 대한res를 받을 수 있다.
(subscribe도 onNext, onError, onCompleted 으로 세가지 인자를 가질 수 있다.)
onCompleted이 호출되면 이제 더 이상의 onNext는 없다는 의미이다.

예제 코드를 요약하자면 create를 통해 리퀘스트를 보내는 이벤트를 만들고 이에 대한 응답을 지켜보는 observable을 만들어서 이에 대해 subscribe하여 response를 받는 것이다.

onNext의 res와 subscribe의 ((res)) 는 같은 값을 갖는다.

References

  • https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/return.md
  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/subscribe.md
  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/create.md

realjays

반박시 당신말이 맞습니다.