Skip to content

Introducing core.asyncλ︎

TODO::work in progress, sorryλ︎

Pull requests are welcome

The core.async library provides a way to do concurrent programming using channels (eg. queues).

It minimises the need to use complex concurrent constructs and worry less about thread management.

core.async is written in Clojure and can be used with both Clojure and ClojureScript.

Hint:: core.async resourcesλ︎


Hint::Communicating Sequential Processesλ︎

Communicating Sequential Processes (CSP) is a formalism for describing concurrent systems pioneered by C. A. R. Hoare in 1978. It is a concurrency model based on message passing and synchronization through channels

core.async on ClojureScriptλ︎

core.async is very widely used within ClojureScript applications and many libraries are built on top of it.

It’s a good example of the syntactic abstractions that can be achieved by transforming code with ClojureScript macros.

JavaScript is single-threaded so you do not get the benefit of safe communication between threads, as there is only one.

Conceptsλ︎

Concepts - channel, put, take

Channelsλ︎

A channel is a queue with one or more publishers and one or more consumers. Producers put data onto the queue, consumers take data from the queue.

As data in Clojure is immutable, channels provide a safe way to communicate between threads.

Chanel sizeλ︎

Channels do not include a buffer by default, they use a producer (put!) and consumer (take!) to transfer a value through the channel. A maximum of 1024 put! functions can be queued onto a single channel.

Specify a channel using (chan) or a channel with a fixed buffer using (chan 12).

Processesλ︎

Processes are independently running pieces of code that use channels for communication and coordination.

Calling put! and take! inside a process will stop that process until the operation completes.

Processes are launched using the go macro and puts and takes use the <! and >! placeholders. The go macro rewrites your code to use callbacks but inside go everything looks like synchronous code, which makes understanding it straightforward:

In ClojureScript, stopping a process doesn’t block the single-threaded JavaScript environment, instead, the process will be resumed at a later time when it is no longer blocked.

Important functionsλ︎

chanλ︎

The chan function creates a new channel.

You can give a name to a channel using the def function, eg. (def my-channel (chan))

A single channel can take a maximum of 1024 put requests. Once it has reached the maximum, then it is considered full.

A buffer of a fixed size can be specified when defining a new channel: (def channel-with-fixed-buffer (chan 12)). This buffer increases the number of puts that can be sent to the channel. A dropping or sliding buffer can be used to discard messages added when the buffer is full.

A channel can also include a transducer, to manipulate the value on the channel eg. adding a timestamp (chan 32 (map (Date. now))) eg. filtering messages (chan 12 (map even?))

Channels can be explicitly closed using (close channel-name) or by adding a timeout that closes the channel after the specified number of milliseconds (timeout 6000).

put!λ︎

The put! function puts a value (message) on the channel.

You can put messages on the channel even if nothing is listening (no waiting take! functions).

Evaluating put! will always add a message on to the channel as long as the channel is open and not full.

take!λ︎

The take! function will take a single message from the queue.

If there are no messages on the queue when you evaluate take!, then the function will wait to execute as soon as something is put on the channel

The take! function needs an argument that is the channel and a function that will receive any message taken from a channel.

timeλ︎

This is a macro that executes an expression and prints the time it takes

Criterium is an excellent library for performance testing your code

goλ︎

Referred to as a go block, the go function creates a lightweight process, not bound to threads. Thousands of go blocks can be created efficiently and they can all have their own channel.

blocking and non-blockingλ︎

core.async offers two ways to write to and read from channels: blocking and non-blocking. A blocking write blocks the thread until the channel has space to be written to (the buffer size of a channel is configurable), a blocking read blocks a thread until a value becomes available on the queue to be read.

More interesting, and the only type supported in ClojureScript, are asynchronous channel reads and writes to channels, which are only allowed in "go blocks". Go blocks are written in a synchronous style, and internally converted to a state machine that executes them asynchronously.

Consider the following core.async-based code:

(let [ch (chan)]
  (go (while true
        (let [v (<! ch)]
          (println "Read: " v))))
  (go (>! ch "hi")
      (<! (timeout 5000))
      (>! ch "there")))

In this example, let introduces a new local variable ch, which is a new channel. Within the let's scope two go blocks are defined, the first is an eternal loop that reads (<!) a new value from channel ch into variable v. It then prints "Read: " followed by the read value to the standard out. The second go block writes (>!) two values to channel ch: "hi", it then waits 5 seconds and then writes "there" to the channel. Waiting for 5 seconds is implemented by reading from a timeout channel, which is a channel that closes itself (returns nil) after a set timeout. When running this code in the Clojure REPL (for instance), it will return instantly. It will then print "Read: hi", and 5 seconds later it will print "Read: there".

Hint::λ︎

In JavaScript you cannot do blocking loops like this: the browser will freeze up for 5 seconds. The "magic" of core.async is that internally it converts the body of each go block into a state machine and turns the synchronous-looking channel reads and writes into asynchronous calls.