Spring 5.0 Projects
上QQ阅读APP看书,第一时间看更新

Observer event calls

What we discussed so far is very high-level information about how we can use Observable in RxJava. It basically pushes (emits) the items (data or events) of a given type through a series of operators (if defined) until it reaches  Observer. Let's dig into more details to understand which mechanism works under the hood between this interaction and how RxJava complies with Reactive Streams specifications.

Observable interacts with Observers through the following event calls:

  • onNext: This is the call from where data/events are being sent, one at a time, down to all registered Observers.
  • onComplete: This event is used to signal completion of communication to all. 
  •  Observers: It simply denotes no more onNext calls happen.
  • onError: In case any error occurs before an onComplete() call, an onError() event is used to signal the error from Observable to Observers. Observable will stop emitting the data and Observers will handle the error.

These events are defined as an abstract method in the Observer type, and we will see the implementation type later in this chapter. First let's see how these event calls happen during the interaction with the following code:

public class RxJavaCreateDemo {

public static void main(String[] args) {
Observable<String> daysOfWeek = Observable.create(
sourceEmitter -> {
try {
sourceEmitter.onNext("Sunday");
sourceEmitter.onNext("Monday");
sourceEmitter.onNext("Tuesday");
sourceEmitter.onNext("Wednesday");
sourceEmitter.onNext("Thursday");
sourceEmitter.onNext("Friday");
sourceEmitter.onNext("Saturday");
sourceEmitter.onComplete();
}catch(Exception e) {
sourceEmitter.onError(e);
}
});
Observable<String> daysInUpperCase= daysOfWeek.map(day->day.toUpperCase())
.filter(day->day.startsWith("S"));
daysInUpperCase.subscribe(day->System.out.println("Day is -->"+day));
}
}

 Observable.create() is a factory method and used to create Observable with the emitter. The onNext() method of the emitter is used to emit (send) the data/events (one at a time) to the Observable chain (and finally to registered Observers). The onComplete() method is used to terminate further communication.

If you try to make an onNext() call after onComplete(), the data will not be transmitted. In case any error occurs, the onError() method is called. It is used to push up the error to the Observable chain where it is handled by Observer. In this code, there is no chance of any exception, but you can handle any error with onError().

We have used the map and filter operators to refine the data to uppercase and starting with D respectively. Finally, they are printed by Observer. The flow of data will happen from onNext()mapfilterObserver.  Each operator will return new Observable class in the chain. 

You notice that in the first example we used the Observable.just() method to emit the data. It internally invokes the onNext() method for each of the values pushed. On getting the last value, it will call onComplete(). So Observable.just() is equivalent to Observable.create() calling onNext() on each data and onComplete() on last one. The create() method is generally used for sources that are not reactive in nature.