RxJS
以訂閱+推播的方式管理數據
Observable
用來推播數據的東西。
- 推播數據的方式可以 sync/async,端看實作的方式。
- 使用
subscriber.next()推播數據。 subscriber.complete()後的subscriber.next()不會被推播。- 可以使用
subscriber.error()拋錯。
import { Observable } from 'rxjs'
const observable = new Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
setTimeout(() => {
subscriber.next(3)
subscriber.complete()
subscriber.next(4)
})
})
Observer
用來接 Observable 的東西,由 next、error、complete 三個 API 組成。
import { Observable } from 'rxjs'
const observer = {
next(v) { console.log(`next value: ${v}`) },
error(err) { console.log(`error: ${err}`) },
complete() { console.log('complete') }
}
const observable = new Observable((subscriber) => {
subscriber.next(1)
subscriber.next(2)
})
observable.subscribe(observer)
實務上也可以單純是個 function
observable.subscribe((v) => console.log(`next value: ${v}`))
Operator
Operator 為用來產生 Observables 的 methods,分成兩類-Pipeable Operators 及 Creation Operators。
Pipeable Operators
可以透過 observableInstance.pipe(operator()) 使用。Pipeable Operator 將 Observable 作為 Input,並回傳新的 Observable。
observableInstance.pipe(
operator1(),
operator2(),
operator3(),
)
Creation Operators
可以獨立使用,產生 Observable。
import { interval } from 'rxjs'
const observable = interval(1000)
Higher-order Observables
接手 Observable 的 Observable
例如|發放 URL 的 Observable 被執行 URL 的 Observable 接手
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
concatAll(),
)
Subscription
observable.subscribe() 的回傳值,可以用來:
- 取消/終止 Observable
- 增加/移除 childSubscription
import { interval } from 'rxjs';
const observable1 = interval(400);
const observable2 = interval(300);
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
// subscription.remove(childSubscription)
}, 1000);
Subject
- 可以把資料發給其他 Observables 使用的 Observable。
- 比較像 EventEmitters。
- 同時身兼 Observable 及 Observer 身份。
- Observer 只能存取到 subscribe 後執行的
next()派發的值。
import { Subject } from 'rxjs'
const subject = new Subject<number>()
// MARK: Subject as Observable
subject.subscribe((v) => console.log(`observer A: ${v}`))
subject.subscribe((v) => console.log(`observer B: ${v}`))
// MARK: Subject as Observer
subject.next(1)
subject.next(2)
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
因為是 Observer,所以可以配合 Observable.prototype.subscribe() 使用:
import { Subject, from } from 'rxjs'
const subject = new Subject<number>()
// MARK: Subject as Observable
subject.subscribe((v) => console.log(`observer A: ${v}`))
subject.subscribe((v) => console.log(`observer B: ${v}`))
// MARK: Subject as Observer
const observable = from([1, 2, 3])
observable.subscribe(subject)
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
multicast Observable
muticast operator observable 類似 source observable 及 subject observable 的中間層:
- 可以透過
multicast.subscribe達到同subject.subscribe(someObserver)的效果。 - 可以透過
multicast.connect達到同source.subscribe(subject)的效果。
import { from, Subject } from 'rxjs'
import { multicast } from 'rxjs/operators'
const subject = new Subject<number>()
const source = from([1, 2, 3])
const multicasted = source.pipe(multicast(subject))
// MARK: Multicast proxy to Subject (Subject as Observable)
multicasted.subscribe((v) => console.log(`observer A: ${v}`))
multicasted.subscribe((v) => console.log(`observer B: ${v}`))
// MARK: Multicast proxy to Source (Subject as Observer)
const subscriptionConnet = multicasted.connect() // start execution
// subscriptionConnet.unsubscribe()
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
refCount Observable
refCount 會在被 subscribe 時自動呼叫 multicast.prototype.connect() 並在 subscribe 降為 0 時自動 multicast.prototype.connect().unsubscribe()。
import { interval, Subject } from 'rxjs'
import { multicast, refCount } from 'rxjs/operators'
const subject = new Subject<number>()
const source = interval(500)
const refCounted = source.pipe(
multicast(subject),
refCount()
)
console.log('Start connect after subscribe')
const subscriptionA = refCounted.subscribe((v) => console.log(`observer A: ${v}`))
let subscriptionB
setTimeout(() => {
subscriptionB = refCounted.subscribe((v) => console.log(`observer B: ${v}`))
}, 1000)
setTimeout(() => {
subscriptionA.unsubscribe()
console.log('Unsubscribe Observer A')
}, 2000)
setTimeout(() => {
subscriptionB.unsubscribe()
console.log('Unsubscribe Observer B')
console.log('Stop connection after unsubscribe all')
}, 3000)
BehaviorSubject
會暫存當前的數值,新訂閱的用戶必定會觸發一次 next()。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe((v) => console.log(`observerA: ${v}`));
subject.next(1);
subject.next(2);
subject.subscribe((v) => console.log(`observerB: ${v}`));
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
ReplaySubject
功能跟 BehaviorSubject 差不多,但可以定義要記錄幾個值、多少時間內的值。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3, 100); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
AsyncSubject
只有在呼叫 compelete() 後會將最後一次 next() 的值拋給 Obsserver。
Scheduler
用來控制 Subscription 何時開始、何時派發資料(when notifications are delivered.)。
- Scheduler 是一個資料結構。 它知道如何根據優先級或其他標準來儲存並佇列任務。
- Scheduler 是一個執行環境。 它意味著任務何時何地被執行,比如像是 立即執行、在回呼(callback)中執行、setTimeout 中執行、animation frame 中執行
- Scheduler 是一個虛擬時鐘。 它透過
now()這個方法提供了時間的概念,我們可以讓任務在特定的時間點被執行。
Reference https://ithelp.ithome.com.tw/articles/10188988
每個 operator 都有預設的 Scheduler,在使用時也可以定義要使用哪些 Scheduler。根據 Operator 的特性,會有不同的預設 Scheduler。
Creation Operator 通常可接受 Scheduler 做為參數。
subscribeOn
定義 subscribe() 實際執行時機。
- 預設情況下,呼叫
subscribe()後會馬上執行。
observeOn
定義通知發送的時機。
- 夾在 Observer 及 Observable 中間。