Links

Links are built on top of Channels of Julia. They are used as communication primitives for Tasks of Julia. A Link basically includes a Channel and a Buffer. The mode of the buffer is Cyclic.(see Buffer Modes for information on buffer modes). Every item sent through a Link is sent through the channel of the Link and written to the Buffer so that all the data flowing through a Link is recorded.

The construction of a Link is very simple: just specify its buffer length and element type.

Jusdl.Connections.LinkType
Link(dtype::Type{T}, ln::Int=64) where {T}

Constructs a Link with element type T and buffer length ln. The buffer element type is T and mode is Cyclic.

Link(ln::Int=64)

Constructs a Link with element type Float64 and buffer length ln. The buffer element type is Float64 and mode is Cyclic.

Example

julia> l = Link(Int, 5)
Link(state:open, eltype:Int64, hasmaster:false, numslaves:0, isreadable:false, iswritable:false)

julia> l = Link(Bool)
Link(state:open, eltype:Bool, hasmaster:false, numslaves:0, isreadable:false, iswritable:false)
source

Links can be connected to each other so that data can flow from one link to another. The flows from link l1 to l2, then l1 is said to drive l2 and l1 is called as master and l2 is called as slave. A Link can have more than one slave but can have just one master. When a Link is initialized, it has no master and slaves.

Jusdl.Connections.connectFunction
connect(master::Link, slave::Link)

Connects master to slave. When connected, any element that is put into master is also put into slave.

connect(master::AbstractVector{<:Link}, slave::AbstractVector{<:Link})

Connects each link in master to each link in slave one by one.

Example

julia> l1, l2 = Link(), Link();

julia> connect(l1, l2)

julia> l2.master[] == l1
true
source

Similarly Links can be disconnected.

Warning

Note that the order of arguments is important when the links are connected. connect(l1, l2) connects l1 and l2 such that l1 drives l2, i.e., data flows from l1 to l2. In other words, l1 is the master link and l2 is the slave link. However, the order of arguments is not important when the links are disconnected. disconnect(l1, l2) does the same thing with disconnect(l2, l1), i.e., it justs breaks the connection between l2 and l1.

The data can be read from and written into Links if active tasks are bound to them. Links can be thought of like a pipe. In order to write data to a Link from one of its ends, a task that reads written data from the other end must be bounded to the Link. Similarly, in order to read data from one of the Link from one of its end, a task that writes the read data must be bound to the Link. Reading from and writing to Link is carried out with take! and put! functions. For more clarity, let us see some examples.

Let us first construct a Link,


julia> l = Link(5)
Link(state:open, eltype:Float64, hasmaster:false, numslaves:0, isreadable:false, iswritable:false)

l is a Link with a buffer length of 5 and element type of Float64. Not that the l is open, but it is not ready for data reading or writing. To write data, we must bound a task that reads the written data.

julia> function reader(link::Link)  # Define job.
           while true
               val = take!(link)
               val === NaN && break  # Poison-pill the tasks to terminate safely.
           end
       end
reader (generic function with 1 method)

julia> t = @async reader(l)
Task (runnable) @0x00007f70295ba710

The reader is defined such that the data written from one end of l is read until the data is NaN. Now, we have runnable a task t. This means the l is ready for data writing.

julia> put!(l, 1.)
1.0

julia> put!(l, 2.)
2.0

To terminate the task, we must write NaN to l.

julia> put!(l, NaN)  # Terminate the task
NaN

julia> t   # Show that the `t` is terminated.
Task (done) @0x00007f70295ba710

Note that the data flown through the l is written to its buffer.

julia> l.buffer.data
5-element Array{Float64,1}:
   1.0
   2.0
 NaN
   0.0
   0.0

Whenever the bound task to the l is runnable, the data can be written to l. That is, the data length that can be written to l is not limited by the buffer length of l. But, beware that the buffer of Links is Cyclic. That means, when the buffer is full, its data is overwritten.

julia> l = Link(5)
Link(state:open, eltype:Float64, hasmaster:false, numslaves:0, isreadable:false, iswritable:false)

julia> t = @async reader(l)
Task (runnable) @0x00007f70295c7d00

julia> for item in 1. : 10.
           put!(l, item)
           @show l.buffer.data
       end
l.buffer.data = [1.0, 0.0, 0.0, 0.0, 0.0]
l.buffer.data = [1.0, 2.0, 0.0, 0.0, 0.0]
l.buffer.data = [1.0, 2.0, 3.0, 0.0, 0.0]
l.buffer.data = [1.0, 2.0, 3.0, 4.0, 0.0]
l.buffer.data = [1.0, 2.0, 3.0, 4.0, 5.0]
l.buffer.data = [6.0, 2.0, 3.0, 4.0, 5.0]
l.buffer.data = [6.0, 7.0, 3.0, 4.0, 5.0]
l.buffer.data = [6.0, 7.0, 8.0, 4.0, 5.0]
l.buffer.data = [6.0, 7.0, 8.0, 9.0, 5.0]
l.buffer.data = [6.0, 7.0, 8.0, 9.0, 10.0]

The case is very similar to read data from l. Again a runnable task is bound the l


julia> l = Link(5)
Link(state:open, eltype:Float64, hasmaster:false, numslaves:0, isreadable:false, iswritable:false)

julia> function writer(link::Link, vals)
           for val in vals
               put!(link, val)
           end
       end
writer (generic function with 1 method)

julia> t = @async writer(l, 1.:5.)
Task (runnable) @0x00007f70295e1120

julia> bind(l, t)
Channel{Float64}(sz_max:0,sz_curr:1)

julia> take!(l)
1.0

julia> take!(l)
2.0

It is possible to read data from l until t is active. To read all the data at once, collect can be used.

julia> t
Task (runnable) @0x00007f70295e1120

julia> collect(l)
3-element Array{Float64,1}:
 3.0
 4.0
 5.0

julia> t  # Show that `t` is terminated.
Task (done) @0x00007f70295e1120

Full API

Base.put!Method
put!(link::Link, val)

Puts val to link. val is handed over to the channel of link. val is also written in to the buffer of link.

Warning

link must be writable to put val. That is, a runnable task that takes items from the link must be bounded to link.

Example

julia> l = Link();

julia> t  = @async while true 
       item = take!(l)
       item === NaN && break 
       println("Took " * string(item))
       end;

julia> bind(l, t);

julia> put!(l, 1.)
Took 1.0
1.0

julia> put!(l, 2.)
Took 2.0
2.0

julia> put!(l, NaN)
NaN
source
Base.take!Method
take!(link::Link)

Take an element from link.

Warning

link must be readable to take value. That is, a runnable task that puts items from the link must be bounded to link.

Example

julia> l = Link(5);

julia> t = @async for item in 1. : 5.
       put!(l, item)
       end;

julia> bind(l, t);

julia> take!(l)
1.0

julia> take!(l)
2.0
source
Base.isopenMethod
isopen(link::Link)

Returns true if link is open. A link is open if its channel is open.

source
Base.isreadableMethod
isreadable(link::Link)

Returns true if link is readable. When link is readable, data can be read from link with take function.

source
Base.iswritableMethod
writable(link::Link)

Returns true if link is writable. When link is writable, data can be written into link with put function.

source
Missing docstring.

Missing docstring for isfull(link::Link). Check Documenter's build log for details.

Jusdl.Connections.findflowMethod
findflow(link1::Link, link2::Link)

Returns a tuple of (masterlink, slavelink) where masterlink is the link that drives the other and slavelink is the link that is driven by the other.

Example

julia> ls = [Link() for i = 1 : 2];

julia> connect(ls[1], ls[2])

julia> findflow(ls[2], ls[1]) .== (ls[1], ls[2])
(true, true)
source
Jusdl.Connections.insertMethod
insert(master::Link, slave::Link, new::Link)

Inserts the new link between the master link and slave link. The master is connected to new, and new is connected to slave.

Example

julia> ls = [Link() for i = 1 : 3];  

julia> connect(ls[1], ls[2]) 

julia> insert(ls[1], ls[2], ls[3])

julia> isconnected(ls[1], ls[2])
false

julia> isconnected(ls[1], ls[3]) && isconnected(ls[3], ls[2])
true
source
Jusdl.Connections.releaseMethod
release(link::Link)

Release all the slave links of link. That is, all the slave links of link is disconnected.

Example

julia> ls = [Link() for i = 1 : 5];

julia> foreach(l -> connect(ls[1], l), ls[2:5])

julia> map(l -> isconnected(ls[1], l), ls[2:5])
4-element Array{Bool,1}:
 1
 1
 1
 1

julia> release(ls[1])  # Release all the slaves.

julia> map(l -> isconnected(ls[1], l), ls[2:5])
4-element Array{Bool,1}:
 0
 0
 0
 0
source
Base.bindMethod
bind(link::Link, task::Task)

Binds task to link. When task is done link is closed.

source
Base.collectMethod
collect(link::Link)

Collects all the available data on the link.

Warning

To collect all available data from link, a task must be bounded to it.

Example

julia> l = Link();  # Construct a link.

julia> t = @async for item in 1 : 5  # Construct a task
       put!(l, item)
       end;

julia> bind(l, t);  # Bind it to the link.

julia> take!(l)  # Take element from link.
1.0

julia> take!(l)  # Take again ...
2.0

julia> collect(l)  # Collect remaining data.
3-element Array{Float64,1}:
 3.0
 4.0
 5.0
source
Jusdl.Connections.launchMethod
launch(link::Link)

Constructs a taker task and binds it to link. The taker task reads the data and prints an info message until missing is read from the link.

source
Jusdl.Connections.launchMethod
launch(link:Link, valrange)

Constructs a putter task and binds it to link. putter tasks puts the data in valrange.

source