hirari       

hirari

  1. 通过代码设计和优化大幅减少线程间的锁竞争。这一点主要通过 ConcurrentBag 来实现,下文会展开。
  2. 引入了更多 JDK 的特性,尤其是 concurrent 包的工具。DBCP 和 C3P0 出现时间较早,基于早期的 JDK 进行开发,也就很难享受到后面更新带来的福利;
  3. 使用 javassist 直接修改 class 文件生成动态代理,精简了很多不必要的字节码,提高代理方法运行速度。相比 JDK 和 cglib 的动态代理,通过 javassist 直接修改 class 文件生成的代理类在运行上会更快一些(这是网上找到的说法,但是目前 JDK 和 cglib 已经经过了多次优化,在代理类的运行速度上应该不会差一个数量级)
  4. 重视代码细节对性能的影响。下文到的 fastPathPool 就是一个例子,仔细琢磨 HikariCP 的代码就会发现许多类似的细节优化,除此之外还有 FastList 等自定义集合类;

HikariCP利用了一个第三方的Java字节码修改类库Javassist来生成委托实现动态代理

微观上 HiKariCP 程序编译出的字节码执行效率更高,站在字节码的角度去优化 Java 代码,HiKariCP 的作者对性能的执着可见一斑,不过遗憾的是他并没有详细解释都做了哪些优化。而宏观上主要是和两个数据结构有关,一个是 FastList,另一个是 ConcurrentBag。下面我们来看看它们是如何提升 HiKariCP 的性能的。

FastList 适用于逆序删除场景;而 ConcurrentBag 通过 ThreadLocal 做一次预分配,避免直接竞争共享资源,非常适合池化资源的分配。

FastList

假设一个 Connection 依次创建 6 个 Statement,分别是 S1、S2、S3、S4、S5、S6,按照正常的编码习惯,关闭 Statement 的顺序一般是逆序的,关闭的顺序是:S6、S5、S4、S3、S2、S1,而 ArrayList 的 remove(Object o) 方法是顺序遍历查找,逆序删除而顺序查找,这样的查找效率就太慢了。

如何优化呢?很简单,优化成逆序查找就可以了。逆序删除示意图HiKariCP 中的 FastList 相对于 ArrayList 的一个优化点就是将 remove(Object element) 方法的查找顺序变成了逆序查找。除此之外,FastList 还有另一个优化点,是 get(int index) 方法没有对 index 参数进行越界检查,HiKariCP 能保证不会越界,所以不用每次都进行越界检查。

ConcurrentBag

HiKariCP 并没有使用 Java SDK 中的阻塞队列,而是自己实现了一个叫做 ConcurrentBag 的并发容器。ConcurrentBag 的设计最初源自 C#,它的一个核心设计是使用 ThreadLocal 避免部分并发问题,不过 HiKariCP 中的 ConcurrentBag 并没有完全参考 C# 的实现

//用于存储所有的数据库连接
CopyOnWriteArrayList<T> sharedList;
//线程本地存储中的数据库连接
ThreadLocal<List<Object>> threadList;
//等待数据库连接的线程数
AtomicInteger waiters;
//分配数据库连接的工具
SynchronousQueue<T> handoffQueue;

当线程池创建了一个数据库连接时,通过调用 ConcurrentBag 的 add() 方法加入到 ConcurrentBag 中,下面是 add() 方法的具体实现,逻辑很简单,就是将这个连接加入到共享队列 sharedList 中,如果此时有线程在等待数据库连接,那么就通过 handoffQueue 将这个连接分配给等待的线程。

//将空闲连接添加到队列
void add(final T bagEntry){
  //加入共享队列
  sharedList.add(bagEntry);
  //如果有等待连接的线程,
  //则通过handoffQueue直接分配给等待的线程
  while (waiters.get() > 0 
    && bagEntry.getState() == STATE_NOT_IN_USE 
    && !handoffQueue.offer(bagEntry)) {
      yield();
  }
}

通过 ConcurrentBag 提供的 borrow() 方法,可以获取一个空闲的数据库连接,borrow() 的主要逻辑是:

首先查看线程本地存储是否有空闲连接,如果有,则返回一个空闲的连接;

如果线程本地存储中无空闲连接,则从共享队列中获取。

如果共享队列中也没有空闲的连接,则请求线程需要等待。


T borrow(long timeout, final TimeUnit timeUnit){
  // 先查看线程本地存储是否有空闲连接
  final List<Object> list = threadList.get();
  for (int i = list.size() - 1; i >= 0; i--) {
    final Object entry = list.remove(i);
    // 弱引用
    final T bagEntry = weakThreadLocals 
      ? ((WeakReference<T>) entry).get() 
      : (T) entry;
    //线程本地存储中的连接也可以被窃取,
    //所以需要用CAS方法防止重复分配
    if (bagEntry != null 
      && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
      return bagEntry;
    }
  }

  // 线程本地存储中无空闲连接,则从共享队列中获取
  final int waiting = waiters.incrementAndGet();
  try {
    for (T bagEntry : sharedList) {
      //如果共享队列中有空闲连接,则返回
      if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
        return bagEntry;
      }
    }
    //共享队列中没有连接,则需要等待
    timeout = timeUnit.toNanos(timeout);
    do {
      final long start = currentTime();
      final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
      if (bagEntry == null 
        || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
          return bagEntry;
      }
      //重新计算等待时间
      timeout -= elapsedNanos(start);
    } while (timeout > 10_000);
    //超时没有获取到连接,返回null
    return null;
  } finally {
    waiters.decrementAndGet();
  }
}

houseKeeper

/**
 * The house keeping task to retire and maintain minimum idle connections.
 */

保持最小空闲连接数,如果当前空闲连接数大于最小空闲连接数的话,则关闭连接

idleTimeout

这个参数控制了一个空闲连接能够在hikari中能够存活的最长时间,只有当minimumIdlemaxiumnPoolSize小时,才会生效。最小值10秒,默认值10分钟。

private final class HouseKeeper implements Runnable
{
   private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);

   @Override
   public void run()
   {
      try {
         // refresh values in case they changed via MBean
         connectionTimeout = config.getConnectionTimeout();
         validationTimeout = config.getValidationTimeout();
         leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
         catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;

         final long idleTimeout = config.getIdleTimeout();
         final long now = currentTime();

         // Detect retrograde time, allowing +128ms as per NTP spec.
         if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
            logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
                        poolName, elapsedDisplayString(previous, now));
            previous = now;
            softEvictConnections();
            return;
         }
         else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {
            // No point evicting for forward clock motion, this merely accelerates connection retirement anyway
            logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
         }

         previous = now;

         String afterPrefix = "Pool ";
         if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
            logPoolState("Before cleanup ");
            afterPrefix = "After cleanup  ";

            final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
            int toRemove = notInUse.size() - config.getMinimumIdle();
            for (PoolEntry entry : notInUse) {
                // 如果连接的上次使用时间距今是否已经过去了idleTimeout
               if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
                  closeConnection(entry, "(connection has passed idleTimeout)");
                  toRemove--;
               }
            }
         }

         logPoolState(afterPrefix);

         fillPool(); // Try to maintain minimum connections
      }
      catch (Exception e) {
         logger.error("Unexpected exception in housekeeping task", e);
      }
   }
}

maxLifetime

这个参数控制了连接在hikari中的最长存活时间(这句话存疑,本例case就是一个活生生的反例)。hikari强烈建议设置该值,该值需要比数据库或者基础设施的(空闲)连接时间限制要短一点。默认值30分钟。

如果maxLifetime的时候,如果连接正在被使用,则不会被剔除,可以通过idleTimeout剔除空闲连接。

private PoolEntry createPoolEntry()
{
   try {
      final PoolEntry poolEntry = newPoolEntry();

      final long maxLifetime = config.getMaxLifetime();
      if (maxLifetime > 0) {
         // variance up to 2.5% of the maxlifetime
         final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
         final long lifetime = maxLifetime - variance;
         // 一个只运行一次的定时任务,在maxlifetime的时候执行一次
         poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
            () -> {
               // 先软剔除连接,1.标记剔除 2.如果正在使用,不剔除,下次获取连接的时候,看到剔除标记再剔除,或者HouseKeeper的周期性任务做兜底,保证连接空闲时间达到idleTimeout后被回收
               if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
                 	// 提交创建连接的任务
                  addBagItem(connectionBag.getWaitingThreadCount());
               }
            },
            lifetime, MILLISECONDS));
      }

      return poolEntry;
   }
   ...
}

poolEntry只有4个状态:

关闭连接,然后补充连接至最少连接数

  1. 连接的生命周期到达maxLifetime
  2. 连接的空闲时间到达idleTimeout
void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
   // 将连接移除sharedList
   if (connectionBag.remove(poolEntry)) {
      final Connection connection = poolEntry.close();
      closeConnectionExecutor.execute(() -> {
        // 关闭实际连接
         quietlyCloseConnection(connection, closureReason);
         if (poolState == POOL_NORMAL) {
            fillPool();
         }
      });
   }
}


   public boolean remove(final T bagEntry)
   {
      if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
         return false;
      }

      final boolean removed = sharedList.remove(bagEntry);
      if (!removed && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
      }

      return removed;
   }

com.zaxxer.hikari.pool.PoolBase#quietlyCloseConnection

void quietlyCloseConnection(final Connection connection, final String closureReason)
{
   if (connection != null) {
      try {
         logger.debug("{} - Closing connection {}: {}", poolName, connection, closureReason);

         try {
            setNetworkTimeout(connection, SECONDS.toMillis(15));
         }
         catch (SQLException e) {
            // ignore
         }
         finally {
            connection.close(); // continue with the close even if setNetworkTimeout() throws
         }
      }
      catch (Exception e) {
         logger.debug("{} - Closing connection {} failed", poolName, connection, e);
      }
   }
}

获取连接

public Connection getConnection(final long hardTimeout) throws SQLException
{
   suspendResumeLock.acquire();
   final long startTime = currentTime();

   try {
      long timeout = hardTimeout;
      do {
         PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
         if (poolEntry == null) {
            break; // We timed out... break and throw exception
         }

         final long now = currentTime();
         // 只有距离连接上次使用超过500MS,才会验证连接是否有效,不是每次拿到连接都要校验连接有效性
         if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
            closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
            timeout = hardTimeout - elapsedMillis(startTime);
         }
         else {
            //距离上次连接使用不超过500MS;或者超过500ms且校验过连接,这两种情况会返回连接
            metricsTracker.recordBorrowStats(poolEntry, startTime);
            return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
         }
      } while (timeout > 0L);

      metricsTracker.recordBorrowTimeoutStats(startTime);
      throw createTimeoutException(startTime);
   }
   catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
   }
   finally {
      suspendResumeLock.release();
   }
}

回收连接

释放连接需要调用 ConcurrentBag 提供的 requite() 方法,该方法的逻辑很简单,首先将数据库连接状态更改为 STATE_NOT_IN_USE,之后查看是否存在等待线程,如果有,则分配给等待线程;如果没有,则将该数据库连接保存到线程本地存储里。

/**
 * Release this entry back to the pool.
 *
 * @param lastAccessed last access time-stamp
 */
void recycle(final long lastAccessed)
{
   if (connection != null) {
      this.lastAccessed = lastAccessed;
      hikariPool.recycle(this);
   }
}
/**
 * Recycle PoolEntry (add back to the pool)
 *
 * @param poolEntry the PoolEntry to recycle
 */
@Override
void recycle(final PoolEntry poolEntry)
{
   metricsTracker.recordConnectionUsage(poolEntry);

   connectionBag.requite(poolEntry);
}
public void requite(final T bagEntry)
{
   bagEntry.setState(STATE_NOT_IN_USE);

   // 如果有线程在等待,先给他们
   for (int i = 0; waiters.get() > 0; i++) {
      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
         return;
      }
      else if ((i & 0xff) == 0xff) {
         parkNanos(MICROSECONDS.toNanos(10));
      }
      else {
         yield();
      }
   }

   // 如果没有等待的线程,则进入线程本地存储
   final List<Object> threadLocalList = threadList.get();
   if (threadLocalList.size() < 50) {
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
}

并发场景

public ConcurrentBag(final IBagStateListener listener)
{
   this.listener = listener;
   this.weakThreadLocals = useWeakThreadLocals();

   // 队列长度0的并发队列
   this.handoffQueue = new SynchronousQueue<>(true);
   // 等待获取连接的线程
   this.waiters = new AtomicInteger();
   // 全局共享的连接
   this.sharedList = new CopyOnWriteArrayList<>();
   if (weakThreadLocals) {
     	// 当前线程缓存的连接
      this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
   }
   else {
      // 当前线程缓存的连接
      this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
   }
}

并发获取连接

concurrentBag.borrow

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // Try the thread-local list first
   // 先从当前线程里取
   final List<Object> list = threadList.get();
   for (int i = list.size() - 1; i >= 0; i--) {
      // 取完就把连接从list里删除,防止一直呆在本地线程,因为该连接可能被删除或者被其他线程取走
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }

   // Otherwise, scan the shared list ... then poll the handoff queue
   final int waiting = waiters.incrementAndGet();
   try {
      // 从全局的连接池里取
      for (T bagEntry : sharedList) {
         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // If we may have stolen another waiter's connection, request another bag add.
            // 可能取了其他线程请求增加到连接,所以重新申请添加连接
            if (waiting > 1) {
               listener.addBagItem(waiting - 1);
            }
            return bagEntry;
         }
      }

      listener.addBagItem(waiting);

      timeout = timeUnit.toNanos(timeout);
      do {
         final long start = currentTime();
         // 阻塞获取连接
         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }

         timeout -= elapsedNanos(start);
      } while (timeout > 10_000);

      return null;
   }
   finally {
      waiters.decrementAndGet();
   }
}

先从当前线程里取(不需要加锁)

从全局的连接池里取(cas;bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE))

阻塞获取连接(handoffQueue.poll(timeout, NANOSECONDS);)

image-20210121092445277

borrow 和 requite 对于 ConcurrentBag 而言是只读的操作,addConnectionExecutor 只开启一个线程执行任务,所以 add 操作是单线程的,唯一存在锁竞争的就是 remove 方法

下面这份网上的配置是最佳实践吗

    hikari:
      validationTimeout: 1500
      connection-timeout: 2000
      idleTimeout: 60000
      max-lifetime: 0
      maximumPoolSize: 50
      minimum-idle: 5

每隔300s会将连接重连,起到保活功能

    hikari:
      validationTimeout: 1500
      connection-timeout: 2000
      max-lifetime: 300000
 // minimum-idle不设置默认和maximumPoolSize一样大,此时idleTimeout失效
      maximumPoolSize: 50

代理

public final class ProxyFactory
{
   private ProxyFactory()
   {
      // unconstructable
   }

   /**
    * Create a proxy for the specified {@link Connection} instance.
    * @param poolEntry the PoolEntry holding pool state
    * @param connection the raw database Connection
    * @param openStatements a reusable list to track open Statement instances
    * @param leakTask the ProxyLeakTask for this connection
    * @param now the current timestamp
    * @param isReadOnly the default readOnly state of the connection
    * @param isAutoCommit the default autoCommit state of the connection
    * @return a proxy that wraps the specified {@link Connection}
    */
   static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }

   static Statement getProxyStatement(final ProxyConnection connection, final Statement statement)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }

   static CallableStatement getProxyCallableStatement(final ProxyConnection connection, final CallableStatement statement)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }

   static PreparedStatement getProxyPreparedStatement(final ProxyConnection connection, final PreparedStatement statement)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }

   static ResultSet getProxyResultSet(final ProxyConnection connection, final ProxyStatement statement, final ResultSet resultSet)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }

   static DatabaseMetaData getProxyDatabaseMetaData(final ProxyConnection connection, final DatabaseMetaData metaData)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }
}

编译时调用JavassistProxyFactory才生成的

com.zaxxer.hikari.util.JavassistProxyFactory

public final class JavassistProxyFactory
{
   private static ClassPool classPool;
   private static String genDirectory = "";

   public static void main(String... args) throws Exception {
      classPool = new ClassPool();
      classPool.importPackage("java.sql");
      classPool.appendClassPath(new LoaderClassPath(JavassistProxyFactory.class.getClassLoader()));

      if (args.length > 0) {
         genDirectory = args[0];
      }

      // Cast is not needed for these
      String methodBody = "{ try { return delegate.method($$); } catch (SQLException e) { throw checkException(e); } }";
      generateProxyClass(Connection.class, ProxyConnection.class.getName(), methodBody);
      generateProxyClass(Statement.class, ProxyStatement.class.getName(), methodBody);
      generateProxyClass(ResultSet.class, ProxyResultSet.class.getName(), methodBody);
      generateProxyClass(DatabaseMetaData.class, ProxyDatabaseMetaData.class.getName(), methodBody);

      // For these we have to cast the delegate
      methodBody = "{ try { return ((cast) delegate).method($$); } catch (SQLException e) { throw checkException(e); } }";
      generateProxyClass(PreparedStatement.class, ProxyPreparedStatement.class.getName(), methodBody);
      generateProxyClass(CallableStatement.class, ProxyCallableStatement.class.getName(), methodBody);

      modifyProxyFactory();
   }
 
  private static void modifyProxyFactory() throws NotFoundException, CannotCompileException, IOException {
      System.out.println("Generating method bodies for com.zaxxer.hikari.proxy.ProxyFactory");

      String packageName = ProxyConnection.class.getPackage().getName();
      CtClass proxyCt = classPool.getCtClass("com.zaxxer.hikari.pool.ProxyFactory");
      for (CtMethod method : proxyCt.getMethods()) {
         switch (method.getName()) {
            case "getProxyConnection":
               method.setBody("{return new " + packageName + ".HikariProxyConnection($$);}");
               break;
            case "getProxyStatement":
               method.setBody("{return new " + packageName + ".HikariProxyStatement($$);}");
               break;
            case "getProxyPreparedStatement":
               method.setBody("{return new " + packageName + ".HikariProxyPreparedStatement($$);}");
               break;
            case "getProxyCallableStatement":
               method.setBody("{return new " + packageName + ".HikariProxyCallableStatement($$);}");
               break;
            case "getProxyResultSet":
               method.setBody("{return new " + packageName + ".HikariProxyResultSet($$);}");
               break;
            case "getProxyDatabaseMetaData":
               method.setBody("{return new " + packageName + ".HikariProxyDatabaseMetaData($$);}");
               break;
            default:
               // unhandled method
               break;
         }
      }

      proxyCt.writeFile(genDirectory + "target/classes");
   }
  
}

之所以使用Javassist生成动态代理,是因为其速度更快,相比于JDK Proxy生成的字节码更少,精简了很多不必要的字节码。

编译的时候生成代理类

<plugin>
   <!-- Generate proxies -->
   <groupId>org.codehaus.mojo</groupId>
   <artifactId>exec-maven-plugin</artifactId>
   <version>1.6.0</version>
   <executions>
      <execution>
         <phase>compile</phase>
         <!-- phase>generate-test-sources</phase -->
         <goals>
            <goal>java</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <mainClass>com.zaxxer.hikari.util.JavassistProxyFactory</mainClass>
   </configuration>
</plugin>

参考

https://ayonel.github.io/2020/08/18/hikari-keeplive/

https://www.cnblogs.com/ZhangZiSheng001/p/12329937.html

https://www.jianshu.com/p/f93fc70922c3