aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/netchan/export.go
blob: e91e777e306ee1ac58435ce3d8f6e73940fad5b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

/*
	The netchan package implements type-safe networked channels:
	it allows the two ends of a channel to appear on different
	computers connected by a network.  It does this by transporting
	data sent to a channel on one machine so it can be recovered
	by a receive of a channel of the same type on the other.

	An exporter publishes a set of channels by name.  An importer
	connects to the exporting machine and imports the channels
	by name. After importing the channels, the two machines can
	use the channels in the usual way.

	Networked channels are not synchronized; they always behave
	as if they are buffered channels of at least one element.
*/
package netchan

// BUG: can't use range clause to receive when using ImportNValues to limit the count.

import (
	"log"
	"io"
	"net"
	"os"
	"reflect"
	"strconv"
	"sync"
)

// Export

// expLog is a logging convenience function.  The first argument must be a string.
func expLog(args ...interface{}) {
	args[0] = "netchan export: " + args[0].(string)
	log.Print(args...)
}

// An Exporter allows a set of channels to be published on a single
// network port.  A single machine may have multiple Exporters
// but they must use different ports.
type Exporter struct {
	*clientSet
}

type expClient struct {
	*encDec
	exp     *Exporter
	chans   map[int]*netChan // channels in use by client
	mu      sync.Mutex       // protects remaining fields
	errored bool             // client has been sent an error
	seqNum  int64            // sequences messages sent to client; has value of highest sent
	ackNum  int64            // highest sequence number acknowledged
	seqLock sync.Mutex       // guarantees messages are in sequence, only locked under mu
}

func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
	client := new(expClient)
	client.exp = exp
	client.encDec = newEncDec(conn)
	client.seqNum = 0
	client.ackNum = 0
	client.chans = make(map[int]*netChan)
	return client
}

func (client *expClient) sendError(hdr *header, err string) {
	error := &error{err}
	expLog("sending error to client:", error.Error)
	client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
	client.mu.Lock()
	client.errored = true
	client.mu.Unlock()
}

func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
	exp := client.exp
	exp.mu.Lock()
	ech, ok := exp.names[name]
	exp.mu.Unlock()
	if !ok {
		client.sendError(hdr, "no such channel: "+name)
		return nil
	}
	if ech.dir != dir {
		client.sendError(hdr, "wrong direction for channel: "+name)
		return nil
	}
	nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
	client.chans[hdr.Id] = nch
	return nch
}

func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
	nch := client.chans[hdr.Id]
	if nch == nil {
		return nil
	}
	if nch.dir != dir {
		client.sendError(hdr, "wrong direction for channel: "+nch.name)
	}
	return nch
}

// The function run manages sends and receives for a single client.  For each
// (client Recv) request, this will launch a serveRecv goroutine to deliver
// the data for that channel, while (client Send) requests are handled as
// data arrives from the client.
func (client *expClient) run() {
	hdr := new(header)
	hdrValue := reflect.NewValue(hdr)
	req := new(request)
	reqValue := reflect.NewValue(req)
	error := new(error)
	for {
		*hdr = header{}
		if err := client.decode(hdrValue); err != nil {
			if err != os.EOF {
				expLog("error decoding client header:", err)
			}
			break
		}
		switch hdr.PayloadType {
		case payRequest:
			*req = request{}
			if err := client.decode(reqValue); err != nil {
				expLog("error decoding client request:", err)
				break
			}
			if req.Size < 1 {
				panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
			}
			switch req.Dir {
			case Recv:
				// look up channel before calling serveRecv to
				// avoid a lock around client.chans.
				if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
					go client.serveRecv(nch, *hdr, req.Count)
				}
			case Send:
				client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
				// The actual sends will have payload type payData.
				// TODO: manage the count?
			default:
				error.Error = "request: can't handle channel direction"
				expLog(error.Error, req.Dir)
				client.encode(hdr, payError, error)
			}
		case payData:
			client.serveSend(*hdr)
		case payClosed:
			client.serveClosed(*hdr)
		case payAck:
			client.mu.Lock()
			if client.ackNum != hdr.SeqNum-1 {
				// Since the sequence number is incremented and the message is sent
				// in a single instance of locking client.mu, the messages are guaranteed
				// to be sent in order.  Therefore receipt of acknowledgement N means
				// all messages <=N have been seen by the recipient.  We check anyway.
				expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
			}
			if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. 
				client.ackNum = hdr.SeqNum
			}
			client.mu.Unlock()
		case payAckSend:
			if nch := client.getChan(hdr, Send); nch != nil {
				nch.acked()
			}
		default:
			log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
		}
	}
	client.exp.delClient(client)
}

// Send all the data on a single channel to a client asking for a Recv.
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
	for {
		val, ok := nch.recv()
		if !ok {
			if err := client.encode(&hdr, payClosed, nil); err != nil {
				expLog("error encoding server closed message:", err)
			}
			break
		}
		// We hold the lock during transmission to guarantee messages are
		// sent in sequence number order.  Also, we increment first so the
		// value of client.SeqNum is the value of the highest used sequence
		// number, not one beyond.
		client.mu.Lock()
		client.seqNum++
		hdr.SeqNum = client.seqNum
		client.seqLock.Lock() // guarantee ordering of messages
		client.mu.Unlock()
		err := client.encode(&hdr, payData, val.Interface())
		client.seqLock.Unlock()
		if err != nil {
			expLog("error encoding client response:", err)
			client.sendError(&hdr, err.String())
			break
		}
		// Negative count means run forever.
		if count >= 0 {
			if count--; count <= 0 {
				break
			}
		}
	}
}

// Receive and deliver locally one item from a client asking for a Send
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveSend(hdr header) {
	nch := client.getChan(&hdr, Recv)
	if nch == nil {
		return
	}
	// Create a new value for each received item.
	val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem())
	if err := client.decode(val); err != nil {
		expLog("value decode:", err, "; type ", nch.ch.Type())
		return
	}
	nch.send(val)
}

// Report that client has closed the channel that is sending to us.
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveClosed(hdr header) {
	nch := client.getChan(&hdr, Recv)
	if nch == nil {
		return
	}
	nch.close()
}

func (client *expClient) unackedCount() int64 {
	client.mu.Lock()
	n := client.seqNum - client.ackNum
	client.mu.Unlock()
	return n
}

func (client *expClient) seq() int64 {
	client.mu.Lock()
	n := client.seqNum
	client.mu.Unlock()
	return n
}

func (client *expClient) ack() int64 {
	client.mu.Lock()
	n := client.seqNum
	client.mu.Unlock()
	return n
}

// Serve waits for incoming connections on the listener
// and serves the Exporter's channels on each.
// It blocks until the listener is closed.
func (exp *Exporter) Serve(listener net.Listener) {
	for {
		conn, err := listener.Accept()
		if err != nil {
			expLog("listen:", err)
			break
		}
		go exp.ServeConn(conn)
	}
}

// ServeConn exports the Exporter's channels on conn.
// It blocks until the connection is terminated.
func (exp *Exporter) ServeConn(conn io.ReadWriter) {
	exp.addClient(conn).run()
}

// NewExporter creates a new Exporter that exports a set of channels.
func NewExporter() *Exporter {
	e := &Exporter{
		clientSet: &clientSet{
			names:   make(map[string]*chanDir),
			clients: make(map[unackedCounter]bool),
		},
	}
	return e
}

// ListenAndServe exports the exporter's channels through the
// given network and local address defined as in net.Listen.
func (exp *Exporter) ListenAndServe(network, localaddr string) os.Error {
	listener, err := net.Listen(network, localaddr)
	if err != nil {
		return err
	}
	go exp.Serve(listener)
	return nil
}

// addClient creates a new expClient and records its existence
func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
	client := newClient(exp, conn)
	exp.mu.Lock()
	exp.clients[client] = true
	exp.mu.Unlock()
	return client
}

// delClient forgets the client existed
func (exp *Exporter) delClient(client *expClient) {
	exp.mu.Lock()
	exp.clients[client] = false, false
	exp.mu.Unlock()
}

// Drain waits until all messages sent from this exporter/importer, including
// those not yet sent to any client and possibly including those sent while
// Drain was executing, have been received by the importer.  In short, it
// waits until all the exporter's messages have been received by a client.
// If the timeout (measured in nanoseconds) is positive and Drain takes
// longer than that to complete, an error is returned.
func (exp *Exporter) Drain(timeout int64) os.Error {
	// This wrapper function is here so the method's comment will appear in godoc.
	return exp.clientSet.drain(timeout)
}

// Sync waits until all clients of the exporter have received the messages
// that were sent at the time Sync was invoked.  Unlike Drain, it does not
// wait for messages sent while it is running or messages that have not been
// dispatched to any client.  If the timeout (measured in nanoseconds) is
// positive and Sync takes longer than that to complete, an error is
// returned.
func (exp *Exporter) Sync(timeout int64) os.Error {
	// This wrapper function is here so the method's comment will appear in godoc.
	return exp.clientSet.sync(timeout)
}

func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) {
	chanType, ok := reflect.Typeof(chT).(*reflect.ChanType)
	if !ok {
		return nil, os.ErrorString("not a channel")
	}
	if dir != Send && dir != Recv {
		return nil, os.ErrorString("unknown channel direction")
	}
	switch chanType.Dir() {
	case reflect.BothDir:
	case reflect.SendDir:
		if dir != Recv {
			return nil, os.ErrorString("to import/export with Send, must provide <-chan")
		}
	case reflect.RecvDir:
		if dir != Send {
			return nil, os.ErrorString("to import/export with Recv, must provide chan<-")
		}
	}
	return reflect.NewValue(chT).(*reflect.ChanValue), nil
}

// Export exports a channel of a given type and specified direction.  The
// channel to be exported is provided in the call and may be of arbitrary
// channel type.
// Despite the literal signature, the effective signature is
//	Export(name string, chT chan T, dir Dir)
func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
	ch, err := checkChan(chT, dir)
	if err != nil {
		return err
	}
	exp.mu.Lock()
	defer exp.mu.Unlock()
	_, present := exp.names[name]
	if present {
		return os.ErrorString("channel name already being exported:" + name)
	}
	exp.names[name] = &chanDir{ch, dir}
	return nil
}

// Hangup disassociates the named channel from the Exporter and closes
// the channel.  Messages in flight for the channel may be dropped.
func (exp *Exporter) Hangup(name string) os.Error {
	exp.mu.Lock()
	chDir, ok := exp.names[name]
	if ok {
		exp.names[name] = nil, false
	}
	// TODO drop all instances of channel from client sets
	exp.mu.Unlock()
	if !ok {
		return os.ErrorString("netchan export: hangup: no such channel: " + name)
	}
	chDir.ch.Close()
	return nil
}