The dataflow model of computation is a programming language abstraction exploited for pipelined parallelism over infinite streams, and task parallel execution over data-independent tasks. There are numerous Haskell libraries for dataflow-style programming, some that focus on stream processing and others that focus on task parallel scheduling on (shared memory) multicore CPUs and on (distributed memory) clusters and HPCs. Here are a few examples:
|Memory||Finite & single assignment||Streams|
|Shared||monad-par, lvish||conduit, pipes, iteratee|
This post concentrates on dataflow task parallelism with Haskell and Erlang. The Haskell below uses the monad-par library to construct the dataflow task graph on the right, executed in parallel with
import Control.Monad.Par f :: Par Int g,h :: Int -> Par Int j,k :: Int -> Int -> Par Int main = print $ runPar $ do a <- spawn f b <- spawn (get a >>= g) c <- spawn (get a >>= h) d <- spawn (get b >>= \x -> get c >>= \y -> j x y) e <- spawn (get b >>= \x -> get d >>= \y -> k x y) get e
This simple parallel programming abstraction can be thought of as synchronous dataflow using rendevouz futures (R. Halstead, 1985). A future can be thought of as placeholder for a value that is set to contain a real value once that value becomes known, by evaluating its corresponding spawned task.
Now to compare some functional libraries with futures using three properties to measure them. The contenders are monad-par (Marlow et al, 2011), Erlang Remote Procedure Call (RPC) (Cesarini, 2009), the CloudHaskell Platform (CH-P) (on github), HdpH (Maier, 2014) and HdpH-RS (Stewart, 2015). Those properties are: does the runtime system…
|Library||Load Balancing||Fault tolerant||Distributed memory|
The monad-par library is designed for shared-memory execution with GHC on one node, and load balancing between processor cores. Erlang’s RPC library supports multiple node distribution, but the Erlang VM does not support automatic load balancing. Fault tolerance only works if spawned processes are explicitly linked or monitored, hence (✔), leaving programmatic recovery to the user. CH-P is unsurprisingly the same as Erlang RPC, but does require additional Template Haskell code for explicit closure creation. HdpH and HdpH-RS are designed for distributed memory, and load balancing is done with hierarchical work stealing — between sparkpools across nodes and between threadpools across cores on each node.
The following table shows the primitives for creating futures.
The monad-par and HdpH(-RS)
spawn function returns an empty IStructure, called an
IVar which represents an empty future, to be filled once the execution of the spawn task is complete, and
spawnAt in HdpH(-RS) supports explicit node placement. The Erlang RPC function
rpc:async_call/4 eagerly places future tasks, returning a key. The functions
rpc:nb_yield/2 are used to read values from keys.
Once futures are created ascynhronously, there needs to be some way of making a synchronous blocking request to get data from a future once it has been filled.
In monad-par and HdpH(-RS), a blocking wait on a future is
get. In Erlang RPC it is
rpc:yield/4, and in CH-P it is
wait. The non-blocking version in HdpH and HdpH-RS is
tryGet, in Erlang RPC it is
rpc:nb_yield/4 and in CH-P it is
This is Fibonacci with the RPC library in Erlang using
-module(fib_rpc). -export([fib/1,random_node/0]). %% Compute Fibonacci with futures fib(0) -> 0; fib(1) -> 1; fib(X) -> Key = rpc:async_call(random_node(),fib_rpc,fib,[X-1]), Y = fib(X-2), Z = rpc:yield(Key), Y + Z. %% Select random node (maybe our own) random_node() -> I = random:uniform(length(nodes()) + 1), Nodes = nodes() ++ [node()], lists:nth(I,Nodes).
This is Fibonacci with the monad-par library in Haskell using
fib :: Int -> Par Int fib x | x < 2 = return x fib x = do v <- spawn (fib (x-1)) y <- fib (x-2) x <- get v return (x+y)
This is Fibonacci with the HdpH(-RS) library in Haskell using
fib :: Int -> Par Integer fib x | x < 2 = return x fib x = do v <- spawn $(mkClosure [| fibRemote (x) |]) y <- fib (x-2) clo_x <- get v force $ unClosure clo_x + y fibRemote :: Int -> Par (Closure Integer) fibRemote n = fib (n-1) >>= force . toClosure
This is Fibonacci CH-P library in Haskell using
remotableDecl [ [d| fib :: ([NodeId],Int) -> Process Integer ; fib (_,0) = return 0 fib (_,1) = return 1 fib (nids,n) = do node <- liftIO (randomIO >>= \ix -> return (nids !! (ix `mod` length nids))) let tsk = remoteTask ($(functionTDict 'fib)) node ($(mkClosure 'fib) (nids,n-2)) future <- asyncSTM tsk y <- fib (nids,n-1) (AsyncDone z) <- wait future return $ y + z |] ]
Dataflow can be used for pipelined parallelism, which is often exploited on fine grained concurrent embedded hardware such as FPGAs. The dataflow programming style can also be used for task parallel scheduling on CPUs. There are a number of Haskell libraries for shared-memory parallel dataflow programming, including monad-par for deterministic and lvish for quasi-deterministic parallelism. There are others that support distributed-memory parallel dataflow programming, including the HdpH DSLs, and the Cloud Haskell Platform library, which closely resembles the Erlang Remote Procedure Call functionality. They don’t share a common notation for creation of and operations on futures, and this post gives their correspondence.
If the reader is still interested, more detail is given in Appendix 2 of my thesis, from page 203.