diff options
Diffstat (limited to 'libgo/runtime/netpoll.goc')
-rw-r--r-- | libgo/runtime/netpoll.goc | 139 |
1 files changed, 92 insertions, 47 deletions
diff --git a/libgo/runtime/netpoll.goc b/libgo/runtime/netpoll.goc index 0270573..15dd58c 100644 --- a/libgo/runtime/netpoll.goc +++ b/libgo/runtime/netpoll.goc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build darwin dragonfly freebsd linux netbsd openbsd windows +// +build darwin dragonfly freebsd linux netbsd openbsd solaris windows package net @@ -24,21 +24,45 @@ package net // An implementation must call the following function to denote that the pd is ready. // void runtime_netpollready(G **gpp, PollDesc *pd, int32 mode); +// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer +// goroutines respectively. The semaphore can be in the following states: +// READY - io readiness notification is pending; +// a goroutine consumes the notification by changing the state to nil. +// WAIT - a goroutine prepares to park on the semaphore, but not yet parked; +// the goroutine commits to park by changing the state to G pointer, +// or, alternatively, concurrent io notification changes the state to READY, +// or, alternatively, concurrent timeout/close changes the state to nil. +// G pointer - the goroutine is blocked on the semaphore; +// io notification or timeout/close changes the state to READY or nil respectively +// and unparks the goroutine. +// nil - nothing of the above. #define READY ((G*)1) +#define WAIT ((G*)2) + +enum +{ + PollBlockSize = 4*1024, +}; struct PollDesc { PollDesc* link; // in pollcache, protected by pollcache.Lock + + // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. + // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. + // pollReset, pollWait, pollWaitCanceled and runtime_netpollready (IO rediness notification) + // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated + // in a lock-free way by all operations. Lock; // protectes the following fields uintptr fd; bool closing; uintptr seq; // protects from stale timers and ready notifications - G* rg; // G waiting for read or READY (binary semaphore) + G* rg; // READY, WAIT, G waiting for read or nil Timer rt; // read deadline timer (set if rt.fv != nil) int64 rd; // read deadline - G* wg; // the same for writes - Timer wt; - int64 wd; + G* wg; // READY, WAIT, G waiting for write or nil + Timer wt; // write deadline timer + int64 wd; // write deadline }; static struct @@ -52,7 +76,7 @@ static struct // seq is incremented when deadlines are changed or descriptor is reused. } pollcache; -static bool netpollblock(PollDesc*, int32); +static bool netpollblock(PollDesc*, int32, bool); static G* netpollunblock(PollDesc*, int32, bool); static void deadline(int64, Eface); static void readDeadline(int64, Eface); @@ -102,7 +126,6 @@ func runtime_pollClose(pd *PollDesc) { } func runtime_pollReset(pd *PollDesc, mode int) (err int) { - runtime_lock(pd); err = checkerr(pd, mode); if(err) goto ret; @@ -111,14 +134,15 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) { else if(mode == 'w') pd->wg = nil; ret: - runtime_unlock(pd); } func runtime_pollWait(pd *PollDesc, mode int) (err int) { - runtime_lock(pd); err = checkerr(pd, mode); if(err == 0) { - while(!netpollblock(pd, mode)) { + // As for now only Solaris uses level-triggered IO. + if(Solaris) + runtime_netpollarm(pd->fd, mode); + while(!netpollblock(pd, mode, false)) { err = checkerr(pd, mode); if(err != 0) break; @@ -127,15 +151,13 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) { // Pretend it has not happened and retry. } } - runtime_unlock(pd); } func runtime_pollWaitCanceled(pd *PollDesc, mode int) { - runtime_lock(pd); - // wait for ioready, ignore closing or timeouts. - while(!netpollblock(pd, mode)) + // This function is used only on windows after a failed attempt to cancel + // a pending async IO operation. Wait for ioready, ignore closing or timeouts. + while(!netpollblock(pd, mode, true)) ; - runtime_unlock(pd); } func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { @@ -190,7 +212,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { } // If we set the new deadline in the past, unblock currently pending IO if any. rg = nil; - wg = nil; + runtime_atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if(pd->rd < 0) rg = netpollunblock(pd, 'r', false); if(pd->wd < 0) @@ -210,6 +232,7 @@ func runtime_pollUnblock(pd *PollDesc) { runtime_throw("runtime_pollUnblock: already closing"); pd->closing = true; pd->seq++; + runtime_atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock rg = netpollunblock(pd, 'r', false); wg = netpollunblock(pd, 'w', false); if(pd->rt.fv) { @@ -240,12 +263,10 @@ runtime_netpollready(G **gpp, PollDesc *pd, int32 mode) G *rg, *wg; rg = wg = nil; - runtime_lock(pd); if(mode == 'r' || mode == 'r'+'w') rg = netpollunblock(pd, 'r', true); if(mode == 'w' || mode == 'r'+'w') wg = netpollunblock(pd, 'w', true); - runtime_unlock(pd); if(rg) { rg->schedlink = *gpp; *gpp = rg; @@ -266,51 +287,75 @@ checkerr(PollDesc *pd, int32 mode) return 0; } +static bool +blockcommit(G *gp, G **gpp) +{ + return runtime_casp(gpp, WAIT, gp); +} + // returns true if IO is ready, or false if timedout or closed +// waitio - wait only for completed IO, ignore errors static bool -netpollblock(PollDesc *pd, int32 mode) +netpollblock(PollDesc *pd, int32 mode, bool waitio) { - G **gpp; + G **gpp, *old; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) { - *gpp = nil; - return true; + + // set the gpp semaphore to WAIT + for(;;) { + old = *gpp; + if(old == READY) { + *gpp = nil; + return true; + } + if(old != nil) + runtime_throw("netpollblock: double wait"); + if(runtime_casp(gpp, nil, WAIT)) + break; } - if(*gpp != nil) - runtime_throw("netpollblock: double wait"); - *gpp = runtime_g(); - runtime_park(runtime_unlock, &pd->Lock, "IO wait"); - runtime_lock(pd); - if(runtime_g()->param) - return true; - return false; + + // need to recheck error states after setting gpp to WAIT + // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl + // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg + if(waitio || checkerr(pd, mode) == 0) + runtime_park((bool(*)(G*, void*))blockcommit, gpp, "IO wait"); + // be careful to not lose concurrent READY notification + old = runtime_xchgp(gpp, nil); + if(old > WAIT) + runtime_throw("netpollblock: corrupted state"); + return old == READY; } static G* netpollunblock(PollDesc *pd, int32 mode, bool ioready) { - G **gpp, *old; + G **gpp, *old, *new; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) - return nil; - if(*gpp == nil) { - // Only set READY for ioready. runtime_pollWait - // will check for timeout/cancel before waiting. + + for(;;) { + old = *gpp; + if(old == READY) + return nil; + if(old == nil && !ioready) { + // Only set READY for ioready. runtime_pollWait + // will check for timeout/cancel before waiting. + return nil; + } + new = nil; if(ioready) - *gpp = READY; - return nil; + new = READY; + if(runtime_casp(gpp, old, new)) + break; } - old = *gpp; - // pass unblock reason onto blocked g - old->param = (void*)(uintptr)ioready; - *gpp = nil; - return old; + if(old > WAIT) + return old; // must be G* + return nil; } static void @@ -336,14 +381,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write) if(pd->rd <= 0 || pd->rt.fv == nil) runtime_throw("deadlineimpl: inconsistent read deadline"); pd->rd = -1; - pd->rt.fv = nil; + runtime_atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock rg = netpollunblock(pd, 'r', false); } if(write) { if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) runtime_throw("deadlineimpl: inconsistent write deadline"); pd->wd = -1; - pd->wt.fv = nil; + runtime_atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock wg = netpollunblock(pd, 'w', false); } runtime_unlock(pd); @@ -379,7 +424,7 @@ allocPollDesc(void) runtime_lock(&pollcache); if(pollcache.first == nil) { - n = PageSize/sizeof(*pd); + n = PollBlockSize/sizeof(*pd); if(n == 0) n = 1; // Must be in non-GC memory because can be referenced |