diff options
Diffstat (limited to 'libgo/go/database')
-rw-r--r-- | libgo/go/database/sql/driver/driver.go | 61 | ||||
-rw-r--r-- | libgo/go/database/sql/fakedb_test.go | 45 | ||||
-rw-r--r-- | libgo/go/database/sql/sql.go | 350 | ||||
-rw-r--r-- | libgo/go/database/sql/sql_test.go | 296 |
4 files changed, 617 insertions, 135 deletions
diff --git a/libgo/go/database/sql/driver/driver.go b/libgo/go/database/sql/driver/driver.go index 316e7ce..5bbcf20 100644 --- a/libgo/go/database/sql/driver/driver.go +++ b/libgo/go/database/sql/driver/driver.go @@ -6,6 +6,35 @@ // drivers as used by package sql. // // Most code should use package sql. +// +// The driver interface has evolved over time. Drivers should implement +// Connector and DriverContext interfaces. +// The Connector.Connect and Driver.Open methods should never return ErrBadConn. +// ErrBadConn should only be returned from Validator, SessionResetter, or +// a query method if the connection is already in an invalid (e.g. closed) state. +// +// All Conn implementations should implement the following interfaces: +// Pinger, SessionResetter, and Validator. +// +// If named parameters or context are supported, the driver's Conn should implement: +// ExecerContext, QueryerContext, ConnPrepareContext, and ConnBeginTx. +// +// To support custom data types, implement NamedValueChecker. NamedValueChecker +// also allows queries to accept per-query options as a parameter by returning +// ErrRemoveArgument from CheckNamedValue. +// +// If multiple result sets are supported, Rows should implement RowsNextResultSet. +// If the driver knows how to describe the types present in the returned result +// it should implement the following interfaces: RowsColumnTypeScanType, +// RowsColumnTypeDatabaseTypeName, RowsColumnTypeLength, RowsColumnTypeNullable, +// and RowsColumnTypePrecisionScale. A given row value may also return a Rows +// type, which may represent a database cursor value. +// +// Before a connection is returned to the connection pool after use, IsValid is +// called if implemented. Before a connection is reused for another query, +// ResetSession is called if implemented. If a connection is never returned to the +// connection pool but immediately reused, then ResetSession is called prior to +// reuse but IsValid is not called. package driver import ( @@ -67,7 +96,7 @@ type Driver interface { // If a Driver implements DriverContext, then sql.DB will call // OpenConnector to obtain a Connector and then invoke -// that Connector's Conn method to obtain each needed connection, +// that Connector's Connect method to obtain each needed connection, // instead of invoking the Driver's Open method for each connection. // The two-step sequence allows drivers to parse the name just once // and also provides access to per-Conn contexts. @@ -94,7 +123,9 @@ type Connector interface { // // The provided context.Context is for dialing purposes only // (see net.DialContext) and should not be stored or used for - // other purposes. + // other purposes. A default timeout should still be used + // when dialing as a connection pool may call Connect + // asynchronously to any query. // // The returned connection is only used by one goroutine at a // time. @@ -205,6 +236,9 @@ type Conn interface { // connections and only calls Close when there's a surplus of // idle connections, it shouldn't be necessary for drivers to // do their own connection caching. + // + // Drivers must ensure all network calls made by Close + // do not block indefinitely (e.g. apply a timeout). Close() error // Begin starts and returns a new transaction. @@ -255,15 +289,23 @@ type ConnBeginTx interface { // SessionResetter may be implemented by Conn to allow drivers to reset the // session state associated with the connection and to signal a bad connection. type SessionResetter interface { - // ResetSession is called while a connection is in the connection - // pool. No queries will run on this connection until this method returns. - // - // If the connection is bad this should return driver.ErrBadConn to prevent - // the connection from being returned to the connection pool. Any other - // error will be discarded. + // ResetSession is called prior to executing a query on the connection + // if the connection has been used before. If the driver returns ErrBadConn + // the connection is discarded. ResetSession(ctx context.Context) error } +// Validator may be implemented by Conn to allow drivers to +// signal if a connection is valid or if it should be discarded. +// +// If implemented, drivers may return the underlying error from queries, +// even if the connection should be discarded by the connection pool. +type Validator interface { + // IsValid is called prior to placing the connection into the + // connection pool. The connection will be discarded if false is returned. + IsValid() bool +} + // Result is the result of a query execution. type Result interface { // LastInsertId returns the database's auto-generated ID @@ -283,6 +325,9 @@ type Stmt interface { // // As of Go 1.1, a Stmt will not be closed if it's in use // by any queries. + // + // Drivers must ensure all network calls made by Close + // do not block indefinitely (e.g. apply a timeout). Close() error // NumInput returns the number of placeholder parameters. diff --git a/libgo/go/database/sql/fakedb_test.go b/libgo/go/database/sql/fakedb_test.go index a0028be..7605a2a 100644 --- a/libgo/go/database/sql/fakedb_test.go +++ b/libgo/go/database/sql/fakedb_test.go @@ -390,12 +390,19 @@ func setStrictFakeConnClose(t *testing.T) { func (c *fakeConn) ResetSession(ctx context.Context) error { c.dirtySession = false + c.currTx = nil if c.isBad() { return driver.ErrBadConn } return nil } +var _ driver.Validator = (*fakeConn)(nil) + +func (c *fakeConn) IsValid() bool { + return !c.isBad() +} + func (c *fakeConn) Close() (err error) { drv := fdriver.(*fakeDriver) defer func() { @@ -728,6 +735,9 @@ var hookExecBadConn func() bool func (s *fakeStmt) Exec(args []driver.Value) (driver.Result, error) { panic("Using ExecContext") } + +var errFakeConnSessionDirty = errors.New("fakedb: session is dirty") + func (s *fakeStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) { if s.panic == "Exec" { panic(s.panic) @@ -740,7 +750,7 @@ func (s *fakeStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (d return nil, driver.ErrBadConn } if s.c.isDirtyAndMark() { - return nil, errors.New("fakedb: session is dirty") + return nil, errFakeConnSessionDirty } err := checkSubsetTypes(s.c.db.allowAny, args) @@ -854,7 +864,7 @@ func (s *fakeStmt) QueryContext(ctx context.Context, args []driver.NamedValue) ( return nil, driver.ErrBadConn } if s.c.isDirtyAndMark() { - return nil, errors.New("fakedb: session is dirty") + return nil, errFakeConnSessionDirty } err := checkSubsetTypes(s.c.db.allowAny, args) @@ -887,6 +897,37 @@ func (s *fakeStmt) QueryContext(ctx context.Context, args []driver.NamedValue) ( } } } + if s.table == "tx_status" && s.colName[0] == "tx_status" { + txStatus := "autocommit" + if s.c.currTx != nil { + txStatus = "transaction" + } + cursor := &rowsCursor{ + parentMem: s.c, + posRow: -1, + rows: [][]*row{ + []*row{ + { + cols: []interface{}{ + txStatus, + }, + }, + }, + }, + cols: [][]string{ + []string{ + "tx_status", + }, + }, + colType: [][]string{ + []string{ + "string", + }, + }, + errPos: -1, + } + return cursor, nil + } t.mu.Lock() diff --git a/libgo/go/database/sql/sql.go b/libgo/go/database/sql/sql.go index 0f5bbc0..b3d0653 100644 --- a/libgo/go/database/sql/sql.go +++ b/libgo/go/database/sql/sql.go @@ -421,17 +421,18 @@ type DB struct { // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. openerCh chan struct{} - resetterCh chan *driverConn closed bool dep map[finalCloser]depSet lastPut map[*driverConn]string // stacktrace of last conn's put; debug only - maxIdle int // zero means defaultMaxIdleConns; negative means 0 + maxIdleCount int // zero means defaultMaxIdleConns; negative means 0 maxOpen int // <= 0 means unlimited maxLifetime time.Duration // maximum amount of time a connection may be reused + maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed cleanerCh chan struct{} waitCount int64 // Total number of connections waited for. - maxIdleClosed int64 // Total number of connections closed due to idle. - maxLifetimeClosed int64 // Total number of connections closed due to max free limit. + maxIdleClosed int64 // Total number of connections closed due to idle count. + maxIdleTimeClosed int64 // Total number of connections closed due to idle time. + maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit. stop func() // stop cancels the connection opener and the session resetter. } @@ -458,15 +459,16 @@ type driverConn struct { sync.Mutex // guards following ci driver.Conn + needReset bool // The connection session should be reset before use if true. closed bool finalClosed bool // ci.Close has been called openStmt map[*driverStmt]bool - lastErr error // lastError captures the result of the session resetter. // guarded by db.mu inUse bool - onPut []func() // code (with db.mu held) run when conn is next returned - dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked + returnedAt time.Time // Time the connection was created or returned. + onPut []func() // code (with db.mu held) run when conn is next returned + dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked } func (dc *driverConn) releaseConn(err error) { @@ -486,6 +488,36 @@ func (dc *driverConn) expired(timeout time.Duration) bool { return dc.createdAt.Add(timeout).Before(nowFunc()) } +// resetSession checks if the driver connection needs the +// session to be reset and if required, resets it. +func (dc *driverConn) resetSession(ctx context.Context) error { + dc.Lock() + defer dc.Unlock() + + if !dc.needReset { + return nil + } + if cr, ok := dc.ci.(driver.SessionResetter); ok { + return cr.ResetSession(ctx) + } + return nil +} + +// validateConnection checks if the connection is valid and can +// still be used. It also marks the session for reset if required. +func (dc *driverConn) validateConnection(needsReset bool) bool { + dc.Lock() + defer dc.Unlock() + + if needsReset { + dc.needReset = true + } + if cv, ok := dc.ci.(driver.Validator); ok { + return cv.IsValid() + } + return true +} + // prepareLocked prepares the query on dc. When cg == nil the dc must keep track of // the prepared statements in a pool. func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) { @@ -511,19 +543,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que return ds, nil } -// resetSession resets the connection session and sets the lastErr -// that is checked before returning the connection to another query. -// -// resetSession assumes that the embedded mutex is locked when the connection -// was returned to the pool. This unlocks the mutex. -func (dc *driverConn) resetSession(ctx context.Context) { - defer dc.Unlock() // In case of panic. - if dc.closed { // Check if the database has been closed. - return - } - dc.lastErr = dc.ci.(driver.SessionResetter).ResetSession(ctx) -} - // the dc.db's Mutex is held. func (dc *driverConn) closeDBLocked() func() error { dc.Lock() @@ -713,14 +732,12 @@ func OpenDB(c driver.Connector) *DB { db := &DB{ connector: c, openerCh: make(chan struct{}, connectionRequestQueueSize), - resetterCh: make(chan *driverConn, 50), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } go db.connectionOpener(ctx) - go db.connectionResetter(ctx) return db } @@ -839,7 +856,7 @@ func (db *DB) Close() error { const defaultMaxIdleConns = 2 func (db *DB) maxIdleConnsLocked() int { - n := db.maxIdle + n := db.maxIdleCount switch { case n == 0: // TODO(bradfitz): ask driver, if supported, for its default preference @@ -851,6 +868,14 @@ func (db *DB) maxIdleConnsLocked() int { } } +func (db *DB) shortestIdleTimeLocked() time.Duration { + min := db.maxIdleTime + if min > db.maxLifetime { + min = db.maxLifetime + } + return min +} + // SetMaxIdleConns sets the maximum number of connections in the idle // connection pool. // @@ -864,14 +889,14 @@ func (db *DB) maxIdleConnsLocked() int { func (db *DB) SetMaxIdleConns(n int) { db.mu.Lock() if n > 0 { - db.maxIdle = n + db.maxIdleCount = n } else { // No idle connections. - db.maxIdle = -1 + db.maxIdleCount = -1 } // Make sure maxIdle doesn't exceed maxOpen if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen { - db.maxIdle = db.maxOpen + db.maxIdleCount = db.maxOpen } var closing []*driverConn idleCount := len(db.freeConn) @@ -912,13 +937,13 @@ func (db *DB) SetMaxOpenConns(n int) { // // Expired connections may be closed lazily before reuse. // -// If d <= 0, connections are reused forever. +// If d <= 0, connections are not closed due to a connection's age. func (db *DB) SetConnMaxLifetime(d time.Duration) { if d < 0 { d = 0 } db.mu.Lock() - // wake cleaner up when lifetime is shortened. + // Wake cleaner up when lifetime is shortened. if d > 0 && d < db.maxLifetime && db.cleanerCh != nil { select { case db.cleanerCh <- struct{}{}: @@ -930,11 +955,34 @@ func (db *DB) SetConnMaxLifetime(d time.Duration) { db.mu.Unlock() } +// SetConnMaxIdleTime sets the maximum amount of time a connection may be idle. +// +// Expired connections may be closed lazily before reuse. +// +// If d <= 0, connections are not closed due to a connection's idle time. +func (db *DB) SetConnMaxIdleTime(d time.Duration) { + if d < 0 { + d = 0 + } + db.mu.Lock() + defer db.mu.Unlock() + + // Wake cleaner up when idle time is shortened. + if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil { + select { + case db.cleanerCh <- struct{}{}: + default: + } + } + db.maxIdleTime = d + db.startCleanerLocked() +} + // startCleanerLocked starts connectionCleaner if needed. func (db *DB) startCleanerLocked() { - if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { + if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) - go db.connectionCleaner(db.maxLifetime) + go db.connectionCleaner(db.shortestIdleTimeLocked()) } } @@ -953,15 +1001,30 @@ func (db *DB) connectionCleaner(d time.Duration) { } db.mu.Lock() - d = db.maxLifetime + + d = db.shortestIdleTimeLocked() if db.closed || db.numOpen == 0 || d <= 0 { db.cleanerCh = nil db.mu.Unlock() return } - expiredSince := nowFunc().Add(-d) - var closing []*driverConn + closing := db.connectionCleanerRunLocked() + db.mu.Unlock() + for _, c := range closing { + c.Close() + } + + if d < minInterval { + d = minInterval + } + t.Reset(d) + } +} + +func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) { + if db.maxLifetime > 0 { + expiredSince := nowFunc().Add(-db.maxLifetime) for i := 0; i < len(db.freeConn); i++ { c := db.freeConn[i] if c.createdAt.Before(expiredSince) { @@ -974,17 +1037,26 @@ func (db *DB) connectionCleaner(d time.Duration) { } } db.maxLifetimeClosed += int64(len(closing)) - db.mu.Unlock() - - for _, c := range closing { - c.Close() - } + } - if d < minInterval { - d = minInterval + if db.maxIdleTime > 0 { + expiredSince := nowFunc().Add(-db.maxIdleTime) + var expiredCount int64 + for i := 0; i < len(db.freeConn); i++ { + c := db.freeConn[i] + if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) { + closing = append(closing, c) + expiredCount++ + last := len(db.freeConn) - 1 + db.freeConn[i] = db.freeConn[last] + db.freeConn[last] = nil + db.freeConn = db.freeConn[:last] + i-- + } } - t.Reset(d) + db.maxIdleTimeClosed += expiredCount } + return } // DBStats contains database statistics. @@ -1000,6 +1072,7 @@ type DBStats struct { WaitCount int64 // The total number of connections waited for. WaitDuration time.Duration // The total time blocked waiting for a new connection. MaxIdleClosed int64 // The total number of connections closed due to SetMaxIdleConns. + MaxIdleTimeClosed int64 // The total number of connections closed due to SetConnMaxIdleTime. MaxLifetimeClosed int64 // The total number of connections closed due to SetConnMaxLifetime. } @@ -1020,6 +1093,7 @@ func (db *DB) Stats() DBStats { WaitCount: db.waitCount, WaitDuration: time.Duration(wait), MaxIdleClosed: db.maxIdleClosed, + MaxIdleTimeClosed: db.maxIdleTimeClosed, MaxLifetimeClosed: db.maxLifetimeClosed, } return stats @@ -1058,23 +1132,6 @@ func (db *DB) connectionOpener(ctx context.Context) { } } -// connectionResetter runs in a separate goroutine to reset connections async -// to exported API. -func (db *DB) connectionResetter(ctx context.Context) { - for { - select { - case <-ctx.Done(): - close(db.resetterCh) - for dc := range db.resetterCh { - dc.Unlock() - } - return - case dc := <-db.resetterCh: - dc.resetSession(ctx) - } - } -} - // Open one new connection func (db *DB) openNewConnection(ctx context.Context) { // maybeOpenNewConnctions has already executed db.numOpen++ before it sent @@ -1097,9 +1154,10 @@ func (db *DB) openNewConnection(ctx context.Context) { return } dc := &driverConn{ - db: db, - createdAt: nowFunc(), - ci: ci, + db: db, + createdAt: nowFunc(), + returnedAt: nowFunc(), + ci: ci, } if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) @@ -1150,19 +1208,20 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true - db.mu.Unlock() if conn.expired(lifetime) { + db.maxLifetimeClosed++ + db.mu.Unlock() conn.Close() return nil, driver.ErrBadConn } - // Lock around reading lastErr to ensure the session resetter finished. - conn.Lock() - err := conn.lastErr - conn.Unlock() - if err == driver.ErrBadConn { + db.mu.Unlock() + + // Reset the session if required. + if err := conn.resetSession(ctx); err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } + return conn, nil } @@ -1177,7 +1236,7 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn db.waitCount++ db.mu.Unlock() - waitStart := time.Now() + waitStart := nowFunc() // Timeout the connection request with the context. select { @@ -1204,18 +1263,25 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn if !ok { return nil, errDBClosed } - if ret.err == nil && ret.conn.expired(lifetime) { + // Only check if the connection is expired if the strategy is cachedOrNewConns. + // If we require a new connection, just re-use the connection without looking + // at the expiry time. If it is expired, it will be checked when it is placed + // back into the connection pool. + // This prioritizes giving a valid connection to a client over the exact connection + // lifetime, which could expire exactly after this point anyway. + if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { + db.mu.Lock() + db.maxLifetimeClosed++ + db.mu.Unlock() ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } - // Lock around reading lastErr to ensure the session resetter finished. - ret.conn.Lock() - err := ret.conn.lastErr - ret.conn.Unlock() - if err == driver.ErrBadConn { + + // Reset the session if required. + if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } @@ -1235,10 +1301,11 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn } db.mu.Lock() dc := &driverConn{ - db: db, - createdAt: nowFunc(), - ci: ci, - inUse: true, + db: db, + createdAt: nowFunc(), + returnedAt: nowFunc(), + ci: ci, + inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() @@ -1275,17 +1342,29 @@ const debugGetPut = false // putConn adds a connection to the db's free pool. // err is optionally the last error that occurred on this connection. func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { + if err != driver.ErrBadConn { + if !dc.validateConnection(resetSession) { + err = driver.ErrBadConn + } + } db.mu.Lock() if !dc.inUse { + db.mu.Unlock() if debugGetPut { fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) } panic("sql: connection returned that was never out") } + + if err != driver.ErrBadConn && dc.expired(db.maxLifetime) { + db.maxLifetimeClosed++ + err = driver.ErrBadConn + } if debugGetPut { db.lastPut[dc] = stack() } dc.inUse = false + dc.returnedAt = nowFunc() for _, fn := range dc.onPut { fn() @@ -1305,41 +1384,13 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { if putConnHook != nil { putConnHook(db, dc) } - if db.closed { - // Connections do not need to be reset if they will be closed. - // Prevents writing to resetterCh after the DB has closed. - resetSession = false - } - if resetSession { - if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { - // Lock the driverConn here so it isn't released until - // the connection is reset. - // The lock must be taken before the connection is put into - // the pool to prevent it from being taken out before it is reset. - dc.Lock() - } - } added := db.putConnDBLocked(dc, nil) db.mu.Unlock() if !added { - if resetSession { - dc.Unlock() - } dc.Close() return } - if !resetSession { - return - } - select { - default: - // If the resetterCh is blocking then mark the connection - // as bad and continue on. - dc.lastErr = driver.ErrBadConn - dc.Unlock() - case db.resetterCh <- dc: - } } // Satisfy a connRequest or put the driverConn in the idle pool and return true @@ -1701,7 +1752,11 @@ func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStra // beginDC starts a transaction. The provided dc must be valid and ready to use. func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) { var txi driver.Tx + keepConnOnRollback := false withLock(dc, func() { + _, hasSessionResetter := dc.ci.(driver.SessionResetter) + _, hasConnectionValidator := dc.ci.(driver.Validator) + keepConnOnRollback = hasSessionResetter && hasConnectionValidator txi, err = ctxDriverBegin(ctx, opts, dc.ci) }) if err != nil { @@ -1713,12 +1768,13 @@ func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), // The cancel function in Tx will be called after done is set to true. ctx, cancel := context.WithCancel(ctx) tx = &Tx{ - db: db, - dc: dc, - releaseConn: release, - txi: txi, - cancel: cancel, - ctx: ctx, + db: db, + dc: dc, + releaseConn: release, + txi: txi, + cancel: cancel, + keepConnOnRollback: keepConnOnRollback, + ctx: ctx, } go tx.awaitDone() return tx, nil @@ -1980,6 +2036,11 @@ type Tx struct { // Use atomic operations on value when checking value. done int32 + // keepConnOnRollback is true if the driver knows + // how to reset the connection's session and if need be discard + // the connection. + keepConnOnRollback bool + // All Stmts prepared for this transaction. These will be closed after the // transaction has been committed or rolled back. stmts struct { @@ -2005,7 +2066,10 @@ func (tx *Tx) awaitDone() { // transaction is closed and the resources are released. This // rollback does nothing if the transaction has already been // committed or rolled back. - tx.rollback(true) + // Do not discard the connection if the connection knows + // how to reset the session. + discardConnection := !tx.keepConnOnRollback + tx.rollback(discardConnection) } func (tx *Tx) isDone() bool { @@ -2016,14 +2080,10 @@ func (tx *Tx) isDone() bool { // that has already been committed or rolled back. var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back") -// close returns the connection to the pool and -// must only be called by Tx.rollback or Tx.Commit. -func (tx *Tx) close(err error) { - tx.cancel() - - tx.closemu.Lock() - defer tx.closemu.Unlock() - +// closeLocked returns the connection to the pool and +// must only be called by Tx.rollback or Tx.Commit while +// closemu is Locked and tx already canceled. +func (tx *Tx) closeLocked(err error) { tx.releaseConn(err) tx.dc = nil tx.txi = nil @@ -2090,6 +2150,15 @@ func (tx *Tx) Commit() error { if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { return ErrTxDone } + + // Cancel the Tx to release any active R-closemu locks. + // This is safe to do because tx.done has already transitioned + // from 0 to 1. Hold the W-closemu lock prior to rollback + // to ensure no other connection has an active query. + tx.cancel() + tx.closemu.Lock() + defer tx.closemu.Unlock() + var err error withLock(tx.dc, func() { err = tx.txi.Commit() @@ -2097,16 +2166,31 @@ func (tx *Tx) Commit() error { if err != driver.ErrBadConn { tx.closePrepared() } - tx.close(err) + tx.closeLocked(err) return err } +var rollbackHook func() + // rollback aborts the transaction and optionally forces the pool to discard // the connection. func (tx *Tx) rollback(discardConn bool) error { if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { return ErrTxDone } + + if rollbackHook != nil { + rollbackHook() + } + + // Cancel the Tx to release any active R-closemu locks. + // This is safe to do because tx.done has already transitioned + // from 0 to 1. Hold the W-closemu lock prior to rollback + // to ensure no other connection has an active query. + tx.cancel() + tx.closemu.Lock() + defer tx.closemu.Unlock() + var err error withLock(tx.dc, func() { err = tx.txi.Rollback() @@ -2117,7 +2201,7 @@ func (tx *Tx) rollback(discardConn bool) error { if discardConn { err = driver.ErrBadConn } - tx.close(err) + tx.closeLocked(err) return err } @@ -2709,10 +2793,17 @@ func (rs *Rows) lasterrOrErrLocked(err error) error { return err } +// bypassRowsAwaitDone is only used for testing. +// If true, it will not close the Rows automatically from the context. +var bypassRowsAwaitDone = false + func (rs *Rows) initContextClose(ctx, txctx context.Context) { if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) { return } + if bypassRowsAwaitDone { + return + } ctx, rs.cancel = context.WithCancel(ctx) go rs.awaitDone(ctx, txctx) } @@ -2922,10 +3013,11 @@ func (ci *ColumnType) Nullable() (nullable, ok bool) { } // DatabaseTypeName returns the database system name of the column type. If an empty -// string is returned the driver type name is not supported. +// string is returned, then the driver type name is not supported. // Consult your driver documentation for a list of driver data types. Length specifiers // are not included. -// Common type include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL", "INT", "BIGINT". +// Common type names include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL", +// "INT", and "BIGINT". func (ci *ColumnType) DatabaseTypeName() string { return ci.databaseType } @@ -3140,6 +3232,14 @@ func (r *Row) Scan(dest ...interface{}) error { return r.rows.Close() } +// Err provides a way for wrapping packages to check for +// query errors without calling Scan. +// Err returns the error, if any, that was encountered while running the query. +// If this error is not nil, this error will also be returned from Scan. +func (r *Row) Err() error { + return r.err +} + // A Result summarizes an executed SQL command. type Result interface { // LastInsertId returns the integer generated by the database diff --git a/libgo/go/database/sql/sql_test.go b/libgo/go/database/sql/sql_test.go index 6f59260..5727f0d 100644 --- a/libgo/go/database/sql/sql_test.go +++ b/libgo/go/database/sql/sql_test.go @@ -80,6 +80,11 @@ func newTestDBConnector(t testing.TB, fc *fakeConnector, name string) *DB { exec(t, db, "CREATE|magicquery|op=string,millis=int32") exec(t, db, "INSERT|magicquery|op=sleep,millis=10") } + if name == "tx_status" { + // Magic table name and column, known by fakedb_test.go. + exec(t, db, "CREATE|tx_status|tx_status=string") + exec(t, db, "INSERT|tx_status|tx_status=invalid") + } return db } @@ -437,6 +442,7 @@ func TestTxContextWait(t *testing.T) { } t.Fatal(err) } + tx.keepConnOnRollback = false // This will trigger the *fakeConn.Prepare method which will take time // performing the query. The ctxDriverPrepare func will check the context @@ -449,6 +455,35 @@ func TestTxContextWait(t *testing.T) { waitForFree(t, db, 5*time.Second, 0) } +// TestTxContextWaitNoDiscard is the same as TestTxContextWait, but should not discard +// the final connection. +func TestTxContextWaitNoDiscard(t *testing.T) { + db := newTestDB(t, "people") + defer closeDB(t, db) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) + defer cancel() + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + // Guard against the context being canceled before BeginTx completes. + if err == context.DeadlineExceeded { + t.Skip("tx context canceled prior to first use") + } + t.Fatal(err) + } + + // This will trigger the *fakeConn.Prepare method which will take time + // performing the query. The ctxDriverPrepare func will check the context + // after this and close the rows and return an error. + _, err = tx.QueryContext(ctx, "WAIT|1s|SELECT|people|age,name|") + if err != context.DeadlineExceeded { + t.Fatalf("expected QueryContext to error with context deadline exceeded but returned %v", err) + } + + waitForFree(t, db, 5*time.Second, 1) +} + // TestUnsupportedOptions checks that the database fails when a driver that // doesn't implement ConnBeginTx is used with non-default options and an // un-cancellable context. @@ -788,6 +823,24 @@ func TestQueryRow(t *testing.T) { } } +func TestRowErr(t *testing.T) { + db := newTestDB(t, "people") + + err := db.QueryRowContext(context.Background(), "SELECT|people|bdate|age=?", 3).Err() + if err != nil { + t.Errorf("Unexpected err = %v; want %v", err, nil) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = db.QueryRowContext(ctx, "SELECT|people|bdate|age=?", 3).Err() + exp := "context canceled" + if err == nil || !strings.Contains(err.Error(), exp) { + t.Errorf("Expected err = %v; got %v", exp, err) + } +} + func TestTxRollbackCommitErr(t *testing.T) { db := newTestDB(t, "people") defer closeDB(t, db) @@ -1525,6 +1578,37 @@ func TestConnTx(t *testing.T) { } } +// TestConnIsValid verifies that a database connection that should be discarded, +// is actually discarded and does not re-enter the connection pool. +// If the IsValid method from *fakeConn is removed, this test will fail. +func TestConnIsValid(t *testing.T) { + db := newTestDB(t, "people") + defer closeDB(t, db) + + db.SetMaxOpenConns(1) + + ctx := context.Background() + + c, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + + err = c.Raw(func(raw interface{}) error { + dc := raw.(*fakeConn) + dc.stickyBad = true + return nil + }) + if err != nil { + t.Fatal(err) + } + c.Close() + + if len(db.freeConn) > 0 && db.freeConn[0].ci.(*fakeConn).stickyBad { + t.Fatal("bad connection returned to pool; expected bad connection to be discarded") + } +} + // Tests fix for issue 2542, that we release a lock when querying on // a closed connection. func TestIssue2542Deadlock(t *testing.T) { @@ -2658,6 +2742,163 @@ func TestManyErrBadConn(t *testing.T) { } } +// Issue 34775: Ensure that a Tx cannot commit after a rollback. +func TestTxCannotCommitAfterRollback(t *testing.T) { + db := newTestDB(t, "tx_status") + defer closeDB(t, db) + + // First check query reporting is correct. + var txStatus string + err := db.QueryRow("SELECT|tx_status|tx_status|").Scan(&txStatus) + if err != nil { + t.Fatal(err) + } + if g, w := txStatus, "autocommit"; g != w { + t.Fatalf("tx_status=%q, wanted %q", g, w) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + t.Fatal(err) + } + + // Ignore dirty session for this test. + // A failing test should trigger the dirty session flag as well, + // but that isn't exactly what this should test for. + tx.txi.(*fakeTx).c.skipDirtySession = true + + defer tx.Rollback() + + err = tx.QueryRow("SELECT|tx_status|tx_status|").Scan(&txStatus) + if err != nil { + t.Fatal(err) + } + if g, w := txStatus, "transaction"; g != w { + t.Fatalf("tx_status=%q, wanted %q", g, w) + } + + // 1. Begin a transaction. + // 2. (A) Start a query, (B) begin Tx rollback through a ctx cancel. + // 3. Check if 2.A has committed in Tx (pass) or outside of Tx (fail). + sendQuery := make(chan struct{}) + // The Tx status is returned through the row results, ensure + // that the rows results are not cancelled. + bypassRowsAwaitDone = true + hookTxGrabConn = func() { + cancel() + <-sendQuery + } + rollbackHook = func() { + close(sendQuery) + } + defer func() { + hookTxGrabConn = nil + rollbackHook = nil + bypassRowsAwaitDone = false + }() + + err = tx.QueryRow("SELECT|tx_status|tx_status|").Scan(&txStatus) + if err != nil { + // A failure here would be expected if skipDirtySession was not set to true above. + t.Fatal(err) + } + if g, w := txStatus, "transaction"; g != w { + t.Fatalf("tx_status=%q, wanted %q", g, w) + } +} + +// Issue32530 encounters an issue where a connection may +// expire right after it comes out of a used connection pool +// even when a new connection is requested. +func TestConnExpiresFreshOutOfPool(t *testing.T) { + execCases := []struct { + expired bool + badReset bool + }{ + {false, false}, + {true, false}, + {false, true}, + } + + t0 := time.Unix(1000000, 0) + offset := time.Duration(0) + offsetMu := sync.RWMutex{} + + nowFunc = func() time.Time { + offsetMu.RLock() + defer offsetMu.RUnlock() + return t0.Add(offset) + } + defer func() { nowFunc = time.Now }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db := newTestDB(t, "magicquery") + defer closeDB(t, db) + + db.SetMaxOpenConns(1) + + for _, ec := range execCases { + ec := ec + name := fmt.Sprintf("expired=%t,badReset=%t", ec.expired, ec.badReset) + t.Run(name, func(t *testing.T) { + db.clearAllConns(t) + + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(10 * time.Second) + + conn, err := db.conn(ctx, alwaysNewConn) + if err != nil { + t.Fatal(err) + } + + afterPutConn := make(chan struct{}) + waitingForConn := make(chan struct{}) + + go func() { + conn, err := db.conn(ctx, alwaysNewConn) + if err != nil { + t.Fatal(err) + } + db.putConn(conn, err, false) + close(afterPutConn) + }() + go func() { + for { + db.mu.Lock() + ct := len(db.connRequests) + db.mu.Unlock() + if ct > 0 { + close(waitingForConn) + return + } + time.Sleep(10 * time.Millisecond) + } + }() + + <-waitingForConn + + offsetMu.Lock() + if ec.expired { + offset = 11 * time.Second + } else { + offset = time.Duration(0) + } + offsetMu.Unlock() + + conn.ci.(*fakeConn).stickyBad = ec.badReset + + db.putConn(conn, err, true) + + <-afterPutConn + }) + } +} + // TestIssue20575 ensures the Rows from query does not block // closing a transaction. Ensure Rows is closed while closing a trasaction. func TestIssue20575(t *testing.T) { @@ -3593,6 +3834,61 @@ func TestStatsMaxIdleClosedTen(t *testing.T) { } } +func TestMaxIdleTime(t *testing.T) { + list := []struct { + wantMaxIdleTime time.Duration + wantIdleClosed int64 + timeOffset time.Duration + }{ + {time.Nanosecond, 1, 10 * time.Millisecond}, + {time.Hour, 0, 10 * time.Millisecond}, + } + baseTime := time.Unix(0, 0) + defer func() { + nowFunc = time.Now + }() + for _, item := range list { + nowFunc = func() time.Time { + return baseTime + } + t.Run(fmt.Sprintf("%v", item.wantMaxIdleTime), func(t *testing.T) { + db := newTestDB(t, "people") + defer closeDB(t, db) + + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxIdleTime(item.wantMaxIdleTime) + db.SetConnMaxLifetime(0) + + preMaxIdleClosed := db.Stats().MaxIdleTimeClosed + + if err := db.Ping(); err != nil { + t.Fatal(err) + } + + nowFunc = func() time.Time { + return baseTime.Add(item.timeOffset) + } + + db.mu.Lock() + closing := db.connectionCleanerRunLocked() + db.mu.Unlock() + for _, c := range closing { + c.Close() + } + if g, w := int64(len(closing)), item.wantIdleClosed; g != w { + t.Errorf("got: %d; want %d closed conns", g, w) + } + + st := db.Stats() + maxIdleClosed := st.MaxIdleTimeClosed - preMaxIdleClosed + if g, w := maxIdleClosed, item.wantIdleClosed; g != w { + t.Errorf(" got: %d; want %d max idle closed conns", g, w) + } + }) + } +} + type nvcDriver struct { fakeDriver skipNamedValueCheck bool |