Skip to content

Commit

Permalink
Remove some unhelpful polymorphism in Eval. Getting concrete.
Browse files Browse the repository at this point in the history
  • Loading branch information
dougalm committed Dec 11, 2023
1 parent d7eaa5d commit 4d627b2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 106 deletions.
157 changes: 76 additions & 81 deletions src/lib/Live/Eval.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
{-# LANGUAGE UndecidableInstances #-}

module Live.Eval (
watchAndEvalFile, EvalServer, EvalUpdate, CellsState, CellsUpdate, fmapCellsUpdate,
NodeList (..), NodeListUpdate (..), subscribeIO, nodeListAsUpdate) where
watchAndEvalFile, EvalServer, CellsState, CellsUpdate,
NodeList (..), NodeListUpdate (..), subscribeIO, cellsStateAsUpdate) where

import Control.Concurrent
import Control.Monad
Expand All @@ -28,31 +28,27 @@ import Types.Source
import TopLevel
import ConcreteSyntax
import MonadUtil
import RenderHtml

-- === Top-level interface ===

type EvalServer = StateServer EvalState EvalUpdate
type EvalState = CellsState SourceBlock Outputs
type EvalUpdate = CellsUpdate SourceBlock Outputs
type EvalServer = StateServer CellsState CellsUpdate

-- `watchAndEvalFile` returns the channel by which a client may
-- subscribe by sending a write-only view of its input channel.
watchAndEvalFile :: FilePath -> EvalConfig -> TopStateEx -> IO EvalServer
watchAndEvalFile fname opts env = do
watcher <- launchFileWatcher fname
parser <- launchCellParser watcher \source -> uModuleSourceBlocks $ parseUModule Main source
launchDagEvaluator parser env (sourceBlockEvalFun opts)
launchDagEvaluator opts parser env

sourceBlockEvalFun :: EvalConfig -> Mailbox Outputs -> TopStateEx -> SourceBlock -> IO TopStateEx
sourceBlockEvalFun cfg resultChan env block = do
let cfg' = cfg { cfgLogAction = send resultChan }
evalSourceBlockIO cfg' env block

fmapCellsUpdate :: CellsUpdate i o -> (NodeId -> i -> i') -> (NodeId -> o -> o') -> CellsUpdate i' o'
fmapCellsUpdate (NodeListUpdate t m) fi fo = NodeListUpdate t m' where
m' = mapUpdateMapWithKey m
(\k (CellState i s o) -> CellState (fi k i) s (fo k o))
(\k (CellUpdate s o) -> CellUpdate s (fo k o))
cellsStateAsUpdate :: CellsState -> CellsUpdate
cellsStateAsUpdate = nodeListAsUpdate

-- === DAG diff state ===

Expand All @@ -65,7 +61,7 @@ type NodeId = Int
data NodeList a = NodeList
{ orderedNodes :: [NodeId]
, nodeMap :: M.Map NodeId a }
deriving (Show, Generic)
deriving (Show, Generic, Functor)

data NodeListUpdate s d = NodeListUpdate
{ orderedNodesUpdate :: TailUpdate NodeId
Expand Down Expand Up @@ -118,18 +114,18 @@ computeNodeListUpdate nodes newVals = do
-- This coarsely parses the full file into blocks and forms a DAG (for now a
-- trivial one assuming all top-to-bottom dependencies) of the results.

type CellParser a = StateServer (Dag a) (DagUpdate a)
type CellParser = StateServer (Dag SourceBlock) (DagUpdate SourceBlock)

data CellParserMsg a =
Subscribe_CP (SubscribeMsg (Dag a) (DagUpdate a))
data CellParserMsg =
Subscribe_CP (SubscribeMsg (Dag SourceBlock) (DagUpdate SourceBlock))
| Update_CP (Overwrite Text)
deriving (Show)

launchCellParser :: (Eq a, MonadIO m) => FileWatcher -> (Text -> [a]) -> m (CellParser a)
launchCellParser :: MonadIO m => FileWatcher -> (Text -> [SourceBlock]) -> m CellParser
launchCellParser fileWatcher parseCells =
sliceMailbox Subscribe_CP <$> launchActor (cellParserImpl fileWatcher parseCells)

cellParserImpl :: Eq a => FileWatcher -> (Text -> [a]) -> ActorM (CellParserMsg a) ()
cellParserImpl :: FileWatcher -> (Text -> [SourceBlock]) -> ActorM CellParserMsg ()
cellParserImpl fileWatcher parseCells = runFreshNameT do
Overwritable initContents <- subscribe Update_CP fileWatcher
initNodeList <- buildNodeList $ fmap Unchanging $ parseCells initContents
Expand All @@ -147,27 +143,26 @@ cellParserImpl fileWatcher parseCells = runFreshNameT do
-- This is where we track the state of evaluation and decide what we needs to be
-- run and what needs to be killed.

type Evaluator i o = StateServer (CellsState i o) (CellsUpdate i o)
newtype EvaluatorM s i o a =
type Evaluator = StateServer CellsState CellsUpdate
newtype EvaluatorM a =
EvaluatorM { runEvaluatorM' ::
IncServerT (CellsState i o) (CellsUpdate i o)
(StateT (EvaluatorState s i o)
(ActorM (EvaluatorMsg s i o))) a }
deriving (Functor, Applicative, Monad, MonadIO,
Actor (EvaluatorMsg s i o))
deriving instance Monoid o => IncServer (CellsState i o) (CellsUpdate i o) (EvaluatorM s i o)

instance Monoid o => Semigroup (CellUpdate o) where
IncServerT CellsState CellsUpdate
(StateT EvaluatorState
(ActorM EvaluatorMsg)) a }
deriving (Functor, Applicative, Monad, MonadIO, Actor (EvaluatorMsg))
deriving instance IncServer CellsState CellsUpdate EvaluatorM

instance Semigroup CellUpdate where
CellUpdate s o <> CellUpdate s' o' = CellUpdate (s<>s') (o<>o')

instance Monoid o => Monoid (CellUpdate o) where
instance Monoid CellUpdate where
mempty = CellUpdate mempty mempty

instance Monoid o => IncState (CellState i o) (CellUpdate o) where
instance IncState CellState CellUpdate where
applyDiff (CellState source status result) (CellUpdate status' result') =
CellState source (fromOverwritable (applyDiff (Overwritable status) status')) (result <> result')

instance Monoid o => DefuncState (EvaluatorMUpdate s i o) (EvaluatorM s i o) where
instance DefuncState EvaluatorMUpdate EvaluatorM where
update = \case
UpdateDagEU dag -> EvaluatorM $ update dag
UpdateCurJob status -> EvaluatorM $ lift $ modify \s -> s { curRunningJob = status }
Expand All @@ -178,74 +173,73 @@ instance Monoid o => DefuncState (EvaluatorMUpdate s i o) (EvaluatorM s i o) whe
UpdateCellState nodeId cellUpdate -> update $ UpdateDagEU $ NodeListUpdate mempty $
MapUpdate $ M.singleton nodeId $ Update cellUpdate

instance Monoid o => LabelReader (EvaluatorMLabel s i o) (EvaluatorM s i o) where
instance LabelReader EvaluatorMLabel EvaluatorM where
getl l = case l of
NodeListEM -> EvaluatorM $ orderedNodes <$> getl It
NodeInfo nodeId -> EvaluatorM $ M.lookup nodeId <$> nodeMap <$> getl It
PrevEnvs -> EvaluatorM $ lift $ prevEnvs <$> get
CurRunningJob -> EvaluatorM $ lift $ curRunningJob <$> get
EvalFun -> EvaluatorM $ lift $ evalFun <$> get
EvalCfg -> EvaluatorM $ lift $ evaluatorCfg <$> get

data EvaluatorMUpdate s i o =
UpdateDagEU (NodeListUpdate (CellState i o) (CellUpdate o))
| UpdateCellState NodeId (CellUpdate o)
data EvaluatorMUpdate =
UpdateDagEU (NodeListUpdate CellState CellUpdate)
| UpdateCellState NodeId CellUpdate
| UpdateCurJob CurJobStatus
| UpdateEnvs [s]
| AppendEnv s

data EvaluatorMLabel s i o a where
NodeListEM :: EvaluatorMLabel s i o [NodeId]
NodeInfo :: NodeId -> EvaluatorMLabel s i o (Maybe (CellState i o))
PrevEnvs :: EvaluatorMLabel s i o [s]
CurRunningJob :: EvaluatorMLabel s i o (CurJobStatus)
EvalFun :: EvaluatorMLabel s i o (EvalFun s i o)

-- `s` is the persistent state (i.e. TopEnvEx the environment)
-- `i` is the type of input cell (e.g. SourceBlock)
-- `o` is the (monoidal) type of updates, e.g. `Result`
type EvalFun s i o = Mailbox o -> s -> i -> IO s
| UpdateEnvs [TopStateEx]
| AppendEnv TopStateEx

data EvaluatorMLabel a where
NodeListEM :: EvaluatorMLabel [NodeId]
NodeInfo :: NodeId -> EvaluatorMLabel (Maybe CellState)
PrevEnvs :: EvaluatorMLabel [TopStateEx]
CurRunningJob :: EvaluatorMLabel (CurJobStatus)
EvalCfg :: EvaluatorMLabel EvalConfig

-- It's redundant to have both NodeId and TheadId but it defends against
-- possible GHC reuse of ThreadId (I don't know if that can actually happen)
type JobId = (ThreadId, NodeId)
type CurJobStatus = Maybe (JobId, CellIndex)

data EvaluatorState s i o = EvaluatorState
{ prevEnvs :: [s]
, evalFun :: EvalFun s i o
data EvaluatorState = EvaluatorState
{ evaluatorCfg :: EvalConfig
, prevEnvs :: [TopStateEx]
, curRunningJob :: CurJobStatus }

data CellStatus = Waiting | Running | Complete deriving (Show, Generic)

data CellState i o = CellState i CellStatus o deriving (Show, Generic)
data CellUpdate o = CellUpdate (Overwrite CellStatus) o deriving (Show, Generic)
data CellState = CellState SourceBlockWithId CellStatus Outputs
deriving (Show, Generic)

type Show3 s i o = (Show s, Show i, Show o)
data CellUpdate = CellUpdate (Overwrite CellStatus) Outputs deriving (Show, Generic)

type CellsState i o = NodeList (CellState i o)
type CellsUpdate i o = NodeListUpdate (CellState i o) (CellUpdate o)
type CellsState = NodeList CellState
type CellsUpdate = NodeListUpdate CellState CellUpdate

type CellIndex = Int -- index in the list of cells, not the NodeId

data JobUpdate o s = PartialJobUpdate o | JobComplete s deriving (Show)
data JobUpdate =
PartialJobUpdate Outputs
| JobComplete TopStateEx
deriving (Show)

data EvaluatorMsg s i o =
SourceUpdate (DagUpdate i)
| JobUpdate JobId (JobUpdate o s)
| Subscribe_E (SubscribeMsg (CellsState i o) (CellsUpdate i o))
data EvaluatorMsg =
SourceUpdate (DagUpdate SourceBlock)
| JobUpdate JobId JobUpdate
| Subscribe_E (SubscribeMsg CellsState CellsUpdate)
deriving (Show)

initEvaluatorState :: s -> EvalFun s i o -> EvaluatorState s i o
initEvaluatorState s evalCell = EvaluatorState [s] evalCell Nothing
initEvaluatorState :: EvalConfig -> TopStateEx -> EvaluatorState
initEvaluatorState cfg s = EvaluatorState cfg [s] Nothing

launchDagEvaluator :: (Show3 s i o, Monoid o, MonadIO m) => CellParser i -> s -> EvalFun s i o -> m (Evaluator i o)
launchDagEvaluator cellParser env evalCell = do
launchDagEvaluator :: MonadIO m => EvalConfig -> CellParser -> TopStateEx -> m Evaluator
launchDagEvaluator cfg cellParser env = do
mailbox <- launchActor do
let s = initEvaluatorState env evalCell
let s = initEvaluatorState cfg env
void $ flip runStateT s $ runIncServerT emptyNodeList $ runEvaluatorM' $
dagEvaluatorImpl cellParser
return $ sliceMailbox Subscribe_E mailbox

dagEvaluatorImpl :: (Show3 s i o, Monoid o) => CellParser i -> EvaluatorM s i o ()
dagEvaluatorImpl :: CellParser -> EvaluatorM ()
dagEvaluatorImpl cellParser = do
initDag <- subscribe SourceUpdate cellParser
processDagUpdate (nodeListAsUpdate initDag) >> flushDiffs
Expand All @@ -259,7 +253,7 @@ dagEvaluatorImpl cellParser = do
processJobUpdate jobId jobUpdate
flushDiffs

processJobUpdate :: (Show3 s i o, Monoid o) => JobId -> JobUpdate o s -> EvaluatorM s i o ()
processJobUpdate :: JobId -> JobUpdate -> EvaluatorM ()
processJobUpdate jobId jobUpdate = do
getl CurRunningJob >>= \case
Just (jobId', _) -> when (jobId == jobId') do
Expand All @@ -274,12 +268,12 @@ processJobUpdate jobId jobUpdate = do
PartialJobUpdate result -> update $ UpdateCellState nodeId $ CellUpdate NoChange result
Nothing -> return () -- this job is a zombie

nextCellIndex :: Monoid o => EvaluatorM s i o Int
nextCellIndex :: EvaluatorM Int
nextCellIndex = do
envs <- getl PrevEnvs
return $ length envs - 1

launchNextJob :: (Show3 s i o, Monoid o) => EvaluatorM s i o ()
launchNextJob :: EvaluatorM ()
launchNextJob = do
cellIndex <- nextCellIndex
nodeList <- getl NodeListEM
Expand All @@ -288,34 +282,35 @@ launchNextJob = do
let nodeId = nodeList !! cellIndex
launchJob cellIndex nodeId curEnv

launchJob :: (Show3 s i o, Monoid o) => CellIndex -> NodeId -> s -> EvaluatorM s i o ()
launchJob :: CellIndex -> NodeId -> TopStateEx -> EvaluatorM ()
launchJob cellIndex nodeId env = do
jobAction <- getl EvalFun
cfg <- getl EvalCfg
let jobAction = sourceBlockEvalFun cfg
CellState source _ _ <- fromJust <$> getl (NodeInfo nodeId)
mailbox <- selfMailbox id
update $ UpdateCellState nodeId $ CellUpdate (OverwriteWith Running) mempty
threadId <- liftIO $ forkIO do
threadId <- myThreadId
let jobId = (threadId, nodeId)
let resultsMailbox = sliceMailbox (JobUpdate jobId . PartialJobUpdate) mailbox
finalEnv <- jobAction resultsMailbox env source
finalEnv <- jobAction resultsMailbox env $ sourceBlockWithoutId source
send mailbox $ JobUpdate jobId $ JobComplete finalEnv
let jobId = (threadId, nodeId)
update $ UpdateCurJob (Just (jobId, cellIndex))

computeNumValidCells :: Monoid o => TailUpdate NodeId -> EvaluatorM s i o Int
computeNumValidCells :: TailUpdate NodeId -> EvaluatorM Int
computeNumValidCells tailUpdate = do
let nDropped = numDropped tailUpdate
nTotal <- length <$> getl NodeListEM
return $ nTotal - nDropped

processDagUpdate :: (Show3 s i o, Monoid o) => DagUpdate i -> EvaluatorM s i o ()
processDagUpdate :: DagUpdate SourceBlock -> EvaluatorM ()
processDagUpdate (NodeListUpdate tailUpdate mapUpdate) = do
nValid <- computeNumValidCells tailUpdate
envs <- getl PrevEnvs
update $ UpdateEnvs $ take (nValid + 1) envs
update $ UpdateDagEU $ NodeListUpdate tailUpdate $ mapUpdateMapWithKey mapUpdate
(\_ (Unchanging i) -> CellState i Waiting mempty)
(\cellId (Unchanging i) -> CellState (SourceBlockWithId cellId i) Waiting mempty)
(\_ () -> mempty)
getl CurRunningJob >>= \case
Nothing -> launchNextJob
Expand All @@ -327,9 +322,9 @@ processDagUpdate (NodeListUpdate tailUpdate mapUpdate) = do
launchNextJob
| otherwise -> return () -- Current job is fine. Let it continue.

-- === instances ===
-- === ToJSON ===

instance ToJSON CellStatus
instance (ToJSON i, ToJSON o) => ToJSON (CellState i o)
instance ToJSON o => ToJSON (CellUpdate o)
instance ToJSON CellState where
instance ToJSON CellStatus
instance ToJSON CellUpdate
instance (ToJSON s, ToJSON d) => ToJSON (NodeListUpdate s d)
27 changes: 5 additions & 22 deletions src/lib/Live/Web.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@ import qualified Data.ByteString as BS
-- import Paths_dex (getDataFileName)

import Live.Eval
import RenderHtml
import IncState
import Actor
import TopLevel
import Types.Source

runWeb :: FilePath -> EvalConfig -> TopStateEx -> IO ()
runWeb fname opts env = do
resultsChan <- watchAndEvalFile fname opts env >>= renderResults
resultsChan <- watchAndEvalFile fname opts env
putStrLn "Streaming output to http://localhost:8000/"
run 8000 $ serveResults resultsChan

serveResults :: RenderedResultsServer -> Application
serveResults :: EvalServer -> Application
serveResults resultsSubscribe request respond = do
print (pathInfo request)
case pathInfo request of
Expand All @@ -52,14 +48,11 @@ serveResults resultsSubscribe request respond = do
-- fname <- getDataFileName dataFname
respond $ responseFile status200 [("Content-Type", ctype)] fname Nothing

type RenderedResultsServer = StateServer (MonoidState RenderedResults) RenderedResults
type RenderedResults = CellsUpdate RenderedSourceBlock RenderedOutputs

resultStream :: RenderedResultsServer -> StreamingBody
resultStream :: EvalServer -> StreamingBody
resultStream resultsServer write flush = do
sendUpdate ("start"::String)
(MonoidState initResult, resultsChan) <- subscribeIO resultsServer
sendUpdate initResult
(initResult, resultsChan) <- subscribeIO resultsServer
sendUpdate $ cellsStateAsUpdate initResult
forever $ readChan resultsChan >>= sendUpdate
where
sendUpdate :: ToJSON a => a -> IO ()
Expand All @@ -68,13 +61,3 @@ resultStream resultsServer write flush = do
encodePacket :: ToJSON a => a -> BS.ByteString
encodePacket = toStrict . wrap . encode
where wrap s = "data:" <> s <> "\n\n"

renderResults :: EvalServer -> IO RenderedResultsServer
renderResults evalServer = launchIncFunctionEvaluator evalServer
(\x -> (MonoidState $ renderEvalUpdate $ nodeListAsUpdate x, ()))
(\_ () dx -> (renderEvalUpdate dx, ()))

renderEvalUpdate :: CellsUpdate SourceBlock Outputs -> CellsUpdate RenderedSourceBlock RenderedOutputs
renderEvalUpdate cellsUpdate = fmapCellsUpdate cellsUpdate
(\k b -> renderSourceBlock k b)
(\_ r -> renderOutputs r)
Loading

0 comments on commit 4d627b2

Please sign in to comment.