ConcurrentBag
来实现,下文会展开。HikariCP利用了一个第三方的Java字节码修改类库Javassist来生成委托实现动态代理
微观上 HiKariCP 程序编译出的字节码执行效率更高,站在字节码的角度去优化 Java 代码,HiKariCP 的作者对性能的执着可见一斑,不过遗憾的是他并没有详细解释都做了哪些优化。而宏观上主要是和两个数据结构有关,一个是 FastList,另一个是 ConcurrentBag。下面我们来看看它们是如何提升 HiKariCP 的性能的。
FastList 适用于逆序删除场景;而 ConcurrentBag 通过 ThreadLocal 做一次预分配,避免直接竞争共享资源,非常适合池化资源的分配。
假设一个 Connection 依次创建 6 个 Statement,分别是 S1、S2、S3、S4、S5、S6,按照正常的编码习惯,关闭 Statement 的顺序一般是逆序的,关闭的顺序是:S6、S5、S4、S3、S2、S1,而 ArrayList 的 remove(Object o) 方法是顺序遍历查找,逆序删除而顺序查找,这样的查找效率就太慢了。
如何优化呢?很简单,优化成逆序查找就可以了。逆序删除示意图HiKariCP 中的 FastList 相对于 ArrayList 的一个优化点就是将 remove(Object element) 方法的查找顺序变成了逆序查找。除此之外,FastList 还有另一个优化点,是 get(int index) 方法没有对 index 参数进行越界检查,HiKariCP 能保证不会越界,所以不用每次都进行越界检查。
HiKariCP 并没有使用 Java SDK 中的阻塞队列,而是自己实现了一个叫做 ConcurrentBag 的并发容器。ConcurrentBag 的设计最初源自 C#,它的一个核心设计是使用 ThreadLocal 避免部分并发问题,不过 HiKariCP 中的 ConcurrentBag 并没有完全参考 C# 的实现
//用于存储所有的数据库连接
CopyOnWriteArrayList<T> sharedList;
//线程本地存储中的数据库连接
ThreadLocal<List<Object>> threadList;
//等待数据库连接的线程数
AtomicInteger waiters;
//分配数据库连接的工具
SynchronousQueue<T> handoffQueue;
当线程池创建了一个数据库连接时,通过调用 ConcurrentBag 的 add() 方法加入到 ConcurrentBag 中,下面是 add() 方法的具体实现,逻辑很简单,就是将这个连接加入到共享队列 sharedList 中,如果此时有线程在等待数据库连接,那么就通过 handoffQueue 将这个连接分配给等待的线程。
//将空闲连接添加到队列
void add(final T bagEntry){
//加入共享队列
sharedList.add(bagEntry);
//如果有等待连接的线程,
//则通过handoffQueue直接分配给等待的线程
while (waiters.get() > 0
&& bagEntry.getState() == STATE_NOT_IN_USE
&& !handoffQueue.offer(bagEntry)) {
yield();
}
}
通过 ConcurrentBag 提供的 borrow() 方法,可以获取一个空闲的数据库连接,borrow() 的主要逻辑是:
首先查看线程本地存储是否有空闲连接,如果有,则返回一个空闲的连接;
如果线程本地存储中无空闲连接,则从共享队列中获取。
如果共享队列中也没有空闲的连接,则请求线程需要等待。
T borrow(long timeout, final TimeUnit timeUnit){
// 先查看线程本地存储是否有空闲连接
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
// 弱引用
final T bagEntry = weakThreadLocals
? ((WeakReference<T>) entry).get()
: (T) entry;
//线程本地存储中的连接也可以被窃取,
//所以需要用CAS方法防止重复分配
if (bagEntry != null
&& bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// 线程本地存储中无空闲连接,则从共享队列中获取
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
//如果共享队列中有空闲连接,则返回
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
//共享队列中没有连接,则需要等待
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null
|| bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
//重新计算等待时间
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
//超时没有获取到连接,返回null
return null;
} finally {
waiters.decrementAndGet();
}
}
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
保持最小空闲连接数,如果当前空闲连接数大于最小空闲连接数的话,则关闭连接
这个参数控制了一个空闲连接能够在hikari中能够存活的最长时间,只有当minimumIdle
比maxiumnPoolSize
小时,才会生效。最小值10秒,默认值10分钟。
idleTimeout > 0
才会进行空闲连接回收minimumIdle < maximumPoolSize
才会进行空闲连接回收private final class HouseKeeper implements Runnable
{
private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);
@Override
public void run()
{
try {
// refresh values in case they changed via MBean
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
return;
}
else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
}
previous = now;
String afterPrefix = "Pool ";
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
// 如果连接的上次使用时间距今是否已经过去了idleTimeout
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
}
logPoolState(afterPrefix);
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
logger.error("Unexpected exception in housekeeping task", e);
}
}
}
这个参数控制了连接在hikari中的最长存活时间(这句话存疑,本例case就是一个活生生的反例)。hikari强烈建议设置该值,该值需要比数据库或者基础设施的(空闲)连接时间限制要短一点。默认值30分钟。
如果maxLifetime的时候,如果连接正在被使用,则不会被剔除,可以通过idleTimeout剔除空闲连接。
private PoolEntry createPoolEntry()
{
try {
final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
// 一个只运行一次的定时任务,在maxlifetime的时候执行一次
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
// 先软剔除连接,1.标记剔除 2.如果正在使用,不剔除,下次获取连接的时候,看到剔除标记再剔除,或者HouseKeeper的周期性任务做兜底,保证连接空闲时间达到idleTimeout后被回收
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
// 提交创建连接的任务
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
return poolEntry;
}
...
}
poolEntry只有4个状态:
maxLifetime
idleTimeout
void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
// 将连接移除sharedList
if (connectionBag.remove(poolEntry)) {
final Connection connection = poolEntry.close();
closeConnectionExecutor.execute(() -> {
// 关闭实际连接
quietlyCloseConnection(connection, closureReason);
if (poolState == POOL_NORMAL) {
fillPool();
}
});
}
}
public boolean remove(final T bagEntry)
{
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
return removed;
}
com.zaxxer.hikari.pool.PoolBase#quietlyCloseConnection
void quietlyCloseConnection(final Connection connection, final String closureReason)
{
if (connection != null) {
try {
logger.debug("{} - Closing connection {}: {}", poolName, connection, closureReason);
try {
setNetworkTimeout(connection, SECONDS.toMillis(15));
}
catch (SQLException e) {
// ignore
}
finally {
connection.close(); // continue with the close even if setNetworkTimeout() throws
}
}
catch (Exception e) {
logger.debug("{} - Closing connection {} failed", poolName, connection, e);
}
}
}
public Connection getConnection(final long hardTimeout) throws SQLException
{
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
// 只有距离连接上次使用超过500MS,才会验证连接是否有效,不是每次拿到连接都要校验连接有效性
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
//距离上次连接使用不超过500MS;或者超过500ms且校验过连接,这两种情况会返回连接
metricsTracker.recordBorrowStats(poolEntry, startTime);
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
释放连接需要调用 ConcurrentBag 提供的 requite() 方法,该方法的逻辑很简单,首先将数据库连接状态更改为 STATE_NOT_IN_USE,之后查看是否存在等待线程,如果有,则分配给等待线程;如果没有,则将该数据库连接保存到线程本地存储里。
/**
* Release this entry back to the pool.
*
* @param lastAccessed last access time-stamp
*/
void recycle(final long lastAccessed)
{
if (connection != null) {
this.lastAccessed = lastAccessed;
hikariPool.recycle(this);
}
}
/**
* Recycle PoolEntry (add back to the pool)
*
* @param poolEntry the PoolEntry to recycle
*/
@Override
void recycle(final PoolEntry poolEntry)
{
metricsTracker.recordConnectionUsage(poolEntry);
connectionBag.requite(poolEntry);
}
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
// 如果有线程在等待,先给他们
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
yield();
}
}
// 如果没有等待的线程,则进入线程本地存储
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
public ConcurrentBag(final IBagStateListener listener)
{
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
// 队列长度0的并发队列
this.handoffQueue = new SynchronousQueue<>(true);
// 等待获取连接的线程
this.waiters = new AtomicInteger();
// 全局共享的连接
this.sharedList = new CopyOnWriteArrayList<>();
if (weakThreadLocals) {
// 当前线程缓存的连接
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
}
else {
// 当前线程缓存的连接
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
}
}
并发获取连接
concurrentBag.borrow
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
// 先从当前线程里取
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
// 取完就把连接从list里删除,防止一直呆在本地线程,因为该连接可能被删除或者被其他线程取走
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
// 从全局的连接池里取
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
// 可能取了其他线程请求增加到连接,所以重新申请添加连接
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
// 阻塞获取连接
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
先从当前线程里取(不需要加锁)
从全局的连接池里取(cas;bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE))
阻塞获取连接(handoffQueue.poll(timeout, NANOSECONDS);)
borrow 和 requite 对于 ConcurrentBag
而言是只读的操作,addConnectionExecutor 只开启一个线程执行任务,所以 add 操作是单线程的,唯一存在锁竞争的就是 remove 方法
下面这份网上的配置是最佳实践吗
hikari:
validationTimeout: 1500
connection-timeout: 2000
idleTimeout: 60000
max-lifetime: 0
maximumPoolSize: 50
minimum-idle: 5
每隔300s会将连接重连,起到保活功能
hikari:
validationTimeout: 1500
connection-timeout: 2000
max-lifetime: 300000
// minimum-idle不设置默认和maximumPoolSize一样大,此时idleTimeout失效
maximumPoolSize: 50
public final class ProxyFactory
{
private ProxyFactory()
{
// unconstructable
}
/**
* Create a proxy for the specified {@link Connection} instance.
* @param poolEntry the PoolEntry holding pool state
* @param connection the raw database Connection
* @param openStatements a reusable list to track open Statement instances
* @param leakTask the ProxyLeakTask for this connection
* @param now the current timestamp
* @param isReadOnly the default readOnly state of the connection
* @param isAutoCommit the default autoCommit state of the connection
* @return a proxy that wraps the specified {@link Connection}
*/
static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
static Statement getProxyStatement(final ProxyConnection connection, final Statement statement)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
static CallableStatement getProxyCallableStatement(final ProxyConnection connection, final CallableStatement statement)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
static PreparedStatement getProxyPreparedStatement(final ProxyConnection connection, final PreparedStatement statement)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
static ResultSet getProxyResultSet(final ProxyConnection connection, final ProxyStatement statement, final ResultSet resultSet)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
static DatabaseMetaData getProxyDatabaseMetaData(final ProxyConnection connection, final DatabaseMetaData metaData)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
}
编译时调用JavassistProxyFactory才生成的
com.zaxxer.hikari.util.JavassistProxyFactory
public final class JavassistProxyFactory
{
private static ClassPool classPool;
private static String genDirectory = "";
public static void main(String... args) throws Exception {
classPool = new ClassPool();
classPool.importPackage("java.sql");
classPool.appendClassPath(new LoaderClassPath(JavassistProxyFactory.class.getClassLoader()));
if (args.length > 0) {
genDirectory = args[0];
}
// Cast is not needed for these
String methodBody = "{ try { return delegate.method($$); } catch (SQLException e) { throw checkException(e); } }";
generateProxyClass(Connection.class, ProxyConnection.class.getName(), methodBody);
generateProxyClass(Statement.class, ProxyStatement.class.getName(), methodBody);
generateProxyClass(ResultSet.class, ProxyResultSet.class.getName(), methodBody);
generateProxyClass(DatabaseMetaData.class, ProxyDatabaseMetaData.class.getName(), methodBody);
// For these we have to cast the delegate
methodBody = "{ try { return ((cast) delegate).method($$); } catch (SQLException e) { throw checkException(e); } }";
generateProxyClass(PreparedStatement.class, ProxyPreparedStatement.class.getName(), methodBody);
generateProxyClass(CallableStatement.class, ProxyCallableStatement.class.getName(), methodBody);
modifyProxyFactory();
}
private static void modifyProxyFactory() throws NotFoundException, CannotCompileException, IOException {
System.out.println("Generating method bodies for com.zaxxer.hikari.proxy.ProxyFactory");
String packageName = ProxyConnection.class.getPackage().getName();
CtClass proxyCt = classPool.getCtClass("com.zaxxer.hikari.pool.ProxyFactory");
for (CtMethod method : proxyCt.getMethods()) {
switch (method.getName()) {
case "getProxyConnection":
method.setBody("{return new " + packageName + ".HikariProxyConnection($$);}");
break;
case "getProxyStatement":
method.setBody("{return new " + packageName + ".HikariProxyStatement($$);}");
break;
case "getProxyPreparedStatement":
method.setBody("{return new " + packageName + ".HikariProxyPreparedStatement($$);}");
break;
case "getProxyCallableStatement":
method.setBody("{return new " + packageName + ".HikariProxyCallableStatement($$);}");
break;
case "getProxyResultSet":
method.setBody("{return new " + packageName + ".HikariProxyResultSet($$);}");
break;
case "getProxyDatabaseMetaData":
method.setBody("{return new " + packageName + ".HikariProxyDatabaseMetaData($$);}");
break;
default:
// unhandled method
break;
}
}
proxyCt.writeFile(genDirectory + "target/classes");
}
}
之所以使用Javassist生成动态代理,是因为其速度更快,相比于JDK Proxy生成的字节码更少,精简了很多不必要的字节码。
编译的时候生成代理类
<plugin>
<!-- Generate proxies -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<phase>compile</phase>
<!-- phase>generate-test-sources</phase -->
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>com.zaxxer.hikari.util.JavassistProxyFactory</mainClass>
</configuration>
</plugin>
https://ayonel.github.io/2020/08/18/hikari-keeplive/
https://www.cnblogs.com/ZhangZiSheng001/p/12329937.html
https://www.jianshu.com/p/f93fc70922c3