diff options
Diffstat (limited to 'libgo/go/database/sql/sql.go')
-rw-r--r-- | libgo/go/database/sql/sql.go | 258 |
1 files changed, 208 insertions, 50 deletions
diff --git a/libgo/go/database/sql/sql.go b/libgo/go/database/sql/sql.go index aaa4ea2..d8e7cb7 100644 --- a/libgo/go/database/sql/sql.go +++ b/libgo/go/database/sql/sql.go @@ -21,13 +21,17 @@ import ( "sort" "sync" "sync/atomic" + "time" ) var ( - driversMu sync.Mutex + driversMu sync.RWMutex drivers = make(map[string]driver.Driver) ) +// nowFunc returns the current time; it's overridden in tests. +var nowFunc = time.Now + // Register makes a database driver available by the provided name. // If Register is called twice with the same name or if driver is nil, // it panics. @@ -52,8 +56,8 @@ func unregisterAllDrivers() { // Drivers returns a sorted list of the names of the registered drivers. func Drivers() []string { - driversMu.Lock() - defer driversMu.Unlock() + driversMu.RLock() + defer driversMu.RUnlock() var list []string for name := range drivers { list = append(list, name) @@ -185,8 +189,7 @@ func (n NullBool) Value() (driver.Value, error) { type Scanner interface { // Scan assigns a value from a database driver. // - // The src value will be of one of the following restricted - // set of types: + // The src value will be of one of the following types: // // int64 // float64 @@ -229,19 +232,20 @@ type DB struct { mu sync.Mutex // protects following fields freeConn []*driverConn connRequests []chan connRequest - numOpen int - pendingOpens int + numOpen int // number of opened and pending open connections // Used to signal the need for new connections // a goroutine running connectionOpener() reads on this chan and // maybeOpenNewConnections sends on the chan (one send per needed connection) // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. - openerCh chan struct{} - closed bool - dep map[finalCloser]depSet - lastPut map[*driverConn]string // stacktrace of last conn's put; debug only - maxIdle int // zero means defaultMaxIdleConns; negative means 0 - maxOpen int // <= 0 means unlimited + openerCh chan struct{} + closed bool + dep map[finalCloser]depSet + lastPut map[*driverConn]string // stacktrace of last conn's put; debug only + maxIdle int // zero means defaultMaxIdleConns; negative means 0 + maxOpen int // <= 0 means unlimited + maxLifetime time.Duration // maximum amount of time a connection may be reused + cleanerCh chan struct{} } // connReuseStrategy determines how (*DB).conn returns database connections. @@ -261,7 +265,8 @@ const ( // interfaces returned via that Conn, such as calls on Tx, Stmt, // Result, Rows) type driverConn struct { - db *DB + db *DB + createdAt time.Time sync.Mutex // guards following ci driver.Conn @@ -285,6 +290,13 @@ func (dc *driverConn) removeOpenStmt(si driver.Stmt) { delete(dc.openStmt, si) } +func (dc *driverConn) expired(timeout time.Duration) bool { + if timeout <= 0 { + return false + } + return dc.createdAt.Add(timeout).Before(nowFunc()) +} + func (dc *driverConn) prepareLocked(query string) (driver.Stmt, error) { si, err := dc.ci.Prepare(query) if err == nil { @@ -441,7 +453,7 @@ func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error { } } -// This is the size of the connectionOpener request chan (dn.openerCh). +// This is the size of the connectionOpener request chan (DB.openerCh). // This value should be larger than the maximum typical value // used for db.maxOpen. If maxOpen is significantly larger than // connectionRequestQueueSize then it is possible for ALL calls into the *DB @@ -466,9 +478,9 @@ var connectionRequestQueueSize = 1000000 // function should be called just once. It is rarely necessary to // close a DB. func Open(driverName, dataSourceName string) (*DB, error) { - driversMu.Lock() + driversMu.RLock() driveri, ok := drivers[driverName] - driversMu.Unlock() + driversMu.RUnlock() if !ok { return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName) } @@ -507,6 +519,9 @@ func (db *DB) Close() error { return nil } close(db.openerCh) + if db.cleanerCh != nil { + close(db.cleanerCh) + } var err error fns := make([]func() error, 0, len(db.freeConn)) for _, dc := range db.freeConn { @@ -595,6 +610,84 @@ func (db *DB) SetMaxOpenConns(n int) { } } +// SetConnMaxLifetime sets the maximum amount of time a connection may be reused. +// +// Expired connections may be closed lazily before reuse. +// +// If d <= 0, connections are reused forever. +func (db *DB) SetConnMaxLifetime(d time.Duration) { + if d < 0 { + d = 0 + } + db.mu.Lock() + // wake cleaner up when lifetime is shortened. + if d > 0 && d < db.maxLifetime && db.cleanerCh != nil { + select { + case db.cleanerCh <- struct{}{}: + default: + } + } + db.maxLifetime = d + db.startCleanerLocked() + db.mu.Unlock() +} + +// startCleanerLocked starts connectionCleaner if needed. +func (db *DB) startCleanerLocked() { + if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { + db.cleanerCh = make(chan struct{}, 1) + go db.connectionCleaner(db.maxLifetime) + } +} + +func (db *DB) connectionCleaner(d time.Duration) { + const minInterval = time.Second + + if d < minInterval { + d = minInterval + } + t := time.NewTimer(d) + + for { + select { + case <-t.C: + case <-db.cleanerCh: // maxLifetime was changed or db was closed. + } + + db.mu.Lock() + d = db.maxLifetime + if db.closed || db.numOpen == 0 || d <= 0 { + db.cleanerCh = nil + db.mu.Unlock() + return + } + + expiredSince := nowFunc().Add(-d) + var closing []*driverConn + for i := 0; i < len(db.freeConn); i++ { + c := db.freeConn[i] + if c.createdAt.Before(expiredSince) { + closing = append(closing, c) + last := len(db.freeConn) - 1 + db.freeConn[i] = db.freeConn[last] + db.freeConn[last] = nil + db.freeConn = db.freeConn[:last] + i-- + } + } + db.mu.Unlock() + + for _, c := range closing { + c.Close() + } + + if d < minInterval { + d = minInterval + } + t.Reset(d) + } +} + // DBStats contains database statistics. type DBStats struct { // OpenConnections is the number of open connections to the database. @@ -615,15 +708,15 @@ func (db *DB) Stats() DBStats { // If there are connRequests and the connection limit hasn't been reached, // then tell the connectionOpener to open new connections. func (db *DB) maybeOpenNewConnections() { - numRequests := len(db.connRequests) - db.pendingOpens + numRequests := len(db.connRequests) if db.maxOpen > 0 { - numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens) + numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { - db.pendingOpens++ + db.numOpen++ // optimistically numRequests-- db.openerCh <- struct{}{} } @@ -638,6 +731,9 @@ func (db *DB) connectionOpener() { // Open one new connection func (db *DB) openNewConnection() { + // maybeOpenNewConnctions has already executed db.numOpen++ before it sent + // on db.openerCh. This function must execute db.numOpen-- if the + // connection fails or is closed before returning. ci, err := db.driver.Open(db.dsn) db.mu.Lock() defer db.mu.Unlock() @@ -645,21 +741,24 @@ func (db *DB) openNewConnection() { if err == nil { ci.Close() } + db.numOpen-- return } - db.pendingOpens-- if err != nil { + db.numOpen-- db.putConnDBLocked(nil, err) + db.maybeOpenNewConnections() return } dc := &driverConn{ - db: db, - ci: ci, + db: db, + createdAt: nowFunc(), + ci: ci, } if db.putConnDBLocked(dc, err) { db.addDepLocked(dc, dc) - db.numOpen++ } else { + db.numOpen-- ci.Close() } } @@ -681,6 +780,7 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { db.mu.Unlock() return nil, errDBClosed } + lifetime := db.maxLifetime // Prefer a free connection, if possible. numFree := len(db.freeConn) @@ -690,6 +790,10 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() + if conn.expired(lifetime) { + conn.Close() + return nil, driver.ErrBadConn + } return conn, nil } @@ -701,7 +805,14 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { req := make(chan connRequest, 1) db.connRequests = append(db.connRequests, req) db.mu.Unlock() - ret := <-req + ret, ok := <-req + if !ok { + return nil, errDBClosed + } + if ret.err == nil && ret.conn.expired(lifetime) { + ret.conn.Close() + return nil, driver.ErrBadConn + } return ret.conn, ret.err } @@ -711,13 +822,15 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) { if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism + db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ - db: db, - ci: ci, + db: db, + createdAt: nowFunc(), + ci: ci, } db.addDepLocked(dc, dc) dc.inUse = true @@ -827,6 +940,7 @@ func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { return true } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) + db.startCleanerLocked() return true } return false @@ -1022,7 +1136,7 @@ func (db *DB) queryConn(dc *driverConn, releaseConn func(error), query string, a } // QueryRow executes a query that is expected to return at most one row. -// QueryRow always return a non-nil value. Errors are deferred until +// QueryRow always returns a non-nil value. Errors are deferred until // Row's Scan method is called. func (db *DB) QueryRow(query string, args ...interface{}) *Row { rows, err := db.Query(query, args...) @@ -1103,12 +1217,12 @@ type Tx struct { var ErrTxDone = errors.New("sql: Transaction has already been committed or rolled back") -func (tx *Tx) close() { +func (tx *Tx) close(err error) { if tx.done { panic("double close") // internal error } tx.done = true - tx.db.putConn(tx.dc, nil) + tx.db.putConn(tx.dc, err) tx.dc = nil tx.txi = nil } @@ -1134,13 +1248,13 @@ func (tx *Tx) Commit() error { if tx.done { return ErrTxDone } - defer tx.close() tx.dc.Lock() err := tx.txi.Commit() tx.dc.Unlock() if err != driver.ErrBadConn { tx.closePrepared() } + tx.close(err) return err } @@ -1149,13 +1263,13 @@ func (tx *Tx) Rollback() error { if tx.done { return ErrTxDone } - defer tx.close() tx.dc.Lock() err := tx.txi.Rollback() tx.dc.Unlock() if err != driver.ErrBadConn { tx.closePrepared() } + tx.close(err) return err } @@ -1296,7 +1410,7 @@ func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) { } // QueryRow executes a query that is expected to return at most one row. -// QueryRow always return a non-nil value. Errors are deferred until +// QueryRow always returns a non-nil value. Errors are deferred until // Row's Scan method is called. func (tx *Tx) QueryRow(query string, args ...interface{}) *Row { rows, err := tx.Query(query, args...) @@ -1362,10 +1476,14 @@ func (s *Stmt) Exec(args ...interface{}) (Result, error) { return nil, driver.ErrBadConn } -func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) { +func driverNumInput(ds driverStmt) int { ds.Lock() - want := ds.si.NumInput() - ds.Unlock() + defer ds.Unlock() // in case NumInput panics + return ds.si.NumInput() +} + +func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) { + want := driverNumInput(ds) // -1 means the driver doesn't know how to count the number of // placeholders, so we won't sanity check input here and instead let the @@ -1380,8 +1498,8 @@ func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) { } ds.Lock() + defer ds.Unlock() resi, err := ds.si.Exec(dargs) - ds.Unlock() if err != nil { return nil, err } @@ -1576,9 +1694,9 @@ func (s *Stmt) Close() error { s.closed = true if s.tx != nil { - s.txsi.Close() + err := s.txsi.Close() s.mu.Unlock() - return nil + return err } s.mu.Unlock() @@ -1667,17 +1785,56 @@ func (rs *Rows) Columns() ([]string, error) { } // Scan copies the columns in the current row into the values pointed -// at by dest. +// at by dest. The number of values in dest must be the same as the +// number of columns in Rows. +// +// Scan converts columns read from the database into the following +// common Go types and special types provided by the sql package: +// +// *string +// *[]byte +// *int, *int8, *int16, *int32, *int64 +// *uint, *uint8, *uint16, *uint32, *uint64 +// *bool +// *float32, *float64 +// *interface{} +// *RawBytes +// any type implementing Scanner (see Scanner docs) +// +// In the most simple case, if the type of the value from the source +// column is an integer, bool or string type T and dest is of type *T, +// Scan simply assigns the value through the pointer. // -// If an argument has type *[]byte, Scan saves in that argument a copy -// of the corresponding data. The copy is owned by the caller and can -// be modified and held indefinitely. The copy can be avoided by using -// an argument of type *RawBytes instead; see the documentation for -// RawBytes for restrictions on its use. +// Scan also converts between string and numeric types, as long as no +// information would be lost. While Scan stringifies all numbers +// scanned from numeric database columns into *string, scans into +// numeric types are checked for overflow. For example, a float64 with +// value 300 or a string with value "300" can scan into a uint16, but +// not into a uint8, though float64(255) or "255" can scan into a +// uint8. One exception is that scans of some float64 numbers to +// strings may lose information when stringifying. In general, scan +// floating point columns into *float64. +// +// If a dest argument has type *[]byte, Scan saves in that argument a +// copy of the corresponding data. The copy is owned by the caller and +// can be modified and held indefinitely. The copy can be avoided by +// using an argument of type *RawBytes instead; see the documentation +// for RawBytes for restrictions on its use. // // If an argument has type *interface{}, Scan copies the value -// provided by the underlying driver without conversion. If the value -// is of type []byte, a copy is made and the caller owns the result. +// provided by the underlying driver without conversion. When scanning +// from a source value of type []byte to *interface{}, a copy of the +// slice is made and the caller owns the result. +// +// Source values of type time.Time may be scanned into values of type +// *time.Time, *interface{}, *string, or *[]byte. When converting to +// the latter two, time.Format3339Nano is used. +// +// Source values of type bool may be scanned into types *bool, +// *interface{}, *string, *[]byte, or *RawBytes. +// +// For scanning into *bool, the source may be true, false, 1, 0, or +// string inputs parseable by strconv.ParseBool. func (rs *Rows) Scan(dest ...interface{}) error { if rs.closed { return errors.New("sql: Rows are closed") @@ -1726,8 +1883,9 @@ type Row struct { } // Scan copies the columns from the matched row into the values -// pointed at by dest. If more than one row matches the query, -// Scan uses the first row and discards the rest. If no row matches +// pointed at by dest. See the documentation on Rows.Scan for details. +// If more than one row matches the query, +// Scan uses the first row and discards the rest. If no row matches // the query, Scan returns ErrNoRows. func (r *Row) Scan(dest ...interface{}) error { if r.err != nil { @@ -1812,6 +1970,6 @@ func stack() string { // withLock runs while holding lk. func withLock(lk sync.Locker, fn func()) { lk.Lock() + defer lk.Unlock() // in case fn panics fn() - lk.Unlock() } |