{-# LANGUAGE CPP, DeriveDataTypeable, NoImplicitPrelude #-}

#if __GLASGOW_HASKELL__ >= 704
{-# LANGUAGE Safe #-}
#endif

-------------------------------------------------------------------------------
-- |
-- Module     : Control.Concurrent.Broadcast
-- Copyright  : (c) 2010-2011 Bas van Dijk & Roel van Dijk
-- License    : BSD3 (see the file LICENSE)
-- Maintainer : Bas van Dijk <v.dijk.bas@gmail.com>
--            , Roel van Dijk <vandijk.roel@gmail.com>
--
-- A 'Broadcast' is a mechanism for communication between threads. Multiple
-- @'listen'ers@ wait until a broadcaster @'broadcast's@ a value. The listeners
-- block until the value is received. When the broadcaster broadcasts a value
-- all listeners are woken.
--
-- All functions are /exception safe/. Throwing asynchronous exceptions will not
-- compromise the internal state of a broadcast.
--
-- This module is designed to be imported qualified. We suggest importing it
-- like:
--
-- @
-- import           Control.Concurrent.Broadcast              ( Broadcast )
-- import qualified Control.Concurrent.Broadcast as Broadcast ( ... )
-- @
-------------------------------------------------------------------------------

module Control.Concurrent.Broadcast
  ( Broadcast

    -- * Creating broadcasts
  , new
  , newBroadcasting

    -- * Listening to broadcasts
  , listen
  , tryListen
  , listenTimeout

    -- * Broadcasting
  , broadcast
  , signal
  , silence
  ) where


-------------------------------------------------------------------------------
-- Imports
-------------------------------------------------------------------------------

-- from base:
import Control.Monad              ( return, when )
import Control.Concurrent.MVar    ( MVar, newMVar, newEmptyMVar
                                  , takeMVar, putMVar, readMVar, modifyMVar_
                                  )
import Control.Exception          ( onException )
import Data.Eq                    ( Eq )
import Data.Either                ( Either(Left ,Right), either )
import Data.Function              ( ($), (.), const )
import Data.Functor               ( fmap, (<$>) )
import Data.Foldable              ( for_ )
import Data.List                  ( delete, length )
import Data.Maybe                 ( Maybe(Nothing, Just), isNothing )
import Data.Ord                   ( max )
import Data.Typeable              ( Typeable )
import Prelude                    ( Integer, seq )
import System.IO                  ( IO )

#if __GLASGOW_HASKELL__ < 700
import Prelude                    ( fromInteger )
import Control.Monad              ( (>>=), (>>), fail )
import Data.Ord                   ( Ord )
#endif

-- from unbounded-delays:
import Control.Concurrent.Timeout ( timeout )

-- from concurrent-extra (this package):
import Utils                      ( purelyModifyMVar, mask_ )



-------------------------------------------------------------------------------
-- Broadcast
-------------------------------------------------------------------------------

{-|
A broadcast is in one of two possible states:

* \"Silent\": @'listen'ing@ to the broadcast will block until a value is
@'broadcast'ed@.

* \"Broadcasting @x@\": @'listen'ing@ to the broadcast will return the value @x@
without blocking.
-}
newtype Broadcast a = Broadcast {forall a. Broadcast a -> MVar (Either [MVar a] a)
unBroadcast :: MVar (Either [MVar a] a)}
    deriving (Broadcast a -> Broadcast a -> Bool
forall a. Broadcast a -> Broadcast a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Broadcast a -> Broadcast a -> Bool
$c/= :: forall a. Broadcast a -> Broadcast a -> Bool
== :: Broadcast a -> Broadcast a -> Bool
$c== :: forall a. Broadcast a -> Broadcast a -> Bool
Eq, Typeable)

-- | @new@ creates a broadcast in the \"silent\" state.
new :: IO (Broadcast a)
new :: forall a. IO (Broadcast a)
new = forall a. MVar (Either [MVar a] a) -> Broadcast a
Broadcast forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (MVar a)
newMVar (forall a b. a -> Either a b
Left [])

-- | @newBroadcasting x@ creates a broadcast in the \"broadcasting @x@\" state.
newBroadcasting :: a -> IO (Broadcast a)
newBroadcasting :: forall a. a -> IO (Broadcast a)
newBroadcasting a
x = forall a. MVar (Either [MVar a] a) -> Broadcast a
Broadcast forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (MVar a)
newMVar (forall a b. b -> Either a b
Right a
x)

{-|
Listen to a broadcast.

* If the broadcast is \"broadcasting @x@\", @listen@ will return @x@
immediately.

* If the broadcast is \"silent\", @listen@ will block until another thread
@'broadcast's@ a value to the broadcast.
-}
listen :: Broadcast a -> IO a
listen :: forall a. Broadcast a -> IO a
listen (Broadcast MVar (Either [MVar a] a)
mv) = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
  Either [MVar a] a
mx <- forall a. MVar a -> IO a
takeMVar MVar (Either [MVar a] a)
mv
  case Either [MVar a] a
mx of
    Left [MVar a]
ls -> do MVar a
l <- forall a. IO (MVar a)
newEmptyMVar
                  forall a. MVar a -> a -> IO ()
putMVar MVar (Either [MVar a] a)
mv forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ MVar a
lforall a. a -> [a] -> [a]
:[MVar a]
ls
                  forall a. MVar a -> IO a
takeMVar MVar a
l
    Right a
x -> do forall a. MVar a -> a -> IO ()
putMVar MVar (Either [MVar a] a)
mv Either [MVar a] a
mx
                  forall (m :: * -> *) a. Monad m => a -> m a
return a
x

{-|
Try to listen to a broadcast; non blocking.

* If the broadcast is \"broadcasting @x@\", @tryListen@ will return 'Just' @x@
immediately.

* If the broadcast is \"silent\", @tryListen@ returns 'Nothing' immediately.
-}
tryListen :: Broadcast a -> IO (Maybe a)
tryListen :: forall a. Broadcast a -> IO (Maybe a)
tryListen = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const forall a. Maybe a
Nothing) forall a. a -> Maybe a
Just) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. MVar a -> IO a
readMVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Broadcast a -> MVar (Either [MVar a] a)
unBroadcast

{-|
Listen to a broadcast if it is available within a given amount of time.

Like 'listen', but with a timeout. A return value of 'Nothing' indicates a
timeout occurred.

The timeout is specified in microseconds.

If the broadcast is \"silent\" and a timeout of 0 &#x3bc;s is specified the
function returns 'Nothing' without blocking.

Negative timeouts are treated the same as a timeout of 0 &#x3bc;s.
-}
listenTimeout :: Broadcast a -> Integer -> IO (Maybe a)
listenTimeout :: forall a. Broadcast a -> Integer -> IO (Maybe a)
listenTimeout (Broadcast MVar (Either [MVar a] a)
mv) Integer
time = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
  Either [MVar a] a
mx <- forall a. MVar a -> IO a
takeMVar MVar (Either [MVar a] a)
mv
  case Either [MVar a] a
mx of
    Left [MVar a]
ls -> do MVar a
l <- forall a. IO (MVar a)
newEmptyMVar
                  forall a. MVar a -> a -> IO ()
putMVar MVar (Either [MVar a] a)
mv forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ MVar a
lforall a. a -> [a] -> [a]
:[MVar a]
ls
                  Maybe a
my <- forall α. Integer -> IO α -> IO (Maybe α)
timeout (forall a. Ord a => a -> a -> a
max Integer
time Integer
0) (forall a. MVar a -> IO a
takeMVar MVar a
l)
                         forall a b. IO a -> IO b -> IO a
`onException` MVar a -> IO ()
deleteReader MVar a
l
                  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe a
my) (MVar a -> IO ()
deleteReader MVar a
l)
                  forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
my
    Right a
x -> do forall a. MVar a -> a -> IO ()
putMVar MVar (Either [MVar a] a)
mv Either [MVar a] a
mx
                  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
x
    where
      deleteReader :: MVar a -> IO ()
deleteReader MVar a
l = do Either [MVar a] a
mx <- forall a. MVar a -> IO a
takeMVar MVar (Either [MVar a] a)
mv
                          case Either [MVar a] a
mx of
                            Left [MVar a]
ls -> let ls' :: [MVar a]
ls' = forall a. Eq a => a -> [a] -> [a]
delete MVar a
l [MVar a]
ls
                                       in forall (t :: * -> *) a. Foldable t => t a -> Int
length [MVar a]
ls' seq :: forall a b. a -> b -> b
`seq` forall a. MVar a -> a -> IO ()
putMVar MVar (Either [MVar a] a)
mv (forall a b. a -> Either a b
Left [MVar a]
ls')
                            Right a
_ -> forall a. MVar a -> a -> IO ()
putMVar MVar (Either [MVar a] a)
mv Either [MVar a] a
mx

{-|
Broadcast a value.

@broadcast b x@ changes the state of the broadcast @b@ to \"broadcasting @x@\".

If the broadcast was \"silent\" all threads that are @'listen'ing@ to the
broadcast will be woken.
-}
broadcast :: Broadcast a -> a -> IO ()

{-|
Broadcast a value before becoming \"silent\".

The state of the broadcast is changed to \"silent\" after all threads that are
@'listen'ing@ to the broadcast are woken and resume with the signalled value.

The semantics of signal are equivalent to the following definition:

@
  signal b x = 'block' $ 'broadcast' b x >> 'silence' b
@
-}
signal :: Broadcast a -> a -> IO ()

broadcast :: forall a. Broadcast a -> a -> IO ()
broadcast Broadcast a
b a
x = forall a. Either [MVar a] a -> Broadcast a -> a -> IO ()
broadcastThen (forall a b. b -> Either a b
Right a
x) Broadcast a
b a
x
signal :: forall a. Broadcast a -> a -> IO ()
signal    Broadcast a
b a
x = forall a. Either [MVar a] a -> Broadcast a -> a -> IO ()
broadcastThen (forall a b. a -> Either a b
Left []) Broadcast a
b a
x

-- | Internally used function that performs the actual broadcast in 'broadcast'
-- and 'signal' then changes to the given final state.
broadcastThen :: Either [MVar a] a -> Broadcast a -> a -> IO ()
broadcastThen :: forall a. Either [MVar a] a -> Broadcast a -> a -> IO ()
broadcastThen Either [MVar a] a
finalState (Broadcast MVar (Either [MVar a] a)
mv) a
x =
    forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Either [MVar a] a)
mv forall a b. (a -> b) -> a -> b
$ \Either [MVar a] a
mx -> do
      case Either [MVar a] a
mx of
        Left [MVar a]
ls -> do forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [MVar a]
ls (forall a. MVar a -> a -> IO ()
`putMVar` a
x)
                      forall (m :: * -> *) a. Monad m => a -> m a
return Either [MVar a] a
finalState
        Right a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Either [MVar a] a
finalState

-- | Set a broadcast to the \"silent\" state.
silence :: Broadcast a -> IO ()
silence :: forall a. Broadcast a -> IO ()
silence (Broadcast MVar (Either [MVar a] a)
mv) = forall a. MVar a -> (a -> a) -> IO ()
purelyModifyMVar MVar (Either [MVar a] a)
mv forall a b. (a -> b) -> a -> b
$ forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left []