Updated 6 March 2021
Android code architecture is always a hot topic in android application development. There is always some better architecture/pattern developed which help developers to write better apps. One such approach is Reactive Programing or RX.
In Reactive programming we have two things of interest, the first is the Observable, the second is the Subscriber or the Observer. The Observable does all the work and the Subscriber is used to listen to the various states of the Observable, an Observable can either complete or fail; this will be reflected in the onComplete or the onError methods in the Subscriber, there is also a method called as onNext which will execute when an Observable emits an item. ~source
So in this blog, we will see how to add Rx in an Android project along with its caveats.
We are using Retrofit2 for all our network calls. Here are the latest dependencies used to add RxAndroid in the project.
1 2 3 4 5 6 |
// Reative X android compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // Because RxAndroid releases are few and far between, it is recommended you also // explicitly depend on RxJava 's latest version for bug fixes and new features. compile 'io.reactivex.rxjava2:rxjava:2.1.0' compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0' // same version as of Retrofit2 |
Here we are using different versions of RxAndroid and RxJava for bug fixes and new features added to RxJava library. Also, We are using Rx Java adapter in order to make the Retrofit2 client compatible with RxAndroid.
We can build our Retrofit client like in the sample given below. Please note we are using RxJava2CallAdapterFactory for compatibility with Retrofit2.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public static Retrofit getClient(Context context) { if (sRetrofit == null) { sContext = context.getApplicationContext(); // in case you need it sRetrofit = new Retrofit.Builder() .baseUrl(BuildConfig.BASE_URL) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // to support Refrofit2 via RxAndroid // .addConverterFactory(ScalarsConverterFactory.create()) .addConverterFactory(GsonConverterFactory.create()) .client(getOkHttpClientBuilder(1).build()) .build(); } return sRetrofit; } |
By Default Rx is Synchronous in nature but we can update use it asynchronously without using tedious AsyncTask just by simply adding subscribeOn().
The Observable class has many static methods which create observable objects. The following code shows you how to use the just() operator to create a very simple Observable that emits a single String.
1 2 3 4 |
Observable.just("one", "two", "three", "four", "five") .subscribeOn(Schedulers.newThread()) // this will create observable object on New thread .observeOn(AndroidSchedulers.mainThread()) // we are observing it on main thread just onPostExecute of AsyncTask. .subscribe(/* an Observer */); |
Here is a sample request where we can use subscribe the Observable on available io thread and observer on the main thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
RetrofitClient.getClient(this).create(ApiInterface.class).getHomePageData().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<HomePageResponse>() { @Override public void onSubscribe(@NonNull Disposable d) { // called in begining of subscription } @Override public void onNext(@NonNull HomePageResponse homePageResponse) { // called each time when data appears from stream or data source we are observing // For http/api call this will be executed once. } @Override public void onError(@NonNull Throwable e) { // similar to onFailure() of Retrofit } @Override public void onComplete() { // interface method to notify that subscription is completed } }); // Api Interface @POST(MOBIKUL_CATALOG_HOME_PAGE_DATA) Observable<HomePageResponse> getHomePageData(); |
There are varieties of use cases where developers enjoy using Rx in apps. Rx can be used with kotlin or java8 which will remove redundant boilerplate code. Rx solves many concurrency and multi-threading problems that we face in the application such as combining API calls (fetching results from different API call simultaneously and combine them), multiple data sources (like one from API source x and another from API source y), event click handling, efficient search suggestion and like that.
Let us take an example of combining API calls using zip operator:
In this example, there are two observable create observableOne and observableTwo on IO thread so that network operation can be performed on it efficiently without creating a new thread. We are using static operator zip on both observable for combining results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
/*adding subscribe on here instead of zip to create the observable on io thread. */ Observable<FirstResponse> observableOne = ApiConnection.getFirstResponse(this).subscribeOn(Schedulers.io()); Observable<SecondResponse> observableTwo = ApiConnection.getSecondResponse(this).subscribeOn(Schedulers.io()); Observable.zip(observableOne, observableTwo, (FirstResponse, SecondResponse) -> { // if there is any need to modify the two response or combined response object. return CombinedResponse; }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<CombinedResponse>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull CombinedResponse CombinedResponse) { // USE Combined Response } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); |
1 2 3 4 5 6 7 8 9 |
D/OkHttp: --> POST http://192.168.1.86:8010/mobikul/firstRequest http/1.1 D/OkHttp: --> END POST (0-byte body) D/OkHttp: --> POST http://192.168.1.86:8010/mobikul/secondRequest http/1.1 D/OkHttp: --> END POST (0-byte body) D/OkHttp: <-- 200 OK http://192.168.1.86:8010/mobikul/firstRequest (156ms) D/OkHttp: <-- END HTTP (684-byte body) D/OkHttp: <-- 200 OK http://192.168.1.86:8010/mobikul/secondRequest (253ms) D/OkHttp: <-- END HTTP (9709-byte body) |
As you can see in above logs both request are fired and we don’t need redundant check whether a request has been completed or not.
If we want to make the application more efficient we need to properly unsubscribe the observable it whenever the activity/fragment is destroyed. For this, we use the CompositeDisposable class which is a disposable container that can hold onto multiple other disposables and * offers O(1) add and removal complexity.
That is each request is added to the CompositeDisposable list and clear whenever the activity/fragment is destroyed or whenever some condition is met. Here is a ready baked code for you.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// DECLARATION AND INITIALIZATION public CompositeDisposable mCompositeDisposable = new CompositeDisposable(); // ADDING DISPOSABLE COMPONENT TO THE COMPOSITE-DISPOSABLE LIST @Override public void onSubscribe(@NonNull Disposable d) { mCompositeDisposable.add(d); } // Clearing list @Override protected void onStop() { super.onStop(); mCompositeDisposable.clear(); } |
We can use custom observer instead or Observer provided by reactiveX libraries. The main advance of using it is to promote OOPs in architecture.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public abstract class CustomObserver<T> implements Observer<T> { @SuppressWarnings("unused") private static final String TAG = "CustomObserver"; private Context mContext; protected CustomObserver(@android.support.annotation.NonNull Context context) { mContext = context; } @Override public void onSubscribe(@NonNull Disposable d) { // common task perform on subscribing any observable e.g. Adding the disposable object to Composite Disposable list mCompositeDisposable.add(d); } @Override public void onNext(@NonNull T t) { //Perform if there is any common operation } @Override public void onError(@NonNull Throwable t) { // Handler general failure } @Override public void onComplete() { // Handle general request complete events } } |
To detach an observer from its observable while the observable is still emitting data, we can call the unsubscribe() method on the Subscription object. e.g. mySubscription.unsubscribe();
Using the subscribeOn and observeOn operators, we can explicitly specify which thread should run the background job and which thread should handle the user interface updates.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public abstract class CustomObserver<T> implements Observer<T> { private Context mContext; protected CustomObserver(@android.support.annotation.NonNull Context context) { mContext = context; } @SuppressWarnings("unused") private static final String TAG = "CustomObserver"; public void onNext(@NonNull T t) { // process request } public void onError(@NonNull Throwable t) { NetworkHelper.onFailure(t, (Activity) mContext); } } |
https://www.toptal.com/android/functional-reactive-android-rxjava
https://medium.freecodecamp.com/rxandroid-and-kotlin-part-1-f0382dc26ed8
https://github.com/kaushikgopal/RxJava-Android-Samples
That’s all it take for lazy programming. Write efficient n clean code.
If you have more details or questions, you can reply to the received confirmation email.
Back to Home
1 comments