http://garydai.github.io/2018/01/17/db_pool.html
数据库连接不释放,放到池子里
取链接的时候从池子里拿,如果没有的话,唤醒创建连接的线程,创建连接
单独一个线程判断连接池里的连接是否一段时间内空闲,是的话则关闭连接
// init connections,创建连接,使得连接数达到initialSize大小
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
createAndLogThread();
// 创建线程用于创建连接
createAndStartCreatorThread();
// 创建线程用于关闭连接
createAndStartDestroyThread();
protected void createAndStartCreatorThread() {
if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start();
return;
}
initedLatch.countDown();
}
如果连接一段时间内空闲,则关闭连接,timeBetweenEvictionRunsMillis间隔处理一次,默认1分钟
DestroyConnectionThread
// timeBetweenEvictionRunsMillis间隔检测一次
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destroyTask.run();
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
shrink 销毁线程
将销毁连接放入evictConnections数组,存活线程放入keepAliveConnections数组
销毁逻辑:大于minEvictableIdleTimeMillis要连接要关闭,keepAliveConnections要活性检测
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
// int checkCount = poolingCount - minIdle;
// checkCount = poolingCount - minIdle个连接,保留>=minIdle个连接,因为i不连续
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 空闲时间大于maxEvictableIdleTimeMillis,则销毁
evictConnections[evictCount++] = connection;
continue;
}
}
// keepAliveBetweenTimeMillis默认2分钟
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
// 设置了keepAlive,空闲时间>keepAliveBetweenTimeMillis,则进入keepAliveConnections数组
keepAliveConnections[keepAliveCount++] = connection;
}
// 新建连接池,剔除evictConnections和keepAliveConnections里的连接
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
// 遍历evictConnections数组,close连接
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
// 遍历keepAliveConnections里的连接,验证连接
this.validateConnection(connection);
// 连接通过验证,则加入连接池,否则关闭连接
boolean putOk = put(holer, 0L);
// 如果连接数没达到minIdle,则增加连接
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
int evictCount = 0;
int keepAliveCount = 0;
try {
if (!inited) {
return;
}
// 超出最小空闲连接数的部分,都要回收
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// 连接空闲时间
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// 小于最小允许的空闲时间,则break;因为数据后面的连接,取得的时间,比前面的的晚,则空闲时间一定是小于前面的
if (idleMillis < minEvictableIdleTimeMillis) {
break;
}
// 以下逻辑是,空闲时间大于minEvictableIdleTimeMillis的情况
if (checkTime && i < checkCount) {
// 还没达到回收数,回收该连接
evictConnections[evictCount++] = connection;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
//已经达到了回收数,若该线程空闲时间超出maxEvictableIdleTimeMillis,也要丢弃,即目前连接池数量小于最小空闲数
evictConnections[evictCount++] = connection;
} else if (keepAlive) {
// 已经达到了回收数,且没有超出maxEvictableIdleTimeMillis,且keepAlive设置true,默认是false;连接保活;minEvictableIdleTimeMillis ~ maxEvictableIdleTimeMillis之间的要保活
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
} finally {
lock.unlock();
}
if (evictCount > 0) {
// 关闭evictCount里的连接
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCount.incrementAndGet();
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// 检测keepAlive的连接,即保活了连接,令active,并放入连接池中
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
if (validate) {
holer.lastActiveTimeMillis = System.currentTimeMillis();
put(holer);
} else {
JdbcUtils.close(connection);
}
}
Arrays.fill(keepAliveConnections, null);
}
}
CreateConnectionThread
// 线程在等待唤起
empty.wait();
connection = createPhysicalConnection();
// 放入线程池,put里会执行notEmpty.signal();
put(physicalConnection);
private boolean put(DruidConnectionHolder holder) {
lock.lock();
try {
if (poolingCount >= maxActive) {
return false;
}
// 放到数组最后一个
connections[poolingCount] = holder;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
createTaskCount--;
if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
Druid是在获取连接时,才会做存活检测, 而并不是定时做存活检测
// 获取到连接poolcount会减1
poolableConnection = getConnectionInternal(maxWaitMillis);
if(testOnBorrow) {
// 检测连接是否正常
testConnectionInternal
} else if(testWhileIdle) {
if(currentTimeMillis - lastActiveTimeMillis >= timeBetweenEvictionRunsMillis) {
// 距离上次活动的时候达到空闲检测的时间
testConnectionInternal
}
}
getConnectionInternal
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
pollLast
if(poolingCount == 0) {
// 如果连接池为空,通知生产者生成连接
emptySignal();
// 等待生产连接成功
notEmpty.awaitNanos(estimate);
}
// 从连接池的最后一项获取连接poolingCount--
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
takeLast
if(poolingCount == 0) {
// 如果连接池为空,通知生产者生成连接
emptySignal();
// 等待生产连接成功
notEmpty.await();
}
// poolingCount--
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
DruidAbstractDataSource
if (validConnectionChecker != null) {
validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout)
if (valid && isMySql) { // unexcepted branch
long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
if (lastPacketReceivedTimeMs > 0) {
long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
if (lastPacketReceivedTimeMs > 0 //
&& mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {
// 如果连接长时间未使用,超出timeBetweenEvictionRunsMillis时间,则抛弃连接
discardConnection(conn);
String errorMsg = "discard long time none received connection. "
+ ", jdbcUrl : " + jdbcUrl
+ ", jdbcUrl : " + jdbcUrl
+ ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis;
LOG.error(errorMsg);
return false;
}
}
}
}
如果没有设置checker
Statement stmt = null;
ResultSet rset = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rset = stmt.executeQuery(validationQuery);
if (!rset.next()) {
return false;
}
} finally {
JdbcUtils.close(rset);
JdbcUtils.close(stmt);
}
/**
* 抛弃连接,不进行回收,而是抛弃
*
* @param realConnection
*/
public void discardConnection(Connection realConnection) {
JdbcUtils.close(realConnection);
lock.lock();
try {
activeCount--;
discardCount++;
if (activeCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
mysql连接检查器
public class MSSQLValidConnectionChecker extends ValidConnectionCheckerAdapter implements ValidConnectionChecker, Serializable {
public boolean isValidConnection(final Connection c, String validateQuery, int validationQueryTimeout) throws Exception {
if (c.isClosed()) {
return false;
}
Statement stmt = null;
try {
stmt = c.createStatement();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
stmt.execute(validateQuery);
return true;
} catch (SQLException e) {
throw e;
} finally {
JdbcUtils.close(stmt);
}
}
}
recycle
if (testOnReturn) {
testConnectionInternal(holder, physicalConnection);
}
// 放回连接池
putLast(holder, currentTimeMillis);
putLast
e.lastActiveTimeMillis = lastActiveTimeMillis;
connections[poolingCount] = e;
incrementPoolingCount();
notEmpty.signal();
notEmptySignalCount++;
ConnectionPool
|---------------------------|
| |
getConnection | |
(notEmpty.await) | |
(lowWater.signal) | |
(maxActive.await) | |
<-------------------------- | |
<-------------------------- | |
<-------------------------- | |
| |
| |
--------------------------> | |
--------------------------> | | 销毁多余连接的线程
--------------------------> | | (highWater.awati, idleTimeout.await)
close | | -------------------------------------->
(highWater.signal) | |
(maxActive.signal) | |
| |
| |
产生连接的线程 | |
(lowWater.await) | |
(notEmpty.signal) | |
--------------------------> | |
| |
|---------------------------|
五个Condition:notEmpty、maxActive、lowWater、hightWater, idleTime
druid加锁,从数组里取连接
hikari减少加锁粒度,先从本地线程,再从数组里取,但是用的cas取,最后取不到,才阻塞
druid配置说明:https://github.com/alibaba/druid/wiki/DruidDataSource%E9%85%8D%E7%BD%AE%E5%B1%9E%E6%80%A7%E5%88%97%E8%A1%A8