aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/database/sql
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/database/sql')
-rw-r--r--libgo/go/database/sql/driver/driver.go61
-rw-r--r--libgo/go/database/sql/fakedb_test.go45
-rw-r--r--libgo/go/database/sql/sql.go350
-rw-r--r--libgo/go/database/sql/sql_test.go296
4 files changed, 617 insertions, 135 deletions
diff --git a/libgo/go/database/sql/driver/driver.go b/libgo/go/database/sql/driver/driver.go
index 316e7ce..5bbcf20 100644
--- a/libgo/go/database/sql/driver/driver.go
+++ b/libgo/go/database/sql/driver/driver.go
@@ -6,6 +6,35 @@
// drivers as used by package sql.
//
// Most code should use package sql.
+//
+// The driver interface has evolved over time. Drivers should implement
+// Connector and DriverContext interfaces.
+// The Connector.Connect and Driver.Open methods should never return ErrBadConn.
+// ErrBadConn should only be returned from Validator, SessionResetter, or
+// a query method if the connection is already in an invalid (e.g. closed) state.
+//
+// All Conn implementations should implement the following interfaces:
+// Pinger, SessionResetter, and Validator.
+//
+// If named parameters or context are supported, the driver's Conn should implement:
+// ExecerContext, QueryerContext, ConnPrepareContext, and ConnBeginTx.
+//
+// To support custom data types, implement NamedValueChecker. NamedValueChecker
+// also allows queries to accept per-query options as a parameter by returning
+// ErrRemoveArgument from CheckNamedValue.
+//
+// If multiple result sets are supported, Rows should implement RowsNextResultSet.
+// If the driver knows how to describe the types present in the returned result
+// it should implement the following interfaces: RowsColumnTypeScanType,
+// RowsColumnTypeDatabaseTypeName, RowsColumnTypeLength, RowsColumnTypeNullable,
+// and RowsColumnTypePrecisionScale. A given row value may also return a Rows
+// type, which may represent a database cursor value.
+//
+// Before a connection is returned to the connection pool after use, IsValid is
+// called if implemented. Before a connection is reused for another query,
+// ResetSession is called if implemented. If a connection is never returned to the
+// connection pool but immediately reused, then ResetSession is called prior to
+// reuse but IsValid is not called.
package driver
import (
@@ -67,7 +96,7 @@ type Driver interface {
// If a Driver implements DriverContext, then sql.DB will call
// OpenConnector to obtain a Connector and then invoke
-// that Connector's Conn method to obtain each needed connection,
+// that Connector's Connect method to obtain each needed connection,
// instead of invoking the Driver's Open method for each connection.
// The two-step sequence allows drivers to parse the name just once
// and also provides access to per-Conn contexts.
@@ -94,7 +123,9 @@ type Connector interface {
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
- // other purposes.
+ // other purposes. A default timeout should still be used
+ // when dialing as a connection pool may call Connect
+ // asynchronously to any query.
//
// The returned connection is only used by one goroutine at a
// time.
@@ -205,6 +236,9 @@ type Conn interface {
// connections and only calls Close when there's a surplus of
// idle connections, it shouldn't be necessary for drivers to
// do their own connection caching.
+ //
+ // Drivers must ensure all network calls made by Close
+ // do not block indefinitely (e.g. apply a timeout).
Close() error
// Begin starts and returns a new transaction.
@@ -255,15 +289,23 @@ type ConnBeginTx interface {
// SessionResetter may be implemented by Conn to allow drivers to reset the
// session state associated with the connection and to signal a bad connection.
type SessionResetter interface {
- // ResetSession is called while a connection is in the connection
- // pool. No queries will run on this connection until this method returns.
- //
- // If the connection is bad this should return driver.ErrBadConn to prevent
- // the connection from being returned to the connection pool. Any other
- // error will be discarded.
+ // ResetSession is called prior to executing a query on the connection
+ // if the connection has been used before. If the driver returns ErrBadConn
+ // the connection is discarded.
ResetSession(ctx context.Context) error
}
+// Validator may be implemented by Conn to allow drivers to
+// signal if a connection is valid or if it should be discarded.
+//
+// If implemented, drivers may return the underlying error from queries,
+// even if the connection should be discarded by the connection pool.
+type Validator interface {
+ // IsValid is called prior to placing the connection into the
+ // connection pool. The connection will be discarded if false is returned.
+ IsValid() bool
+}
+
// Result is the result of a query execution.
type Result interface {
// LastInsertId returns the database's auto-generated ID
@@ -283,6 +325,9 @@ type Stmt interface {
//
// As of Go 1.1, a Stmt will not be closed if it's in use
// by any queries.
+ //
+ // Drivers must ensure all network calls made by Close
+ // do not block indefinitely (e.g. apply a timeout).
Close() error
// NumInput returns the number of placeholder parameters.
diff --git a/libgo/go/database/sql/fakedb_test.go b/libgo/go/database/sql/fakedb_test.go
index a0028be..7605a2a 100644
--- a/libgo/go/database/sql/fakedb_test.go
+++ b/libgo/go/database/sql/fakedb_test.go
@@ -390,12 +390,19 @@ func setStrictFakeConnClose(t *testing.T) {
func (c *fakeConn) ResetSession(ctx context.Context) error {
c.dirtySession = false
+ c.currTx = nil
if c.isBad() {
return driver.ErrBadConn
}
return nil
}
+var _ driver.Validator = (*fakeConn)(nil)
+
+func (c *fakeConn) IsValid() bool {
+ return !c.isBad()
+}
+
func (c *fakeConn) Close() (err error) {
drv := fdriver.(*fakeDriver)
defer func() {
@@ -728,6 +735,9 @@ var hookExecBadConn func() bool
func (s *fakeStmt) Exec(args []driver.Value) (driver.Result, error) {
panic("Using ExecContext")
}
+
+var errFakeConnSessionDirty = errors.New("fakedb: session is dirty")
+
func (s *fakeStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
if s.panic == "Exec" {
panic(s.panic)
@@ -740,7 +750,7 @@ func (s *fakeStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (d
return nil, driver.ErrBadConn
}
if s.c.isDirtyAndMark() {
- return nil, errors.New("fakedb: session is dirty")
+ return nil, errFakeConnSessionDirty
}
err := checkSubsetTypes(s.c.db.allowAny, args)
@@ -854,7 +864,7 @@ func (s *fakeStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (
return nil, driver.ErrBadConn
}
if s.c.isDirtyAndMark() {
- return nil, errors.New("fakedb: session is dirty")
+ return nil, errFakeConnSessionDirty
}
err := checkSubsetTypes(s.c.db.allowAny, args)
@@ -887,6 +897,37 @@ func (s *fakeStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (
}
}
}
+ if s.table == "tx_status" && s.colName[0] == "tx_status" {
+ txStatus := "autocommit"
+ if s.c.currTx != nil {
+ txStatus = "transaction"
+ }
+ cursor := &rowsCursor{
+ parentMem: s.c,
+ posRow: -1,
+ rows: [][]*row{
+ []*row{
+ {
+ cols: []interface{}{
+ txStatus,
+ },
+ },
+ },
+ },
+ cols: [][]string{
+ []string{
+ "tx_status",
+ },
+ },
+ colType: [][]string{
+ []string{
+ "string",
+ },
+ },
+ errPos: -1,
+ }
+ return cursor, nil
+ }
t.mu.Lock()
diff --git a/libgo/go/database/sql/sql.go b/libgo/go/database/sql/sql.go
index 0f5bbc0..b3d0653 100644
--- a/libgo/go/database/sql/sql.go
+++ b/libgo/go/database/sql/sql.go
@@ -421,17 +421,18 @@ type DB struct {
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
- resetterCh chan *driverConn
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
- maxIdle int // zero means defaultMaxIdleConns; negative means 0
+ maxIdleCount int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
+ maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed
cleanerCh chan struct{}
waitCount int64 // Total number of connections waited for.
- maxIdleClosed int64 // Total number of connections closed due to idle.
- maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
+ maxIdleClosed int64 // Total number of connections closed due to idle count.
+ maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
+ maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.
stop func() // stop cancels the connection opener and the session resetter.
}
@@ -458,15 +459,16 @@ type driverConn struct {
sync.Mutex // guards following
ci driver.Conn
+ needReset bool // The connection session should be reset before use if true.
closed bool
finalClosed bool // ci.Close has been called
openStmt map[*driverStmt]bool
- lastErr error // lastError captures the result of the session resetter.
// guarded by db.mu
inUse bool
- onPut []func() // code (with db.mu held) run when conn is next returned
- dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
+ returnedAt time.Time // Time the connection was created or returned.
+ onPut []func() // code (with db.mu held) run when conn is next returned
+ dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}
func (dc *driverConn) releaseConn(err error) {
@@ -486,6 +488,36 @@ func (dc *driverConn) expired(timeout time.Duration) bool {
return dc.createdAt.Add(timeout).Before(nowFunc())
}
+// resetSession checks if the driver connection needs the
+// session to be reset and if required, resets it.
+func (dc *driverConn) resetSession(ctx context.Context) error {
+ dc.Lock()
+ defer dc.Unlock()
+
+ if !dc.needReset {
+ return nil
+ }
+ if cr, ok := dc.ci.(driver.SessionResetter); ok {
+ return cr.ResetSession(ctx)
+ }
+ return nil
+}
+
+// validateConnection checks if the connection is valid and can
+// still be used. It also marks the session for reset if required.
+func (dc *driverConn) validateConnection(needsReset bool) bool {
+ dc.Lock()
+ defer dc.Unlock()
+
+ if needsReset {
+ dc.needReset = true
+ }
+ if cv, ok := dc.ci.(driver.Validator); ok {
+ return cv.IsValid()
+ }
+ return true
+}
+
// prepareLocked prepares the query on dc. When cg == nil the dc must keep track of
// the prepared statements in a pool.
func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
@@ -511,19 +543,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que
return ds, nil
}
-// resetSession resets the connection session and sets the lastErr
-// that is checked before returning the connection to another query.
-//
-// resetSession assumes that the embedded mutex is locked when the connection
-// was returned to the pool. This unlocks the mutex.
-func (dc *driverConn) resetSession(ctx context.Context) {
- defer dc.Unlock() // In case of panic.
- if dc.closed { // Check if the database has been closed.
- return
- }
- dc.lastErr = dc.ci.(driver.SessionResetter).ResetSession(ctx)
-}
-
// the dc.db's Mutex is held.
func (dc *driverConn) closeDBLocked() func() error {
dc.Lock()
@@ -713,14 +732,12 @@ func OpenDB(c driver.Connector) *DB {
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
- resetterCh: make(chan *driverConn, 50),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
- go db.connectionResetter(ctx)
return db
}
@@ -839,7 +856,7 @@ func (db *DB) Close() error {
const defaultMaxIdleConns = 2
func (db *DB) maxIdleConnsLocked() int {
- n := db.maxIdle
+ n := db.maxIdleCount
switch {
case n == 0:
// TODO(bradfitz): ask driver, if supported, for its default preference
@@ -851,6 +868,14 @@ func (db *DB) maxIdleConnsLocked() int {
}
}
+func (db *DB) shortestIdleTimeLocked() time.Duration {
+ min := db.maxIdleTime
+ if min > db.maxLifetime {
+ min = db.maxLifetime
+ }
+ return min
+}
+
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool.
//
@@ -864,14 +889,14 @@ func (db *DB) maxIdleConnsLocked() int {
func (db *DB) SetMaxIdleConns(n int) {
db.mu.Lock()
if n > 0 {
- db.maxIdle = n
+ db.maxIdleCount = n
} else {
// No idle connections.
- db.maxIdle = -1
+ db.maxIdleCount = -1
}
// Make sure maxIdle doesn't exceed maxOpen
if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
- db.maxIdle = db.maxOpen
+ db.maxIdleCount = db.maxOpen
}
var closing []*driverConn
idleCount := len(db.freeConn)
@@ -912,13 +937,13 @@ func (db *DB) SetMaxOpenConns(n int) {
//
// Expired connections may be closed lazily before reuse.
//
-// If d <= 0, connections are reused forever.
+// If d <= 0, connections are not closed due to a connection's age.
func (db *DB) SetConnMaxLifetime(d time.Duration) {
if d < 0 {
d = 0
}
db.mu.Lock()
- // wake cleaner up when lifetime is shortened.
+ // Wake cleaner up when lifetime is shortened.
if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
select {
case db.cleanerCh <- struct{}{}:
@@ -930,11 +955,34 @@ func (db *DB) SetConnMaxLifetime(d time.Duration) {
db.mu.Unlock()
}
+// SetConnMaxIdleTime sets the maximum amount of time a connection may be idle.
+//
+// Expired connections may be closed lazily before reuse.
+//
+// If d <= 0, connections are not closed due to a connection's idle time.
+func (db *DB) SetConnMaxIdleTime(d time.Duration) {
+ if d < 0 {
+ d = 0
+ }
+ db.mu.Lock()
+ defer db.mu.Unlock()
+
+ // Wake cleaner up when idle time is shortened.
+ if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
+ select {
+ case db.cleanerCh <- struct{}{}:
+ default:
+ }
+ }
+ db.maxIdleTime = d
+ db.startCleanerLocked()
+}
+
// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {
- if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
+ if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
db.cleanerCh = make(chan struct{}, 1)
- go db.connectionCleaner(db.maxLifetime)
+ go db.connectionCleaner(db.shortestIdleTimeLocked())
}
}
@@ -953,15 +1001,30 @@ func (db *DB) connectionCleaner(d time.Duration) {
}
db.mu.Lock()
- d = db.maxLifetime
+
+ d = db.shortestIdleTimeLocked()
if db.closed || db.numOpen == 0 || d <= 0 {
db.cleanerCh = nil
db.mu.Unlock()
return
}
- expiredSince := nowFunc().Add(-d)
- var closing []*driverConn
+ closing := db.connectionCleanerRunLocked()
+ db.mu.Unlock()
+ for _, c := range closing {
+ c.Close()
+ }
+
+ if d < minInterval {
+ d = minInterval
+ }
+ t.Reset(d)
+ }
+}
+
+func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {
+ if db.maxLifetime > 0 {
+ expiredSince := nowFunc().Add(-db.maxLifetime)
for i := 0; i < len(db.freeConn); i++ {
c := db.freeConn[i]
if c.createdAt.Before(expiredSince) {
@@ -974,17 +1037,26 @@ func (db *DB) connectionCleaner(d time.Duration) {
}
}
db.maxLifetimeClosed += int64(len(closing))
- db.mu.Unlock()
-
- for _, c := range closing {
- c.Close()
- }
+ }
- if d < minInterval {
- d = minInterval
+ if db.maxIdleTime > 0 {
+ expiredSince := nowFunc().Add(-db.maxIdleTime)
+ var expiredCount int64
+ for i := 0; i < len(db.freeConn); i++ {
+ c := db.freeConn[i]
+ if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {
+ closing = append(closing, c)
+ expiredCount++
+ last := len(db.freeConn) - 1
+ db.freeConn[i] = db.freeConn[last]
+ db.freeConn[last] = nil
+ db.freeConn = db.freeConn[:last]
+ i--
+ }
}
- t.Reset(d)
+ db.maxIdleTimeClosed += expiredCount
}
+ return
}
// DBStats contains database statistics.
@@ -1000,6 +1072,7 @@ type DBStats struct {
WaitCount int64 // The total number of connections waited for.
WaitDuration time.Duration // The total time blocked waiting for a new connection.
MaxIdleClosed int64 // The total number of connections closed due to SetMaxIdleConns.
+ MaxIdleTimeClosed int64 // The total number of connections closed due to SetConnMaxIdleTime.
MaxLifetimeClosed int64 // The total number of connections closed due to SetConnMaxLifetime.
}
@@ -1020,6 +1093,7 @@ func (db *DB) Stats() DBStats {
WaitCount: db.waitCount,
WaitDuration: time.Duration(wait),
MaxIdleClosed: db.maxIdleClosed,
+ MaxIdleTimeClosed: db.maxIdleTimeClosed,
MaxLifetimeClosed: db.maxLifetimeClosed,
}
return stats
@@ -1058,23 +1132,6 @@ func (db *DB) connectionOpener(ctx context.Context) {
}
}
-// connectionResetter runs in a separate goroutine to reset connections async
-// to exported API.
-func (db *DB) connectionResetter(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- close(db.resetterCh)
- for dc := range db.resetterCh {
- dc.Unlock()
- }
- return
- case dc := <-db.resetterCh:
- dc.resetSession(ctx)
- }
- }
-}
-
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
@@ -1097,9 +1154,10 @@ func (db *DB) openNewConnection(ctx context.Context) {
return
}
dc := &driverConn{
- db: db,
- createdAt: nowFunc(),
- ci: ci,
+ db: db,
+ createdAt: nowFunc(),
+ returnedAt: nowFunc(),
+ ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
@@ -1150,19 +1208,20 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
- db.mu.Unlock()
if conn.expired(lifetime) {
+ db.maxLifetimeClosed++
+ db.mu.Unlock()
conn.Close()
return nil, driver.ErrBadConn
}
- // Lock around reading lastErr to ensure the session resetter finished.
- conn.Lock()
- err := conn.lastErr
- conn.Unlock()
- if err == driver.ErrBadConn {
+ db.mu.Unlock()
+
+ // Reset the session if required.
+ if err := conn.resetSession(ctx); err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
+
return conn, nil
}
@@ -1177,7 +1236,7 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
db.waitCount++
db.mu.Unlock()
- waitStart := time.Now()
+ waitStart := nowFunc()
// Timeout the connection request with the context.
select {
@@ -1204,18 +1263,25 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
if !ok {
return nil, errDBClosed
}
- if ret.err == nil && ret.conn.expired(lifetime) {
+ // Only check if the connection is expired if the strategy is cachedOrNewConns.
+ // If we require a new connection, just re-use the connection without looking
+ // at the expiry time. If it is expired, it will be checked when it is placed
+ // back into the connection pool.
+ // This prioritizes giving a valid connection to a client over the exact connection
+ // lifetime, which could expire exactly after this point anyway.
+ if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
+ db.mu.Lock()
+ db.maxLifetimeClosed++
+ db.mu.Unlock()
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
- // Lock around reading lastErr to ensure the session resetter finished.
- ret.conn.Lock()
- err := ret.conn.lastErr
- ret.conn.Unlock()
- if err == driver.ErrBadConn {
+
+ // Reset the session if required.
+ if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
@@ -1235,10 +1301,11 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
}
db.mu.Lock()
dc := &driverConn{
- db: db,
- createdAt: nowFunc(),
- ci: ci,
- inUse: true,
+ db: db,
+ createdAt: nowFunc(),
+ returnedAt: nowFunc(),
+ ci: ci,
+ inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
@@ -1275,17 +1342,29 @@ const debugGetPut = false
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
+ if err != driver.ErrBadConn {
+ if !dc.validateConnection(resetSession) {
+ err = driver.ErrBadConn
+ }
+ }
db.mu.Lock()
if !dc.inUse {
+ db.mu.Unlock()
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
+
+ if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
+ db.maxLifetimeClosed++
+ err = driver.ErrBadConn
+ }
if debugGetPut {
db.lastPut[dc] = stack()
}
dc.inUse = false
+ dc.returnedAt = nowFunc()
for _, fn := range dc.onPut {
fn()
@@ -1305,41 +1384,13 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
if putConnHook != nil {
putConnHook(db, dc)
}
- if db.closed {
- // Connections do not need to be reset if they will be closed.
- // Prevents writing to resetterCh after the DB has closed.
- resetSession = false
- }
- if resetSession {
- if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
- // Lock the driverConn here so it isn't released until
- // the connection is reset.
- // The lock must be taken before the connection is put into
- // the pool to prevent it from being taken out before it is reset.
- dc.Lock()
- }
- }
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
if !added {
- if resetSession {
- dc.Unlock()
- }
dc.Close()
return
}
- if !resetSession {
- return
- }
- select {
- default:
- // If the resetterCh is blocking then mark the connection
- // as bad and continue on.
- dc.lastErr = driver.ErrBadConn
- dc.Unlock()
- case db.resetterCh <- dc:
- }
}
// Satisfy a connRequest or put the driverConn in the idle pool and return true
@@ -1701,7 +1752,11 @@ func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStra
// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
+ keepConnOnRollback := false
withLock(dc, func() {
+ _, hasSessionResetter := dc.ci.(driver.SessionResetter)
+ _, hasConnectionValidator := dc.ci.(driver.Validator)
+ keepConnOnRollback = hasSessionResetter && hasConnectionValidator
txi, err = ctxDriverBegin(ctx, opts, dc.ci)
})
if err != nil {
@@ -1713,12 +1768,13 @@ func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error),
// The cancel function in Tx will be called after done is set to true.
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
- db: db,
- dc: dc,
- releaseConn: release,
- txi: txi,
- cancel: cancel,
- ctx: ctx,
+ db: db,
+ dc: dc,
+ releaseConn: release,
+ txi: txi,
+ cancel: cancel,
+ keepConnOnRollback: keepConnOnRollback,
+ ctx: ctx,
}
go tx.awaitDone()
return tx, nil
@@ -1980,6 +2036,11 @@ type Tx struct {
// Use atomic operations on value when checking value.
done int32
+ // keepConnOnRollback is true if the driver knows
+ // how to reset the connection's session and if need be discard
+ // the connection.
+ keepConnOnRollback bool
+
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
@@ -2005,7 +2066,10 @@ func (tx *Tx) awaitDone() {
// transaction is closed and the resources are released. This
// rollback does nothing if the transaction has already been
// committed or rolled back.
- tx.rollback(true)
+ // Do not discard the connection if the connection knows
+ // how to reset the session.
+ discardConnection := !tx.keepConnOnRollback
+ tx.rollback(discardConnection)
}
func (tx *Tx) isDone() bool {
@@ -2016,14 +2080,10 @@ func (tx *Tx) isDone() bool {
// that has already been committed or rolled back.
var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
-// close returns the connection to the pool and
-// must only be called by Tx.rollback or Tx.Commit.
-func (tx *Tx) close(err error) {
- tx.cancel()
-
- tx.closemu.Lock()
- defer tx.closemu.Unlock()
-
+// closeLocked returns the connection to the pool and
+// must only be called by Tx.rollback or Tx.Commit while
+// closemu is Locked and tx already canceled.
+func (tx *Tx) closeLocked(err error) {
tx.releaseConn(err)
tx.dc = nil
tx.txi = nil
@@ -2090,6 +2150,15 @@ func (tx *Tx) Commit() error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
+
+ // Cancel the Tx to release any active R-closemu locks.
+ // This is safe to do because tx.done has already transitioned
+ // from 0 to 1. Hold the W-closemu lock prior to rollback
+ // to ensure no other connection has an active query.
+ tx.cancel()
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit()
@@ -2097,16 +2166,31 @@ func (tx *Tx) Commit() error {
if err != driver.ErrBadConn {
tx.closePrepared()
}
- tx.close(err)
+ tx.closeLocked(err)
return err
}
+var rollbackHook func()
+
// rollback aborts the transaction and optionally forces the pool to discard
// the connection.
func (tx *Tx) rollback(discardConn bool) error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
+
+ if rollbackHook != nil {
+ rollbackHook()
+ }
+
+ // Cancel the Tx to release any active R-closemu locks.
+ // This is safe to do because tx.done has already transitioned
+ // from 0 to 1. Hold the W-closemu lock prior to rollback
+ // to ensure no other connection has an active query.
+ tx.cancel()
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
var err error
withLock(tx.dc, func() {
err = tx.txi.Rollback()
@@ -2117,7 +2201,7 @@ func (tx *Tx) rollback(discardConn bool) error {
if discardConn {
err = driver.ErrBadConn
}
- tx.close(err)
+ tx.closeLocked(err)
return err
}
@@ -2709,10 +2793,17 @@ func (rs *Rows) lasterrOrErrLocked(err error) error {
return err
}
+// bypassRowsAwaitDone is only used for testing.
+// If true, it will not close the Rows automatically from the context.
+var bypassRowsAwaitDone = false
+
func (rs *Rows) initContextClose(ctx, txctx context.Context) {
if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
return
}
+ if bypassRowsAwaitDone {
+ return
+ }
ctx, rs.cancel = context.WithCancel(ctx)
go rs.awaitDone(ctx, txctx)
}
@@ -2922,10 +3013,11 @@ func (ci *ColumnType) Nullable() (nullable, ok bool) {
}
// DatabaseTypeName returns the database system name of the column type. If an empty
-// string is returned the driver type name is not supported.
+// string is returned, then the driver type name is not supported.
// Consult your driver documentation for a list of driver data types. Length specifiers
// are not included.
-// Common type include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL", "INT", "BIGINT".
+// Common type names include "VARCHAR", "TEXT", "NVARCHAR", "DECIMAL", "BOOL",
+// "INT", and "BIGINT".
func (ci *ColumnType) DatabaseTypeName() string {
return ci.databaseType
}
@@ -3140,6 +3232,14 @@ func (r *Row) Scan(dest ...interface{}) error {
return r.rows.Close()
}
+// Err provides a way for wrapping packages to check for
+// query errors without calling Scan.
+// Err returns the error, if any, that was encountered while running the query.
+// If this error is not nil, this error will also be returned from Scan.
+func (r *Row) Err() error {
+ return r.err
+}
+
// A Result summarizes an executed SQL command.
type Result interface {
// LastInsertId returns the integer generated by the database
diff --git a/libgo/go/database/sql/sql_test.go b/libgo/go/database/sql/sql_test.go
index 6f59260..5727f0d 100644
--- a/libgo/go/database/sql/sql_test.go
+++ b/libgo/go/database/sql/sql_test.go
@@ -80,6 +80,11 @@ func newTestDBConnector(t testing.TB, fc *fakeConnector, name string) *DB {
exec(t, db, "CREATE|magicquery|op=string,millis=int32")
exec(t, db, "INSERT|magicquery|op=sleep,millis=10")
}
+ if name == "tx_status" {
+ // Magic table name and column, known by fakedb_test.go.
+ exec(t, db, "CREATE|tx_status|tx_status=string")
+ exec(t, db, "INSERT|tx_status|tx_status=invalid")
+ }
return db
}
@@ -437,6 +442,7 @@ func TestTxContextWait(t *testing.T) {
}
t.Fatal(err)
}
+ tx.keepConnOnRollback = false
// This will trigger the *fakeConn.Prepare method which will take time
// performing the query. The ctxDriverPrepare func will check the context
@@ -449,6 +455,35 @@ func TestTxContextWait(t *testing.T) {
waitForFree(t, db, 5*time.Second, 0)
}
+// TestTxContextWaitNoDiscard is the same as TestTxContextWait, but should not discard
+// the final connection.
+func TestTxContextWaitNoDiscard(t *testing.T) {
+ db := newTestDB(t, "people")
+ defer closeDB(t, db)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
+ defer cancel()
+
+ tx, err := db.BeginTx(ctx, nil)
+ if err != nil {
+ // Guard against the context being canceled before BeginTx completes.
+ if err == context.DeadlineExceeded {
+ t.Skip("tx context canceled prior to first use")
+ }
+ t.Fatal(err)
+ }
+
+ // This will trigger the *fakeConn.Prepare method which will take time
+ // performing the query. The ctxDriverPrepare func will check the context
+ // after this and close the rows and return an error.
+ _, err = tx.QueryContext(ctx, "WAIT|1s|SELECT|people|age,name|")
+ if err != context.DeadlineExceeded {
+ t.Fatalf("expected QueryContext to error with context deadline exceeded but returned %v", err)
+ }
+
+ waitForFree(t, db, 5*time.Second, 1)
+}
+
// TestUnsupportedOptions checks that the database fails when a driver that
// doesn't implement ConnBeginTx is used with non-default options and an
// un-cancellable context.
@@ -788,6 +823,24 @@ func TestQueryRow(t *testing.T) {
}
}
+func TestRowErr(t *testing.T) {
+ db := newTestDB(t, "people")
+
+ err := db.QueryRowContext(context.Background(), "SELECT|people|bdate|age=?", 3).Err()
+ if err != nil {
+ t.Errorf("Unexpected err = %v; want %v", err, nil)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ err = db.QueryRowContext(ctx, "SELECT|people|bdate|age=?", 3).Err()
+ exp := "context canceled"
+ if err == nil || !strings.Contains(err.Error(), exp) {
+ t.Errorf("Expected err = %v; got %v", exp, err)
+ }
+}
+
func TestTxRollbackCommitErr(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)
@@ -1525,6 +1578,37 @@ func TestConnTx(t *testing.T) {
}
}
+// TestConnIsValid verifies that a database connection that should be discarded,
+// is actually discarded and does not re-enter the connection pool.
+// If the IsValid method from *fakeConn is removed, this test will fail.
+func TestConnIsValid(t *testing.T) {
+ db := newTestDB(t, "people")
+ defer closeDB(t, db)
+
+ db.SetMaxOpenConns(1)
+
+ ctx := context.Background()
+
+ c, err := db.Conn(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = c.Raw(func(raw interface{}) error {
+ dc := raw.(*fakeConn)
+ dc.stickyBad = true
+ return nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ c.Close()
+
+ if len(db.freeConn) > 0 && db.freeConn[0].ci.(*fakeConn).stickyBad {
+ t.Fatal("bad connection returned to pool; expected bad connection to be discarded")
+ }
+}
+
// Tests fix for issue 2542, that we release a lock when querying on
// a closed connection.
func TestIssue2542Deadlock(t *testing.T) {
@@ -2658,6 +2742,163 @@ func TestManyErrBadConn(t *testing.T) {
}
}
+// Issue 34775: Ensure that a Tx cannot commit after a rollback.
+func TestTxCannotCommitAfterRollback(t *testing.T) {
+ db := newTestDB(t, "tx_status")
+ defer closeDB(t, db)
+
+ // First check query reporting is correct.
+ var txStatus string
+ err := db.QueryRow("SELECT|tx_status|tx_status|").Scan(&txStatus)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if g, w := txStatus, "autocommit"; g != w {
+ t.Fatalf("tx_status=%q, wanted %q", g, w)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ tx, err := db.BeginTx(ctx, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Ignore dirty session for this test.
+ // A failing test should trigger the dirty session flag as well,
+ // but that isn't exactly what this should test for.
+ tx.txi.(*fakeTx).c.skipDirtySession = true
+
+ defer tx.Rollback()
+
+ err = tx.QueryRow("SELECT|tx_status|tx_status|").Scan(&txStatus)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if g, w := txStatus, "transaction"; g != w {
+ t.Fatalf("tx_status=%q, wanted %q", g, w)
+ }
+
+ // 1. Begin a transaction.
+ // 2. (A) Start a query, (B) begin Tx rollback through a ctx cancel.
+ // 3. Check if 2.A has committed in Tx (pass) or outside of Tx (fail).
+ sendQuery := make(chan struct{})
+ // The Tx status is returned through the row results, ensure
+ // that the rows results are not cancelled.
+ bypassRowsAwaitDone = true
+ hookTxGrabConn = func() {
+ cancel()
+ <-sendQuery
+ }
+ rollbackHook = func() {
+ close(sendQuery)
+ }
+ defer func() {
+ hookTxGrabConn = nil
+ rollbackHook = nil
+ bypassRowsAwaitDone = false
+ }()
+
+ err = tx.QueryRow("SELECT|tx_status|tx_status|").Scan(&txStatus)
+ if err != nil {
+ // A failure here would be expected if skipDirtySession was not set to true above.
+ t.Fatal(err)
+ }
+ if g, w := txStatus, "transaction"; g != w {
+ t.Fatalf("tx_status=%q, wanted %q", g, w)
+ }
+}
+
+// Issue32530 encounters an issue where a connection may
+// expire right after it comes out of a used connection pool
+// even when a new connection is requested.
+func TestConnExpiresFreshOutOfPool(t *testing.T) {
+ execCases := []struct {
+ expired bool
+ badReset bool
+ }{
+ {false, false},
+ {true, false},
+ {false, true},
+ }
+
+ t0 := time.Unix(1000000, 0)
+ offset := time.Duration(0)
+ offsetMu := sync.RWMutex{}
+
+ nowFunc = func() time.Time {
+ offsetMu.RLock()
+ defer offsetMu.RUnlock()
+ return t0.Add(offset)
+ }
+ defer func() { nowFunc = time.Now }()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ db := newTestDB(t, "magicquery")
+ defer closeDB(t, db)
+
+ db.SetMaxOpenConns(1)
+
+ for _, ec := range execCases {
+ ec := ec
+ name := fmt.Sprintf("expired=%t,badReset=%t", ec.expired, ec.badReset)
+ t.Run(name, func(t *testing.T) {
+ db.clearAllConns(t)
+
+ db.SetMaxIdleConns(1)
+ db.SetConnMaxLifetime(10 * time.Second)
+
+ conn, err := db.conn(ctx, alwaysNewConn)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ afterPutConn := make(chan struct{})
+ waitingForConn := make(chan struct{})
+
+ go func() {
+ conn, err := db.conn(ctx, alwaysNewConn)
+ if err != nil {
+ t.Fatal(err)
+ }
+ db.putConn(conn, err, false)
+ close(afterPutConn)
+ }()
+ go func() {
+ for {
+ db.mu.Lock()
+ ct := len(db.connRequests)
+ db.mu.Unlock()
+ if ct > 0 {
+ close(waitingForConn)
+ return
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ }()
+
+ <-waitingForConn
+
+ offsetMu.Lock()
+ if ec.expired {
+ offset = 11 * time.Second
+ } else {
+ offset = time.Duration(0)
+ }
+ offsetMu.Unlock()
+
+ conn.ci.(*fakeConn).stickyBad = ec.badReset
+
+ db.putConn(conn, err, true)
+
+ <-afterPutConn
+ })
+ }
+}
+
// TestIssue20575 ensures the Rows from query does not block
// closing a transaction. Ensure Rows is closed while closing a trasaction.
func TestIssue20575(t *testing.T) {
@@ -3593,6 +3834,61 @@ func TestStatsMaxIdleClosedTen(t *testing.T) {
}
}
+func TestMaxIdleTime(t *testing.T) {
+ list := []struct {
+ wantMaxIdleTime time.Duration
+ wantIdleClosed int64
+ timeOffset time.Duration
+ }{
+ {time.Nanosecond, 1, 10 * time.Millisecond},
+ {time.Hour, 0, 10 * time.Millisecond},
+ }
+ baseTime := time.Unix(0, 0)
+ defer func() {
+ nowFunc = time.Now
+ }()
+ for _, item := range list {
+ nowFunc = func() time.Time {
+ return baseTime
+ }
+ t.Run(fmt.Sprintf("%v", item.wantMaxIdleTime), func(t *testing.T) {
+ db := newTestDB(t, "people")
+ defer closeDB(t, db)
+
+ db.SetMaxOpenConns(1)
+ db.SetMaxIdleConns(1)
+ db.SetConnMaxIdleTime(item.wantMaxIdleTime)
+ db.SetConnMaxLifetime(0)
+
+ preMaxIdleClosed := db.Stats().MaxIdleTimeClosed
+
+ if err := db.Ping(); err != nil {
+ t.Fatal(err)
+ }
+
+ nowFunc = func() time.Time {
+ return baseTime.Add(item.timeOffset)
+ }
+
+ db.mu.Lock()
+ closing := db.connectionCleanerRunLocked()
+ db.mu.Unlock()
+ for _, c := range closing {
+ c.Close()
+ }
+ if g, w := int64(len(closing)), item.wantIdleClosed; g != w {
+ t.Errorf("got: %d; want %d closed conns", g, w)
+ }
+
+ st := db.Stats()
+ maxIdleClosed := st.MaxIdleTimeClosed - preMaxIdleClosed
+ if g, w := maxIdleClosed, item.wantIdleClosed; g != w {
+ t.Errorf(" got: %d; want %d max idle closed conns", g, w)
+ }
+ })
+ }
+}
+
type nvcDriver struct {
fakeDriver
skipNamedValueCheck bool