在hystrix中核心使用了响应式编程。不了解响应式编程的同学可能会看的云里雾里。
基本概念
官网上对rx的描述。
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
以下翻译来自google。
RX 是一个使用可观察序列,来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据和/或事件序列,并添加允许您以声明方式将序列组合在一起的运算符,同时抽象出对诸如低级线程、同步、线程安全、并发数据结构和非 阻塞 I/O。
看完这些,我们大概能够明白RX到底是个啥玩意了。说到底就是一个用于异步操作的jar包,只不过符合了一定的标准。
RX 结合了观察者模式、迭代器模式和函数式编程的精华。
为什么要用RX
因为简洁、易用
名词解析
-
Observable 被观察者,迭代器的强化版,同时也是观察者模式中的被观察者,一旦有了数据,就会通知观察者或订阅者
-
subscribe 是一个动作,表示观察者要订阅被观察者,将两者关联起来
-
Observer 观察者,通过监听Observable发出的数据并根据设置的对应的操作符函数进行处理
-
emit Observable在数据产生时发送通知给Observer,Observer会调用对应的方法
-
items 数据项
-
Func* 用于包装有返回值的函数(作为回调或触发时执行)
-
Action*用于包装无返回值的函数(作为回调或触发时执行)
为什么说Observable是一个强化版的迭代器呢?在上一篇我分析hystrix的时候,一层套一层的创建Observable,然后返回给上一层,间接的就成了迭代器了。在Observable中所有的操作符最后返回的也是Observable。
rx流程
-
通过Observables创建事件流或数据流
-
通过操作符变换(转换、过滤、融合、聚合、条件、数学操作)数据流;
-
通过订阅执行(一种是在创建时就订阅,来一个处理一个,一种是先创建,发送数据,等最后一次订阅执行)
我们来看下Observable
在RxJava中,一个实现了Observer接口的对象可以订阅 (subscribe) 一个Observable实例。订阅者(subscriber) 对 Observable 发出 (emit) 的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable 发出的数据,而是创建了一个处于监听状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。
看下官方的流程图
在Rxjava中的事件回调方法
-
onSubscribe() 意思是当Observable被Observer 订阅的时候触发; 这个是官方说的Cold 方式
-
onNext() 迭代处理Observable发出的items,只要onCompleted 或onError触发就不再触发
-
onCompleted() 通知Observable已经完成处理,这个和onError() 是互斥的
-
onError() 事件异常时触发,触发以后整个流程终止
同时官方把把Observables分为hot 和cold
-
hot 是创建了Observable就开始发出items,后续再过来订阅这个的Observer收到的items可能不完整,所以想要完全消费,在创建Observable的时候就得订阅才能完整消费,类似于redis的发布订阅模式,发布的时候,你在线,你能消费,不在线永远不能消费;
-
cold 是先创建Observable,等到最后有订阅者了,Observable才会发出items,一个订阅消费一套,类似于kafka的广播模式,你什么时候来都可以从头消费(不要关注消息存储的时间)
官方把Observable分了好多类,具体可以查看:https://reactivex.io/documentation/operators.html#creating
上一篇hystrix源码分析讲过的东西,这里我会一语带过。
在HystrixCommand中的 toObservable().toBlocking().toFuture(),我们重点分析了toObservable()里的逻辑。
当时并没有详细解说为什么.
//核心处理逻辑
public Observable<R> toObservable() {
return Observable.defer(new Func0<Observable<R>>() {
//通过defer创建Observable
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
return hystrixObservable.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
});
}
在hystrix源码分析中我在讲hystrix的时候,我只是梳理下逻辑,并没有深入探究,主要用了以下几个
-
defer():只有当订阅者订阅才创建Observable;为每个订阅创建一个新的 Observable
-
just():将一个或多个对象转换成发出这个或这些对象的一个Observable
-
from():将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
-
rror():—创建一个什么都不做直接通知错误的Observable
在调用toObservable()方法的时候,执行逻辑如下(如果一切正常的情况下):
-
如果按照commandKey已经不能执行了在executeCommandWithSpecifiedIsolation时就会中断
-
然后一步步的创建Observable,绑定对应的操作事件
-
最后通过Observable.just(run()) 将执行的方法体包装成一个Observable
-
最后形成了一个Observable嵌套链条,可能在某个地方就会触发onTerminate或onError
具体流程可以看下面代码里标注的步骤,这里标注了如何将业务方法转成Observable
public class Observable<T> {
//①将任意对象转成Observable,并发出方法执行体,这里value是一个方法执行体
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
}
//
public final class ScalarSynchronousObservable<T> extends Observable<T> {
final T t;
static <T> Producer createProducer(Subscriber<? super T> s, T v) {
if (STRONG_MODE) {
return new SingleProducer<T>(s, v);
}
return new WeakSingleProducer<T>(s, v);
}
//②通过create创建对应方法体的Observable
public static <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
//③通过super也就是Observable往Observable里放一观察者(但这个观察者并没有订阅),我们再看下JustOnSubscribe
protected ScalarSynchronousObservable(final T t) {
super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
this.t = t;
}
//④JustOnSubscribe持有了方法体,JustOnSubscribe是观察者
static final class JustOnSubscribe<T> implements OnSubscribe<T> {
final T value;
JustOnSubscribe(T value) {
this.value = value;
}
//当外部回调时,将方法体和消费者绑定
@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, value));
}
}
}
public final class RxJavaHooks {
//静态代码块里会把onObservableCreate初始化
static {
init();
}
//创建一个Observable,观察者,这个观察者并没有订阅Observable
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
//⑤ 如果有观察者,则直接执行④创建的JustOnSubscribe,否则直接返回JustOnSubscribe
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
}
按照Rxjava的hot和cold,hystrix虽然创建了一个JustOnSubscribe的观察者,但是并没有订阅
-
从开始到最终创建中间有一定的逻辑
-
事件回调是从外到里
-
绑定的操作,大部分都是从里到外(最后会有示例)
示意图如下:
我们看下toBlocking()
只是转化成了BlockingObservable,里面没有太多的逻辑
toObservable().toBlocking().toFuture();
public class Observable<T> {
public final BlockingObservable<T> toBlocking() {
return BlockingObservable.from(this);
}
}
public final class BlockingObservable<T> {
//最后的Observable
private final Observable<? extends T> o;
// 将最后toObservable()的Observable转化成BlockingObservable
public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
return new BlockingObservable<T>(o);
}
}
我们继续看toFuture()
public final class BlockingObservable<T> {
public Future<T> toFuture() {
//最后的Observable转成Future
return BlockingOperatorToFuture.toFuture((Observable<T>)o);
}
public T single() {
return blockForSingle(o.single());
}
}
public final class BlockingOperatorToFuture {
//具体逻辑
public static <T> Future<T> toFuture(Observable<? extends T> that) {
//计数器
final CountDownLatch finished = new CountDownLatch(1);
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
//①这里的大意就是用了一个观察者订阅了Observable
@SuppressWarnings("unchecked")
final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
finished.countDown();
}
@Override
public void onError(Throwable e) {
error.compareAndSet(null, e);
finished.countDown();
}
@Override
public void onNext(T v) {
// "single" guarantees there is only one "onNext"
value.set(v);
}
});
return new Future<T>() {
private volatile boolean cancelled;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (finished.getCount() > 0) {
cancelled = true;
s.unsubscribe();
finished.countDown();
return true;
} else {
// can't cancel
return false;
}
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return finished.getCount() == 0;
}
@Override
public T get() throws InterruptedException, ExecutionException {
//通过计数器阻塞
finished.await();
return getValue();
}
....
};
}
}
public class Observable<T> {
//②用观察者单个操作Observable
public final Observable<T> single() {
return lift(OperatorSingle.<T> instance());
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//③将operator和nSubscribe绑定到OnSubscribeLift
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
}
public final class OperatorSingle<T> implements Operator<T, T> {
public static <T> OperatorSingle<T> instance() {
return (OperatorSingle<T>) Holder.INSTANCE;
}
}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
}
我把这块的整体逻辑抽了出来,写了个demo,也验证下对应上面的结论,大家可以体验下
public class RxJavaTest {
protected String run() throws Exception {
System.err.println(" 业务逻辑执行,并通过Observable.just包装成Observable");
return "yxkong";
}
private Observable<String> applyHystrixSemantics() {
final Action1<String> markEmits = new Action1<String>() {
@Override
public void call(String r) {
System.out.println("③ Observable doOnNext call, target: markEmits");
}
};
final Action1<Notification<? super String>> setRequestContext = new Action1<Notification<? super String>>() {
@Override
public void call(Notification<? super String> rNotification) {
System.out.println("③ Observable doOnEach call ,target:setRequestContext");
}
};
return Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
System.out.println("③ Observable is call ");
return Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
System.out.println("④ Observable is call ");
return Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
try {
System.out.println("⑤ Observable is call ");
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println("⑤ Observable doOnSubscribe is call 在 ⑤被订阅之前执行,一般用于修改、添加或者删除事件源的数据流");
}
});
}
});
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
System.out.println("③ Observable doOnTerminate is call ");
}
}).doOnNext(markEmits)
.doOnEach(setRequestContext);
}
@Test
public void demo(){
final Func0<Observable<String>> applyHystrixSemantics = new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
System.out.println("② Observable is call");
return applyHystrixSemantics();
}
};
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
System.out.println("② Observable doOnTerminate is call target:terminateCommandCleanup");
}
};
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
System.out.println("② Observable doOnUnsubscribe is call target:unsubscribeCommandCleanup");
}
};
final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
System.out.println("② Observable doOnCompleted is call target:fireOnCompletedHook");
}
};
System.out.println("demo execute");
Observable defer= Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
System.out.println("① Observable call");
Observable<String> hystrixObservable =Observable.defer(applyHystrixSemantics);
return hystrixObservable.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
System.out.println("① Observable doOnCompleted is call ");
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("① Observable doOnUnsubscribe is call");
}
});
//defer.toBlocking();
//不通过toFuture 不会执行
defer.toBlocking().toFuture();
}
}
最后输出结果为:
demo execute
① Observable call
② Observable is call
③ Observable is call
④ Observable is call
⑤ Observable doOnSubscribe is call 在 ⑤被订阅之前执行,一般用于修改、添加或者删除事件源的数据流
⑤ Observable is call
业务逻辑执行,并通过Observable.just包装成Observable
③ Observable doOnNext call, target: markEmits
③ Observable doOnEach call ,target:setRequestContext
③ Observable doOnTerminate is call
③ Observable doOnEach call ,target:setRequestContext
② Observable doOnTerminate is call target:terminateCommandCleanup
② Observable doOnCompleted is call target:fireOnCompletedHook
① Observable doOnCompleted is call
① Observable doOnUnsubscribe is call
② Observable doOnUnsubscribe is call target:unsubscribeCommandCleanup
如果觉得本文对你有用,欢迎一键三连,也可以关注微信公众号获取及时更新。