druid       

数据库连接池druid

连接池概念

http://garydai.github.io/2018/01/17/db_pool.html

数据库连接不释放,放到池子里

取链接的时候从池子里拿,如果没有的话,唤醒创建连接的线程,创建连接

单独一个线程判断连接池里的连接是否一段时间内空闲,是的话则关闭连接

init


    // init connections,创建连接,使得连接数达到initialSize大小
    while (poolingCount < initialSize) {
        try {
            PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
            DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
            connections[poolingCount++] = holder;
        } catch (SQLException ex) {
            LOG.error("init datasource error, url: " + this.getUrl(), ex);
            if (initExceptionThrow) {
                connectError = ex;
                break;
            } else {
                Thread.sleep(3000);
            }
        }
    }
    
    createAndLogThread();
    // 创建线程用于创建连接
    createAndStartCreatorThread();
    // 创建线程用于关闭连接
    createAndStartDestroyThread();
    protected void createAndStartCreatorThread() {
        if (createScheduler == null) {
            String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
            createConnectionThread = new CreateConnectionThread(threadName);
            createConnectionThread.start();
            return;
        }
        
        initedLatch.countDown();
    }

createAndStartDestroyThread关闭空闲连接线程

如果连接一段时间内空闲则关闭连接timeBetweenEvictionRunsMillis间隔处理一次默认1分钟
DestroyConnectionThread

    // timeBetweenEvictionRunsMillis间隔检测一次
    if (timeBetweenEvictionRunsMillis > 0) {
        Thread.sleep(timeBetweenEvictionRunsMillis);
    } else {
        Thread.sleep(1000); //
    }

    if (Thread.interrupted()) {
        break;
    }

    destroyTask.run();

    public class DestroyTask implements Runnable {
        public DestroyTask() {

        }

        @Override
        public void run() {
            shrink(true, keepAlive);

            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }






shrink 销毁线程

将销毁连接放入evictConnections数组存活线程放入keepAliveConnections数组

销毁逻辑大于minEvictableIdleTimeMillis要连接要关闭keepAliveConnections要活性检测

if (idleMillis >= minEvictableIdleTimeMillis) {
    if (checkTime && i < checkCount) {
        // int checkCount = poolingCount - minIdle;
        // checkCount = poolingCount - minIdle个连接,保留>=minIdle个连接,因为i不连续
        evictConnections[evictCount++] = connection;
        continue;
    } else if (idleMillis > maxEvictableIdleTimeMillis) {
        // 空闲时间大于maxEvictableIdleTimeMillis,则销毁
        evictConnections[evictCount++] = connection;
        continue;
    }
}

// keepAliveBetweenTimeMillis默认2分钟
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
    // 设置了keepAlive,空闲时间>keepAliveBetweenTimeMillis,则进入keepAliveConnections数组
    keepAliveConnections[keepAliveCount++] = connection;
}

// 新建连接池,剔除evictConnections和keepAliveConnections里的连接
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
    System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
    Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
    poolingCount -= removeCount;
}


// 遍历evictConnections数组,close连接
if (evictCount > 0) {
    for (int i = 0; i < evictCount; ++i) {
        DruidConnectionHolder item = evictConnections[i];
        Connection connection = item.getConnection();
        JdbcUtils.close(connection);
        destroyCountUpdater.incrementAndGet(this);
    }
    Arrays.fill(evictConnections, null);
}

// 遍历keepAliveConnections里的连接,验证连接
this.validateConnection(connection);
// 连接通过验证,则加入连接池,否则关闭连接
boolean putOk = put(holer, 0L);


// 如果连接数没达到minIdle,则增加连接
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
    emptySignal();
}

回收空闲线程

public void shrink(boolean checkTime, boolean keepAlive) {
    try {
        lock.lockInterruptibly();
    } catch (InterruptedException e) {
        return;
    }

    int evictCount = 0;
    int keepAliveCount = 0;
    try {
        if (!inited) {
            return;
        }

  			// 超出最小空闲连接数的部分,都要回收
        final int checkCount = poolingCount - minIdle;
        final long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < poolingCount; ++i) {
            DruidConnectionHolder connection = connections[i];

            if (checkTime) {
                if (phyTimeoutMillis > 0) {
                    long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                    if (phyConnectTimeMillis > phyTimeoutMillis) {
                        evictConnections[evictCount++] = connection;
                        continue;
                    }
                }

              	// 连接空闲时间
                long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
								// 小于最小允许的空闲时间,则break;因为数据后面的连接,取得的时间,比前面的的晚,则空闲时间一定是小于前面的
                if (idleMillis < minEvictableIdleTimeMillis) {
                    break;
                }
								// 以下逻辑是,空闲时间大于minEvictableIdleTimeMillis的情况
                if (checkTime && i < checkCount) {
                  	// 还没达到回收数,回收该连接
                    evictConnections[evictCount++] = connection;
                } else if (idleMillis > maxEvictableIdleTimeMillis) {
                  	//已经达到了回收数,若该线程空闲时间超出maxEvictableIdleTimeMillis,也要丢弃,即目前连接池数量小于最小空闲数
                    evictConnections[evictCount++] = connection;
                } else if (keepAlive) {
                  	// 已经达到了回收数,且没有超出maxEvictableIdleTimeMillis,且keepAlive设置true,默认是false;连接保活;minEvictableIdleTimeMillis ~ maxEvictableIdleTimeMillis之间的要保活
                    keepAliveConnections[keepAliveCount++] = connection;
                }
            } else {
                if (i < checkCount) {
                    evictConnections[evictCount++] = connection;
                } else {
                    break;
                }
            }
        }
				
        int removeCount = evictCount + keepAliveCount;
        if (removeCount > 0) {
            System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
            Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
            poolingCount -= removeCount;
        }
        keepAliveCheckCount += keepAliveCount;
    } finally {
        lock.unlock();
    }

    if (evictCount > 0) {
      	// 关闭evictCount里的连接
        for (int i = 0; i < evictCount; ++i) {
            DruidConnectionHolder item = evictConnections[i];
            Connection connection = item.getConnection();
            JdbcUtils.close(connection);
            destroyCount.incrementAndGet();
        }
        Arrays.fill(evictConnections, null);
    }

    if (keepAliveCount > 0) {
      	// 检测keepAlive的连接,即保活了连接,令active,并放入连接池中
        this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
        // keep order
        for (int i = keepAliveCount - 1; i >= 0; --i) {
            DruidConnectionHolder holer = keepAliveConnections[i];
            Connection connection = holer.getConnection();
            holer.incrementKeepAliveCheckCount();

            boolean validate = false;
            try {
                this.validateConnection(connection);
                validate = true;
            } catch (Throwable error) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("keepAliveErr", error);
                }
                // skip
            }

            if (validate) {
                holer.lastActiveTimeMillis = System.currentTimeMillis();
                put(holer);
            } else {
                JdbcUtils.close(connection);
            }
        }
        Arrays.fill(keepAliveConnections, null);
    }
}

新建连接线程

CreateConnectionThread

    // 线程在等待唤起
    empty.wait();

    connection = createPhysicalConnection();
		// 放入线程池,put里会执行notEmpty.signal();
    put(physicalConnection);

private boolean put(DruidConnectionHolder holder) {
    lock.lock();
    try {
        if (poolingCount >= maxActive) {
            return false;
        }
      	// 放到数组最后一个
        connections[poolingCount] = holder;
        incrementPoolingCount();

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = System.currentTimeMillis();
        }

        notEmpty.signal();
        notEmptySignalCount++;

        if (createScheduler != null) {
            createTaskCount--;

            if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                && activeCount + poolingCount + createTaskCount < maxActive) {
                emptySignal();
            }
        }
    } finally {
        lock.unlock();
    }
    return true;
}

getConnect

Druid是在获取连接时,才会做存活检测, 而并不是定时做存活检测

    
    // 获取到连接poolcount会减1
    poolableConnection = getConnectionInternal(maxWaitMillis);

    if(testOnBorrow) {
        // 检测连接是否正常
        testConnectionInternal
    } else if(testWhileIdle) {
        if(currentTimeMillis - lastActiveTimeMillis >= timeBetweenEvictionRunsMillis) {
            // 距离上次活动的时候达到空闲检测的时间
            testConnectionInternal
        }
    }

getConnectionInternal


    if (maxWait > 0) {
        holder = pollLast(nanos);
    } else {
        holder = takeLast();
    }

pollLast

    if(poolingCount == 0) {
        // 如果连接池为空,通知生产者生成连接
        emptySignal();
        // 等待生产连接成功
        notEmpty.awaitNanos(estimate);
    }
    
    // 从连接池的最后一项获取连接poolingCount--
    decrementPoolingCount();
    DruidConnectionHolder last = connections[poolingCount];
    connections[poolingCount] = null;

takeLast

    if(poolingCount == 0) {
        // 如果连接池为空,通知生产者生成连接
        emptySignal();
        // 等待生产连接成功
        notEmpty.await();
    }

    // poolingCount--
    decrementPoolingCount();
    DruidConnectionHolder last = connections[poolingCount];
    connections[poolingCount] = null;
    

testConnectionInternal获取连接前进行活性检测

DruidAbstractDataSource

    if (validConnectionChecker != null) {
        validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout)

        if (valid && isMySql) { // unexcepted branch
            long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
            if (lastPacketReceivedTimeMs > 0) {
                long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
                if (lastPacketReceivedTimeMs > 0 //
                        && mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {
                            // 如果连接长时间未使用,超出timeBetweenEvictionRunsMillis时间,则抛弃连接
                    discardConnection(conn);
                    String errorMsg = "discard long time none received connection. "
                            + ", jdbcUrl : " + jdbcUrl
                            + ", jdbcUrl : " + jdbcUrl
                            + ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis;
                    LOG.error(errorMsg);
                    return false;
                }
            }
        }
    }
    如果没有设置checker
    Statement stmt = null;
    ResultSet rset = null;
    try {
        stmt = conn.createStatement();
        if (getValidationQueryTimeout() > 0) {
            stmt.setQueryTimeout(validationQueryTimeout);
        }
        rset = stmt.executeQuery(validationQuery);
        if (!rset.next()) {
            return false;
        }
    } finally {
        JdbcUtils.close(rset);
        JdbcUtils.close(stmt);
    }


    /**
     * 抛弃连接,不进行回收,而是抛弃
     * 
     * @param realConnection
     */
    public void discardConnection(Connection realConnection) {
        JdbcUtils.close(realConnection);

        lock.lock();
        try {
            activeCount--;
            discardCount++;

            if (activeCount <= minIdle) {
                emptySignal();
            }
        } finally {
            lock.unlock();
        }
    }

mysql连接检查器
public class MSSQLValidConnectionChecker extends ValidConnectionCheckerAdapter implements ValidConnectionChecker, Serializable {

    public boolean isValidConnection(final Connection c, String validateQuery, int validationQueryTimeout) throws Exception {
            if (c.isClosed()) {
                return false;
            }

            Statement stmt = null;

            try {
                stmt = c.createStatement();
                if (validationQueryTimeout > 0) {
                    stmt.setQueryTimeout(validationQueryTimeout);
                }
                stmt.execute(validateQuery);
                return true;
            } catch (SQLException e) {
                throw e;
            } finally {
                JdbcUtils.close(stmt);
            }
        }

}

recycle关闭连接池

recycle

    if (testOnReturn) {
        testConnectionInternal(holder, physicalConnection);
    }
    
    // 放回连接池
    putLast(holder, currentTimeMillis);


putLast
    e.lastActiveTimeMillis = lastActiveTimeMillis;
    connections[poolingCount] = e;
    incrementPoolingCount();

    notEmpty.signal();
    notEmptySignalCount++;

使用ReentrantLock重入锁


                                   ConnectionPool
                              |---------------------------| 
                              |                           |
       getConnection          |                           |    
      (notEmpty.await)        |                           |   
      (lowWater.signal)       |                           |
      (maxActive.await)       |                           |
  <-------------------------- |                           |
  <-------------------------- |                           |
  <-------------------------- |                           |
                              |                           |
                              |                           |
  --------------------------> |                           |
  --------------------------> |                           |    销毁多余连接的线程
  --------------------------> |                           |    (highWater.awati, idleTimeout.await)
     close                    |                           | --------------------------------------> 
    (highWater.signal)        |                           |
    (maxActive.signal)        |                           |  
                              |                           |
                              |                           |
              产生连接的线程    |                            |
     (lowWater.await)         |                           |
     (notEmpty.signal)        |                           |
  --------------------------> |                           |
                              |                           |
                              |---------------------------|
                              
五个Condition:notEmpty、maxActive、lowWater、hightWater, idleTime

druid加锁,从数组里取连接

hikari减少加锁粒度,先从本地线程,再从数组里取,但是用的cas取,最后取不到,才阻塞

reference

druid配置说明:https://github.com/alibaba/druid/wiki/DruidDataSource%E9%85%8D%E7%BD%AE%E5%B1%9E%E6%80%A7%E5%88%97%E8%A1%A8