class Channel(T)

Overview

A Channel enables concurrent communication between fibers.

They allow communicating data between fibers without sharing memory and without having to worry about locks, semaphores or other special structures.

channel = Channel(Int32).new

spawn do
  channel.send(0)
  channel.send(1)
end

channel.receive # => 0
channel.receive # => 1

NOTE Although a Channel(Nil) or any other nilable types like Channel(Int32?) are valid they are discouraged since from certain methods or constructs it receiving a nil as data will be indistinguishable from a closed channel.

Defined in:

concur/channel.cr

Instance Method Summary

Instance Method Detail

def batch(size : Int32, interval : Time::Span, name = nil, buffer_size = 0) : Channel(Enumerable(T)) #

Returns a channel that receives values in batches either every size values received or every interval, if a batch has not been sent within the last interval.

The returned channels will close once self is closed and empty.


[View source]
def broadcast(out_ports = 2, name = nil, buffer_size = 0) : Array(Channel(T)) #

Returns an array of out_ports channels which receive each value received by self.

The returned channels will close once self is closed and empty.

If one of the returned channels is closed, then #broadcast will close every other channel.

NOTE The rate at which values are received by each channel is limited by the slowest consumer. Values are sent to channels in the order they were returned.


[View source]
def each(name = nil, &block : T -> _) #

Runs the given block for each value received from self. The execution of the block takes place on a separate fiber.

The fiber running the block will stop once self is closed and empty.

NOTE If exceptions are not handled within block, then any exception raised within the block will crash the spawned fiber.


[View source]
def flat_map(workers = 1, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &block : T -> Enumerable(V)) : Channel(V) forall V #

Returns a channel that receives each value of the enumerables produced applying block to values received by self.

A workers parameter can be supplied to make the computation of the block concurrent. Note that, for workers > 1, no order guarantees are made - see #map for an example.

The returned channel is closed once self is closed and empty, and all the outstanding runs of block are completed.

See #map for details on the exception handling strategy.


[View source]
def listen(&block : T -> _) #

Receives values from self and processes them via block.

If no exceptions are raised while evaluating block, then the statement returns once self is closed and empty.

NOTE This method runs on the current fiber.

NOTE If exceptions are not handled within block, then any exception raised within the block will crash the calling fiber.


[View source]
def map(initial_state : S, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &block : S, T -> Tuple(S, V)) forall S, V #

Returns a channel that receives values from self transformed via block and based on the provided initial_state.

NOTE block is a function that takes the current state and a value received from self and returns a tuple composed of the next state to be passed to the block and the next value to be received by the returned channel.

The returned channel is closed once self is closed and empty, and any outstanding run of block is completed.

See #map for details on the exception handling strategy.


[View source]
def map(workers = 1, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &block : T -> V) : Channel(V) forall V #

Returns a channel that receives values from self transformed via block.

A workers parameter can be supplied to make the computation of the block concurrent. Note that, for workers > 1, no order guarantees are made.

The returned channel will close once self is closed and empty, and all the outstanding runs of block are completed.

Any exception raised while evaluating block will be passed to the optional callback on_error, together with the value that triggered the error.

Example:

source([1,2,3])
  .map(workers: 2) { |v| sleep rand; v**2 } # => [4, 1, 9]

[View source]
def merge(other : Channel(J), name = nil, buffer_size = 0) : Channel(T | J) forall J #

Returns a channel that receives values from both self and other, as soon as they are available.

The returned channel is closed as soon as both the input channels are closed and empty - but will continue to operate while at least one of them is open.

NOTE If both channels have values ready to be received, then one will be selected at random.


[View source]
def partition(name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &predicate : T -> Bool) : Tuple(Channel(T), Channel(T)) #

Returns a tuple of two channels, one receiving values that satisfy the given predicate and one receiving the remaining ones.

The returned channels will close once self is closed and empty.

See #select for details on the exception handling strategy.


[View source]
def rate_limit(items_per_sec : Float64, max_burst : Int32 = 1, name = nil, buffer_size = 0) #

Returns a channel that receives at most items_per_sec items per second from the caller - or max_burst items, if no elements were received in a while.

The returned channel is closed once self is closed and empty and a new value can be received, based on the rate limiting parameters.

Refer to the documentation of RateLimiter for more details.


[View source]
def reject(name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &predicate : T -> Bool) : Channel(T) #

Returns a channel which receives values received by self that do not satisfy the given predicate.

The returned channels will close once self is closed and empty.

See #select for details on the exception handling strategy.


[View source]
def scan(acc : U, name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &block : U, T -> U) : Channel(U) forall U #

Returns a channel that receives each successive accumulated state produced by block, based on the initial state acc and on the values received by self.

The returned channel is closed once self is closed and empty, and any outstanding run of block is completed.

See #map for details on the exception handling strategy.


[View source]
def select(name = nil, buffer_size = 0, on_error = ->(t : T, ex : Exception) do end, &predicate : T -> Bool) : Channel(T) #

Returns a channel which receives values received by self that satisfy the given predicate.

The returned channels will close once self is closed and empty.

Any exception raised while evaluating predicate will be passed to the optional callback on_error, together with the value that triggered the error.


[View source]
def take(max_items : Int32) #

Returns an enumerable of values received from self containing at most max_items elements.

If self is closed in the process, then the returned enumerable might include fewer elements.


[View source]
def zip(other : Channel(U), name = nil, buffer_size = 0, on_error = ->(tu : ::Tuple(T, U), ex : Exception) do end, &block : T, U -> V) : Channel(V) forall U, V #

Returns a channel that receives tuples of values coming from self and other transformed via block.

The returned channel is closed once either self or other are closed and empty, and any outstanding run of block is completed.

See #map for details on the exception handling strategy.


[View source]