Android Rxjava3

Android Rxjava3

Study notes (4) Detailed explanation of Rxjava3 and its use in Android: basic concepts

Add dependency:

Implementation 'io.reactivex.rxjava3: rxjava: 3.0.13' Implementation 'io.reactivex.rxjava3: rxandroid: 3.0.0' Copy Code

Add java8 compilation

compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } Copy code
1), basic concepts

RxJava is a java implementation of Reactive Extensions, which implements an asynchronous programming interface based on the observer pattern.

Basic field description:

: Iterator, used to traverse

: Observed object (the source of the change), notify the observer when the data changes.

: Observer (observe changes).

: Actions when launching, publishing, sending, and changing.

: Entry refers to a single transmitted data item.

Stream object
: In the RxJava documentation, emissions, emits, item, event, signal, data and message are all considered data objects that are passed in the data stream.

: When upstream and downstream are transmitting, processing, and responding to data streams through Observable in different threads, if the upstream transmits data faster than the downstream receives and processes data, it will cause a backlog of data that has not had time to process. The data will neither be lost nor reclaimed by the garbage collection mechanism, but stored in an asynchronous buffer pool. If the data in the buffer pool has not been processed, it accumulates more and more, which will eventually cause memory overflow. It is the problem of backpressure in reactive programming.

For this reason, RxJava brings the concept of backpressure. Back pressure is a flow control step that controls the use of memory without knowing how much data is still upstream, indicating how much data they can handle. Back pressure refers to a strategy that tells the upstream observer to reduce the sending speed when the observer sends events much faster than the observer's processing speed in an asynchronous scenario

In Rxjava 1.0, some Observables support back pressure and some do not. In order to solve this problem, 2.0 distinguishes between Observables that support back pressure and those that do not support back pressure: there are Flowable classes that support back pressure, but they do not. There are Observable, Single, Maybe and Completable classes for back pressure.

1. If you use FlowableSubscriber when subscribing, you need to actively request upstream data items through s.request(Long.MAX_VALUE). If you encounter a back pressure error report, FlowableSubscriber will try-catch the error by default and call back through onError(), and the program will not crash;

2. If you use Consumer (event consumer) when subscribing, you don't need to actively request upstream data. By default, s.request(Long.MAX_VALUE) has been called. If you encounter back pressure and report an error, and the consumer of Throwable is not new, the program will crash directly;

3. The default buffer pool upstream of the backpressure strategy is 128.

Back pressure strategy type

error, the buffer is about 128

buffer, the buffer is around 1000

drop, discard events that cannot be saved

latest, keep only the latest

missing, the default setting, do nothing

Event scheduler
: RxJava events are not being ignored. A reasonable manager must manage them and release them at the right time so as not to cause memory leaks. The managers here are called event schedulers (or event schedulers). Manager) CompositeDisposable.

2), thread scheduler (Schedulers)

Convenient thread switching in Android.

Scheduler function AndroidSchedulers.mainThread() needs to reference rxandroid, switch to UI thread Schedulers.computation() is used for computing tasks, such as event loop and callback processing, the default number of threads is equal to the number of processors is used for IO-intensive tasks, such as asynchronous blocking IO operations. The thread pool of this scheduler will be adjusted according to needs. It defaults to a CacheThreadScheduler Schedulers.newThread() creates a new thread for each task Schedulers.trampoline() inserts the target task in the current thread and executes it immediately. If there is a task in the current thread, it will be suspended. After the inserted target task is executed, the previously suspended task will be resumed. Scheduler.from(executor) Specify Executor as the scheduler Copy code
3), base class

Compared with RxJava 2, the base classes in RxJava 3 have not changed much. There are mainly the following base classes:

io.reactivex.Flowable sends 0 N data, supports Reactive-Streams and back pressure io.reactivex.Observable sends 0 N data, does not support back pressure, io.reactivex.Single can only send a single data or an error io.reactivex.Completable does not send any data, but only handles onComplete and onError events. io.reactivex.Maybe can transmit 0 or 1 data, either success or failure. Copy code
4) The "hot" and "cold" of Observables

When does Observable start to emit data sequence? It depends on the implementation of Observable. A "hot" Observable may start emitting data as soon as it is created, so all subsequent observers subscribing to it may start to receive data from a certain position in the middle of the sequence (some data are missed). A "cold" Observable will wait until there is an observer subscribed to it before it starts transmitting data, so the observer can ensure that it will receive the entire data sequence.

In some ReactiveX implementations, there is also an Observable called Connectable. Regardless of whether there is an observer subscribed to it, this Observable will not start emitting data unless the Connect method is called.

5) Simple use of Rxjava

What you need to know is that RxJava takes the observer mode as its skeleton, and there are two common observer modes:

Observable (observable)/Observer (observer)

Flowable (observed)/Subscriber (observer)

See below:

In RxJava2/3, Observeable is used to subscribe to Observer and does not support back pressure, while Flowable is used to subscribe to Subscriber and supports back pressure.

(1) Observable/Observer

The most basic usage:

//Observed (Observable abstract class, ObservableOnSubscribe interface) val observable = Observable.create(object: ObservableOnSubscribe<Int> { override fun subscribe ( emitter: ObservableEmitter<Int> ) { emitter.onNext( 1 ) emitter.onNext( 2 ) emitter.onComplete() } }); //Observer (Observer interface) val observer = object: Observer<Int> { override fun onSubscribe ( d: Disposable ) { } override fun onNext ( o: Int ) { Log.d( "" , "Data received: ${o}" ) } override fun onError ( e: Throwable ) { } override fun onComplete () { } } //Association observable.subscribe(observer) Copy code

This observer model does not support back pressure: When the observer sends a large amount of data quickly, the downstream will not do other processing. Even if the data accumulates in a large amount, the call chain will not report MissingBackpressureException. Excessive memory consumption will only be OOM. Therefore, when we use Observable/Observer, what we need to consider is whether the amount of data is very large (the official line is given by 1000 events as a reference)

(2), Flowable/Subscriber

The most basic usage:

//Flowable var sub: Subscription? = null Flowable.range( 0 , 3 ).subscribe(object: Subscriber<Int> { override fun onSubscribe ( s: Subscription ) { sub = s sub?.request( 1 ) //request a piece of data//s ? .cancel() here, the event will not be sent after cancel } override fun onNext ( t: Int? ) { if (t == 1 ) { sub?.cancel() //The event is canceled halfway, no callback onComplete; After being canceled, the request is invalid because the data source has been cleared } else { sub?.request( 1 ) } Log.d( "Flowable" , "onNext:${t}" ) } override fun onError ( t: Throwable? ) { Log.d( "Flowable" , "onError:${t}" ) } override fun onComplete () { Log.d( "Flowable" , "onComplete" ) } }) Copy code

Flowable supports back pressure, that is to say, generally speaking, the upstream observer will respond to the downstream observer's data request, and the downstream will call request(n) to tell the upstream how much data to send. This prevents a large amount of data from accumulating on the call chain and keeps the memory at a low level.

Of course, Flowable can also be created by creat():

//Flowable Flowable.create(object: FlowableOnSubscribe<Int> { override fun subscribe ( emitter: FlowableEmitter<Int> ) { emitter.onNext( 1 ) emitter.onComplete() } }, BackpressureStrategy.BUFFER) //Backpressure strategy needs to be specified .subscribe(object: Subscriber<Int> { override fun onSubscribe ( s: Subscription? ) { } override fun onNext ( t: Int? ) { } override fun onError ( t: Throwable? ) { } override fun onComplete () { } }) Copy code

Although Flowable can be created by create(), you must specify the back pressure strategy to ensure that the Flowable you create supports back pressure.

According to the result output of the above code, we can see that when we call the subscription.request(n) method, without waiting for the execution of the following code in onSubscribe(), it will immediately execute to the onNext method. Therefore, if you are in onNext When using a class that needs to be initialized in the method, you should try to do the initialization work before the subscription.request(n) method is called;

Of course, this is not absolute. When I tested, I found that when customizing Flowable through create(), even if the subscription.request(n) method is called, it will wait for the code behind in the onSubscribe() method to be executed. After that, onNext is called.

(3), Single/SingleObserver

Single is similar to Observable. The difference is that it always emits only one value, or an error notification, instead of emitting a series of values (of course there is no backpressure problem), so when you use a single continuous event stream, You can use Single. The Single observer contains only two events, one is onSuccess for normal processing success, and the other is onError for processing failure. Therefore, unlike Observable, which requires three methods onNext, onError, and onCompleted, subscribing to Single only requires two methods:

onSuccess-Single emits a single value to this method

onError-If the required value cannot be emitted, Single emits a Throwable object to this method

Single will only call one of these two methods, and only once. After any one of these methods is called, the subscription relationship is terminated.

Single operator

Operator Return value Description compose Single creates a custom operator concat and concatWith Observable connect data emitted by multiple Single and Observable create Single calls the create method of the observer to create a Single error Single returns a Single that immediately sends error notifications to subscribers flatMap Single returns a Single, which emits the result of the flatMap operation on the original Single's data flatMapObservable Observable returns an Observable, which emits the result of flatMap operation on the original Single data from Single converts Future to Single just Single returns a Single that emits a specified value map Single returns a Single, which emits the result of performing a map operation on the data of the original Single merge Single converts a Single (the data it emits is another Single, let s say B) into another Single (it emits data from another Single(B)) merge and mergeWith Observable merge and emit data from multiple Singles observeOn Single instructs Single to call the subscriber's method on the specified scheduler onErrorReturn Single converts a Single that emits error notifications into a Single that emits specified data items subscribeOn Single instructs Single to perform an operation on the specified scheduler timeout Single It adds timeout control to the original Single, and sends an error notification if it times out toSingle Single converts an Observable that emits a single value to a Single zip and zipWith Single convert multiple Singles into one, the data transmitted by the latter is the result of applying a function to the former Copy code

Simple to use:

Single.create(object: SingleOnSubscribe< String > { override fun subscribe ( emitter: SingleEmitter< String > ) { //Send data (the following two methods will only call back the previous one, and the subsequent events will not be processed) emitter.onError(NumberFormatException()) emitter.onSuccess( "onSuccess" ) } }).subscribe(object: SingleObserver< String > { override fun onSubscribe ( d: Disposable? ) { Log.d( "SingleObserver" , "onSubscribe" ) } override fun onSuccess ( t: String ? ) { Log.d( "SingleObserver" , "onSuccess:${t}" ) } override fun onError ( e: Throwable? ) { Log.d( "SingleObserver" , "onError:${e.toString()}" ) } }) Copy code
(4), Complete/CompletableObserver

If your observer doesn't even care about the onNext event, you can use Complete, which has only two events: onComplete and onError:

Completable.create(object: CompletableOnSubscribe { override fun subscribe ( emitter: CompletableEmitter ) { //observed emitter.onComplete() } }).subscribe(object: CompletableObserver { //Observer override fun onSubscribe ( d: Disposable? ) { } override fun onComplete () { Log.d( "Completable" , "onComplete" ) } override fun onError ( e: Throwable? ) { } }) Copy code

To convert to other types of observables, you can also use toFlowable(), toObservable() and other methods to convert.

(5) Maybe/MaybeObserver

If you have a requirement that it is possible to send a data or not send any data, then you need Maybe, which is similar to a mixture of Single and Complete.

Maybe may call one of the following situations (the so-called Maybe):



You can see that onSuccess and onComplete are mutually exclusive. The example code is as follows:

Maybe.create(object: MaybeOnSubscribe< String > { override fun subscribe ( emitter: MaybeEmitter< String > ) { // Observed//The following two events are mutually exclusive, only the first emitter.onSuccess( "test" ) will be called emitter.onComplete() } }).subscribe(object: MaybeObserver< String > { //Observer override fun onSubscribe ( d: Disposable ) { Log.d( "Maybe" , "onSubscribe" ) } override fun onSuccess ( s: String ) { Log.d( "Maybe" , "onSuccess:${s}" ) } override fun onComplete () { Log.d( "Maybe" , "onComplete" ) } override fun onError ( e: Throwable? ) { Log.d( "Maybe" , "onError" ) } }) Copy code

To convert to other types of observables, you can also use toFlowable(), toObservable() and other methods to convert.

Examples of actual use:

Maybe.just(isLogin()) //Determine whether to log in. subscribe(object: MaybeObserver< Boolean > { //Observer override fun onSubscribe ( d: Disposable ) { Log.d( "Maybe" , "onSubscribe" ) } override fun onSuccess ( b: Boolean ) { if (b) { Log.d( "Maybe" , "onSuccess: Already logged in" ) } else { Log.d( "Maybe" , "onSuccess: No login" ) } } override fun onComplete () { Log.d( "Maybe" , "onComplete" ) } override fun onError ( e: Throwable? ) { Log.d( "Maybe" , "onError" ) } }) Copy code

The above is the common usage of Maybe/MaybeObserver. As you can see, in fact, this observer mode is not used to send large amounts of data, but to send single data, that is, when you only want the result of a certain event ( When true or false), you can use this observer mode.

(6) The event scheduler releases the event

Basic code:

//Create event scheduler val mCompositeDisposable: CompositeDisposable = CompositeDisposable() override fun onCreate ( savedInstanceState: Bundle? ) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_main) val subscribe = Observable.create(object: ObservableOnSubscribe< String > { override fun subscribe ( emitter: ObservableEmitter< String >? ) { } }).subscribe( //Subscribe to multiple events object: Consumer< String > { override fun accept ( t: String ) { //corresponds to onNext() } }, object : Consumer<Throwable> { override fun accept ( throwable: Throwable ) { //corresponds to onError() } }, object : Action { override fun run () { //Corresponding to onComplete() } }) //The use of event scheduler mCompositeDisposable.add(subscribe); mCompositeDisposable.clear(); } Copy code

The methods provided by CompositeDisposable all manage events

dispose(): release all events

clear(): release all events, the same as dispose()

add(): add an event

addAll(): add all events

remove(): remove an event and release

delete(): remove an event

In Rxjava, the observer cannot receive null as a data source.

Personal study notes, main reference and excerpt from the following blog, thank you here:

1. Xu Jinjin