class Channel(T)
- Channel(T)
- Reference
- Object
Overview
A Channel
enables concurrent communication between fibers.
They allow communicating data between fibers without sharing memory and without having to worry about locks, semaphores or other special structures.
channel = Channel(Int32).new
spawn do
channel.send(0)
channel.send(1)
end
channel.receive # => 0
channel.receive # => 1
NOTE Although a Channel(Nil)
or any other nilable types like Channel(Int32?)
are valid
they are discouraged since from certain methods or constructs it receiving a nil
as data
will be indistinguishable from a closed channel.
Defined in:
concur/channel.crInstance Method Summary
-
#batch(size : Int32, interval : Time::Span, name = nil, buffer_size = 0) : Channel(Enumerable(T))
Returns a channel that receives values in batches either every size values received or every interval, if a batch has not been sent within the last interval.
-
#broadcast(out_ports = 2, name = nil, buffer_size = 0) : Array(Channel(T))
Returns an array of out_ports channels which receive each value received by
self
. -
#each(name = nil, &block : T -> _)
Runs the given block for each value received from
self
. -
#flat_map(workers = 1, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &block : T -> Enumerable(V)) : Channel(V) forall V
Returns a channel that receives each value of the enumerables produced applying block to values received by
self
. -
#listen(&block : T -> _)
Receives values from
self
and processes them via block. -
#map(initial_state : S, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &block : S, T -> Tuple(S, V)) forall S, V
Returns a channel that receives values from
self
transformed via block and based on the provided initial_state. -
#map(workers = 1, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &block : T -> V) : Channel(V) forall V
Returns a channel that receives values from
self
transformed via block. -
#merge(other : Channel(J), name = nil, buffer_size = 0) : Channel(T | J) forall J
Returns a channel that receives values from both self and other, as soon as they are available.
-
#partition(name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &predicate : T -> Bool) : Tuple(Channel(T), Channel(T))
Returns a tuple of two channels, one receiving values that satisfy the given predicate and one receiving the remaining ones.
-
#rate_limit(items_per_sec : Float64, max_burst : Int32 = 1, name = nil, buffer_size = 0)
Returns a channel that receives at most items_per_sec items per second from the caller - or max_burst items, if no elements were received in a while.
-
#reject(name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &predicate : T -> Bool) : Channel(T)
Returns a channel which receives values received by
self
that do not satisfy the given predicate. -
#scan(acc : U, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &block : U, T -> U) : Channel(U) forall U
Returns a channel that receives each successive accumulated state produced by block, based on the initial state acc and on the values received by
self
. -
#select(name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do
end, &predicate : T -> Bool) : Channel(T)
Returns a channel which receives values received by
self
that satisfy the given predicate. -
#take(max_items : Int32)
Returns an enumerable of values received from
self
containing at most max_items elements. -
#zip(other : Channel(U), name = nil, buffer_size = 0, on_error = ->(tu : ::Tuple(T, U), ex : Exception) do
end, &block : T, U -> V) : Channel(V) forall U, V
Returns a channel that receives tuples of values coming from
self
and other transformed via block.
Instance Method Detail
Returns a channel that receives values in batches either every size values received or every interval, if a batch has not been sent within the last interval.
The returned channels will close once self
is closed and empty.
Returns an array of out_ports channels which receive each value
received by self
.
The returned channels will close once self
is closed and empty.
If one of the returned channels is closed, then #broadcast
will
close every other channel.
NOTE The rate at which values are received by each channel is limited by the slowest consumer. Values are sent to channels in the order they were returned.
Runs the given block for each value received from self
.
The execution of the block takes place on a separate fiber.
The fiber running the block will stop once self
is closed and empty.
NOTE If exceptions are not handled within block, then any exception raised within the block will crash the spawned fiber.
Returns a channel that receives each value of the enumerables produced
applying block to values received by self
.
A workers parameter can be supplied to make the computation of the block concurrent.
Note that, for workers > 1, no order guarantees are made - see #map
for an example.
The returned channel is closed once self
is closed and empty, and all
the outstanding runs of block are completed.
See #map
for details on the exception handling strategy.
Receives values from self
and processes them via block.
If no exceptions are raised while evaluating block, then the
statement returns once self
is closed and empty.
NOTE This method runs on the current fiber.
NOTE If exceptions are not handled within block, then any exception raised within the block will crash the calling fiber.
Returns a channel that receives values from self
transformed via block and based
on the provided initial_state.
NOTE block is a function that takes the current state and a value received
from self
and returns a tuple composed of the next state to be passed to the block
and the next value to be received by the returned channel.
The returned channel is closed once self
is closed and empty, and any outstanding
run of block is completed.
See #map
for details on the exception handling strategy.
Returns a channel that receives values from self
transformed via block.
A workers parameter can be supplied to make the computation of the block concurrent. Note that, for workers > 1, no order guarantees are made.
The returned channel will close once self
is closed and empty, and all
the outstanding runs of block are completed.
Any exception raised while evaluating block will be passed to the optional callback on_error, together with the value that triggered the error.
Example:
source([1,2,3])
.map(workers: 2) { |v| sleep rand; v**2 } # => [4, 1, 9]
Returns a channel that receives values from both self and other, as soon as they are available.
The returned channel is closed as soon as both the input channels are closed and empty - but will continue to operate while at least one of them is open.
NOTE If both channels have values ready to be received, then one will be selected at random.
Returns a tuple of two channels, one receiving values that satisfy the given predicate and one receiving the remaining ones.
The returned channels will close once self
is closed and empty.
See #select
for details on the exception handling strategy.
Returns a channel that receives at most items_per_sec items per second from the caller - or max_burst items, if no elements were received in a while.
The returned channel is closed once self
is closed and empty and a new value
can be received, based on the rate limiting parameters.
Refer to the documentation of RateLimiter for more details.
Returns a channel which receives values received by self
that do not satisfy the given predicate.
The returned channels will close once self
is closed and empty.
See #select
for details on the exception handling strategy.
Returns a channel that receives each successive accumulated state produced by block,
based on the initial state acc and on the values received by self
.
The returned channel is closed once self
is closed and empty, and any outstanding
run of block is completed.
See #map
for details on the exception handling strategy.
Returns a channel which receives values received by self
that satisfy the given predicate.
The returned channels will close once self
is closed and empty.
Any exception raised while evaluating predicate will be passed to the optional callback on_error, together with the value that triggered the error.
Returns an enumerable of values received from self
containing at most max_items elements.
If self
is closed in the process, then the returned enumerable might include fewer elements.
Returns a channel that receives tuples of values coming from self
and other
transformed via block.
The returned channel is closed once either self
or other
are closed and empty,
and any outstanding run of block is completed.
See #map
for details on the exception handling strategy.