diff options
Diffstat (limited to 'libgo/go/database/sql/sql.go')
-rw-r--r-- | libgo/go/database/sql/sql.go | 350 |
1 files changed, 225 insertions, 125 deletions
diff --git a/libgo/go/database/sql/sql.go b/libgo/go/database/sql/sql.go index 0f5bbc0..b3d0653 100644 --- a/libgo/go/database/sql/sql.go +++ b/libgo/go/database/sql/sql.go @@ -421,17 +421,18 @@ type DB struct { // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. openerCh chan struct{} - resetterCh chan *driverConn closed bool dep map[finalCloser]depSet lastPut map[*driverConn]string // stacktrace of last conn's put; debug only - maxIdle int // zero means defaultMaxIdleConns; negative means 0 + maxIdleCount int // zero means defaultMaxIdleConns; negative means 0 maxOpen int // <= 0 means unlimited maxLifetime time.Duration // maximum amount of time a connection may be reused + maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed cleanerCh chan struct{} waitCount int64 // Total number of connections waited for. - maxIdleClosed int64 // Total number of connections closed due to idle. - maxLifetimeClosed int64 // Total number of connections closed due to max free limit. + maxIdleClosed int64 // Total number of connections closed due to idle count. + maxIdleTimeClosed int64 // Total number of connections closed due to idle time. + maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit. stop func() // stop cancels the connection opener and the session resetter. } @@ -458,15 +459,16 @@ type driverConn struct { sync.Mutex // guards following ci driver.Conn + needReset bool // The connection session should be reset before use if true. closed bool finalClosed bool // ci.Close has been called openStmt map[*driverStmt]bool - lastErr error // lastError captures the result of the session resetter. // guarded by db.mu inUse bool - onPut []func() // code (with db.mu held) run when conn is next returned - dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked + returnedAt time.Time // Time the connection was created or returned. + onPut []func() // code (with db.mu held) run when conn is next returned + dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked } func (dc *driverConn) releaseConn(err error) { @@ -486,6 +488,36 @@ func (dc *driverConn) expired(timeout time.Duration) bool { return dc.createdAt.Add(timeout).Before(nowFunc()) } +// resetSession checks if the driver connection needs the +// session to be reset and if required, resets it. +func (dc *driverConn) resetSession(ctx context.Context) error { + dc.Lock() + defer dc.Unlock() + + if !dc.needReset { + return nil + } + if cr, ok := dc.ci.(driver.SessionResetter); ok { + return cr.ResetSession(ctx) + } + return nil +} + +// validateConnection checks if the connection is valid and can +// still be used. It also marks the session for reset if required. +func (dc *driverConn) validateConnection(needsReset bool) bool { + dc.Lock() + defer dc.Unlock() + + if needsReset { + dc.needReset = true + } + if cv, ok := dc.ci.(driver.Validator); ok { + return cv.IsValid() + } + return true +} + // prepareLocked prepares the query on dc. When cg == nil the dc must keep track of // the prepared statements in a pool. func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) { @@ -511,19 +543,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que return ds, nil } -// resetSession resets the connection session and sets the lastErr -// that is checked before returning the connection to another query. -// -// resetSession assumes that the embedded mutex is locked when the connection -// was returned to the pool. This unlocks the mutex. -func (dc *driverConn) resetSession(ctx context.Context) { - defer dc.Unlock() // In case of panic. - if dc.closed { // Check if the database has been closed. - return - } - dc.lastErr = dc.ci.(driver.SessionResetter).ResetSession(ctx) -} - // the dc.db's Mutex is held. func (dc *driverConn) closeDBLocked() func() error { dc.Lock() @@ -713,14 +732,12 @@ func OpenDB(c driver.Connector) *DB { db := &DB{ connector: c, openerCh: make(chan struct{}, connectionRequestQueueSize), - resetterCh: make(chan *driverConn, 50), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } go db.connectionOpener(ctx) - go db.connectionResetter(ctx) return db } @@ -839,7 +856,7 @@ func (db *DB) Close() error { const defaultMaxIdleConns = 2 func (db *DB) maxIdleConnsLocked() int { - n := db.maxIdle + n := db.maxIdleCount switch { case n == 0: // TODO(bradfitz): ask driver, if supported, for its default preference @@ -851,6 +868,14 @@ func (db *DB) maxIdleConnsLocked() int { } } +func (db *DB) shortestIdleTimeLocked() time.Duration { + min := db.maxIdleTime + if min > db.maxLifetime { + min = db.maxLifetime + } + return min +} + // SetMaxIdleConns sets the maximum number of connections in the idle // connection pool. // @@ -864,14 +889,14 @@ func (db *DB) maxIdleConnsLocked() int { func (db *DB) SetMaxIdleConns(n int) { db.mu.Lock() if n > 0 { - db.maxIdle = n + db.maxIdleCount = n } else { // No idle connections. - db.maxIdle = -1 + db.maxIdleCount = -1 } // Make sure maxIdle doesn't exceed maxOpen if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen { - db.maxIdle = db.maxOpen + db.maxIdleCount = db.maxOpen } var closing []*driverConn idleCount := len(db.freeConn) @@ -912,13 +937,13 @@ func (db *DB) SetMaxOpenConns(n int) { // // Expired connections may be closed lazily before reuse. // -// If d <= 0, connections are reused forever. +// If d <= 0, connections are not closed due to a connection's age. func (db *DB) SetConnMaxLifetime(d time.Duration) { if d < 0 { d = 0 } db.mu.Lock() - // wake cleaner up when lifetime is shortened. + // Wake cleaner up when lifetime is shortened. if d > 0 && d < db.maxLifetime && db.cleanerCh != nil { select { case db.cleanerCh <- struct{}{}: @@ -930,11 +955,34 @@ func (db *DB) SetConnMaxLifetime(d time.Duration) { db.mu.Unlock() } +// SetConnMaxIdleTime sets the maximum amount of time a connection may be idle. +// +// Expired connections may be closed lazily before reuse. +// +// If d <= 0, connections are not closed due to a connection's idle time. +func (db *DB) SetConnMaxIdleTime(d time.Duration) { + if d < 0 { + d = 0 + } + db.mu.Lock() + defer db.mu.Unlock() + + // Wake cleaner up when idle time is shortened. + if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil { + select { + case db.cleanerCh <- struct{}{}: + default: + } + } + db.maxIdleTime = d + db.startCleanerLocked() +} + // startCleanerLocked starts connectionCleaner if needed. func (db *DB) startCleanerLocked() { - if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { + if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) - go db.connectionCleaner(db.maxLifetime) + go db.connectionCleaner(db.shortestIdleTimeLocked()) } } @@ -953,15 +1001,30 @@ func (db *DB) connectionCleaner(d time.Duration) { } db.mu.Lock() - d = db.maxLifetime + + d = db.shortestIdleTimeLocked() if db.closed || db.numOpen == 0 || d <= 0 { db.cleanerCh = nil db.mu.Unlock() return } - expiredSince := nowFunc().Add(-d) - var closing []*driverConn + closing := db.connectionCleanerRunLocked() + db.mu.Unlock() + for _, c := range closing { + c.Close() + } + + if d < minInterval { + d = minInterval + } + t.Reset(d) + } +} + +func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) { + if db.maxLifetime > 0 { + expiredSince := nowFunc().Add(-db.maxLifetime) for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if c.createdAt.Before(expiredSince) { @@ -974,17 +1037,26 @@ func (db *DB) connectionCleaner(d time.Duration) { } } db.maxLifetimeClosed += int64(len(closing)) - db.mu.Unlock() - - for _, c := range closing { - c.Close() - } + } - if d < minInterval { - d = minInterval + if db.maxIdleTime > 0 { + expiredSince := nowFunc().Add(-db.maxIdleTime) + var expiredCount int64 + for i := 0; i < len(db.freeConn); i++ { + c := db.freeConn[i] + if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) { + closing = append(closing, c) + expiredCount++ + last := len(db.freeConn) - 1 + db.freeConn[i] = db.freeConn[last] + db.freeConn[last] = nil + db.freeConn = db.freeConn[:last] + i-- + } } - t.Reset(d) + db.maxIdleTimeClosed += expiredCount } + return } // DBStats contains database statistics. @@ -1000,6 +1072,7 @@ type DBStats struct { WaitCount int64 // The total number of connections waited for. WaitDuration time.Duration // The total time blocked waiting for a new connection. MaxIdleClosed int64 // The total number of connections closed due to SetMaxIdleConns. + MaxIdleTimeClosed int64 // The total number of connections closed due to SetConnMaxIdleTime. MaxLifetimeClosed int64 // The total number of connections closed due to SetConnMaxLifetime. } @@ -1020,6 +1093,7 @@ func (db *DB) Stats() DBStats { WaitCount: db.waitCount, WaitDuration: time.Duration(wait), MaxIdleClosed: db.maxIdleClosed, + MaxIdleTimeClosed: db.maxIdleTimeClosed, MaxLifetimeClosed: db.maxLifetimeClosed, } return stats @@ -1058,23 +1132,6 @@ func (db *DB) connectionOpener(ctx context.Context) { } } -// connectionResetter runs in a separate goroutine to reset connections async -// to exported API. -func (db *DB) connectionResetter(ctx context.Context) { - for { - select { - case <-ctx.Done(): - close(db.resetterCh) - for dc := range db.resetterCh { - dc.Unlock() - } - return - case dc := <-db.resetterCh: - dc.resetSession(ctx) - } - } -} - // Open one new connection func (db *DB) openNewConnection(ctx context.Context) { // maybeOpenNewConnctions has already executed db.numOpen++ before it sent @@ -1097,9 +1154,10 @@ func (db *DB) openNewConnection(ctx context.Context) { return } dc := &driverConn{ - db: db, - createdAt: nowFunc(), - ci: ci, + db: db, + createdAt: nowFunc(), + returnedAt: nowFunc(), + ci: ci, } if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) @@ -1150,19 +1208,20 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true - db.mu.Unlock() if conn.expired(lifetime) { + db.maxLifetimeClosed++ + db.mu.Unlock() conn.Close() return nil, driver.ErrBadConn } - // Lock around reading lastErr to ensure the session resetter finished. - conn.Lock() - err := conn.lastErr - conn.Unlock() - if err == driver.ErrBadConn { + db.mu.Unlock() + + // Reset the session if required. + if err := conn.resetSession(ctx); err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } + return conn, nil } @@ -1177,7 +1236,7 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn db.waitCount++ db.mu.Unlock() - waitStart := time.Now() + waitStart := nowFunc() // Timeout the connection request with the context. select { @@ -1204,18 +1263,25 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn if !ok { return nil, errDBClosed } - if ret.err == nil && ret.conn.expired(lifetime) { + // Only check if the connection is expired if the strategy is cachedOrNewConns. + // If we require a new connection, just re-use the connection without looking + // at the expiry time. If it is expired, it will be checked when it is placed + // back into the connection pool. + // This prioritizes giving a valid connection to a client over the exact connection + // lifetime, which could expire exactly after this point anyway. + if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { + db.mu.Lock() + db.maxLifetimeClosed++ + db.mu.Unlock() ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } - // Lock around reading lastErr to ensure the session resetter finished. - ret.conn.Lock() - err := ret.conn.lastErr - ret.conn.Unlock() - if err == driver.ErrBadConn { + + // Reset the session if required. + if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } @@ -1235,10 +1301,11 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn } db.mu.Lock() dc := &driverConn{ - db: db, - createdAt: nowFunc(), - ci: ci, - inUse: true, + db: db, + createdAt: nowFunc(), + returnedAt: nowFunc(), + ci: ci, + inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() @@ -1275,17 +1342,29 @@ const debugGetPut = false // putConn adds a connection to the db's free pool. // err is optionally the last error that occurred on this connection. func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { + if err != driver.ErrBadConn { + if !dc.validateConnection(resetSession) { + err = driver.ErrBadConn + } + } db.mu.Lock() if !dc.inUse { + db.mu.Unlock() if debugGetPut { fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) } panic("sql: connection returned that was never out") } + + if err != driver.ErrBadConn && dc.expired(db.maxLifetime) { + db.maxLifetimeClosed++ + err = driver.ErrBadConn + } if debugGetPut { db.lastPut[dc] = stack() } dc.inUse = false + dc.returnedAt = nowFunc() for _, fn := range dc.onPut { fn() @@ -1305,41 +1384,13 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { if putConnHook != nil { putConnHook(db, dc) } - if db.closed { - // Connections do not need to be reset if they will be closed. - // Prevents writing to resetterCh after the DB has closed. - resetSession = false - } - if resetSession { - if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { - // Lock the driverConn here so it isn't released until - // the connection is reset. - // The lock must be taken before the connection is put into - // the pool to prevent it from being taken out before it is reset. - dc.Lock() - } - } added := db.putConnDBLocked(dc, nil) db.mu.Unlock() if !added { - if resetSession { - dc.Unlock() - } dc.Close() return } - if !resetSession { - return - } - select { - default: - // If the resetterCh is blocking then mark the connection - // as bad and continue on. - dc.lastErr = driver.ErrBadConn - dc.Unlock() - case db.resetterCh <- dc: - } } // Satisfy a connRequest or put the driverConn in the idle pool and return true @@ -1701,7 +1752,11 @@ func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStra // beginDC starts a transaction. The provided dc must be valid and ready to use. func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) { var txi driver.Tx + keepConnOnRollback := false withLock(dc, func() { + _, hasSessionResetter := dc.ci.(driver.SessionResetter) + _, hasConnectionValidator := dc.ci.(driver.Validator) + keepConnOnRollback = hasSessionResetter && hasConnectionValidator txi, err = ctxDriverBegin(ctx, opts, dc.ci) }) if err != nil { @@ -1713,12 +1768,13 @@ func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), // The cancel function in Tx will be called after done is set to true. ctx, cancel := context.WithCancel(ctx) tx = &Tx{ - db: db, - dc: dc, - releaseConn: release, - txi: txi, - cancel: cancel, - ctx: ctx, + db: db, + dc: dc, + releaseConn: release, + txi: txi, + cancel: cancel, + keepConnOnRollback: keepConnOnRollback, + ctx: ctx, } go tx.awaitDone() return tx, nil @@ -1980,6 +2036,11 @@ type Tx struct { // Use atomic operations on value when checking value. done int32 + // keepConnOnRollback is true if the driver knows + // how to reset the connection's session and if need be discard + // the connection. + keepConnOnRollback bool + // All Stmts prepared for this transaction. These will be closed after the // transaction has been committed or rolled back. stmts struct { @@ -2005,7 +2066,10 @@ func (tx *Tx) awaitDone() { // transaction is closed and the resources are released. This // rollback does nothing if the transaction has already been // committed or rolled back. - tx.rollback(true) + // Do not discard the connection if the connection knows + // how to reset the session. + discardConnection := !tx.keepConnOnRollback + tx.rollback(discardConnection) } func (tx *Tx) isDone() bool { @@ -2016,14 +2080,10 @@ func (tx *Tx) isDone() bool { // that has already been committed or rolled back. var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back") -// close returns the connection to the pool and -// must only be called by Tx.rollback or Tx.Commit. -func (tx *Tx) close(err error) { - tx.cancel() - - tx.closemu.Lock() - defer tx.closemu.Unlock() - +// closeLocked returns the connection to the pool and +// must only be called by Tx.rollback or Tx.Commit while +// closemu is Locked and tx already canceled. +func (tx *Tx) closeLocked(err error) { tx.releaseConn(err) tx.dc = nil tx.txi = nil @@ -2090,6 +2150,15 @@ func (tx *Tx) Commit() error { if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { return ErrTxDone } + + // Cancel the Tx to release any active R-closemu locks. + // This is safe to do because tx.done has already transitioned + // from 0 to 1. Hold the W-closemu lock prior to rollback + // to ensure no other connection has an active query. + tx.cancel() + tx.closemu.Lock() + defer tx.closemu.Unlock() + var err error withLock(tx.dc, func() { err = tx.txi.Commit() @@ -2097,16 +2166,31 @@ func (tx *Tx) Commit() error { if err != driver.ErrBadConn { tx.closePrepared() } - tx.close(err) + tx.closeLocked(err) return err } +var rollbackHook func() + // rollback aborts the transaction and optionally forces the pool to discard // the connection. func (tx *Tx) rollback(discardConn bool) error { if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { return ErrTxDone } + + if rollbackHook != nil { + rollbackHook() + } + + // Cancel the Tx to release any active R-closemu locks. + // This is safe to do because tx.done has already transitioned + // from 0 to 1. Hold the W-closemu lock prior to rollback + // to ensure no other connection has an active query. + tx.cancel() + tx.closemu.Lock() + defer tx.closemu.Unlock() + var err error withLock(tx.dc, func() { err = tx.txi.Rollback() @@ -2117,7 +2201,7 @@ func (tx *Tx) rollback(discardConn bool) error { if discardConn { err = driver.ErrBadConn } - tx.close(err) + tx.closeLocked(err) return err } @@ -2709,10 +2793,17 @@ func (rs *Rows) lasterrOrErrLocked(err error) error { return err } +// bypassRowsAwaitDone is only used for testing. +// If true, it will not close the Rows automatically from the context. +var bypassRowsAwaitDone = false + func (rs *Rows) initContextClose(ctx, txctx context.Context) { if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) { return } + if bypassRowsAwaitDone { + return + } ctx, rs.cancel = context.WithCancel(ctx) go rs.awaitDone(ctx, txctx) } @@ -2922,10 +3013,11 @@ func (ci *ColumnType) Nullable() (nullable, ok bool) { } // DatabaseTypeName returns the database system name of the column type. If an empty -// string is returned the driver type name is not supported. +// string is returned, then the driver type name is not supported. // Consult your driver documentation for a list of driver data types. Length specifiers // are not included. -// Common type include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL", "INT", "BIGINT". +// Common type names include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL", +// "INT", and "BIGINT". func (ci *ColumnType) DatabaseTypeName() string { return ci.databaseType } @@ -3140,6 +3232,14 @@ func (r *Row) Scan(dest ...interface{}) error { return r.rows.Close() } +// Err provides a way for wrapping packages to check for +// query errors without calling Scan. +// Err returns the error, if any, that was encountered while running the query. +// If this error is not nil, this error will also be returned from Scan. +func (r *Row) Err() error { + return r.err +} + // A Result summarizes an executed SQL command. type Result interface { // LastInsertId returns the integer generated by the database |