Programming with distributed functional futures

Posted on September 7, 2015

The dataflow model of computation is a programming language abstraction exploited for pipelined parallelism over infinite streams, and task parallel execution over data-independent tasks. There are numerous Haskell libraries for dataflow-style programming, some that focus on stream processing and others that focus on task parallel scheduling on (shared memory) multicore CPUs and on (distributed memory) clusters and HPCs. Here are a few examples:

Dataflow values
Memory Finite & single assignmentStreams
Shared monad-par, lvish conduit, pipes, iteratee
Distributed CloudHaskell-Platform, HdpH

Dataflow task parallelism with futures

This post concentrates on dataflow task parallelism with Haskell and Erlang. The Haskell below uses the monad-par library to construct the dataflow task graph on the right, executed in parallel with runPar:

import Control.Monad.Par

f   :: Par Int
g,h :: Int -> Par Int
j,k :: Int -> Int -> Par Int

main = print $ runPar $ do
         a <- spawn f
         b <- spawn (get a >>= g)
         c <- spawn (get a >>= h)
         d <- spawn (get b >>= \x -> get c >>= \y -> j x y)
         e <- spawn (get b >>= \x -> get d >>= \y -> k x y)
         get e

This simple parallel programming abstraction can be thought of as synchronous dataflow using rendevouz futures (R. Halstead, 1985). A future can be thought of as placeholder for a value that is set to contain a real value once that value becomes known, by evaluating its corresponding spawned task.

Now to compare some functional libraries with futures using three properties to measure them. The contenders are monad-par (Marlow et al, 2011), Erlang Remote Procedure Call (RPC) (Cesarini, 2009), the CloudHaskell Platform (CH-P) (on github), HdpH (Maier, 2014) and HdpH-RS (Stewart, 2015). Those properties are: does the runtime system...

  1. balances load between processors?
  2. support distributed memory deployment?
  3. is the scheduler resilient to failure?
Library Load Balancing Fault tolerant Distributed memory
monad-par n/a
Erlang RPC (✔)
CH-P (✔)

The monad-par library is designed for shared-memory execution with GHC on one node, and load balancing between processor cores. Erlang's RPC library supports multiple node distribution, but the Erlang VM does not support automatic load balancing. Fault tolerance only works if spawned processes are explicitly linked or monitored, hence (✔), leaving programmatic recovery to the user. CH-P is unsurprisingly the same as Erlang RPC, but does require additional Template Haskell code for explicit closure creation. HdpH and HdpH-RS are designed for distributed memory, and load balancing is done with hierarchical work stealing --- between sparkpools across nodes and between threadpools across cores on each node.

Creating futures

The following table shows the primitives for creating futures.

Placement monad-par HdpH(-RS) Erlang CH-P
Local only spawn spawn async_call/4 async
Lazily disitributed spawn
Eagerly distributed spawnAt async_call/4 asyncSTM

The monad-par and HdpH(-RS) spawn function returns an empty IStructure, called an IVar which represents an empty future, to be filled once the execution of the spawn task is complete, and spawnAt in HdpH(-RS) supports explicit node placement. The Erlang RPC function rpc:async_call/4 eagerly places future tasks, returning a key. The functions rpc:yield/1, rpc:nb_yield/1 and rpc:nb_yield/2 are used to read values from keys.

Synchronising with futures

Once futures are created ascynhronously, there needs to be some way of making a synchronous blocking request to get data from a future once it has been filled.

Operation monad-par HdpH(-RS) Erlang CH-P
Blocking read get get yield/1 wait
Non-blocking read tryGet nb_yield/1 poll
Timeout read nb_yield/2 waitTimeout
Check fullness probe check

In monad-par and HdpH(-RS), a blocking wait on a future is get. In Erlang RPC it is rpc:yield/4, and in CH-P it is wait. The non-blocking version in HdpH and HdpH-RS is tryGet, in Erlang RPC it is rpc:nb_yield/4 and in CH-P it is waitTimeout.

Dataflow by example: Fibonacci

This is Fibonacci with the RPC library in Erlang using rpc:async_call.


%% Compute Fibonacci with futures
fib(0) -> 0;
fib(1) -> 1;
fib(X) ->
  Key = rpc:async_call(random_node(),fib_rpc,fib,[X-1]),
  Y = fib(X-2),
  Z = rpc:yield(Key),
  Y + Z.

%% Select random node (maybe our own)
random_node() ->
  I = random:uniform(length(nodes()) + 1),
  Nodes = nodes() ++ [node()],

This is Fibonacci with the monad-par library in Haskell using spawn.

fib :: Int -> Par Int
fib x | x < 2 = return x
fib x = do
  v <- spawn (fib (x-1))
  y <- fib (x-2)
  x <- get v
  return (x+y)

This is Fibonacci with the HdpH(-RS) library in Haskell using spawn.

fib :: Int -> Par Integer
fib x | x < 2 = return x
fib x = do
  v <- spawn $(mkClosure [| fibRemote (x) |])
  y <- fib (x-2)
  clo_x <- get v
  force $ unClosure clo_x + y

fibRemote :: Int -> Par (Closure Integer)
fibRemote n = fib (n-1) >>= force . toClosure

This is Fibonacci CH-P library in Haskell using asyncSTM.

remotableDecl [
  [d| fib :: ([NodeId],Int) -> Process Integer ;
      fib (_,0) = return 0
      fib (_,1) = return 1
      fib (nids,n) = do
        node <- liftIO (randomIO >>= \ix -> return (nids !! (ix `mod` length nids)))
        let tsk = remoteTask ($(functionTDict 'fib)) node ($(mkClosure 'fib) (nids,n-2))
        future <- asyncSTM tsk
        y <- fib (nids,n-1)
        (AsyncDone z) <- wait future
        return $ y + z


Dataflow can be used for pipelined parallelism, which is often exploited on fine grained concurrent embedded hardware such as FPGAs. The dataflow programming style can also be used for task parallel scheduling on CPUs. There are a number of Haskell libraries for shared-memory parallel dataflow programming, including monad-par for deterministic and lvish for quasi-deterministic parallelism. There are others that support distributed-memory parallel dataflow programming, including the HdpH DSLs, and the Cloud Haskell Platform library, which closely resembles the Erlang Remote Procedure Call functionality. They don't share a common notation for creation of and operations on futures, and this post gives their correspondence.

If the reader is still interested, more detail is given in Appendix 2 of my thesis, from page 203.