2017/11/14

FluxJava 新增 RxJava2 的支援功能

FluxJava 最初的設計就是以 Add-on 的方式來提供對於 RxJava 的支援,所以這次增加 RxJava2 的部份也依照相同的模式,在 Project 中加上了 fluxjava-rx2 的 Module。新的 Module 功能上與 fluxjava-rx 大致上相同,只是原本以 RxJava 規格運作的部份,改為使用 RxJava2。

由於 RxJava 與 RxJava2 不太有機會共存在同一個 Module 裡,所以 fluxjava-rx2 沿用了 fluxjava-rx 的 Package 名稱,在使用上這二個 Add-on 必須要擇一引用。不過也帶來了一個額外的好處,如果想要由 fluxjava-rx 昇級到 fluxjava-rx2 時,只要修改成 RxJava2 的呼叫規格,不用再特別調整 Import 的內容。

配合 RxJava2 的更新,按照慣例增加了一篇文章「FluxJava 與 RxJava2 結合的使用示範」做為使用上的參考。

在 fluxjava-rx2 中與 fluxjava-rx 最大的差異,主要是因應 RxJava2 把原本的 Observable 分成了有背壓版本的 Flowable 與沒有背壓版本的 Observable。因此 RxBus 與 RxStore 中都分別再提供了 toFlowable 的 Method 來取得 Flowable,不過 Observable 本來就可以再轉換為 Flowable,此處的功能只是為了增加便利性、簡化程式碼之用。

同時,利用這篇文章補充說明一下一些使用上的技術細節。在 RxStore 中使用的是沒有背壓版本的 Observable 來接收外部傳來的訊息,只是接收到之後就會被分派到不同的 Thread 上去處理後續的工作,所以在這個部份是不大有機會遇上 MissingBackpressureException 問題的。但是,這並不代表背壓所造成的情況就不存在了,只是瓶頸移到了 ThreadPool 的承受能力或是執行的環境可以產生 Thread 的數量上。

就算是使用有背壓功能的 Flowable,也不代表就可以高枕無憂了。說穿了,背壓本身不是什麼神奇的黑科技,原理上只是在上下游中間加了個水池,讓下游有喘息的空間。水池畢竟還是有一定的物理限制,沒有控制好,依舊會讓水池承載不下而出現錯誤。

因此,不論使用哪一種方法,前端發送的數量仍然應該要被謹慎地控制,避免海量的訊息把接收端給淹沒了。像是把 RecyclerView 滾動時產生位移的 Event 毫無選擇地往 RxStore 送。

在發送端就要篩選訊息,除了要減少 MissingBackpressureException 可能會出現的機會外,還有一個目的是要節省執行成本。當訊息數量大到無法處理時,能做的只有挑選值得處理的部份來進行。當挑選的工作被移到發送後,RxJava 的傳送機制表面上看起來就是簡單地轉了一手,但是如果去追蹤其程式碼,就可以發現其實底下做了不少的工作。而每一次傳送都要執行這些內容,可以想見在數量到達一定的程度之後,就會顯現出可觀的效能差距。

由於 Observable 在定義上所形成的限制使然,同一個發送源無法把的訊息分配至不同的 Thread 上送出。RxStore 為了要讓每個資料處理要求可以獨立、同步地進行,所以才會在接收到訊息後,以不同的 Thread 進行後續的工作。

在 RxStore 裡提供了一個 getExecutor Method,可以使用 ThreadPool 來做為背壓的替代方案,但是並沒有像背壓一樣有可以控制上游的功能。在實作 getExecutor 時要注意,不要直接在回傳時 New 一個 ThreadPool 的 Instance。因為 getExecutor 是在每一次收到訊息後呼叫一次。以上的做法,也等同於每一次收到訊息就拿到一個 ThreadPool 的 Instance,每一個 ThreadPool 都只會產生一個 Thread,這就失去了使用 ThreadPool 的用意。

最容易出現以上問題的情況是使用 Executors 來取得 ThreadPool,一般的情況下很容易忽略 Executors 其實是每次呼叫就產生一個 Instance。所以當以 return Executors.newFixedThreadPool(poolSize); 的方式回傳 getExecutor 時,一開始也許顯示不出問題而被略過,但是一但訊息爆量後,就會因為產生 Thread 的數量到達上限而中止運作。

以上,是這次補充的內容,讓使用 FluxJava 的朋友做為參考。






0 意見:

張貼留言