Lucy.SpaceLucy.Space.

RxJS 핵심 개념 정리

RxJS 핵심 개념 정리

최근 웹 환경에서는 SPA(Single Page Application)가 대세로 자리 잡았습니다. 이는 다양한 기능과 데이터가 하나의 index.html 페이지 안에 포함된다는 것을 의미합니다. 최신의 웹 애플리케이션은 상태 머신이라고 볼 수 있으며, 사용자의 입력, 로직, 기존 값에 따라 상태가 결정됩니다.

웹 애플리케이션에서 발생하는 오류는 대개 다음과 같은 경우에서 비롯됩니다:

  1. 입력값의 오류
  2. 상태 변화가 정확히 전달되지 않는 경우
  3. 로직 오류

RxJS는 이러한 문제를 효과적으로 해결하기 위해 등장했습니다. 이는 데이터 흐름을 안전하게 처리하고 상태 관리를 단순화하는 데 도움을 주는 라이브러리로, 리액티브 프로그래밍함수형 프로그래밍 기법을 활용합니다.

#1. RxJS 등장 배경

RxJS는 웹 애플리케이션의 오류를 해결하기 위한 도구로 등장했습니다. 아래는 RxJS가 각각의 오류를 해결하는 방식을 설명합니다.

#1.1. 입력 데이터의 오류

비동기 작업과 동기 작업이 섞여 있는 웹 환경에서, 데이터가 전달되는 시점이 다르기 때문에 데이터 입력 및 전달 과정에서 오류가 발생할 수 있습니다.

RxJS는 시간의 개념을 도입하여 이러한 문제를 해결합니다. 모든 데이터를 Observable 인스턴스로 처리하고, 이를 통해 동기와 비동기를 하나의 시간 축(스트림)에서 관리합니다.

코드 예시

1const { fromEvent, from, of } = rxjs
2
3// 비동기 이벤트를 Observable 인스턴스로
4const key$ = fromEvent(document, 'keydown')
5const click$ = fromEvent(document, 'click')
6
7// 동기 이벤트를 Observable 인스턴스로
8const arrayFrom$ = from([1, 2, 3])
9const iterableFrom$ = from(new Map([1, 2], [2, 4]))
10
11// Promise를 Observable 인스턴스로
12const ajaxPromiseFrom$ = from(fetch('/api/some.json'))
13
14// 단일 데이터를 연속으로 전달
15const number$ = of(10, 20, 30)

#1.2. 상태 전파 문제

웹 애플리케이션은 상태 머신으로, 각 모듈이 서로 의존 관계를 가지고 있습니다. 따라서 한 모듈의 상태 변화가 다른 모듈에 정확히 반영되지 않으면 문제가 발생합니다.

RxJS는 옵저버 패턴을 채택하여 상태 전파 문제를 해결합니다.

pull 방식의 문제점

1class User {
2  _state: { name: string; isLogin: boolean } | null = null
3  constructor() {
4    this._state = {
5      name: 'dudu',
6      isLogin: false,
7    }
8  }
9
10  getName() {
11    return this._state?.name
12  }
13
14  isLogin() {
15    return this._state?.isLogin
16  }
17
18  login(name: string) {
19    this._state = {
20      name,
21      isLogin: true,
22    }
23  }
24
25  logout() {
26    this._state = {
27      name: '',
28      isLogin: false,
29    }
30  }
31}
32
33class System {
34  _token: number | null
35  _id: string
36  _user: User
37
38  constructor(user: User) {
39    this._token = null
40    this._id = 'System'
41    this._user = user
42  }
43
44  check() {
45    const username = this._user.getName() ?? ''
46    if (this._user.isLogin()) {
47      this._token = [...username].reduce((acc, v) => acc + v.charCodeAt(0), 0)
48      console.log(`[${this._id}] ${username} 의 토큰은 ${this._token} 입니다`)
49    } else {
50      this._token = null
51      console.log(`[${this._id}] 로그인 되지 않았습니다`)
52    }
53  }
54}
55
56const user = new User()
57const system = new System(user)
58
59system.check() // [System] 로그인 되지 않았습니다
60user.login('dudu')
61system.check() // [System] dudu 의 토큰은 434 입니다
62user.logout()
63system.check() // [System] 로그인 되지 않았습니다

pull 방식에서는 상태를 확인하거나 반영하기 위해 직접 호출이 필요합니다. 상태 변화가 많아질수록 복잡성이 증가하며, 의존성이 높은 구조로 이어질 수 있습니다.

observer 패턴 도입

1type State = { name: string; isLogin: boolean } | null
2
3type Observer = {
4  update: (state: State) => void
5}
6
7class Subject {
8  _observers: Observer[]
9  constructor() {
10    this._observers = []
11  }
12
13  add(observer: Observer) {
14    this._observers.push(observer)
15  }
16
17  remove(observer: Observer) {
18    const idx = this._observers.indexOf(observer)
19    if (idx !== -1) {
20      this._observers.splice(idx, 1)
21    }
22  }
23
24  notify(status: State) {
25    this._observers.forEach((v) => {
26      v.update(status)
27    })
28  }
29}
30
31class User extends Subject {
32  _state: State = null
33  constructor() {
34    super()
35    this._state = {
36      name: 'dudu',
37      isLogin: false,
38    }
39  }
40
41  getName() {
42    return this._state?.name
43  }
44
45  login(name: string) {
46    this._state = {
47      name,
48      isLogin: true,
49    }
50    this.notify(this._state)
51  }
52
53  logout() {
54    this._state = {
55      name: '',
56      isLogin: false,
57    }
58    this.notify(this._state)
59  }
60}
61
62class System {
63  _token: number | null
64  _id: string
65
66  constructor() {
67    this._token = null
68    this._id = 'System'
69  }
70
71  update(state: State) {
72    if (state?.isLogin) {
73      this._token = [...state.name].reduce((acc, v) => acc + v.charCodeAt(0), 0)
74      console.log(`[${this._id}] ${state.name} 의 토큰은 ${this._token} 입니다`)
75    } else {
76      this._token = null
77      console.log(`[${this._id}] 로그인 되지 않았습니다`)
78    }
79  }
80}
81
82const user = new User()
83const system = new System()
84
85user.add(system)
86user.login('dudu') // [System] dudu 의 토큰은 434 입니다
87user.logout() // [System] 로그인 되지 않았습니다
88user.login('dududu') // [System] dududu 의 토큰은 651 입니다

옵저버 패턴에서는 상태가 변경될 때 구독 중인 클래스들에게 자동으로 전파됩니다. 이를 통해 직접 호출 없이 상태 전파가 이루어질 수 있습니다.

옵저버 패턴의 한계와 RxJS의 개선

옵저버 패턴은 상태 전파와 의존성 관리를 효율적으로 해결하는 데 유용하지만, 실제 사용 시 몇 가지 한계점이 존재합니다. RxJS는 이러한 한계를 개선하여 더 안정적이고 유연한 상태 관리와 데이터 처리를 가능하게 합니다.

상태 변화의 종료

  • 한계점: Subject가 서비스를 종료할 때 이를 구독 중인 클래스들에게 별도로 공지해야 합니다. 이 과정에서 추가적인 의사소통 비용이 발생하며, 구독 해제를 수동으로 관리해야 하는 부담이 있습니다.
  • RxJS의 개선: RxJS는 상태 변화 종료를 명확하게 처리하기 위해 complete 메서드를 제공합니다.

상태 변화에서의 에러 처리

  • 한계점: Subject에서 상태 변화 중 에러가 발생하면, 기본적으로 구독자들은 해당 에러를 감지할 수 없습니다. 이는 단순한 구조에서는 문제가 없지만, 구독자가 개별적으로 에러를 처리해야 하는 경우에는 어려움을 야기합니다.
  • RxJS의 개선: RxJS는 에러 처리를 구독자 수준에서 명확히 관리할 수 있도록 error 콜백과 다양한 에러 처리 연산자를 제공합니다. 이를 통해 에러가 발생해도 각 구독자가 독립적으로 처리할 수 있습니다.

Observer ↔ Subject의 역할 충돌

  • 한계점: ObserverSubject의 역할을 동시에 수행하는 클래스에서는 상태를 지속적으로 생성하거나 변경해야 하는 상황이 발생할 수 있습니다. 이로 인해 상태 전파가 무한히 반복되거나, 잘못된 방식으로 관리되어 브라우저가 뻗는 등의 문제가 생길 수 있습니다.
  • RxJS의 개선: RxJS는 상태 관리와 데이터 흐름의 복잡성을 해결하기 위해 Observable의 단방향 데이터 흐름읽기 전용 데이터 처리를 제공합니다. **Observable**은 subscribe를 통해 데이터를 전달할 대상(Observer)에게 데이터를 전달할 수는 있지만, 반대로 데이터를 전달받을 수는 없습니다.

코드 예시

1const { of } = rxjs
2const numbers$ = of([1, 2, 3, 4, 5, 6]) // Observable 인스턴스
3
4number$.subscribe({
5  next(v) {
6    console.log(v)
7  },
8  error(e) {
9    console.error(e)
10  }, // Observer에서의 에러처리
11  complete() {
12    console.log('complete')
13  }, // 상태 변화의 종료
14})

#1.3. 로직 오류

반복문, 분기문, 그리고 변수 관리가 복잡한 코드를 만들어 디버깅과 유지보수를 어렵게 만듭니다. RxJS는 이러한 로직의 복잡성을 줄이기 위해 고차 함수 기반의 오퍼레이터를 제공합니다. 이를 통해 코드를 간결하고 선언적으로 작성할 수 있습니다.

RxJS 오퍼레이터는 데이터를 변환, 필터링, 병합, 그리고 시간 기반으로 처리하는 다양한 기능을 제공합니다. 이를 활용하면 복잡한 로직을 간단한 연산으로 표현할 수 있습니다.

더 자세한 내용은 RxJS 오퍼레이터 글에서 확인할 수 있습니다.

#2. RxJS 핵심

스크린샷 2025-01-03 오후 1.17.24.png

  • Observable데이터를 시간의 축으로 다룰 수 있는 객체로, 연속적인 데이터를 표현합니다. 데이터를 생성하거나 외부 소스(API, 이벤트 등)로부터 데이터를 받아와 Observer에게 전달하는 역할을 합니다.
  • 오퍼레이터는 Observable을 생성하거나 조작하는 함수입니다. 데이터를 변환하거나 필터링하고, 기존 Observable을 기반으로 새로운 Observable을 생성합니다. 항상 새로운 Observable을 반환하기 때문에 기존 데이터 스트림을 변경하지 않는다는 점이 특징입니다
  • ObserverObservable에서 방출된 데이터를 소비하는 주체입니다. 데이터를 수신(next), 에러 처리(error), 스트림 완료 처리(complete)를 담당하는 메서드를 포함합니다.
  • SubscriptionObservable을 구독(subscribe)할 때 반환되는 객체로, 데이터 스트림을 관리합니다. 필요할 경우 unsubscribe를 호출해 스트림을 종료하고 자원을 해제할 수 있습니다.

코드예시

1const subscription = currentTarget$.subscribe(observer)
2
3// 자원 해제
4subscription.unsubscribe()

#3. Observable 구현 시 고려해야할 점

Observable은 단순히 데이터를 Observer에게 전달하는 역할만 하지 않습니다. 현재 상태를 포함한 다양한 상황(에러, 완료, 구독 해제 등)을 관리하고 Observer에게 전달해야 합니다. 이를 고려하여 Observable을 구현할 때 다음과 같은 점을 신경 써야 합니다.

#3.1. 에러 발생

1const number$ = new Observable(function subscribe(observer) {
2  try {
3    observer.next(1)
4    observer.next(2)
5    // 에러발생
6    throw new Error('데이터 전달 도중 에러가 발생했습니다.')
7    observer.next(3)
8  } catch (e) {
9    observer.error(e)
10  }
11})
12
13number$.subscribe({
14  next: (v) => console.log(v),
15  error: (e) => console.error(e),
16})
17
18// 출력:
19// 1
20// 2
21// "데이터 전달 도중 에러가 발생했습니다."

Observable에서 Observer.error가 호출되면, 현재 스트림은 종료되고 구독은 자동으로 해지됩니다. 이후 Observer는 더 이상 데이터를 전달받지 못하며, Observable도 추가적인 작업을 수행하지 않습니다.

#3.2. 데이터 전달이 완료된 경우

1const number$ = new Observable(function subscribe(observer) {
2  try {
3    observer.next(1)
4    observer.next(2)
5    observer.next(3)
6    observer.complete()
7  } catch (e) {
8    observer.error(e)
9  }
10})
11
12number$.subscribe({
13  next: (v) => console.log(v),
14  error: (e) => console.error(e),
15  complete: () => console.log('데이터 전달 완료'),
16})
17
18// 출력:
19// 1
20// 2
21// 3
22// "데이터 전달 완료"

Observer.complete가 호출되면 현재 스트림이 정상적으로 종료됩니다. 이 시점 이후로 Observer는 더 이상 데이터를 전달받지 않으며, 구독도 자동으로 해지됩니다.

#3.3. 구독 해제

유한한 데이터를 다루는 Observable은 complete 호출 시 자동으로 구독이 해지되므로 추가적인 자원 관리가 필요하지 않습니다. 그러나 무한 데이터 스트림(예: 이벤트 핸들러, WebSocket, interval 등)에서는 구독 해제 시 자원을 반드시 정리해야 합니다. 그렇지 않으면 메모리 누수와 같은 문제가 발생할 수 있습니다.

구독 해제를 위해 subscription.unsubscribe를 호출해야 하며, Observable 생성 시 할당한 자원도 함께 제거해야 합니다.

1const interval$ = new Observable(function subscribe(observer) {
2  const id = setInterval(function () {
3    observer.next(new Date().toString())
4  }, 1000)
5
6  // 자원을 해제하는 함수
7  return function () {
8    console.log('interval 제거')
9    clearInterval(id)
10  }
11})
12
13const subscription = interval$.subscribe((v) => console.log(v))
14
15// 5초 뒤 구독을 해제한다
16setTimeout(function () {
17  subscription.unsubscribe()
18}, 5000)

#4. 함수 vs. Observable vs. Promise

구분함수ObservablePromise
정의함수 선언Observable 객체 생성Promise 객체 생성
호출함수 호출Observable.subscribePromise then
호출 시 정의부 실행 여부매번 정의부 실행매번 정의부 실행생성 시 단 한번 호출
지연(lazy) 가능 여부OOX (Promise는 정해진 상탯값만 호출)
데이터한 개여러 개한 개
에러처리 지원별도로 없음error 상태reject 상태
취소 지원XOX
전달 방식PullPushPush

함수 지연 예시 코드

1function foo(value) {
2  console.log(`I'm function ${value}`)
3  return value + 1
4}
5
6const xhr = new XMLHttpRequest()
7xhr.onload = function (e) {
8  afterAjaxResult = JSON.parse(xhr.responseText)
9
10  const result = foo(afterAjaxResult)
11  console.log(result)
12}

호출 지연: 넘겨주어야 할 파라미터(afterAjaxResult)가 존재하지 않거나 아직 준비되지 않았다면, 준비될 때까지 함수 호출을 지연시킬 수 있습니다.

Promise 지연 불가 예시 코드

1const promise = new Promise((resolve, reject) => {
2  console.log('create promise')
3  try {
4    resolve(1)
5  } catch (e) {
6    reject('error promise!')
7  }
8})
9
10promise.then(
11  (value) => console.log(`첫 번째 promise ${value}`),
12  (error) => console.error(`첫 번째 promise ${error}`)
13)
14
15promise.then(
16  (value) => console.log(`두 번째 promise ${value}`),
17  (error) => console.error(`두 번째 promise ${error}`)
18)
19
20// 출력
21// 'create promise'
22// 첫 번째 promise 1
23// 첫 번째 promise 2

Promise 정의부는 생성 시 즉시 실행되며, 상태가 한 번 결정되면 변경되지 않습니다. then이나 catch는 Promise의 상태가 결정된 이후에도 호출 가능하며, 이미 결정된 상태의 값이나 에러를 반환합니다. Promise가 이미 Fulfilled 상태라면, 지연되지 않고 즉시 동일한 값을 반환합니다. 이는 Promise의 상태가 "결정된 값"을 유지하기 때문입니다.

Promise 취소 불가 예시 코드

1const promise = new Promise((resolve, reject) => {
2  try {
3    let value = 0
4    setInterval(() => {
5      console.log(`is going? ${value}`)
6      resolve(value++)
7    }, 1000)
8  } catch (e) {
9    reject('error promise!')
10  }
11})
12
13promise.then((value) => console.log(`promise value ${value}`))
14
15// is going? 0
16// promise value 0
17// is going? 1
18// is going? 2
19// is going? 3

Promise의 상태가 Fulfilled 상태가 되더라도 계속적으로 Interval은 호출됩니다.

Observable 예시 코드

1const obs$ = new Observable((observer) => {
2  let id
3  try {
4    let value = 0
5    id = setInterval(() => {
6      console.log(`is going? ${value}`)
7      observer.next(value++)
8    }, 1000)
9  } catch (e) {
10    observer.error(e)
11  }
12
13  // 자원을 해제하는 함수 지원
14  return () => {
15    clearInterval(id)
16    console.log('cancelled')
17  }
18})
19
20const subscription = obs$.subscribe((value) => console.log(`observable value ${value}`))
21
22// 3초 후에 observable의 구독을 취소
23setTimeout(() => subscription.unsubscribe(), 3000)
24
25// is going? 0
26// observable value 0
27// is going? 1
28// observable value 1
29// is going? 2
30// observable value 2
31// cancelled

#5. Reference

RxJS