signed

QiShunwang

“诚信为本、客户至上”

连接池的实现细节

2021/6/24 18:09:27   来源:

1.引子

软件工程已经是一层轮子包一层轮子,层层堆叠的框架。每一层都留有配置供调整或能实现扩展。在工作中只依靠网上的配置说明根本无法使用好轮子,甚至还会出问题。

1.1各种数据库配置:

Tomcat:实际案例,不全,有不合理,有少配
tomcat:
      poolProperties:
        initial-size: 5
        max-wait: 2000
        maxWaitMillis: 2000
        
        max-total: 50
        max-active: 50
        
        min-evictable-idle-time-millis: 120000
        time-between-eviction-runs-millis: 10000
        removeAbandoned: true
        removeAbandonedTimeout: 10
        
        max-idle: 20
        test-while-idle: true
        validation-query: select 1 from dual
Druid:
druid:
      driverClassName: com.mysql.cj.jdbc.Driver
      type: mysql
      
      initialSize: 10
      
      minIdle: 10
      maxActive: 50
      maxWait: 2000
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 120000
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      filters: stat,wall,log4j
      poolPreparedStatements: false
      maxPoolPreparedStatementPerConnectionSize: -1
      useGlobalDataSourceStat: true
      connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=1000
      keepAlive: true

1.2为什么要这些配置?数据库通讯的本质是什么?

image-20210615160640305

网络通讯的一种模型(客户端)+数据库操作抽象+池化

GRPC实现、Zuul实现、线程池

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1rVPf5yM-1624528538865)(https://raw.githubusercontent.com/WengyXu/oss/master/uPic/2021-06-15/image-20210615160829349.png)]在这里插入图片描述
在这里插入图片描述

这种模型下可能存在的问题:

网络通讯中的问题:连接问题、IO及线程

池中无效连接问题:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MoR8sLH5-1624528538869)(https://raw.githubusercontent.com/WengyXu/oss/master/uPic/2021-06-15/image-20210615161324817.png)]

如果没有设置timeout(多种timeout)则会永久等待,浪费一个“连接”

怎么办?

write 超时就杀死销毁连接

使用前检查

定时巡检

断开时自动感知并销毁连接

2.印证实现

2.1tomcat pool

2.1.1建立连接及初始化:

DataSourceProxy ConnectionPool PooledConnection NonRegisteringDriver(com.mysql.jdbc.Driver) getConnection 如果没有Pool则进入创建Pool流程 new busy和idle LinkedBlockingQueue initializePoolCleaner new PooledConnection[] getConnection borrowConnection(int wait, borrowConnection PooledConnection con = idle.poll() 对Connection进行封装\ncon.setTimestamp(now) createConnection connect 处理maxWait connectUsingDriver connect DataSourceProxy ConnectionPool PooledConnection NonRegisteringDriver(com.mysql.jdbc.Driver)

NonRegisteringDriver(com.mysql.jdbc.Driver)->ConnectionImpl:getInstance
ConnectionImpl->ConnectionImpl:createNewIO
ConnectionImpl->ConnectionImpl:connectOneTryOnly
ConnectionImpl->ConnectionImpl:coreConnect
ConnectionImpl->MysqlIO:MysqlIO()
MysqlIO->StandardSocketFactory:connect 
StandardSocketFactory->StandardSocketFactory:createSocket
StandardSocketFactory->Socket:new Socket()
StandardSocketFactory->StandardSocketFactory:configureSocket
StandardSocketFactory->Socket:bind
StandardSocketFactory->Socket:connect
Note right of Socket:connectTimeout
MysqlIO->Socket:setSoTimeout
MysqlIO->StandardSocketFactory:beforeHandShake
StandardSocketFactory->Socket:setSoTimeout
Note right of MysqlIO:设置timeout
ConnectionImpl->MysqlIO:doHandshake
MysqlIO->MysqlIO:send
MysqlIO->MysqlIO:bufferedOutputStream.out


连接中的一些细节:

            //calculate wait time for this iteration
            long maxWait = wait;
            //if the passed in wait time is -1, means we should use the pool property value
            if (wait==-1) {
                maxWait = (getPoolProperties().getMaxWait()<=0)?Long.MAX_VALUE:getPoolProperties().getMaxWait();//Default value is 30000 (30 seconds)
            }

            long timetowait = Math.max(0, maxWait - (System.currentTimeMillis() - now));
            waitcount.incrementAndGet();
            try {
                //retrieve an existing connection
                con = idle.poll(timetowait, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ex) {
                if (getPoolProperties().getPropagateInterruptState()) {
                    Thread.currentThread().interrupt();
                }
                SQLException sx = new SQLException("Pool wait interrupted.");
                sx.initCause(ex);
                throw sx;
            } finally {
                waitcount.decrementAndGet();
            }

连接超时:

2.1.2提交查询:

ConnectionImpl->ConnectionImpl:commit
ConnectionImpl->MysqlIO:execSQL
MysqlIO->MysqlIO:sqlQueryDirect
MysqlIO->MysqlIO:sendCommand
MysqlIO->MysqlIO:send
MysqlIO->MysqlIO:outputStream.write
MysqlIO->MysqlIO:outputStream.flush
MysqlIO->MysqlIO:readAllResults
MysqlIO->MysqlIO:getResultSet
MysqlIO->MysqlIO:reuseAndReadPacket
MysqlIO->MysqlIO:readFully
MysqlIO->MysqlIO:inputStream.read

SoTimeout影响超时,无超时则会持续等待。(视操作系统)也可以直接调用或者通过PreparedStatement

发起交互的时候可 能会有这种超时:

image-20210615161418173

拉起CancelTask线程处理超时(Statement超时),查询超时注销连接(statement层面)

public void run() {
					if (connection.getQueryTimeoutKillsConnection()) {
						try {
							toCancel.wasCancelled = true;
							toCancel.wasCancelledByTimeout = true;
							connection.realClose(false, false, true, 
									new MySQLStatementCancelledException(Messages.getString("Statement.ConnectionKilledDueToTimeout")));
						} catch (NullPointerException npe) {
							// not worth guarding against
						} catch (SQLException sqlEx) {
							caughtWhileCancelling = sqlEx;
						}
					} else {
						Connection cancelConn = null;
						java.sql.Statement cancelStmt = null;
	
						try {
							synchronized (cancelTimeoutMutex) {
								if (origConnURL.equals(connection.getURL())) {
									//All's fine
									cancelConn = connection.duplicate();
									cancelStmt = cancelConn.createStatement();
									cancelStmt.execute("KILL QUERY " + connectionId);
								} else {
									try {

部分异常

    	} catch (OutOfMemoryError oom) {
    		try {
    			// _Try_ this
    			clearInputStream();
        	} catch (Exception ex) {
        	}
   			try {
   				this.connection.realClose(false, false, true, oom);
        	} catch (Exception ex) {
   			}
			throw oom;
    	}

Socket 异常?

2.1.3检查机制:

ConnectionPool PooledConnection ConnectionPool.PoolCleaner ConnectionImpl MysqlIO NetworkResource ConnectionPool init busy = new LinkedBlockingQueue<>() initializePoolCleaner New PoolCleaner().run().start() checkAbandoned realease disconnect close realCLose closeAllOpenStatements quit forceClose forceClose InputStream.close() ConnectionPool PooledConnection ConnectionPool.PoolCleaner ConnectionImpl MysqlIO NetworkResource

poolCleaner初始化:

    public void initializePoolCleaner(PoolConfiguration properties) {
        //if the evictor thread is supposed to run, start it now
        if (properties.isPoolSweeperEnabled()) {
            poolCleaner = new PoolCleaner(this, 		   		properties.getTimeBetweenEvictionRunsMillis());
            poolCleaner.start();
        } //end if
    }

isPoolSweeperEnabled:

    @Override
    public boolean isPoolSweeperEnabled() {
        boolean timer = getTimeBetweenEvictionRunsMillis()>0;
        boolean result = timer && (isRemoveAbandoned() && getRemoveAbandonedTimeout()>0);
        result = result || (timer && getSuspectTimeout()>0);
        result = result || (timer && isTestWhileIdle() && getValidationQuery()!=null);
        result = result || (timer && getMinEvictableIdleTimeMillis()>0);
        return result;
    }

PoolCleaner线程:

        public void run() {
            ConnectionPool pool = this.pool.get();
            if (pool == null) {
                stopRunning();
            } else if (!pool.isClosed()) {
                try {
                    if (pool.getPoolProperties().isRemoveAbandoned()
                            || pool.getPoolProperties().getSuspectTimeout() > 0)
                        pool.checkAbandoned();
                    if (pool.getPoolProperties().getMinIdle() < pool.idle
                            .size())
                        pool.checkIdle();
                    if (pool.getPoolProperties().isTestWhileIdle())
                        pool.testAllIdle();
                } catch (Exception x) {
                    log.error("", x);
                }
            }
        }

超时检查:

    /**
     * Iterates through all the busy connections and checks for connections that have timed out
     */
    public void checkAbandoned() {
        try {
            if (busy.size()==0) return;
            Iterator<PooledConnection> locked = busy.iterator();
            int sto = getPoolProperties().getSuspectTimeout();
            while (locked.hasNext()) {
                PooledConnection con = locked.next();
                boolean setToNull = false;
                try {
                    con.lock();
                    //the con has been returned to the pool or released
                    //ignore it
                    if (idle.contains(con) || con.isReleased())
                        continue;
                    long time = con.getTimestamp();
                    long now = System.currentTimeMillis();
                    if (shouldAbandon() && (now - time) > con.getAbandonTimeout()) {
                        busy.remove(con);
                        abandon(con);
                        setToNull = true;
                    } else if (sto > 0 && (now - time) > (sto * 1000L)) {
                        suspect(con);
                    } else {
                        //do nothing
                    } //end if
                } finally {
                    con.unlock();
                    if (setToNull)
                        con = null;
                }
            } //while
        } catch (ConcurrentModificationException e) {
            log.debug("checkAbandoned failed." ,e);
        } catch (Exception e) {
            log.warn("checkAbandoned failed, it will be retried.",e);
        }
    }
    /**
     * thread safe way to abandon a connection
     * signals a connection to be abandoned.
     * this will disconnect the connection, and log the stack trace if logAbandoned=true
     * @param con PooledConnection
     */
    protected void abandon(PooledConnection con) {
        if (con == null)
            return;
        try {
            con.lock();
            String trace = con.getStackTrace();
            if (getPoolProperties().isLogAbandoned()) {
                log.warn("Connection has been abandoned " + con + ":" + trace);
            }
            if (jmxPool!=null) {
                jmxPool.notify(org.apache.tomcat.jdbc.pool.jmx.ConnectionPool.NOTIFY_ABANDON, trace);
            }
            //release the connection
            removeAbandonedCount.incrementAndGet();
            release(con);
        } finally {
            con.unlock();
        }
    }

默认参数

org.apache.tomcat.jdbc.pool.PoolProperties

    protected static final AtomicInteger poolCounter = new AtomicInteger(0);
    private volatile Properties dbProperties = new Properties();
    private volatile String url = null;
    private volatile String driverClassName = null;
    private volatile Boolean defaultAutoCommit = null;
    private volatile Boolean defaultReadOnly = null;
    private volatile int defaultTransactionIsolation = DataSourceFactory.UNKNOWN_TRANSACTIONISOLATION;
    private volatile String defaultCatalog = null;
    private volatile String connectionProperties;
    private volatile int initialSize = 10;
    private volatile int maxActive = DEFAULT_MAX_ACTIVE;
    private volatile int maxIdle = maxActive;
    private volatile int minIdle = initialSize;
    private volatile int maxWait = 30000;
    private volatile String validationQuery;
    private volatile int validationQueryTimeout = -1;
    private volatile String validatorClassName;
    private volatile Validator validator;
    private volatile boolean testOnBorrow = false;
    private volatile boolean testOnReturn = false;
    private volatile boolean testWhileIdle = false;
    private volatile int timeBetweenEvictionRunsMillis = 5000;
    private volatile int numTestsPerEvictionRun;
    private volatile int minEvictableIdleTimeMillis = 60000;
    private volatile boolean accessToUnderlyingConnectionAllowed = true;
    private volatile boolean removeAbandoned = false;
    private volatile int removeAbandonedTimeout = 60;
    private volatile boolean logAbandoned = false;
    private volatile String name = "Tomcat Connection Pool["+(poolCounter.addAndGet(1))+"-"+System.identityHashCode(PoolProperties.class)+"]";
    private volatile String password;
    private volatile String username;
    private volatile long validationInterval = 3000;
    private volatile boolean jmxEnabled = true;
    private volatile String initSQL;
    private volatile boolean testOnConnect =false;
    private volatile String jdbcInterceptors=null;
    private volatile boolean fairQueue = true;
    private volatile boolean useEquals = true;
    private volatile int abandonWhenPercentageFull = 0;
    private volatile long maxAge = 0;
    private volatile boolean useLock = false;
    private volatile InterceptorDefinition[] interceptors = null;
    private volatile int suspectTimeout = 0;
    private volatile Object dataSource = null;
    private volatile String dataSourceJNDI = null;
    private volatile boolean alternateUsernameAllowed = false;
    private volatile boolean commitOnReturn = false;
    private volatile boolean rollbackOnReturn = false;
    private volatile boolean useDisposableConnectionFacade = true;
    private volatile boolean logValidationErrors = false;
    private volatile boolean propagateInterruptState = false;
    private volatile boolean ignoreExceptionOnPreLoad = false;
    private volatile boolean useStatementFacade = true;

server端非正常断开,客户端并不一定感知。(socket阻塞模式为例)

1)服务端断开: 客户端不感知

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-92I1my8c-1624528538870)(https://raw.githubusercontent.com/WengyXu/oss/master/uPic/2021-06-15/image-20210615161600091.png)]

2)服务器断开,下一次请求:感知并reset

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0dclp4I4-1624528538871)(https://raw.githubusercontent.com/WengyXu/oss/master/uPic/2021-06-15/image-20210615161706923.png)]

  1. 服务器再次启动,下一次请求:感知并reset

image-20210615161751823

能否通过TCP keepalive自动感知呢?

image-20210615161913884

为什么不用Socket 的超时判断?

在这里插入图片描述

2.2德鲁伊-pool

2.2.1新建连接

入口->DruidDataSource:getConnection
DruidDataSource->DruidDataSource:getConnection
DruidDataSource->DruidDataSource:init
Note right of DruidDataSource:初始化池子和工作线程
DruidDataSource->DruidDataSource:getConnectionDirect
DruidDataSource->DruidAbstractDataSource:getConnectionInternal
DruidAbstractDataSource->DruidAbstractDataSource:createPhysicalConnection
DruidAbstractDataSource->DruidDriver:connect
DruidDriver->DruidDriver:getDataSource
DruidDriver->DataSourceProxy:connect
DataSourceProxy->FilterChainImpl:connection_connect
Note right of  FilterChainImpl:driver.connect(url, info);
FilterChainImpl->NonRegisteringDriver(com.mysql.jdbc.Driver):connect


NonRegisteringDriver(com.mysql.jdbc.Driver)->ConnectionImpl:getInstance
ConnectionImpl->ConnectionImpl:createNewIO
Note right of ConnectionImpl:根据配置可retry
ConnectionImpl->ConnectionImpl:connectOneTryOnly
ConnectionImpl->ConnectionImpl:coreConnect
ConnectionImpl->MysqlIO:new MysqlIO()
MysqlIO->StandardSocketFactory:connect
StandardSocketFactory->StandardSocketFactory:createSocket
StandardSocketFactory->Socket:new Socket()
StandardSocketFactory->StandardSocketFactory:configureSocket
StandardSocketFactory->Socket:bind
StandardSocketFactory->Socket:connect
Note right of Socket:connectTimeout
MysqlIO->Socket:setSoTimeout
MysqlIO->StandardSocketFactory:beforeHandShake
StandardSocketFactory->Socket:setSoTimeout
Note right of MysqlIO:设置timeout
ConnectionImpl->MysqlIO:doHandshake
MysqlIO->MysqlIO:send
MysqlIO->MysqlIO:bufferedOutputStream.out

2.2.2提交查询:

2.2.3检查机制:

DruidDataSource->DruidDataSource:init
DruidDataSource->DruidDataSource:createAndStartDestroyThread
DruidDataSource->ScheduledExecutorService:scheduleAtFixedRate
Note right of DruidDataSource:period(timeBetweenEvictionRunsMillis)
ScheduledExecutorService-->DestroyTask:run
DestroyTask->DestroyTask:shrink
DruidDataSource-->DestroyConnectionThread:run
DestroyConnectionThread-->DruidDataSource:shrink
DruidDataSource->JdbcUtils:close
JdbcUtils->FilterChainImpl:connection_close
Note right of  FilterChainImpl:connection.\ngetRawObject().close();
FilterChainImpl->ConnectionImpl:close




ConnectionImpl->ConnectionImpl:realClose
ConnectionImpl->ConnectionImpl:closeAllOpenStatements
ConnectionImpl->MysqlIO:quit
MysqlIO->MysqlIO:forceClose
MysqlIO->NetworkResource:forceClose
NetworkResource->NetworkResource:InputStream.close()
for (;;) {
                // 从前面开始删除
                try {
                    if (closed) {
                        break;
                    }

                    if (timeBetweenEvictionRunsMillis > 0) {
                        Thread.sleep(timeBetweenEvictionRunsMillis);
                    } else {
                        Thread.sleep(1000); //
                    }

                    if (Thread.interrupted()) {
                        break;
                    }

                    destroyTask.run();
                } catch (InterruptedException e) {
                    break;
                }
            }
    public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }

        int evictCount = 0;
        int keepAliveCount = 0;
        try {
            if (!inited) {
                return;
            }

            final int checkCount = poolingCount - minIdle;
            final long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i < poolingCount; ++i) {
                DruidConnectionHolder connection = connections[i];

                if (checkTime) {
                    if (phyTimeoutMillis > 0) {
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

                    if (idleMillis < minEvictableIdleTimeMillis) {
                        break;
                    }

                    if (checkTime && i < checkCount) {
                        evictConnections[evictCount++] = connection;
                    } else if (idleMillis > maxEvictableIdleTimeMillis) {
                        evictConnections[evictCount++] = connection;
                    } else if (keepAlive) {
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    if (i < checkCount) {
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }

            int removeCount = evictCount + keepAliveCount;
            if (removeCount > 0) {
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            }
            keepAliveCheckCount += keepAliveCount;
        } finally {
            lock.unlock();
        }

        if (evictCount > 0) {
            for (int i = 0; i < evictCount; ++i) {
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCount.incrementAndGet();
            }
            Arrays.fill(evictConnections, null);
        }

        if (keepAliveCount > 0) {
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }

                if (validate) {
                    holer.lastActiveTimeMillis = System.currentTimeMillis();
                    put(holer);
                } else {
                    JdbcUtils.close(connection);
                }
            }
            Arrays.fill(keepAliveConnections, null);
        }
    }

keepalive流程:

DruidDataSource->CreateConnectionTask:runInternal
Note right of CreateConnectionTask:createScheduler.schedule\n(this, timeBetweenConnectErrorMillis, \nTimeUnit.MILLISECONDS);

其他

多种并发控制机制 tomcat pool

Druid的设计模式