结果缓存,缓存同样key的请求结果
故障熔断,错误率高时熔断后续请求,快速失败
请求隔离,⽤线程池⼤⼩和信号量限制并发数(@Async其实也隔离线程)
服务降级,异常时回退执⾏
请求合并,将单个逻辑合并成批量量逻辑
利用异步Servlet,请求处理在单独的线程池里执行
com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
aspect的作用就是把一个普通的Java方法转换成HystrixCommand
/**
* Calls a method of {@link HystrixExecutable} in accordance with specified execution type.
*
* @param invokable {@link HystrixInvokable}
* @param metaHolder {@link MetaHolder}
* @return the result of invocation of specific method.
* @throws RuntimeException
*/
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
// 返回一个Future
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
@EnableHystrix
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
springcloud用一个工具类从spring.factories文件里Import范型类EnableCircuitBreaker
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
public abstract class SpringFactoryImportSelector<T>
implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
private final Log log = LogFactory.getLog(SpringFactoryImportSelector.class);
private ClassLoader beanClassLoader;
private Class<T> annotationClass;
private Environment environment;
@SuppressWarnings("unchecked")
protected SpringFactoryImportSelector() {
this.annotationClass = (Class<T>) GenericTypeResolver
.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
}
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
this.log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
protected boolean hasDefaultFactory() {
return false;
}
protected abstract boolean isEnabled();
protected String getSimpleName() {
return this.annotationClass.getSimpleName();
}
protected Class<T> getAnnotationClass() {
return this.annotationClass;
}
protected Environment getEnvironment() {
return this.environment;
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;
}
}
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
注入hystrixCommandAspect aop
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures
.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
/**
* {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
* {@link ApplicationContext} shuts down.
*/
private class HystrixShutdownHook implements DisposableBean {
@Override
public void destroy() throws Exception {
// Just call Hystrix to reset thread pool etc.
Hystrix.reset();
}
}
}
执行Command的方式一共四种,直接看官方文档(https://github.com/Netflix/Hystrix/wiki/How-it-Works ),具体区别如下:
• execute():以同步堵塞方式执行run()。调用execute()后,hystrix先创建一个新线程运行run(),接着调用程序要在execute()调用处一直堵塞着,直到run()运行完成。
• queue():以异步非堵塞方式执行run()。调用queue()就直接返回一个Future对象,同时hystrix创建一个新线程运行run(),调用程序通过Future.get()拿到run()的返回结果,而Future.get()是堵塞执行的。
• observe():事件注册前执行run()/construct()。第一步是事件注册前,先调用observe()自动触发执行run()/construct()(如果继承的是HystrixCommand,hystrix将创建新线程非堵塞执行run();如果继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct()),第二步是从observe()返回后调用程序调用subscribe()完成事件注册,如果run()/construct()执行成功则触发onNext()和onCompleted(),如果执行异常则触发onError()。
• toObservable():事件注册后执行run()/construct()。第一步是事件注册前,调用toObservable()就直接返回一个Observable对象,第二步调用subscribe()完成事件注册后自动触发执行run()/construct()(如果继承的是HystrixCommand,hystrix将创建新线程非堵塞执行run(),调用程序不必等待run();如果继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct(),调用程序等待construct()执行完才能继续往下走),如果run()/construct()执行成功则触发onNext()和onCompleted(),如果执行异常则触发onError() 注: execute()和queue()是HystrixCommand中的方法,observe()和toObservable()是HystrixObservableCommand 中的方法。从底层实现来讲,HystrixCommand其实也是利用Observable实现的(如果我们看Hystrix的源码的话,可以发现里面大量使用了RxJava),虽然HystrixCommand只返回单个的结果,但HystrixCommand的queue方法实际上是调用了toObservable().toBlocking().toFuture(),而execute方法实际上是调用了queue().get()
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
默认10s内超过20次请求,且10s内有超过一半失败,熔断器打开,过5s,熔断器半开,允许访问一次,如果这次访问成功,则熔断器关闭,否则熔断器再次打开
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
// 回调通知
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
// 查看是否允许访问,熔断器关闭或者半开
if (circuitBreaker.allowRequest()) {
// 查看是否配置了信号量隔离,如果没配置返回TryableSemaphoreNoOp
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
// 执行方法
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
// 10s内请求小于20个请求,不会继续下去,所以达到20个才计算错误率
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
// 10s内错误百分比<50%
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
// 设置熔断器打开或者半开的时间,
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}
10秒内20个请求
default_circuitBreakerRequestVolumeThreshold = 20;// default => statisticalWindowVolumeThreshold: 20 requests in 10 seconds must occur before statistics matter
10秒内50%错误
default_circuitBreakerErrorThresholdPercentage = 50;// default => errorThresholdPercentage = 50 = if 50%+ of requests in 10 seconds are failures or latent then we will trip the circuit
允许一次请求,即半开状态,
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 熔断器打开并且距离上次打开时间超过默认5s
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}
default_circuitBreakerSleepWindowInMilliseconds = 5000;// default => sleepWindow: 5000 = 5 seconds that we will sleep before trying again after tripping the circuit
class Bucket {
// 标识是哪一秒的桶数据
final long windowStart;
// 如果是简单自增统计数据,那么将使用adderForCounterType
final LongAdder[] adderForCounterType;
// 如果是最大并发类的统计数据,那么将使用updaterForCounterType
final LongMaxUpdater[] updaterForCounterType;
Bucket(long startTime) {
this.windowStart = startTime;
// 预分配内存,提高效率,不同事件对应不同的数组index
adderForCounterType = new LongAdder[HystrixRollingNumberEvent.values().length];
for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
if (type.isCounter()) {
adderForCounterType[type.ordinal()] = new LongAdder();
}
}
// 预分配内存,提高效率,不同事件对应不同的数组index
updaterForCounterType = new LongMaxUpdater[HystrixRollingNumberEvent.values().length];
for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
if (type.isMaxUpdater()) {
updaterForCounterType[type.ordinal()] = new LongMaxUpdater();
// initialize to 0 otherwise it is Long.MIN_VALUE
updaterForCounterType[type.ordinal()].update(0);
}
}
}
...略...
}
class ListState {
/*
* 这里的data之所以用AtomicReferenceArray而不是普通数组,是因为data需要
* 在不同的ListState对象中跨线程来引用,需要可见性和并发性的保证。
*/
private final AtomicReferenceArray<Bucket> data;
private final int size;
private final int tail;
private final int head;
private ListState(AtomicReferenceArray<Bucket> data, int head, int tail) {
this.head = head;
this.tail = tail;
if (head == 0 && tail == 0) {
size = 0;
} else {
this.size = (tail + dataLength - head) % dataLength;
}
this.data = data;
}
...略...
}
class BucketCircularArray implements Iterable<Bucket> {
// 持有最新的ListState
private final AtomicReference<ListState> state;
...略...
}
public class HystrixRollingNumber {
// 环形桶数组
final BucketCircularArray buckets;
// 获取该事件类型当前滑动窗口的统计值
public long getRollingSum(HystrixRollingNumberEvent type) {
Bucket lastBucket = getCurrentBucket();
if (lastBucket == null)
return 0;
long sum = 0;
// BucketCircularArray实现了迭代器接口环形桶数组
for (Bucket b : buckets) {
sum += b.getAdder(type).sum();
}
return sum;
}
...略...
}
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
this.sourceStream = bucketedStream // 数据流,每个对象代表单元窗口产生的桶 stream broken up into buckets
.window(numBuckets, 1) // 按照滑动窗口桶的个数进行桶的聚集 emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) // 将一系列的桶聚集成最后的数据对象 convert a window of bucket-summaries into a single summary
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.share() // 共享。不同的订阅者看到的数据是一致的 multiple subscribers should get same data
.onBackpressureDrop(); // 被压流量控制,当消费者消费速度过慢时就丢弃数据,不进行积压 if there are slow consumers, data should not buffer
}
@Override
public Observable<Output> observe() {
return sourceStream;
}
/* package-private */ boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}
}
public static class HealthCounts {
private final long totalCount;// 总数
private final long errorCount;// 错误总数
private final int errorPercentage;// 错误百分比
}
一个滑动窗口例子
/**
* 自定义滑动时间窗口demo - Hystrix也是类似采用这种。
* - 实现runnable方法:用于控制滑动动作,重置桶的值以及总量值
*
* @author lidishan
*/
public class MyDefinedSlideWinDemoLimiter implements RateLimiter, Runnable {
/** 每秒最多允许5个请求,这是默认值,你可以通过构造方法指定 **/
private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5;
/** 最大访问每秒 **/
private long maxVisitPerSecond;
/** 默认把1s分为十个桶,这是默认值 **/
private static final int DEFAULT_BUCKET = 10;
private int bucket;
/** 每个桶对应当前的请求数 **/
private static AtomicInteger[] countPerBucket = null;
/** 总请求数 **/
private AtomicInteger count;
private volatile int index;
/** 构造器 **/
public MyDefinedSlideWinDemoLimiter() {
this(DEFAULT_BUCKET, DEFAULT_ALLOWED_VISIT_PER_SECOND);
}
public MyDefinedSlideWinDemoLimiter(int bucket, long maxVisitPerSecond) {
this.bucket = bucket;
this.maxVisitPerSecond = maxVisitPerSecond;
countPerBucket = new AtomicInteger[bucket];
for (int i = 0; i < bucket; i++) {
countPerBucket[i] = new AtomicInteger();
}
count = new AtomicInteger(0);
}
/**
* 是否超过限制:当前QPS总数是否超过了最大值(默认每秒5个)
* 注意:这里应该是>=。因为其实如果桶内访问数量已经等于5了,就应该限制住外面的再进来
*/
@Override
public boolean isOverLimit() {
return currentQps() >= maxVisitPerSecond;
}
@Override
public int currentQps() {
return count.get();
}
/**
* 访问一次,次数+1(只要请求进来了就+1),并且告知是否加载
* 请注意:放在指定的桶
*/
@Override
public boolean visit() {
countPerBucket[index].incrementAndGet();
count.incrementAndGet();
return isOverLimit();
}
@Override
public void run() {
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~窗口向后滑动一下~~~~~~~~~~~~~~~~~~~~~~~~~~");
// 桶内的指针向前滑动一下:表示后面的visit请求应该打到下一个桶内
index = (index + 1) % bucket;
// 初始化新桶。并且拿出旧值(其实就是把当前这个桶的值释放出来,然后看下这个桶之前是否有访问过,有的话就对count总数减去,然后告诉可以进行访问)
int val = countPerBucket[index].getAndSet(0);
// 这个步骤一定不要变了:因为废弃了一个桶,所以总值要减去~
if (val == 0) {
// 这个桶等于0,说明这个时刻没有流量进来
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~窗口没能释放出流量,继续保持限流~~~~~~~~~~~~~~~~~~~~~~~~~~");
} else {
count.addAndGet(-val);
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~窗口释放出了[" + val + "]个访问名额,你可以访问了~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
}
public static void main(String[] args) throws Exception {
MyDefinedSlideWinDemoLimiter rateLimiter = new MyDefinedSlideWinDemoLimiter();
// 使用一个线程定时滑动这个窗口:100ms滑动一次(一般保持个桶的跨度保持一致)
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(rateLimiter, 100, 100, TimeUnit.MILLISECONDS);
// 此处使用单线程访问,你可以改造成多线程版本
while (true) {
String currThreadName = Thread.currentThread().getName();
boolean overLimit = rateLimiter.isOverLimit();
if (overLimit) {
System.out.printf("线程[%s]===被限流了===,因为访问次数已经超过阈值[%s]\n%n", currThreadName, rateLimiter.currentQps());
} else {
rateLimiter.visit();
System.out.printf("线程[%s]访问成功,当前访问总数[%s]\n%n", currThreadName, rateLimiter.currentQps());
}
Thread.sleep(10);
}
}
}
public interface RateLimiter {
// 是否要限流
boolean isOverLimit();
// 当前QPS总数值(也就是窗口期内的访问总量)
int currentQps();
// touch一下;增加一次访问量
boolean visit();
}
Hystrix内部使用了响应式编程框架-RxJava。
https://www.infoq.cn/article/vcwckaoqbdcax1wywphr
http://www.saily.top/2020/04/19/springcloud/hystrix05/
https://developer.aliyun.com/article/183592
https://blog.csdn.net/manzhizhen/article/details/80296655
https://juejin.cn/post/7012610414580088839