// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // // See Mullender and Cox, ``Semaphores in Plan 9,'' // http://swtch.com/semaphore.pdf package sync #include "runtime.h" #include "chan.h" #include "arch.h" typedef struct SemaWaiter SemaWaiter; struct SemaWaiter { uint32 volatile* addr; G* g; int64 releasetime; int32 nrelease; // -1 for acquire SemaWaiter* prev; SemaWaiter* next; }; typedef struct SemaRoot SemaRoot; struct SemaRoot { Lock; SemaWaiter* head; SemaWaiter* tail; // Number of waiters. Read w/o the lock. uint32 volatile nwait; }; // Prime to not correlate with any user patterns. #define SEMTABLESZ 251 struct semtable { SemaRoot; uint8 pad[CacheLineSize-sizeof(SemaRoot)]; }; static struct semtable semtable[SEMTABLESZ]; static SemaRoot* semroot(uint32 volatile *addr) { return &semtable[((uintptr)addr >> 3) % SEMTABLESZ]; } static void semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s) { s->g = runtime_g(); s->addr = addr; s->next = nil; s->prev = root->tail; if(root->tail) root->tail->next = s; else root->head = s; root->tail = s; } static void semdequeue(SemaRoot *root, SemaWaiter *s) { if(s->next) s->next->prev = s->prev; else root->tail = s->prev; if(s->prev) s->prev->next = s->next; else root->head = s->next; s->prev = nil; s->next = nil; } static int32 cansemacquire(uint32 volatile *addr) { uint32 v; while((v = runtime_atomicload(addr)) > 0) if(runtime_cas(addr, v, v-1)) return 1; return 0; } static void readyWithTime(SudoG* s, int traceskip __attribute__ ((unused))) { if (s->releasetime != 0) { s->releasetime = runtime_cputicks(); } runtime_ready(s->g); } void runtime_semacquire(uint32 volatile *addr, bool profile) { SemaWaiter s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it SemaRoot *root; int64 t0; // Easy case. if(cansemacquire(addr)) return; // Harder case: // increment waiter count // try cansemacquire one more time, return if succeeded // enqueue itself as a waiter // sleep // (waiter descriptor is dequeued by signaler) root = semroot(addr); t0 = 0; s.releasetime = 0; if(profile && runtime_blockprofilerate > 0) { t0 = runtime_cputicks(); s.releasetime = -1; } for(;;) { runtime_lock(root); // Add ourselves to nwait to disable "easy case" in semrelease. runtime_xadd(&root->nwait, 1); // Check cansemacquire to avoid missed wakeup. if(cansemacquire(addr)) { runtime_xadd(&root->nwait, -1); runtime_unlock(root); return; } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. semqueue(root, addr, &s); runtime_parkunlock(root, "semacquire"); if(cansemacquire(addr)) { if(t0) runtime_blockevent(s.releasetime - t0, 3); return; } } } void runtime_semrelease(uint32 volatile *addr) { SemaWaiter *s; SemaRoot *root; root = semroot(addr); runtime_xadd(addr, 1); // Easy case: no waiters? // This check must happen after the xadd, to avoid a missed wakeup // (see loop in semacquire). if(runtime_atomicload(&root->nwait) == 0) return; // Harder case: search for a waiter and wake it. runtime_lock(root); if(runtime_atomicload(&root->nwait) == 0) { // The count is already consumed by another goroutine, // so no need to wake up another goroutine. runtime_unlock(root); return; } for(s = root->head; s; s = s->next) { if(s->addr == addr) { runtime_xadd(&root->nwait, -1); semdequeue(root, s); break; } } runtime_unlock(root); if(s) { if(s->releasetime) s->releasetime = runtime_cputicks(); runtime_ready(s->g); } } // TODO(dvyukov): move to netpoll.goc once it's used by all OSes. void net_runtime_Semacquire(uint32 *addr) __asm__ (GOSYM_PREFIX "net.runtime_Semacquire"); void net_runtime_Semacquire(uint32 *addr) { runtime_semacquire(addr, true); } void net_runtime_Semrelease(uint32 *addr) __asm__ (GOSYM_PREFIX "net.runtime_Semrelease"); void net_runtime_Semrelease(uint32 *addr) { runtime_semrelease(addr); } func runtime_Semacquire(addr *uint32) { runtime_semacquire(addr, true); } func runtime_Semrelease(addr *uint32) { runtime_semrelease(addr); } typedef struct SyncSema SyncSema; struct SyncSema { Lock; SemaWaiter* head; SemaWaiter* tail; }; func runtime_Syncsemcheck(size uintptr) { if(size != sizeof(SyncSema)) { runtime_printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema)); runtime_throw("bad SyncSema size"); } } // Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s. func runtime_Syncsemacquire(s *SyncSema) { SemaWaiter w, *wake; int64 t0; w.g = runtime_g(); w.nrelease = -1; w.next = nil; w.releasetime = 0; t0 = 0; if(runtime_blockprofilerate > 0) { t0 = runtime_cputicks(); w.releasetime = -1; } runtime_lock(s); if(s->head && s->head->nrelease > 0) { // have pending release, consume it wake = nil; s->head->nrelease--; if(s->head->nrelease == 0) { wake = s->head; s->head = wake->next; if(s->head == nil) s->tail = nil; } runtime_unlock(s); if(wake) runtime_ready(wake->g); } else { // enqueue itself if(s->tail == nil) s->head = &w; else s->tail->next = &w; s->tail = &w; runtime_parkunlock(s, "semacquire"); if(t0) runtime_blockevent(w.releasetime - t0, 2); } } // Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s. func runtime_Syncsemrelease(s *SyncSema, n uint32) { SemaWaiter w, *wake; w.g = runtime_g(); w.nrelease = (int32)n; w.next = nil; w.releasetime = 0; runtime_lock(s); while(w.nrelease > 0 && s->head && s->head->nrelease < 0) { // have pending acquire, satisfy it wake = s->head; s->head = wake->next; if(s->head == nil) s->tail = nil; if(wake->releasetime) wake->releasetime = runtime_cputicks(); runtime_ready(wake->g); w.nrelease--; } if(w.nrelease > 0) { // enqueue itself if(s->tail == nil) s->head = &w; else s->tail->next = &w; s->tail = &w; runtime_parkunlock(s, "semarelease"); } else runtime_unlock(s); } // notifyList is a ticket-based notification list used to implement sync.Cond. // // It must be kept in sync with the sync package. typedef struct { // wait is the ticket number of the next waiter. It is atomically // incremented outside the lock. uint32 wait; // notify is the ticket number of the next waiter to be notified. It can // be read outside the lock, but is only written to with lock held. // // Both wait & notify can wrap around, and such cases will be correctly // handled as long as their "unwrapped" difference is bounded by 2^31. // For this not to be the case, we'd need to have 2^31+ goroutines // blocked on the same condvar, which is currently not possible. uint32 notify; // List of parked waiters. Lock lock; SudoG* head; SudoG* tail; } notifyList; // less checks if a < b, considering a & b running counts that may overflow the // 32-bit range, and that their "unwrapped" difference is always less than 2^31. static bool less(uint32 a, uint32 b) { return (int32)(a-b) < 0; } // notifyListAdd adds the caller to a notify list such that it can receive // notifications. The caller must eventually call notifyListWait to wait for // such a notification, passing the returned ticket number. //go:linkname notifyListAdd sync.runtime_notifyListAdd func runtime_notifyListAdd(l *notifyList) (r uint32) { // This may be called concurrently, for example, when called from // sync.Cond.Wait while holding a RWMutex in read mode. r = runtime_xadd(&l->wait, 1) - 1; } // notifyListWait waits for a notification. If one has been sent since // notifyListAdd was called, it returns immediately. Otherwise, it blocks. //go:linkname notifyListWait sync.runtime_notifyListWait func runtime_notifyListWait(l *notifyList, t uint32) { SudoG s; int64 t0; runtime_lock(&l->lock); // Return right away if this ticket has already been notified. if (less(t, l->notify)) { runtime_unlock(&l->lock); return; } // Enqueue itself. runtime_memclr(&s, sizeof(s)); s.g = runtime_g(); s.ticket = t; s.releasetime = 0; t0 = 0; if (runtime_blockprofilerate > 0) { t0 = runtime_cputicks(); s.releasetime = -1; } if (l->tail == nil) { l->head = &s; } else { l->tail->link = &s; } l->tail = &s; runtime_parkunlock(&l->lock, "semacquire"); if (t0 != 0) { runtime_blockevent(s.releasetime-t0, 2); } } // notifyListNotifyAll notifies all entries in the list. //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll func runtime_notifyListNotifyAll(l *notifyList) { SudoG *s; // Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if (runtime_atomicload(&l->wait) == runtime_atomicload(&l->notify)) { return; } // Pull the list out into a local variable, waiters will be readied // outside the lock. runtime_lock(&l->lock); s = l->head; l->head = nil; l->tail = nil; // Update the next ticket to be notified. We can set it to the current // value of wait because any previous waiters are already in the list // or will notice that they have already been notified when trying to // add themselves to the list. runtime_atomicstore(&l->notify, runtime_atomicload(&l->wait)); runtime_unlock(&l->lock); // Go through the local list and ready all waiters. while (s != nil) { SudoG* next = s->link; s->link = nil; readyWithTime(s, 4); s = next; } } // notifyListNotifyOne notifies one entry in the list. //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne func runtime_notifyListNotifyOne(l *notifyList) { uint32 t; SudoG *p; SudoG *s; // Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock at all. if (runtime_atomicload(&l->wait) == runtime_atomicload(&l->notify)) { return; } runtime_lock(&l->lock); // Re-check under the lock if we need to do anything. t = l->notify; if (t == runtime_atomicload(&l->wait)) { runtime_unlock(&l->lock); return; } // Update the next notify ticket number, and try to find the G that // needs to be notified. If it hasn't made it to the list yet we won't // find it, but it won't park itself once it sees the new notify number. runtime_atomicstore(&l->notify, t+1); for (p = nil, s = l->head; s != nil; p = s, s = s->link) { if (s->ticket == t) { SudoG *n = s->link; if (p != nil) { p->link = n; } else { l->head = n; } if (n == nil) { l->tail = p; } runtime_unlock(&l->lock); s->link = nil; readyWithTime(s, 4); return; } } runtime_unlock(&l->lock); } //go:linkname notifyListCheck sync.runtime_notifyListCheck func runtime_notifyListCheck(sz uintptr) { if (sz != sizeof(notifyList)) { runtime_printf("runtime: bad notifyList size\n"); runtime_throw("bad notifyList size"); } }