RxAndroid and Kotlin (Part 1)

Reactive programming has become increasingly popular in recent years as a paradigm that allows developers to create asynchronous and event-based programs by using observable sequences. This is particularly relevant in Android development, where most operations happen to be asynchronous – network requests, database queries, UI events, sensor data, location updates, just to name a few.

RxJava, a Java implementation of ReactiveX, has seen wide adoption in the Android community since its 1.0 release in 2016. According to Realm‘s Android developer survey in 2021, 13% of Android developers use RxJava in their current apps, making it one of the most popular libraries for handling asynchronous events. Many high-profile apps like Netflix, Trello, Uber and Airbnb have adopted RxJava in their codebase.

At the same time, Kotlin, a modern statically-typed language developed by JetBrains, has quickly gained ground with Android developers since Google announced first-class support for it in 2017. As per Google‘s Android Kotlin Usage Survey in 2022, over 60% of the top 1000 apps on Google Play use Kotlin, and for 35.2% of developers, all new code is written in Kotlin. The conciseness, safety and expressiveness of Kotlin makes it a great fit for Android development.

When used together, RxJava and Kotlin provide a powerful combination that can make your Android code more readable, maintainable and error-free. Let‘s explore some of the key benefits and features.

Creating and Subscribing to Observables

At the heart of RxJava are two key types – Observable and Observer. An Observable emits a stream of data or events, and an Observer consumes or reacts to these emissions. To create an Observable, you can use one of the many static factory methods like just, from, range, interval, etc. or the create method which gives you full control of the emission process.

Here‘s a simple example of creating an Observable from a list of integers in Kotlin:

val intObservable = Observable.fromIterable(listOf(1, 2, 3, 4, 5))

And here‘s how you can create a custom Observable that emits a series of odd numbers:

val oddNumberObservable = Observable.create<Int> { emitter -> 
    var i = 1
    while (i < 10) {
        emitter.onNext(i)
        i += 2
    }
    emitter.onComplete()
}

Subscribing to an Observable is as simple as calling the subscribe method on it and providing an Observer implementation. Kotlin‘s lambda syntax makes this even more concise:

intObservable.subscribe { 
    Log.d("Rx", "Got $it")
}

Under the hood, RxJava implements the observer pattern, but it extends it with a set of powerful operators that allow you to transform, filter, combine and schedule the emissions in various ways before they reach the Observer. This is what enables the declarative and compositional nature of reactive programming.

Operators and Marble Diagrams

RxJava provides a vast collection of operators that can be applied to an Observable stream. These operators can be visualized using marble diagrams, which show the input and output streams as timelines with color-coded shapes representing different data or events.

Merge Operator

The merge operator, depicted above, combines multiple Observables into a single Observable that emits items as they arrive from each source. Other commonly used transformation operators include:

  • map: applies a function to each emitted item and returns a new Observable of the transformed items
  • flatMap: applies a function that returns an Observable to each emitted item, and merges the resulting Observables into a single Observable
  • concatMap: similar to flatMap, but maintains the order of the items
  • switchMap: also returns an Observable for each emitted item, but unsubscribes from the previous one when a new item arrives

Filtering operators allow you to conditionally suppress or select certain items from the stream:

  • filter: only emits items that match a given predicate function
  • take: only emits the first n items
  • skip: skips the first n items
  • distinct: suppress duplicate items
  • elementAt: emits only the item at a specified index

Combining operators allow you to pair up multiple Observables in interesting ways:

  • zip: combines the emissions of multiple Observables together via a specified function
  • combineLatest: whenever any of the source Observables emits an item, emits the latest item from each of the other sources
  • withLatestFrom: merges the specified Observable into the current one by applying a function to combine the latest emissions from both

For example, the zip operator can be used to combine data from two separate API calls:

val userObservable = api.getUser(userId)
val postsObservable = api.getPostsByUser(userId) 

Observable.zip(
    userObservable,
    postsObservable,
    { user, posts -> UserWithPosts(user, posts) }
)
.subscribe { userWithPosts ->
    // display user with latest posts
}

Schedulers and Threading

One of the biggest advantages of using RxJava in Android is that it abstracts away complex threading and concurrency operations. By default, an Observable will emit items on the same thread on which its subscribe method is called. However, you can change this behavior by using subscribeOn and observeOn operators.

The subscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators it is called. observeOn, on the other hand, affects the thread that the Observable will use below where it appears. For instance:

Observable.fromIterable(someList)
    .doOnNext { Log.d("Rx", "Emitting on ${Thread.currentThread().name}") }
    .map { /* ... */ }
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext { Log.d("Rx", "Observing on ${Thread.currentThread().name}") } 
    .subscribeOn(Schedulers.io())
    .subscribe { /* ... */ }
Rx: Emitting on RxCachedThreadScheduler-1
Rx: Observing on main

Even though subscribeOn is called last, it affects the entire chain. All upstream operators (fromIterable, doOnNext, map) will be executed on the I/O thread, and downstream operators from observeOn will be executed on the Android main thread.

RxAndroid provides a set of Android-specific schedulers in addition to those available in RxJava:

  • AndroidSchedulers.mainThread(): Schedules work on the main Android UI thread
  • AndroidSchedulers.from(Looper): Schedules work on the given Looper
  • HandlerScheduler.from(Handler): Schedules work on the given Handler

Since scheduler instances are stateless, RxAndroid maintains a singleton instance for each type of scheduler for efficiency.

Kotlin Extensions for RxJava

Kotlin provides a number of language features that further reduce boilerplate when used with RxJava. Extension functions allow you to add new functionality to existing classes, which is handy for extending RxJava‘s types.

For example, you can define an extension function on Observable that applies a scheduler and subscribes in one go:

fun <T> Observable<T>.subscribeOnIo(): Disposable {
    return subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe()
}

And then use it like:

someObservable.subscribeOnIo()

Kotlin‘s null safety also complements RxJava‘s type system well. You can use Observable<T?> to emit nullable items, and the compiler will ensure that you handle nulls properly when subscribing:

val nullableObservable = Observable.just(1, 2, null, 4)

nullableObservable.subscribe { 
    it?.let { doSomething(it) } 
}

Higher-order functions in Kotlin can also be used to create custom operators. For instance, here‘s how you can define a filterNotNull operator:

fun <T> Observable<T?>.filterNotNull(): Observable<T> {
    return filter { it != null }.map { it!! }
}

The built-in let and apply functions can also be used to simplify subscription side-effects:

userObservable.subscribe { user -> 
    user?.let {
        nameTextView.text = it.name
        ageTextView.text = it.age.toString()
    }
}

RxJava and Android components

RxAndroid provides a number of bindings and extensions that make it easy to use RxJava with Android-specific components.

For handling UI events, RxView provides a set of static factory methods that create Observables from various view types:

RxView.clicks(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe { 
        Log.d("Rx", "Clicked!") 
    }

RxTextView provides bindings for EditText widgets:

RxTextView.textChanges(editText)
    .debounce(500, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io())
    .filter { it.length > 3 } 
    .switchMap { api.searchItems(it) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { results ->
        updateSearchResults(results)
    }

For dealing with databases, the SQLBrite library from Square provides a lightweight wrapper around SQLiteOpenHelper and ContentResolver that makes it easy to query with RxJava.

db.createQuery("users", "SELECT * FROM users WHERE age > ?", 18)
    .mapToList { User.fromCursor(it) }
    .subscribe { users ->
        // do something with list of adult users 
    }

RxBinding provides bindings for most Android UI widgets as well as some non-UI components like SharedPreferences, Notification, etc. There are also numerous other RxJava extension libraries for Android-specific components like RxLocation for the Fused Location API, RxFit for the Google Fit API, RxWear for Wear OS, and RxFirebase for Firebase.

Challenges and Best Practices

While RxJava can make your Android code more concise and readable, it does come with a learning curve, especially if you‘re coming from an imperative programming background. The concept of Observables, operators and schedulers can be a lot to wrap your head around initially.

When introducing RxJava to an existing codebase, it‘s best to start small and gradually refactor. Rather than rewriting everything from scratch with RxJava, identify parts of your code that can benefit most from a reactive approach – complex async flows, nested callbacks, error-prone threading, etc. You can then extract these into Observable streams and evolve your usage from there.

In team settings, it‘s crucial to establish a set of best practices and guidelines around using RxJava to ensure consistency and maintainability. For instance, agree on a set of most commonly used operators, naming conventions for streams, how and when to use schedulers, error handling policies, and so on. Code reviews and pair programming can help disseminate this knowledge across the team.

Debugging RxJava code can also be challenging due to the asynchronous and compositional nature of the library. While RxJava itself is well tested, ensuring that your Observable chains behave as expected requires a shift in mindset. Tools like RxMarbles can help visualize the behavior of operators. And libraries like RxRelay (which ports the Subject types from RxJava 1.x) can be used to construct unit tests for your reactive code.

Here are some more best practices to follow:

  • Avoid creating long Observable chains; break them up into smaller, reusable streams.
  • Be mindful of the number of operators you use. Some operators like retry and repeat may cause infinite loops if not used carefully.
  • Use the CompositeDisposable to manage your subscriptions and avoid memory leaks.
  • Prefer create method only as a last resort; most common sources of events already have factory methods like fromCallable, fromIterable, etc.
  • Avoid doing too much work within doOnNext or doOnComplete as they can introduce performance bottlenecks. Prefer side effect-free operators.
  • Use Flowable in place of Observable if you‘re dealing with a huge number (10k+) of emissions to avoid backpressure issues.
  • Adopt a consistent error handling strategy for your Observables. Use onErrorResumeNext and retry sparingly.
  • Enable RxJava assembly tracking in debug builds to get detailed stack traces for errors.

Conclusion

RxJava is a versatile and powerful library for tackling asynchronous and event-based programming on Android. When paired with Kotlin‘s features like lambdas, extension functions and null safety, it can truly transform how you write Android apps. But to use it effectively requires a good understanding of its concepts and best practices.

This article covers the basics of creating and subscribing to Observables, using various operators, threading and testing with RxJava on Android. It also highlights some challenges and best practices to keep in mind when adopting RxJava on a team and in production.

Here are some learning resources to check out if you want to dive deeper:

Hopefully this gives you a solid foundation for getting started with RxJava and Kotlin on Android. In part 2, we‘ll explore some more advanced topics like custom operators, error handling, RxJava 3.x features, and testing strategies. Stay tuned!

Similar Posts