对于多个请求并发连接数据的操作,在没有看gorm以及标准库的源码之前,凭借我使用swoole的经验,我的想法是:在启动进程时,一次性创建一系列数据库长连接,然后使用通道来存储这些连接。这么做既可以防止同一个连接被同时使用,还可以跨协程分发连接。

事实上标准库也是这么做的。

[本文由笔者一边思考写就,可能语言不够简练,留待日后修整]

# 数据库连接的处理

这部分go标准库database/sql已经帮我们完成了。简要了解一下。

我们知道,使用gorm创建数据库连接,需要调用Open方法。查看此方法源码,发现它会返回一个gorm.DB结构体的实例。而这个结构体中,有一个字段是db,此字段的类型是gorm.SQLCommon接口类:

以mysql为例,该字段值通过调用database/sql包的Open方法生成。而sql.Open又调用了sql.OpenDB方法。此方法会返回一个sql.DB结构体的实例,这个结构体的方法有点多,仔细看可以发现实现了Exec、Prepare、Query、QueryRow这四个方法,因此可以说它就是SQLCommon的实现(golang的接口判定规则真是奇妙)。

sql.OpenDB方法说明

// The returned DB is safe for concurrent use by multiple goroutines

// and maintains its own pool of idle connections. Thus, the OpenDB

// function should be called just once. It is rarely necessary to

// close a DB.

通过上面的说明,可知sql.DB结构体自己维护着一个空闲连接池,并发使用是安全的。只需调用一次,几乎不需要关闭。

*gorm.DB的db字段就是管理数据库连接的地方。接下来我们简单阅读以下sql.DB结构体的内容。

type DB struct {
	// Atomic access only. At top of struct to prevent mis-alignment
	// on 32-bit platforms. Of type time.Duration.
	waitDuration int64 // Total time waited for new connections.
	connector driver.Connector
	// numClosed is an atomic counter which represents a total number of
	// closed connections. Stmt.openStmt checks it before cleaning closed
	// connections in Stmt.css.
	numClosed uint64
	mu           sync.Mutex // protects following fields
	freeConn     []*driverConn
	connRequests map[uint64]chan connRequest
	nextRequest  uint64 // Next key to use in connRequests.
	numOpen      int    // number of opened and pending open connections
	// 用于信号通知新连接的请求
	// 一个goroutine运行connectionOpener()方法来监听这个通道
	// maybeOpenNewConnections往这个通道中发送信号(每个连接请求发送一次)
	// 在db.Close()中被关闭,并且让运行connectionOpener的goroutine退出
	openerCh          chan struct{}
	resetterCh        chan *driverConn
	closed            bool
	dep               map[finalCloser]depSet
	lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
	maxIdle           int                    // 最大空闲连接数,0表示默认,即2个
	maxOpen           int                    // 连接数上限,0表示无限
	maxLifetime       time.Duration          // maximum amount of time a connection may be reused
	cleanerCh         chan struct{}
	waitCount         int64 // Total number of connections waited for.
	maxIdleClosed     int64 // Total number of connections closed due to idle.
	maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
	stop func() // stop cancels the connection opener and the session resetter.
}

关键的部位已用中文标示。

openerCh字段就是通知生成新连接的通道队列,默认最大值为1000000connRequests字段的类型为连接请求通道的字典(map[uint64]chan connRequest)集合,用于保存连接的请求通道队列。

使用DB.connectionOpenner方法监听openerCh通道,此方法调用DB.openNewConnection创建新连接。然后通过DB.connector.Connect(此例中,connector是sql.dsnConnector,而它调用的Connect方法由标准库中go-sql-driver/mysqlMySQLDriver.Open方法实现)真正发起数据库连接,包裹在一个driverConn结构体实例中返回。如果失败则调用DB.maybeOpenNewConnections方法再次通知openerCh生成新连接;成功则通过DB.putConnDBLocked方法获取一个connRequests字典中的连接请求通道队列(即chann connRequest),将连接注入到该通道,并使用DB.addDepLocked添加一个依赖标记(类似于gc)——当引用为0时释放这个连接。

就这样,携带着一个sql.DB结构体的gorm.DB结构体实例返回给了gopher,这就是我们通过gorm.Open得到的“数据库连接”。

# gorm和数据库的交互

上面我们已经了解了database/sql标准库是如何创建新连接的。在不考虑gorm的情况下,我们直接使用该标准库,发起查询时应是如下操作:

row, err := db.Query("select * from tbl_n")
// do something with row
row.Close()

当我们调用DB.Query方法时,它会调用私有方法DB.query,而此方法会通过DB.conn尝试获取缓存的连接或者新建的连接。现在,我们是第一次连接,所以没有缓存的连接,该方法直接走创建连接的流程,调用DB.connector.Connect方法——这个方法前文提到过。返回给DB.query方法一个连接,接着DB.query又调用了DB.queryDC方法经过一系列查询(此处不是我们关注的重点,略过),生成一个Rows结构体,返回给gopher使用。

然后我们调用Row.Close方法关闭查询。Close方法会调用Rows结构体中的releaseConn方法释放连接,这个方法实际上是由driverConn携带的:

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
	dc, err := db.conn(ctx, strategy)
	if err != nil {
		return nil, err
	}
	return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}
func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
    // ...
    rows := &Rows{
		dc:          dc,
		releaseConn: releaseConn,
		rowsi:       rowsi,
		closeStmt:   ds,
	}
	rows.initContextClose(ctx, txctx)
	return rows, nil
}

driverConn通过putConn方法再次调用DB.putConnDBLocked方法,如果没有排队等待的连接请求,将连接放入DB.freeConn中,成为被缓存的空闲连接。


在gorm中,当使用调用DB.Find等方法时,在进行一系列的组装之后,其最终都殊途同归于调用DB.callCallbacks方法。此方法接收一组闭包函数数组为参数,以回调的方式调用函数。这些闭包方法来自于DB.parent.callbacks.queries也就是Callback结构体中的增删改查数组。

有趣的是,在同一个包的另一个文件gorm/callback_query.go中,初始化注册了curd的默认回调函数。比如queryCallback函数:实质上在此函数中调用了SQLCommon.Query方法——前面我们提过,database/sql中的DB结构体是SQLCommon的实现,因此,此函数的作用就是进行调用上面所说的Query的方法。换言之,gorm把查询操作也视为回调队列的成员之一。