RxAndroid and Kotlin (Part 1)
Reactive programming has become increasingly popular in recent years as a paradigm that allows developers to create asynchronous and event-based programs by using observable sequences. This is particularly relevant in Android development, where most operations happen to be asynchronous – network requests, database queries, UI events, sensor data, location updates, just to name a few.
RxJava, a Java implementation of ReactiveX, has seen wide adoption in the Android community since its 1.0 release in 2016. According to Realm‘s Android developer survey in 2021, 13% of Android developers use RxJava in their current apps, making it one of the most popular libraries for handling asynchronous events. Many high-profile apps like Netflix, Trello, Uber and Airbnb have adopted RxJava in their codebase.
At the same time, Kotlin, a modern statically-typed language developed by JetBrains, has quickly gained ground with Android developers since Google announced first-class support for it in 2017. As per Google‘s Android Kotlin Usage Survey in 2022, over 60% of the top 1000 apps on Google Play use Kotlin, and for 35.2% of developers, all new code is written in Kotlin. The conciseness, safety and expressiveness of Kotlin makes it a great fit for Android development.
When used together, RxJava and Kotlin provide a powerful combination that can make your Android code more readable, maintainable and error-free. Let‘s explore some of the key benefits and features.
Creating and Subscribing to Observables
At the heart of RxJava are two key types – Observable
and Observer
. An Observable
emits a stream of data or events, and an Observer
consumes or reacts to these emissions. To create an Observable
, you can use one of the many static factory methods like just
, from
, range
, interval
, etc. or the create
method which gives you full control of the emission process.
Here‘s a simple example of creating an Observable
from a list of integers in Kotlin:
val intObservable = Observable.fromIterable(listOf(1, 2, 3, 4, 5))
And here‘s how you can create a custom Observable
that emits a series of odd numbers:
val oddNumberObservable = Observable.create<Int> { emitter ->
var i = 1
while (i < 10) {
emitter.onNext(i)
i += 2
}
emitter.onComplete()
}
Subscribing to an Observable
is as simple as calling the subscribe
method on it and providing an Observer
implementation. Kotlin‘s lambda syntax makes this even more concise:
intObservable.subscribe {
Log.d("Rx", "Got $it")
}
Under the hood, RxJava implements the observer pattern, but it extends it with a set of powerful operators that allow you to transform, filter, combine and schedule the emissions in various ways before they reach the Observer
. This is what enables the declarative and compositional nature of reactive programming.
Operators and Marble Diagrams
RxJava provides a vast collection of operators that can be applied to an Observable
stream. These operators can be visualized using marble diagrams, which show the input and output streams as timelines with color-coded shapes representing different data or events.
The merge
operator, depicted above, combines multiple Observable
s into a single Observable
that emits items as they arrive from each source. Other commonly used transformation operators include:
map
: applies a function to each emitted item and returns a newObservable
of the transformed itemsflatMap
: applies a function that returns anObservable
to each emitted item, and merges the resultingObservable
s into a singleObservable
concatMap
: similar toflatMap
, but maintains the order of the itemsswitchMap
: also returns anObservable
for each emitted item, but unsubscribes from the previous one when a new item arrives
Filtering operators allow you to conditionally suppress or select certain items from the stream:
filter
: only emits items that match a given predicate functiontake
: only emits the first n itemsskip
: skips the first n itemsdistinct
: suppress duplicate itemselementAt
: emits only the item at a specified index
Combining operators allow you to pair up multiple Observable
s in interesting ways:
zip
: combines the emissions of multipleObservable
s together via a specified functioncombineLatest
: whenever any of the sourceObservable
s emits an item, emits the latest item from each of the other sourceswithLatestFrom
: merges the specifiedObservable
into the current one by applying a function to combine the latest emissions from both
For example, the zip
operator can be used to combine data from two separate API calls:
val userObservable = api.getUser(userId)
val postsObservable = api.getPostsByUser(userId)
Observable.zip(
userObservable,
postsObservable,
{ user, posts -> UserWithPosts(user, posts) }
)
.subscribe { userWithPosts ->
// display user with latest posts
}
Schedulers and Threading
One of the biggest advantages of using RxJava in Android is that it abstracts away complex threading and concurrency operations. By default, an Observable
will emit items on the same thread on which its subscribe
method is called. However, you can change this behavior by using subscribeOn
and observeOn
operators.
The subscribeOn
operator designates which thread the Observable
will begin operating on, no matter at what point in the chain of operators it is called. observeOn
, on the other hand, affects the thread that the Observable
will use below where it appears. For instance:
Observable.fromIterable(someList)
.doOnNext { Log.d("Rx", "Emitting on ${Thread.currentThread().name}") }
.map { /* ... */ }
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { Log.d("Rx", "Observing on ${Thread.currentThread().name}") }
.subscribeOn(Schedulers.io())
.subscribe { /* ... */ }
Rx: Emitting on RxCachedThreadScheduler-1
Rx: Observing on main
Even though subscribeOn
is called last, it affects the entire chain. All upstream operators (fromIterable
, doOnNext
, map
) will be executed on the I/O thread, and downstream operators from observeOn
will be executed on the Android main thread.
RxAndroid provides a set of Android-specific schedulers in addition to those available in RxJava:
AndroidSchedulers.mainThread()
: Schedules work on the main Android UI threadAndroidSchedulers.from(Looper)
: Schedules work on the givenLooper
HandlerScheduler.from(Handler)
: Schedules work on the givenHandler
Since scheduler instances are stateless, RxAndroid maintains a singleton instance for each type of scheduler for efficiency.
Kotlin Extensions for RxJava
Kotlin provides a number of language features that further reduce boilerplate when used with RxJava. Extension functions allow you to add new functionality to existing classes, which is handy for extending RxJava‘s types.
For example, you can define an extension function on Observable
that applies a scheduler and subscribes in one go:
fun <T> Observable<T>.subscribeOnIo(): Disposable {
return subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe()
}
And then use it like:
someObservable.subscribeOnIo()
Kotlin‘s null safety also complements RxJava‘s type system well. You can use Observable<T?>
to emit nullable items, and the compiler will ensure that you handle nulls properly when subscribing:
val nullableObservable = Observable.just(1, 2, null, 4)
nullableObservable.subscribe {
it?.let { doSomething(it) }
}
Higher-order functions in Kotlin can also be used to create custom operators. For instance, here‘s how you can define a filterNotNull
operator:
fun <T> Observable<T?>.filterNotNull(): Observable<T> {
return filter { it != null }.map { it!! }
}
The built-in let
and apply
functions can also be used to simplify subscription side-effects:
userObservable.subscribe { user ->
user?.let {
nameTextView.text = it.name
ageTextView.text = it.age.toString()
}
}
RxJava and Android components
RxAndroid provides a number of bindings and extensions that make it easy to use RxJava with Android-specific components.
For handling UI events, RxView
provides a set of static factory methods that create Observable
s from various view types:
RxView.clicks(button)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe {
Log.d("Rx", "Clicked!")
}
RxTextView
provides bindings for EditText
widgets:
RxTextView.textChanges(editText)
.debounce(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.filter { it.length > 3 }
.switchMap { api.searchItems(it) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
updateSearchResults(results)
}
For dealing with databases, the SQLBrite library from Square provides a lightweight wrapper around SQLiteOpenHelper and ContentResolver that makes it easy to query with RxJava.
db.createQuery("users", "SELECT * FROM users WHERE age > ?", 18)
.mapToList { User.fromCursor(it) }
.subscribe { users ->
// do something with list of adult users
}
RxBinding provides bindings for most Android UI widgets as well as some non-UI components like SharedPreferences
, Notification
, etc. There are also numerous other RxJava extension libraries for Android-specific components like RxLocation for the Fused Location API, RxFit for the Google Fit API, RxWear for Wear OS, and RxFirebase for Firebase.
Challenges and Best Practices
While RxJava can make your Android code more concise and readable, it does come with a learning curve, especially if you‘re coming from an imperative programming background. The concept of Observables, operators and schedulers can be a lot to wrap your head around initially.
When introducing RxJava to an existing codebase, it‘s best to start small and gradually refactor. Rather than rewriting everything from scratch with RxJava, identify parts of your code that can benefit most from a reactive approach – complex async flows, nested callbacks, error-prone threading, etc. You can then extract these into Observable
streams and evolve your usage from there.
In team settings, it‘s crucial to establish a set of best practices and guidelines around using RxJava to ensure consistency and maintainability. For instance, agree on a set of most commonly used operators, naming conventions for streams, how and when to use schedulers, error handling policies, and so on. Code reviews and pair programming can help disseminate this knowledge across the team.
Debugging RxJava code can also be challenging due to the asynchronous and compositional nature of the library. While RxJava itself is well tested, ensuring that your Observable chains behave as expected requires a shift in mindset. Tools like RxMarbles can help visualize the behavior of operators. And libraries like RxRelay (which ports the Subject types from RxJava 1.x) can be used to construct unit tests for your reactive code.
Here are some more best practices to follow:
- Avoid creating long Observable chains; break them up into smaller, reusable streams.
- Be mindful of the number of operators you use. Some operators like
retry
andrepeat
may cause infinite loops if not used carefully. - Use the
CompositeDisposable
to manage your subscriptions and avoid memory leaks. - Prefer
create
method only as a last resort; most common sources of events already have factory methods likefromCallable
,fromIterable
, etc. - Avoid doing too much work within
doOnNext
ordoOnComplete
as they can introduce performance bottlenecks. Prefer side effect-free operators. - Use
Flowable
in place ofObservable
if you‘re dealing with a huge number (10k+) of emissions to avoid backpressure issues. - Adopt a consistent error handling strategy for your Observables. Use
onErrorResumeNext
andretry
sparingly. - Enable RxJava assembly tracking in debug builds to get detailed stack traces for errors.
Conclusion
RxJava is a versatile and powerful library for tackling asynchronous and event-based programming on Android. When paired with Kotlin‘s features like lambdas, extension functions and null safety, it can truly transform how you write Android apps. But to use it effectively requires a good understanding of its concepts and best practices.
This article covers the basics of creating and subscribing to Observable
s, using various operators, threading and testing with RxJava on Android. It also highlights some challenges and best practices to keep in mind when adopting RxJava on a team and in production.
Here are some learning resources to check out if you want to dive deeper:
- Official RxJava Wiki – Comprehensive documentation and recipes for RxJava
- Reactive Programming with RxJava – In-depth book on RxJava for Java and Android developers
- Grokking RxJava – Excellent multi-part blog series on learning RxJava
- RxAndroid Codelab – Step-by-step codelab on using RxJava and Kotlin in Android
- RxJava Android Samples – A collection of real-world RxJava samples for Android
Hopefully this gives you a solid foundation for getting started with RxJava and Kotlin on Android. In part 2, we‘ll explore some more advanced topics like custom operators, error handling, RxJava 3.x features, and testing strategies. Stay tuned!