Concurrent and Multicore Programming in Haskell

Concurrency and Parallelism

Haskell threads

Semi-explicit parallelism

f x y = (x + y) + g x + h y

Sparks (1)

Prelude Control.Parallel> :t par
par :: a -> b -> b
a `par` b

Sparks (2)

Prelude Control.Parallel> :t pseq 
pseq :: a -> b -> b
a `pseq` b
f `par` e `pseq` f + e
$ time ./B +RTS -N2
100000020000000
0.84s user 0.02s system 190% cpu 0.478 total

Compiling and running

ghc --make -threaded -rtsopts file.hs
./exec arg +RTS -N2 -RTS arg

seq and pseq

Prelude> :t seq
seq :: a -> b -> b
a `seq` b
b `seq` a `seq` b

How much to evaluate? (1)

import Debug.Trace

martor x = trace ("called for " ++ show x) x

list n = trace "generating list" $ map martor [1..n]

main = do
  print $ trace "computing" $ list 2
  let l = trace "new list" $ list 2
  print $ trace "once" l
  print $ trace "twice" l
  let l = trace "with seq" $ let l = list 2 in l `seq` l
  print $ trace "seq" l

How much to evaluate? (2)

Main> main
computing
generating list
[called for 1
1,called for 2
2]
once
new list
generating list
[called for 1
1,called for 2
2]
twice
[1,2]
seq
with seq
generating list
[called for 1
1,called for 2
2]

NF, HNF, WHNF

42
(2, "hello")
\x -> (x + 1)
1 + 2
(1 + 1, 2 + 2)
\x -> 2 + 2
'h' : ("e" ++ "llo")
(\x -> x + 1) 2
"he" ++ "llo"

Strictness (eager conversion to NF)

($!) :: (a -> b) -> a -> b

Remember

f $ x = f x

So

f $! g x

is a strict version of

f $ g x

More stricntess

f x !y !z t = ...

data Complex a = !a :+ !a

Sequential sort

import System.Environment

mySort (x:xs) = lesser ++ x:greater
  where
    lesser = mySort [y | y <- xs, y < x]
    greater = mySort [y | y <- xs, y >= x]
mySort _ = []

main = do
  args <- getArgs
  let n:_ = map read args
  print $ length $ mySort [n, n-1 .. 0]
$ time ./4 5000 +RTS -N2
5001

real    0m17.506s
user    0m10.619s
sys     0m2.863s

Parallel sort

import Control.Parallel
import System.Environment

mySort (x:xs) = greater `par` (lesser `pseq` lesser ++ x:greater)
  where
    lesser = mySort [y | y <- xs, y < x]
    greater = mySort [y | y <- xs, y >= x]
mySort _ = []

main = do
  args <- getArgs
  let n:_ = map read args
  print $ length $ mySort [n, n-1 .. 0]
$ time ./2 5000 +RTS -N2
5001

real    0m7.559s
user    0m5.926s
sys     0m1.613s

Measuring performance

$ ghc --make -threaded -eventlog -rtsopts 2.hs
$ ./2 5000 +RTS -ls -N2
$ threadscope 2.eventlog

Measuring performance

Performance results

instance NFData ... where
  rnf = ...

Explicit Parallelism or Concurrency

Prelude Control.Concurrent> :t forkIO
forkIO :: IO () -> IO ThreadId
Prelude Control.Concurrent> :t forkOS
forkOS :: IO () -> IO ThreadId
import Control.Concurrent

thread x = do
  putStrLn $ "Thread " ++ show x

main = do
  forkIO $ thread 42
$ ./3 +RTS -N2
Thread 42
$ ./3 +RTS -N2
$

More Threads

import Control.Concurrent

thread x = do
  putStrLn $ "Thread " ++ show x

main = do
  forkIO $ thread 1
  forkIO $ thread 2
  forkIO $ thread 3
  forkIO $ thread 4
  forkIO $ thread 5
  forkIO $ thread 6
$ ./5 +RTS -N2
Thread 1
Thread 3
Thread 5

Even More Threads

import Control.Concurrent
import Control.Monad

thread x = do
  putStrLn $ "Thread " ++ show x

main = do
  mapM (forkIO . thread) [1..6]
$ ./6 +RTS -N2
Thread 1
$ ./6 +RTS -N2
Thread 1
Thread 3
Thread 5
$ ./6 +RTS -N2
Thread 1
Thread 6
$ ./6 +RTS -N2
Thread 2

Main thread

Simple Communication Between Threads

newMVar :: a -> IO (MVar a)
newEmptyMVar :: IO (MVar a)
putMVar :: MVar a -> a -> IO ()
takeMVar :: MVar a -> IO a
tryPutMVar :: MVar a -> a -> IO Bool
tryTakeMVar :: MVar a -> IO (Maybe a)

Synchronizing Threads

import Control.Concurrent

thread :: Int -> MVar () -> IO ()
thread x v = do
  putStrLn $ "Thread " ++ show x
  putMVar v ()

main = do
  v <- newEmptyMVar
  forkIO $ thread 42 v
  takeMVar v

Synchronizing Threads

import Control.Concurrent
import Control.Monad

thread :: Int -> MVar () -> IO ()
thread x v = do
  putStrLn $ "Thread " ++ show x
  putMVar v ()

main = do
  vars <- replicateM 6 newEmptyMVar
  mapM (forkIO . uncurry thread) (zip [1..6] vars)
  mapM_ takeMVar vars

Advanced Communication Between Threads

newChan :: IO (Chan a)
dupChan :: Chan a -> IO (Chan a)
readChan :: Chan a -> IO a
writeChan :: Chan a -> a -> IO ()

Chan Example

import Control.Concurrent

thread :: Int -> Chan Int -> MVar Int -> IO ()
thread x c v = do
  putStrLn $ "Thread " ++ show x
  threadDelay 100000
  a <- readChan c
  b <- readChan c
  putMVar v (a + b)
  return ()

main = do
  v <- newEmptyMVar
  c <- newChan
  forkIO $ thread 42 c v
  writeChan c 40
  writeChan c 2
  s <- readMVar v
  print s

Broadcast Example

import Control.Concurrent
import Control.Monad

thread :: Int -> Chan Int -> MVar () -> IO ()
thread x c v = do
  s <- readChan c
  putStrLn $ "Thread " ++ show x ++ " read " ++ show s
  putMVar v ()

main = do
  c <- newChan
  vars <- replicateM 6 newEmptyMVar
  chans <- replicateM 6 (dupChan c)
  mapM (forkIO . \(x,y,z) -> thread x y z) $ [(x, chans !! x, vars !! x) | x <- [0..5]]
  threadDelay 1000000
  writeChan c 42
  mapM_ takeMVar vars

Problems :: Deadlocks

import Control.Concurrent

thread out inn v = do
  modifyMVar_ out $ \x -> do
    yield
    modifyMVar_ inn $ \y -> return (y + 1)
    return (x + 1)
  putStrLn "done"
  putMVar v ()

main = do
  a <- newMVar 1
  b <- newMVar 2
  v <- newEmptyMVar
  v' <- newEmptyMVar
  forkIO $ thread a b v
  forkIO $ thread b a v'
  takeMVar v
  takeMVar v'

Other Problems

Software Transactional Memory

atomically :: STM a -> IO a
readTVar :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ()
withdraw :: Account -> Int -> STM ()
withdraw acc amount = do
  bal <- readTVar acc
  writeTVar acc (bal - amount)

deposit :: Account -> Int -> STM ()
deposit acc amount = withdraw acc (- amount)

STM - Bank account

import Control.Concurrent
import Control.Concurrent.STM

type Account = TVar Int

newAccount = newTVarIO 0

...

limitedWithdraw :: Account -> Int -> STM ()
limitedWithdraw acc amount = do
  bal <- readTVar acc
  check (amount <= 0 || amount <= bal)
  writeTVar acc (bal - amount)

t1 a = atomically $ deposit a 10
t2 a = atomically $ limitedWithdraw a 6

main = do
  a <- newAccount
  mapM forkIO [t1 a, t2 a, t2 a, t2 a, t1 a]
  threadDelay 1000000

Data Parallel Haskell (1)

Data Parallel Haskell (2)

sumsq :: [: Float :]  Float
sumsq a = sumP [: x*x | x  a :]

Data Parallel Haskell (3)

type Vector = [: Float :]
type Matrix = [: Vector :]

matMul :: Matrix  Vector  Vector
matMul m v = [: vecMul r v | r  m :]

Data Parallel Haskell (4)

Conclusions

End