Introduction
The select
statement is a key component in building concurrent Crystal applications based on channels and fibers. It gives us the ability to wait on multiple channel operations and act as soon as one of them completes.
In this article, we’ll look into the main variants of the construct and discuss 5 use cases.
I’ll assume you’re familiar with the basics of Communicating Sequential Processes. If you need a primer or a refresher, you can check out this presentation I recorded a while ago, or read through the official Crystal concurrency guide.
Sending and receiving on multiple channels
You can use the select
statement to process the first value coming from two or more channels. If we wrap the select
in a loop, then we can use this to gracefully terminate a fiber consuming values from a channel.
#1. Graceful termination
In the snippet below, the echo fiber loops over values coming from two channels: values
and terminate
.
|
|
Notice the trailing question mark on terminate.receive?
(line 6).
This will capture the closure of the channel as a nil
value.
Whether the terminate
channel is closed or receives a value, the loop will break, and the fiber will terminate.
A concurrent fiber can politely ask for echo to terminate at any time with
terminate.close
Tip. Channel#close
is non-blocking, meaning that the invoking fiber won’t wait for the channel closure to be acknowledged by any receiving fiber. If you need a guarantee that the receiving fiber has completed its work, then you need to define a join point between the two fibers. In the example below, the echo fiber notifies main of its termination by closing a done
channel both fibers have a reference to (see Figure 1).
spawn(name: "echo") do
loop do
select
when v = values.receive
puts v
when terminate.receive?
break
end
end
done.close
end
# main fiber
terminate.close
done.receive?
You can check out the full code here and try it for yourself 💻
We’ve seen how we can use select
to receive on multiple channels. Now let’s make a case for mixing send
and receive
in a select
block.
#2. Retrieving values from a stateful fiber
In the example below - full code here - we run a stateful fiber that aggregates values coming from a channel. Thanks to the select
statement, the fiber supports sending the current cumulative sum to a concurrent fiber over the result
channel, on demand.
spawn(name: "sum calculator") do
sum = 0.0
loop do
select
when v = values.receive
sum += v
when result.send sum
end
end
end
All it takes for another fiber to query the current aggregated value is the following.
sum = result.receive
Challenge. Can you combine the two approaches shown above to gracefully terminate a stateful fiber?
Receiving and sending with timeout
Since Crystal 0.33, select
supports a timeout action. This allows us to run custom logic when a timeout is triggered.
A recurring use case is the one where you’d like to retrieve some fresh data from a third-party API, but you’d rather fall back to some cached data if the call takes too long. In other words, you’re happy to sacrifice data freshness for user experience.
#3. Timeout on async calls
Imagine you want to give users the latest available stock market information by calling a third-party API, provided that the API response comes in a timely fashion.
Here is a function simulating an asynchronous call to an external API.
def get_stock_price_async(sym : Symbol) : Channel(Float64)
Channel(Float64).new.tap { |ch|
spawn do
sleep rand
ch.send(rand)
end
}
end
And here is the corresponding synchronous function with timeout.
def get_stock_price(sym : Symbol, max_wait : Time::Span)
select
when v = get_stock_price_async(sym).receive
Cache[sym] = v
when timeout max_wait
Cache[sym]?
end
end
Now our users won’t ever have to wait longer than the set time span - unless we have a cache-miss, but let’s face it, they would have dropped off anyway.
get_stock_price(:tsla, 0.5.seconds)
You can play with the code above here 📉
Timeouts are also really useful when dealing with back-pressure in a pipeline, which leads us to…
#4. Easing back-pressure
When sending values downstream in a data pipeline, a stage might block on the send
operation for a non-negligible amount of time. This is an unequivocal symptom of the fact that the downstream stages are struggling with the work load. Ignoring this could be fatal: our entire pipeline could grind to a halt, leaving us no other option than a restart. Timeouts give us the opportunity to act upon such - hopefully transient - data congestions.
In the word_size
stage below, we assume that it’s OK to drop messages whenever the downstream stage is taking too long to accept the value being sent. This should keep the pipeline responsive at all time and let it self-heal as soon as the work load becomes manageable again.
def word_size(input : Channel(String), max_wait : Time::Span) : Channel(String)
Channel(String).new.tap { |ch|
spawn(name: "word_size") do
loop do
word = input.receive
select
when ch.send word.size
when timeout max_wait
log "downstream channel full. Dropping #{word}."
end
end
end
}
end
To see this code in action, we’ll start the pipeline…
random_word = -> { "a" * rand(10) }
output = word_size(producer("word_gen", &random_word), 0.7.seconds)
… And simulate a slow consumer.
loop do
sleep 3 * rand
size = output.receive
log "received word size #{size}"
end
You can find a working version of the code here. The output will look similar to the following.
word_size: successfully processed aaaaaaa
main: received word size 7
word_size: timed out while processing aaa
main: received word size 4
word_size: successfully processed aaaa
word_size: timed out while processing aaaaa
Tip. Notice how it is not guaranteed that word_size will log before main. This is a good reminder that, when evaluating the correctness of your concurrent code, you should make no assumptions on the order in which fibers will run.
The examples we’ve seen so far have one thing in common: the select
clauses are blocking for the calling fiber.
This is often desirable, but there are cases where you are fine with the receive / send action being suppressed if there is no room left in the target channel. Enter the else
clause.
Non-blocking channel operations
Adding an else
clause to a select
lets us define some behaviour to be triggered whenever every other clause is blocked. This is often seen in loop / select blocks, when it makes sense for the fiber to run some background job - or chunks of it - while waiting for other clauses to unblock.
Another use case for the else
clause has to do with non-essential / optional channel operations.
#5. Heartbeats and diagnostic messages
A technique to monitor fibers' health status is to have fibers sending heartbeat
messages to a monitoring fiber via a dedicated channel. Such messages can give us insight into how our system is functioning, but are not essential for its execution - just like logging and tracing. Let’s see how an else
clause can express this semantics.
Suppose we want a worker fiber to send a heartbeat to a global Diagnostic
channel - with capacity zero, for simplicity - when it gets spawned.
spawn(name: "worker") do
select
when Diagnostic.send Heartbeat.new
else
log "skipped a heartbeat"
end
end
Thanks to the else
clause, we’ve achieved the following behaviour.
- If the monitoring fiber is enabled and ready to receive, then the heartbeat will be sent.
- If the monitoring fiber is enabled but busy, then the heartbeat will be dropped.
- If the monitoring fiber is disabled, then the heartbeat will be dropped.
The invariant is: the worker fiber will not block to perform a non-essential operation.
You can tweak the code and experiment with different scenarios here.
Challenge. Does it make sense to have both an else
clause and a timeout
clause in a select
block?
Things we didn’t talk about
- As of the latest Crystal version (0.34.0), the clause order in a
select
statement matters. In particular, clauses at the top take precedence over the following ones. This is in contrast to the behaviour of the select statement in golang, where a “uniform pseudo-random selection” takes place on the ready-to-proceed clauses. - If all you need is a way to select over a set of
receive
operations, thenChannel.receive_first
might be the right tool for the job - see docs. - A
select
block does not handle channel’s closure for you. AChannel::ClosedError
exception will be raised if a receive / send action is invoked on a closed channel.
This concludes our tour of Crystal’s select
use cases.
If you’re hungry for more, you can go through these golang examples - courtesy of gobyexample.com - and translate them to Crystal 📚
Thanks for reading, I hope you found this useful. You can share your experiences with fibers and channels in the comments section below.
If you’d like to stay in touch, then subscribe or follow me on Twitter.