hystrix源码分析

2021/12/25 1364点热度 0人点赞 0条评论

最近遇到了一些生产问题涉及到了hystrix,想要了解下底层的原理。hystrix中大量运用了Rxjava的响应式编程,不懂Rxjava,理解起来有点费劲。

基本准备

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <spring-boot.version>2.3.12.RELEASE</spring-boot.version>
        <spring-cloud.version>Hoxton.SR11</spring-cloud.version>
    </properties>      
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>

    <dependencyManagement>
        <dependencies>
            <!--导入依赖的pom文件-->
            <!--顺序靠前,先被引入-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>        
        </dependencies>
    </dependencyManagement>
</project>            

启用hystrix

//在主类上添加
@EnableHystrix

我们看下源码
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}

最终@EnableHystrix继承了@EnableCircuitBreaker

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {

}

按照之前的模式,我们一定是去分析EnableCircuitBreakerImportSelector,至于如何执行@Import这里不做过多说明,之前已经讲过,感兴趣的看下这里:具体代码在ConfigurationClassParser.processImports里的String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());

通过EnableCircuitBreakerImportSelector导入HystrixCircuitBreakerConfiguration

//优先级不是特别高,毕竟是运行时的切面
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
        extends SpringFactoryImportSelector<EnableCircuitBreaker> {

    @Override
    protected boolean isEnabled() {
        //默认是开启的
        return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                Boolean.class, Boolean.TRUE);
    }
}

Import导入后,会通过selectImports从配置文件里加载对应的类 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker

public abstract class SpringFactoryImportSelector<T>
        implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
    //将泛型T解析为this.annotationClass
    protected SpringFactoryImportSelector() {
        this.annotationClass = (Class<T>) GenericTypeResolver.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
    }
    @Override
    public String[] selectImports(AnnotationMetadata metadata) {
        //通过SpringFactoriesLoader.loadFactoryNames从spring.factories中获取org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker对应的配置
        List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
    }           
}

具体到最后对应的是2.2.8.RELEASE/spring-cloud-netflix-hystrix-2.2.8.RELEASE.jar!/META-INF/spring.factories

spring.factories文件的内容

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

在@SpringBootApplication中,对@EnableAutoConfiguration有引用,所以一启动,就会加载HystrixAutoConfiguration这些类

@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
        @Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication {}

HystrixCircuitBreakerConfiguration解析

@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
    //核心是这个切面
    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }
    ....
}

在HystrixCommandAspect中主要是对HystrixCommand和HystrixCollapser这两个注解进行处理

@Aspect
public class HystrixCommandAspect {
    static {
        //通过静态方法将两个注解的两个工厂实例化
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
            .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
            .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
            .build();
    }
    //定义切入点注解HystrixCommand
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }
    //定义切入点注解HystrixCollapser
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        //获取目标方法
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        //只处理这两种注解标注的方法
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        //获取MetaHolderFactory不同实现的factory
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //获取目标方法的的元数据,方法签名,参数等
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        /**
         * 创建处理器CommandCollapser 或 GenericCommand (同步) 或GenericObservableCommand(异步)
         * GenericCommand里有很多super,最终通过HystrixCommandBuilderFactory.getInstance().create(metaHolder) 构建了一个HystrixCommandBuilder作为GenericCommad的参数
         * new  GenericCommand 通过super到AbstractHystrixCommand,
         * AbstractHystrixCommand 通过super到HystrixCommand,
         * HystrixCommand最终到了AbstractCommand  一路传递
         * 一会在AbstractCommand中分析下
         */
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        //根据返回值推断执行类型
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
        //返回结果
        Object result;
        try {
            //不使用observable模式
            if (!metaHolder.isObservable()) {
                //execute执行
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }

    //HystrixCommand的时候MetaHolder的创建
    private static class CommandMetaHolderFactory extends MetaHolderFactory {
        @Override
        public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            //获取注解HystrixCommand
            HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
            //根据返回结果推断任务类型,可以知道以哪种方式执行
            ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
            MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
            if (isCompileWeaving()) {
                builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
            }
            //这里没有多少参数,最重要的一个hystrixCommand,你在注解里加了什么
            return builder.defaultCommandKey(method.getName())
                            .hystrixCommand(hystrixCommand)
                            .observableExecutionMode(hystrixCommand.observableExecutionMode())  //执行模式
                            .executionType(executionType) //执行方式
                            .observable(ExecutionType.OBSERVABLE == executionType)
                            .build();
        }
    }
}

//在枚举ExecutionType类里
    public static ExecutionType getExecutionType(Class<?> type) {
        if (Future.class.isAssignableFrom(type)) {
            return ExecutionType.ASYNCHRONOUS;
        } else if (Observable.class.isAssignableFrom(type)) {
            return ExecutionType.OBSERVABLE;
        } else {
            return ExecutionType.SYNCHRONOUS;
        }
    }

我们重点分析下同步处理,通过代码我们可以看到HystrixInvokable 是 GenericCommand,我们同步里的看下 CommandExecutor.execute(invokable, executionType, metaHolder)

public class CommandExecutor {
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                //重点看同步处理这个,先把GenericCommand 转成HystrixExecutable 再执行execute
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
}

我们先看下类关系

通过GenericCommand一层层的往上翻,最终定位到HystrixCommand有个execute()

HystrixCommand

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    //同步执行
    public R execute() {
        try {
            //通过queue().get()来同步执行
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
   //异步执行,什么时候get(),由调用者决定,get()的时候会阻塞
   public Future<R> queue() {
        //核心处理,最终定位到了AbstractCommand里的toObservable()里
        final Future<R> delegate = toObservable().toBlocking().toFuture();

        final Future<R> f = new Future<R>() {
            .....
            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }

        };
        //特殊处理了下,已经执行完了,get()也不会阻塞了
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                ...
            }
        }
        return f;
    }
}

我们看下官方的流程图:(来源地址:https://github.com/Netflix/Hystrix/wiki/How-it-Works)通过代码,我们也可以看到确实到了这里

AbstractCommand

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
   // 一个 new GenericCommand 最终走到了这里,参数都包装到了这里,只是从GenericCommand 来说
    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
        //设置group
        this.commandGroup = initGroupKey(group);
        //正常为方法名,不设置,就是类
        this.commandKey = initCommandKey(key, getClass());
        //这里做了优化,将组装好的CommandProperties放入了ConcurrentHashMap缓存了起来(如果是动态生成方法溢出?)
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        //初始化线程池的key,如果为null,则用groupKey.name()
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        //初始化指标,以commandKey缓存
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        //初始化断路器,也是以commandKey缓存
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        //初始化线程池,传入有就用,没有就创建一个缓存,还是commandKey维度,最终是通过concurrencyStrategy.getThreadPool(threadPoolKey, properties)创建
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }
    //核心处理逻辑
    public Observable<R> toObservable() {
        //拿到当前的对象
        final AbstractCommand<R> _cmd = this;
        /**
         *  terminateCommandCleanup  用于中断命令清理的回调函数
         *  unsubscribeCommandCleanup 取消订阅命令的清理回调函数
         *  applyHystrixSemantics  创建带执行逻辑的Observable,用于订阅后回调
         *  wrapWithAllOnNextHooks onNext执行的钩子
         *  fireOnCompletedHook  完成后触发的钩子
         */
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                //没有订阅不执行
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                //执行
                return applyHystrixSemantics(_cmd);
            }
        };
        //onNext执行后的钩子
        final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
            @Override
            public R call(R r) {
                R afterFirstApplication = r;

                try {
                    afterFirstApplication = executionHook.onComplete(_cmd, r);
                } catch (Throwable hookEx) {
                    logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
                }

                try {
                    return executionHook.onEmit(_cmd, afterFirstApplication);
                } catch (Throwable hookEx) {
                    logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                    return afterFirstApplication;
                }
            }
        };
        //通过defer创建Observable
        return Observable.defer(new Func0<Observable<R>>() {
            //创建观察者,观察者的执行,得由subscribe()触发
            @Override
            public Observable<R> call() {
                 //通过cas来判断命令状态,初始为NOT_STARTED,如果进来不是NOT_STARTED说明已经有别线程执行了,就抛异常
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                }
                //命令开始时间
                commandStartTimestamp = System.currentTimeMillis();
                if (properties.requestLogEnabled().get()) {
                    // 不管发生什么都要用日志记录下来
                    if (currentRequestLog != null) {
                        currentRequestLog.addExecutedCommand(_cmd);
                    }
                }
                //是否允许请求缓存,
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                //cacheKey是需要设置@CacheKey 在HystrixCacheKeyGenerator中会把方法和指定的key的value进行拼接
                final String cacheKey = getCacheKey();

                //允许请求缓存的情况下,先从缓存中获取,最终缓存用的是ConcurrentHashMap
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                //通过defer创建Observable
                Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 开启缓存的情况下,重新放入缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 对hystrixObservable进行包装后放入缓存
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);

                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)    //中断命令清理
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // 不订阅也清理
                        .doOnCompleted(fireOnCompletedHook);//完成触发的钩子
            }
        });
    }

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        //开始执行
        executionHook.onStart(_cmd);

        //断路器允许执行,里面有好多逻辑,具体看下面的解说
        if (circuitBreaker.allowRequest()) {
            //信号量处理(前提是你配置的是信号量处理机制,如果使用线程池则使用TryableSemaphoreNoOp,申请信号量为true)
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            //执行完释放信号量
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            //申请信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    //设置调用时间
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)  //执行命令,这块是核心
                            .doOnError(markExceptionThrown) //异常处理
                            .doOnTerminate(singleSemaphoreRelease)//中断时,信号量释放
                            .doOnUnsubscribe(singleSemaphoreRelease);//不订阅,也释放信号量
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                //申请不到信号量就直接执行Fallback
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            //断路器不允许执行,就直接执行Fallback方法
            return handleShortCircuitViaFallback();
        }
    }

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        //HystrixRequestContext hystrix线程的上下文,可以通过它传递一些内容,防止异步过程中数据丢失
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
        /**
         *  又是定义一堆Action和Func
         *  markEmits 用于onNext回调 
         *  markOnCompleted 用于完成后处理,会采样一个SUCCESS事件
         *  handleFallback,有返回结果 用于处理Fallback,
         *  setRequestContext 设置请求上下文
         *  markEmits、markOnCompleted、handleFallback都会产生一个事件
         */

        Observable<R> execution;
        //执行命令
        if (properties.executionTimeoutEnabled().get()) {
            //配置超时时间的情况下会加一个HystrixObservableTimeoutOperator 这里会发送超时事件
            execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        //真正执行逻辑,区分了线程池和信号量
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // 
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {

                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    //设置线程状态,计数,并执行
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //计数
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();

                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {

                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(new Action0() {
                @Override
                public void call() {
                    //异常处理
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                        handleThreadEnd(_cmd);
                    }
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    //未订阅处理
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                        handleThreadEnd(_cmd);
                    }
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
            //信号量的执行
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    // semaphore isolated
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (Throwable ex) {
                        //If the above hooks throw, then use that as the result of the run method
                        return Observable.error(ex);
                    }
                }
            });
        }
    }
}    
断路器
public interface HystrixCircuitBreaker {
    //初始化实现
    public static class Factory {
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // 缓存中有,就直接返回
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // 第一次进来,没有就初始化并缓存到map里

            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {
                return cbForCommand;
            }
        }
    }
    //断路器实现
    static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        //hystrix 的配置都在HystrixCommandProperties
        @Override
        public boolean allowRequest() {
            if (properties.circuitBreakerForceOpen().get()) {
                //强制打开断路器,就直接返回,这个时候就熔断了
                return false;
            }
            //断路器关闭的情况下,再次验证
            if (properties.circuitBreakerForceClosed().get()) {
                isOpen();
                //不管本次什么样的结果,至少我进来的是开着的,设置为关闭以后,我还得继续执行
                return true;
            }
            return !isOpen() || allowSingleTest();
        }
        @Override
        public boolean isOpen() {
            //如果是open的,拦截,直接返回true
            if (circuitOpen.get()) {
                return true;
            }

            /**
             * HealthCounts里存储的是一个滑动窗口期间的请求数。
             *  totalCount 总请求数 包含:数(失败+成功+超时+ threadPoolRejected +信号量拒绝)
             *  errorCount 异常数据 刨除成功就是失败
             *  errorPercentage  异常百分比= errorCount/totalCount *100;
             */
            HealthCounts health = metrics.getHealthCounts();

            //如果总请求数小于配置的值,不拦截,配置值是hystrix.对应的comandKey.circuitBreaker.requestVolumeThreshold  默认值是20
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                return false;
            }
            //如果异常率小于配置的值,也不拦截,配置值是hystrix.对应的comandKey.circuitBreaker.errorThresholdPercentage 默认值是50,也就是有50%异常,就熔断
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                return false;
            } else {
                // 异常比率较高,就设置断路器为open
                if (circuitOpen.compareAndSet(false, true)) {
                    // 设置断路器器打开的时间
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {
                    //走到这里,说明本线程没有设置成功,别的线程已经打开了断路器,直接返回
                    return true;
                }
            }
        }

    }
    }
}
  • 初始化时通过ConcurrentHashMap来缓存HystrixCircuitBreaker来提升性能。

断路器的三种状态:转化关系如下:

断路器初始是Closed状态,如果调用持续出错或者超时,断路器会进入Open状态,熔断请求,后续的一段时间内所有的调用都会触发fallback

  • Open 状态:请求不再进行调用当前服务,内部设置时钟一般为MTTR(平均故障处理时间),当打开时长达到所设时钟则进入半熔断状态

  • Closed 关闭:路器器关闭不会对服务进行熔断部分请求

  • Half Open 半开:根据规则调用当前服务,如果请求成功且符合规则则认为当前服务恢复正常,关闭熔断

看下官方的流程图 在官方:https://github.com/Netflix/Hystrix/wiki/How-it-Works 有图有说明

  • 断路器是以commandKey为维度

  • 断路器是打开状态,直接熔断

  • 在窗口期内,如果请求的量大于设置的值,熔断(默认是10秒超过20个请求)

  • 如果异常率大于配置的值,熔断(默认是10秒超过50%的失败)

  • 一段时间之后,这个断路器是掰开状态,会允许一个请求进来,如果成功,断路器会关闭

ps: HystrixCommand 的配置属性在类:HystrixCommandProperties

最后上下逻辑图

同时,在hystrix-core的jar包com.netflix.hystrix.metric.consumer下,有很多的HystrixEvent的消费流,这些根据配置执行不同的限流手段,有滑动窗口,有令牌桶,这些流都会在HystrixCommandMetrics实例化的时候启动。如果想了解这一块可以看下HystrixConfigurationStream和HystrixCommandMetrics。后期也会针对这块分析下。

流程是理顺了,但是整起来磕磕绊绊的,里面一层套一层的,想要真正的了解它的底层运作,需要好好的了解下Rxjava,后续研究下再分享。

yxkong

这个人很懒,什么都没留下