aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/database/sql/sql.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/database/sql/sql.go')
-rw-r--r--libgo/go/database/sql/sql.go258
1 files changed, 208 insertions, 50 deletions
diff --git a/libgo/go/database/sql/sql.go b/libgo/go/database/sql/sql.go
index aaa4ea2..d8e7cb7 100644
--- a/libgo/go/database/sql/sql.go
+++ b/libgo/go/database/sql/sql.go
@@ -21,13 +21,17 @@ import (
"sort"
"sync"
"sync/atomic"
+ "time"
)
var (
- driversMu sync.Mutex
+ driversMu sync.RWMutex
drivers = make(map[string]driver.Driver)
)
+// nowFunc returns the current time; it's overridden in tests.
+var nowFunc = time.Now
+
// Register makes a database driver available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
@@ -52,8 +56,8 @@ func unregisterAllDrivers() {
// Drivers returns a sorted list of the names of the registered drivers.
func Drivers() []string {
- driversMu.Lock()
- defer driversMu.Unlock()
+ driversMu.RLock()
+ defer driversMu.RUnlock()
var list []string
for name := range drivers {
list = append(list, name)
@@ -185,8 +189,7 @@ func (n NullBool) Value() (driver.Value, error) {
type Scanner interface {
// Scan assigns a value from a database driver.
//
- // The src value will be of one of the following restricted
- // set of types:
+ // The src value will be of one of the following types:
//
// int64
// float64
@@ -229,19 +232,20 @@ type DB struct {
mu sync.Mutex // protects following fields
freeConn []*driverConn
connRequests []chan connRequest
- numOpen int
- pendingOpens int
+ numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
- openerCh chan struct{}
- closed bool
- dep map[finalCloser]depSet
- lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
- maxIdle int // zero means defaultMaxIdleConns; negative means 0
- maxOpen int // <= 0 means unlimited
+ openerCh chan struct{}
+ closed bool
+ dep map[finalCloser]depSet
+ lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
+ maxIdle int // zero means defaultMaxIdleConns; negative means 0
+ maxOpen int // <= 0 means unlimited
+ maxLifetime time.Duration // maximum amount of time a connection may be reused
+ cleanerCh chan struct{}
}
// connReuseStrategy determines how (*DB).conn returns database connections.
@@ -261,7 +265,8 @@ const (
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
- db *DB
+ db *DB
+ createdAt time.Time
sync.Mutex // guards following
ci driver.Conn
@@ -285,6 +290,13 @@ func (dc *driverConn) removeOpenStmt(si driver.Stmt) {
delete(dc.openStmt, si)
}
+func (dc *driverConn) expired(timeout time.Duration) bool {
+ if timeout <= 0 {
+ return false
+ }
+ return dc.createdAt.Add(timeout).Before(nowFunc())
+}
+
func (dc *driverConn) prepareLocked(query string) (driver.Stmt, error) {
si, err := dc.ci.Prepare(query)
if err == nil {
@@ -441,7 +453,7 @@ func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
}
}
-// This is the size of the connectionOpener request chan (dn.openerCh).
+// This is the size of the connectionOpener request chan (DB.openerCh).
// This value should be larger than the maximum typical value
// used for db.maxOpen. If maxOpen is significantly larger than
// connectionRequestQueueSize then it is possible for ALL calls into the *DB
@@ -466,9 +478,9 @@ var connectionRequestQueueSize = 1000000
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
- driversMu.Lock()
+ driversMu.RLock()
driveri, ok := drivers[driverName]
- driversMu.Unlock()
+ driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
@@ -507,6 +519,9 @@ func (db *DB) Close() error {
return nil
}
close(db.openerCh)
+ if db.cleanerCh != nil {
+ close(db.cleanerCh)
+ }
var err error
fns := make([]func() error, 0, len(db.freeConn))
for _, dc := range db.freeConn {
@@ -595,6 +610,84 @@ func (db *DB) SetMaxOpenConns(n int) {
}
}
+// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
+//
+// Expired connections may be closed lazily before reuse.
+//
+// If d <= 0, connections are reused forever.
+func (db *DB) SetConnMaxLifetime(d time.Duration) {
+ if d < 0 {
+ d = 0
+ }
+ db.mu.Lock()
+ // wake cleaner up when lifetime is shortened.
+ if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
+ select {
+ case db.cleanerCh <- struct{}{}:
+ default:
+ }
+ }
+ db.maxLifetime = d
+ db.startCleanerLocked()
+ db.mu.Unlock()
+}
+
+// startCleanerLocked starts connectionCleaner if needed.
+func (db *DB) startCleanerLocked() {
+ if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
+ db.cleanerCh = make(chan struct{}, 1)
+ go db.connectionCleaner(db.maxLifetime)
+ }
+}
+
+func (db *DB) connectionCleaner(d time.Duration) {
+ const minInterval = time.Second
+
+ if d < minInterval {
+ d = minInterval
+ }
+ t := time.NewTimer(d)
+
+ for {
+ select {
+ case <-t.C:
+ case <-db.cleanerCh: // maxLifetime was changed or db was closed.
+ }
+
+ db.mu.Lock()
+ d = db.maxLifetime
+ if db.closed || db.numOpen == 0 || d <= 0 {
+ db.cleanerCh = nil
+ db.mu.Unlock()
+ return
+ }
+
+ expiredSince := nowFunc().Add(-d)
+ var closing []*driverConn
+ for i := 0; i < len(db.freeConn); i++ {
+ c := db.freeConn[i]
+ if c.createdAt.Before(expiredSince) {
+ closing = append(closing, c)
+ last := len(db.freeConn) - 1
+ db.freeConn[i] = db.freeConn[last]
+ db.freeConn[last] = nil
+ db.freeConn = db.freeConn[:last]
+ i--
+ }
+ }
+ db.mu.Unlock()
+
+ for _, c := range closing {
+ c.Close()
+ }
+
+ if d < minInterval {
+ d = minInterval
+ }
+ t.Reset(d)
+ }
+}
+
// DBStats contains database statistics.
type DBStats struct {
// OpenConnections is the number of open connections to the database.
@@ -615,15 +708,15 @@ func (db *DB) Stats() DBStats {
// If there are connRequests and the connection limit hasn't been reached,
// then tell the connectionOpener to open new connections.
func (db *DB) maybeOpenNewConnections() {
- numRequests := len(db.connRequests) - db.pendingOpens
+ numRequests := len(db.connRequests)
if db.maxOpen > 0 {
- numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens)
+ numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
for numRequests > 0 {
- db.pendingOpens++
+ db.numOpen++ // optimistically
numRequests--
db.openerCh <- struct{}{}
}
@@ -638,6 +731,9 @@ func (db *DB) connectionOpener() {
// Open one new connection
func (db *DB) openNewConnection() {
+ // maybeOpenNewConnctions has already executed db.numOpen++ before it sent
+ // on db.openerCh. This function must execute db.numOpen-- if the
+ // connection fails or is closed before returning.
ci, err := db.driver.Open(db.dsn)
db.mu.Lock()
defer db.mu.Unlock()
@@ -645,21 +741,24 @@ func (db *DB) openNewConnection() {
if err == nil {
ci.Close()
}
+ db.numOpen--
return
}
- db.pendingOpens--
if err != nil {
+ db.numOpen--
db.putConnDBLocked(nil, err)
+ db.maybeOpenNewConnections()
return
}
dc := &driverConn{
- db: db,
- ci: ci,
+ db: db,
+ createdAt: nowFunc(),
+ ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
- db.numOpen++
} else {
+ db.numOpen--
ci.Close()
}
}
@@ -681,6 +780,7 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
db.mu.Unlock()
return nil, errDBClosed
}
+ lifetime := db.maxLifetime
// Prefer a free connection, if possible.
numFree := len(db.freeConn)
@@ -690,6 +790,10 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
+ if conn.expired(lifetime) {
+ conn.Close()
+ return nil, driver.ErrBadConn
+ }
return conn, nil
}
@@ -701,7 +805,14 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req)
db.mu.Unlock()
- ret := <-req
+ ret, ok := <-req
+ if !ok {
+ return nil, errDBClosed
+ }
+ if ret.err == nil && ret.conn.expired(lifetime) {
+ ret.conn.Close()
+ return nil, driver.ErrBadConn
+ }
return ret.conn, ret.err
}
@@ -711,13 +822,15 @@ func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
+ db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
- db: db,
- ci: ci,
+ db: db,
+ createdAt: nowFunc(),
+ ci: ci,
}
db.addDepLocked(dc, dc)
dc.inUse = true
@@ -827,6 +940,7 @@ func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
return true
} else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
+ db.startCleanerLocked()
return true
}
return false
@@ -1022,7 +1136,7 @@ func (db *DB) queryConn(dc *driverConn, releaseConn func(error), query string, a
}
// QueryRow executes a query that is expected to return at most one row.
-// QueryRow always return a non-nil value. Errors are deferred until
+// QueryRow always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
func (db *DB) QueryRow(query string, args ...interface{}) *Row {
rows, err := db.Query(query, args...)
@@ -1103,12 +1217,12 @@ type Tx struct {
var ErrTxDone = errors.New("sql: Transaction has already been committed or rolled back")
-func (tx *Tx) close() {
+func (tx *Tx) close(err error) {
if tx.done {
panic("double close") // internal error
}
tx.done = true
- tx.db.putConn(tx.dc, nil)
+ tx.db.putConn(tx.dc, err)
tx.dc = nil
tx.txi = nil
}
@@ -1134,13 +1248,13 @@ func (tx *Tx) Commit() error {
if tx.done {
return ErrTxDone
}
- defer tx.close()
tx.dc.Lock()
err := tx.txi.Commit()
tx.dc.Unlock()
if err != driver.ErrBadConn {
tx.closePrepared()
}
+ tx.close(err)
return err
}
@@ -1149,13 +1263,13 @@ func (tx *Tx) Rollback() error {
if tx.done {
return ErrTxDone
}
- defer tx.close()
tx.dc.Lock()
err := tx.txi.Rollback()
tx.dc.Unlock()
if err != driver.ErrBadConn {
tx.closePrepared()
}
+ tx.close(err)
return err
}
@@ -1296,7 +1410,7 @@ func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
}
// QueryRow executes a query that is expected to return at most one row.
-// QueryRow always return a non-nil value. Errors are deferred until
+// QueryRow always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
rows, err := tx.Query(query, args...)
@@ -1362,10 +1476,14 @@ func (s *Stmt) Exec(args ...interface{}) (Result, error) {
return nil, driver.ErrBadConn
}
-func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) {
+func driverNumInput(ds driverStmt) int {
ds.Lock()
- want := ds.si.NumInput()
- ds.Unlock()
+ defer ds.Unlock() // in case NumInput panics
+ return ds.si.NumInput()
+}
+
+func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) {
+ want := driverNumInput(ds)
// -1 means the driver doesn't know how to count the number of
// placeholders, so we won't sanity check input here and instead let the
@@ -1380,8 +1498,8 @@ func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) {
}
ds.Lock()
+ defer ds.Unlock()
resi, err := ds.si.Exec(dargs)
- ds.Unlock()
if err != nil {
return nil, err
}
@@ -1576,9 +1694,9 @@ func (s *Stmt) Close() error {
s.closed = true
if s.tx != nil {
- s.txsi.Close()
+ err := s.txsi.Close()
s.mu.Unlock()
- return nil
+ return err
}
s.mu.Unlock()
@@ -1667,17 +1785,56 @@ func (rs *Rows) Columns() ([]string, error) {
}
// Scan copies the columns in the current row into the values pointed
-// at by dest.
+// at by dest. The number of values in dest must be the same as the
+// number of columns in Rows.
+//
+// Scan converts columns read from the database into the following
+// common Go types and special types provided by the sql package:
+//
+// *string
+// *[]byte
+// *int, *int8, *int16, *int32, *int64
+// *uint, *uint8, *uint16, *uint32, *uint64
+// *bool
+// *float32, *float64
+// *interface{}
+// *RawBytes
+// any type implementing Scanner (see Scanner docs)
+//
+// In the most simple case, if the type of the value from the source
+// column is an integer, bool or string type T and dest is of type *T,
+// Scan simply assigns the value through the pointer.
//
-// If an argument has type *[]byte, Scan saves in that argument a copy
-// of the corresponding data. The copy is owned by the caller and can
-// be modified and held indefinitely. The copy can be avoided by using
-// an argument of type *RawBytes instead; see the documentation for
-// RawBytes for restrictions on its use.
+// Scan also converts between string and numeric types, as long as no
+// information would be lost. While Scan stringifies all numbers
+// scanned from numeric database columns into *string, scans into
+// numeric types are checked for overflow. For example, a float64 with
+// value 300 or a string with value "300" can scan into a uint16, but
+// not into a uint8, though float64(255) or "255" can scan into a
+// uint8. One exception is that scans of some float64 numbers to
+// strings may lose information when stringifying. In general, scan
+// floating point columns into *float64.
+//
+// If a dest argument has type *[]byte, Scan saves in that argument a
+// copy of the corresponding data. The copy is owned by the caller and
+// can be modified and held indefinitely. The copy can be avoided by
+// using an argument of type *RawBytes instead; see the documentation
+// for RawBytes for restrictions on its use.
//
// If an argument has type *interface{}, Scan copies the value
-// provided by the underlying driver without conversion. If the value
-// is of type []byte, a copy is made and the caller owns the result.
+// provided by the underlying driver without conversion. When scanning
+// from a source value of type []byte to *interface{}, a copy of the
+// slice is made and the caller owns the result.
+//
+// Source values of type time.Time may be scanned into values of type
+// *time.Time, *interface{}, *string, or *[]byte. When converting to
+// the latter two, time.Format3339Nano is used.
+//
+// Source values of type bool may be scanned into types *bool,
+// *interface{}, *string, *[]byte, or *RawBytes.
+//
+// For scanning into *bool, the source may be true, false, 1, 0, or
+// string inputs parseable by strconv.ParseBool.
func (rs *Rows) Scan(dest ...interface{}) error {
if rs.closed {
return errors.New("sql: Rows are closed")
@@ -1726,8 +1883,9 @@ type Row struct {
}
// Scan copies the columns from the matched row into the values
-// pointed at by dest. If more than one row matches the query,
-// Scan uses the first row and discards the rest. If no row matches
+// pointed at by dest. See the documentation on Rows.Scan for details.
+// If more than one row matches the query,
+// Scan uses the first row and discards the rest. If no row matches
// the query, Scan returns ErrNoRows.
func (r *Row) Scan(dest ...interface{}) error {
if r.err != nil {
@@ -1812,6 +1970,6 @@ func stack() string {
// withLock runs while holding lk.
func withLock(lk sync.Locker, fn func()) {
lk.Lock()
+ defer lk.Unlock() // in case fn panics
fn()
- lk.Unlock()
}