RxJS

以訂閱+推播的方式管理數據

Observable

用來推播數據的東西。

  • 推播數據的方式可以 sync/async,端看實作的方式。
  • 使用 subscriber.next() 推播數據。
  • subscriber.complete() 後的 subscriber.next() 不會被推播。
  • 可以使用 subscriber.error() 拋錯。
import { Observable } from 'rxjs'

const observable = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  setTimeout(() => {
    subscriber.next(3)
    subscriber.complete()
    subscriber.next(4)
  })
})

Observer

用來接 Observable 的東西,由 nexterrorcomplete 三個 API 組成。

import { Observable } from 'rxjs'

const observer = {
  next(v) { console.log(`next value: ${v}`) },
  error(err) { console.log(`error: ${err}`) },
  complete() { console.log('complete') }
}

const observable = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
})

observable.subscribe(observer)

實務上也可以單純是個 function

observable.subscribe((v) => console.log(`next value: ${v}`))

Operator

Operator 為用來產生 Observables 的 methods,分成兩類-Pipeable OperatorsCreation Operators

Pipeable Operators

可以透過 observableInstance.pipe(operator()) 使用。Pipeable Operator 將 Observable 作為 Input,並回傳新的 Observable。

observableInstance.pipe(
	operator1(),
	operator2(),
	operator3(),
)

Creation Operators

可以獨立使用,產生 Observable。

import { interval } from 'rxjs'

const observable = interval(1000)

Higher-order Observables

接手 Observable 的 Observable

例如|發放 URL 的 Observable 被執行 URL 的 Observable 接手

const fileObservable = urlObservable.pipe(
	map(url => http.get(url)),
  concatAll(),
)

Subscription

observable.subscribe() 的回傳值,可以用來:

  • 取消/終止 Observable
  • 增加/移除 childSubscription
import { interval } from 'rxjs';
 
const observable1 = interval(400);
const observable2 = interval(300);
 
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));
 
subscription.add(childSubscription);
 
setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
  // subscription.remove(childSubscription)
}, 1000);

Subject

  • 可以把資料發給其他 Observables 使用的 Observable。
  • 比較像 EventEmitters。
  • 同時身兼 Observable 及 Observer 身份。
  • Observer 只能存取到 subscribe 後執行的 next() 派發的值。
import { Subject } from 'rxjs'

const subject = new Subject<number>()

// MARK: Subject as Observable

subject.subscribe((v) => console.log(`observer A: ${v}`))
subject.subscribe((v) => console.log(`observer B: ${v}`))

// MARK: Subject as Observer

subject.next(1)
subject.next(2)

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

因為是 Observer,所以可以配合 Observable.prototype.subscribe() 使用:

import { Subject, from } from 'rxjs'

const subject = new Subject<number>()

// MARK: Subject as Observable

subject.subscribe((v) => console.log(`observer A: ${v}`))
subject.subscribe((v) => console.log(`observer B: ${v}`))

// MARK: Subject as Observer

const observable = from([1, 2, 3])
observable.subscribe(subject)

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

multicast Observable

muticast operator observable 類似 source observable 及 subject observable 的中間層:

  • 可以透過 multicast.subscribe 達到同 subject.subscribe(someObserver) 的效果。
  • 可以透過 multicast.connect 達到同 source.subscribe(subject) 的效果。
import { from, Subject } from 'rxjs'
import { multicast } from 'rxjs/operators'

const subject = new Subject<number>()
const source = from([1, 2, 3])
const multicasted = source.pipe(multicast(subject))

// MARK: Multicast proxy to Subject (Subject as Observable)

multicasted.subscribe((v) => console.log(`observer A: ${v}`))
multicasted.subscribe((v) => console.log(`observer B: ${v}`))

// MARK: Multicast proxy to Source (Subject as Observer)

const subscriptionConnet = multicasted.connect() // start execution
// subscriptionConnet.unsubscribe()

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

refCount Observable

refCount 會在被 subscribe 時自動呼叫 multicast.prototype.connect() 並在 subscribe 降為 0 時自動 multicast.prototype.connect().unsubscribe()

import { interval, Subject } from 'rxjs'
import { multicast, refCount } from 'rxjs/operators'

const subject = new Subject<number>()
const source = interval(500)
const refCounted = source.pipe(
  multicast(subject),
  refCount()
)

console.log('Start connect after subscribe')
const subscriptionA = refCounted.subscribe((v) => console.log(`observer A: ${v}`))

let subscriptionB
setTimeout(() => {
	subscriptionB = refCounted.subscribe((v) => console.log(`observer B: ${v}`))
}, 1000)

setTimeout(() => {
  subscriptionA.unsubscribe()
  console.log('Unsubscribe Observer A')
}, 2000)


setTimeout(() => {
  subscriptionB.unsubscribe()
  console.log('Unsubscribe Observer B')
  console.log('Stop connection after unsubscribe all')
}, 3000)

BehaviorSubject

會暫存當前的數值,新訂閱的用戶必定會觸發一次 next()

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe((v) => console.log(`observerA: ${v}`));

subject.next(1);
subject.next(2);

subject.subscribe((v) => console.log(`observerB: ${v}`));

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

功能跟 BehaviorSubject 差不多,但可以定義要記錄幾個值、多少時間內的值。

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3, 100); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

AsyncSubject

只有在呼叫 compelete() 後會將最後一次 next() 的值拋給 Obsserver。

Scheduler

用來控制 Subscription 何時開始、何時派發資料(when notifications are delivered.)。

  • Scheduler 是一個資料結構。 它知道如何根據優先級或其他標準來儲存並佇列任務。
  • Scheduler 是一個執行環境。 它意味著任務何時何地被執行,比如像是 立即執行、在回呼(callback)中執行、setTimeout 中執行、animation frame 中執行
  • Scheduler 是一個虛擬時鐘。 它透過 now() 這個方法提供了時間的概念,我們可以讓任務在特定的時間點被執行。

Reference https://ithelp.ithome.com.tw/articles/10188988

每個 operator 都有預設的 Scheduler,在使用時也可以定義要使用哪些 Scheduler。根據 Operator 的特性,會有不同的預設 Scheduler。

Creation Operator 通常可接受 Scheduler 做為參數。

subscribeOn

定義 subscribe() 實際執行時機。

  • 預設情況下,呼叫 subscribe() 後會馬上執行。

observeOn

定義通知發送的時機。

  • 夾在 Observer 及 Observable 中間。