implementation module iTasks._Framework.TaskServer

import StdFile, StdBool, StdInt, StdClass, StdList, StdMisc, StdArray, StdTuple, StdOrdList
import Data.Maybe, Data.Functor, Data.Error, System.Time, Text
from Data.Map import :: Map, :: Size
import qualified Data.List as DL
import qualified Data.Map as DM
import qualified iTasks._Framework.SDS as SDS
import TCPChannelClass, TCPChannels, TCPEvent, TCPStringChannels, TCPDef, tcp

import iTasks._Framework.IWorld
import iTasks._Framework.Task
import iTasks._Framework.TaskEval

//Helper type that holds the mainloop instances during a select call
//in these mainloop instances the unique listeners and read channels
//have been temporarily removed.
:: *IOTaskInstanceDuringSelect
    = ListenerInstanceDS !ListenerInstanceOpts
    | ConnectionInstanceDS !ConnectionInstanceOpts !*TCP_SChannel
    | BackgroundInstanceDS !BackgroundTask

serve :: !Int !ConnectionTask ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve port ct bt determineTimeout iworld
    = loop determineTimeout (init port ct bt iworld)

init :: !Int !ConnectionTask ![BackgroundTask] !*IWorld -> *IWorld
init port ct bt iworld=:{IWorld|ioTasks,world}
    # (success, mbListener, world) = openTCP_Listener port world
    | not success = abort ("Error: port "+++ toString port +++ " already in use.\n")
    # opts = {ListenerInstanceOpts|taskId=TaskId 0 0, nextConnectionId=0, port=port, connectionTask=ct, removeOnClose = True}
    # ioStates = 'DM'.fromList [(TaskId 0 0, IOActive 'DM'.newMap)]
    = {iworld & ioTasks = {done=[],todo=[ListenerInstance opts (fromJust mbListener):map BackgroundInstance bt]}, ioStates = ioStates,  world = world}

loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld
loop determineTimeout iworld
    # (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout iworld
    //Check which mainloop tasks have data available
    # (todo,chList,world) = select mbTimeout todo world
    //Process the select result
    # iworld =:{shutdown,ioTasks={done}} = process 0 chList {iworld & ioTasks = {done=[],todo=todo}, world = world}
    //Move everything from the done list  back to the todo list
    # iworld = {iworld & ioTasks={todo = reverse done,done=[]}}
    //Everything needs to be re-evaluated
    | shutdown  = halt iworld
    | otherwise = loop determineTimeout iworld

select :: (Maybe Timeout) *[IOTaskInstance] *World -> (!*[IOTaskInstance],![(Int,SelectResult)],!*World)
select mbTimeout mlInstances world
    # (listeners,rChannels,mlInstances)
        = toSelectSet mlInstances
    # (chList,(TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)),_,world)	
        = selectChannel_MT mbTimeout (TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)) TCP_Void world
    # (mlInstances, chList)
        = fromSelectSet listeners rChannels mlInstances chList
    = (mlInstances, chList, world)

toSelectSet :: !*[IOTaskInstance] -> *(!*[*TCP_Listener],!*[*TCP_RChannel],!*[*IOTaskInstanceDuringSelect])
toSelectSet [] = ([],[],[])
toSelectSet [i:is]
    # (ls,rs,is) = toSelectSet is
    = case i of
        ListenerInstance opts l = ([l:ls],rs,[ListenerInstanceDS opts:is])
        ConnectionInstance opts {rChannel,sChannel} = (ls,[rChannel:rs],[ConnectionInstanceDS opts sChannel:is])
        BackgroundInstance bt = (ls,rs,[BackgroundInstanceDS bt:is])

/* Restore the list of main loop instances.
    In the same pass also update the indices in the select result to match the
    correct indices of the main loop instance list.
fromSelectSet :: !*[*TCP_Listener] !*[*TCP_RChannel] !*[*IOTaskInstanceDuringSelect] ![(!Int,!SelectResult)] -> *(![*IOTaskInstance],![(!Int,!SelectResult)])
fromSelectSet ls rs is chList
    # (numListeners,ls) = ulength ls
    # sortedChList      = sortBy (\(x,_) (y,_) -> (x < y)) chList //The single-pass algorithm expects a sorted select result
    = fromSelectSet` 0 numListeners 0 0 ls rs sortedChList is
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs _ [] = ([],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers [l:ls] rs [] [ListenerInstanceDS opts:is]
        # (is,_) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs [] is
        = ([ListenerInstance opts l:is],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers [l:ls] rs [(c,what):ch] [ListenerInstanceDS opts:is]
        | c == numSeenListeners //Check select result
            # (is,ch) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs ch is
            = ([ListenerInstance opts l:is],[(i,what):ch])
        | otherwise 
            # (is,ch) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs [(c,what):ch] is
            = ([ListenerInstance opts l:is],ch)
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls [rChannel:rs] [] [ConnectionInstanceDS opts sChannel:is]
        # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [] is
        = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls [rChannel:rs] [(c,what):ch] [ConnectionInstanceDS opts sChannel:is]
        | c == numListeners + numSeenReceivers
            # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs ch is
            = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],[(i,what):ch])
        | otherwise
            # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [(c,what):ch] is
            = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],ch)
    //Background tasks
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs ch [BackgroundInstanceDS bt:is]
        # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners numSeenReceivers ls rs ch is
        = ([BackgroundInstance bt:is],ch)

    ulength [] = (0,[])
    ulength [x:xs]
        # (n,xs) = ulength xs
        = (n + 1,[x:xs])

//TODO: Use share notification to trigger task re-evaluation based on io events
process :: !Int [(!Int,!SelectResult)] !*IWorld -> *IWorld
process i chList iworld=:{ioTasks={done,todo=[]}} = iworld
process i chList iworld=:{ioTasks={done,todo=[ListenerInstance lopts listener:todo]},ioStates,world}
    # (TaskId instanceNo _) = lopts.ListenerInstanceOpts.taskId
    = case 'DM'.get lopts.ListenerInstanceOpts.taskId ioStates of
        //Active listener:
        Just (IOActive conStates)
            # (mbSelect,chList) = checkSelect i chList
            | mbSelect =:(Just _)
     	        # (tReport, mbNewConn, listener, world)   = receive_MT (Just 0) listener world
                | tReport == TR_Success
                    # (ip,{rChannel,sChannel}) = fromJust mbNewConn
                    # (ConnectionTask handlers sds) = lopts.ListenerInstanceOpts.connectionTask
                    # (mbr,iworld) = 'SDS'.read sds {iworld & ioTasks={done=done,todo=todo},world=world}
                    | mbr =:(Error _)
                        # iworld=:{ioTasks={done,todo},world} = queueUrgentRefresh [instanceNo] ["IO Exception for instance "<+++instanceNo] iworld
                        # ioStates = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOException (snd (fromError mbr))) ioStates
 	                    # world = closeRChannel listener world
                        = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
                    # (mbConState,mbw,out,close,iworld) = handlers.ConnectionHandlersIWorld.onConnect (toString ip) (fromOk mbr) iworld
                    # iworld = queueUrgentRefresh [instanceNo] ["New TCP connection for instance "<+++instanceNo] iworld
                    # iworld=:{ioTasks={done,todo},world}  = writeShareIfNeeded sds mbw iworld
                    | mbConState =:(Error _)
                        # ioStates = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOException (fromError mbConState)) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                    # conStates = 'DM'.put lopts.ListenerInstanceOpts.nextConnectionId (fromOk mbConState,close) conStates
                    # (sChannel,world) = case out of
                        []          = (sChannel,world)
                        data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
                    | close
                    //Close the connection immediately
                        # world = closeRChannel rChannel world
                        # world = closeChannel sChannel world
                        //Remove the connection state if configured in the connection listener options
                        # conStates = if lopts.ListenerInstanceOpts.removeOnClose
                            ('DM'.del lopts.ListenerInstanceOpts.nextConnectionId conStates)
                        # ioStates  = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOActive conStates) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                    | otherwise 
                    //Persist the connection
                        # copts = {ConnectionInstanceOpts|taskId = lopts.ListenerInstanceOpts.taskId
                                  ,connectionId = lopts.ListenerInstanceOpts.nextConnectionId
                                  ,remoteHost = ip, connectionTask = lopts.ListenerInstanceOpts.connectionTask
                                  ,removeOnClose = lopts.ListenerInstanceOpts.removeOnClose}
                        # todo = todo ++ [ConnectionInstance copts {rChannel=rChannel,sChannel=sChannel}]
                        # lopts = {lopts & nextConnectionId = lopts.nextConnectionId + 1}
                        # ioStates  = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOActive conStates) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                //We did not accept properly accept a connection
                | otherwise
                    = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, world=world}
            //Nothing to do
            | otherwise
                = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, world=world}
        //Destroyed listener:
        Just (IODestroyed conStates)
 	        # world = closeRChannel listener world
            //If there are no connections belonging to this listener we can clean up, if there are the last connection will cleanup
            # ioStates = if ('DM'.mapSize conStates == 0) ('DM'.del lopts.ListenerInstanceOpts.taskId ioStates) ioStates
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
        //There was an exception or the state has already been removed
 	        # world = closeRChannel listener world
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
process i chList iworld=:{ioTasks={done,todo=[ConnectionInstance opts {rChannel,sChannel}:todo]},ioStates,world}
    # (TaskId instanceNo _) = opts.ConnectionInstanceOpts.taskId
    = case 'DM'.get opts.ConnectionInstanceOpts.taskId ioStates of
        Just (IOActive conStates)
            # (ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask
            # (mbSelect,chList) = checkSelect i chList

            # mbConState = 'DM'.get opts.ConnectionInstanceOpts.connectionId conStates
            | mbConState =: Nothing
                //Set exception, close connection and continue 
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException "Missing connection state") ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            # conState = fst (fromJust mbConState)
            //Read sds
            # (mbr,iworld=:{ioTasks={done,todo},world}) = 'SDS'.read sds {iworld & ioTasks={done=done,todo=todo},world=world}
            | mbr =:(Error _)
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException (snd (fromError mbr))) ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            //Check if disconnected
            | mbSelect =:(Just SR_Disconnected) || mbSelect=:(Just SR_EOM)
                //Call disconnect function
                # (conState,mbw,iworld) = handlers.ConnectionHandlersIWorld.onDisconnect conState (fromOk mbr) {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
                # iworld = queueUrgentRefresh [instanceNo] ["TCP connection disconnected for "<+++instanceNo] iworld
                # iworld=:{world,ioStates} = writeShareIfNeeded sds mbw iworld
                # ioStates = case conState of
                    Ok state
                        = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive ('DM'.put opts.ConnectionInstanceOpts.connectionId (state,True) conStates)) ioStates
                    Error e
                        = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException e) ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioStates = ioStates, world=world}
            //Read channel data
            # (data,rChannel,world) = case mbSelect of
                Just SR_Available
		            # (data,rChannel,world) = receive rChannel world
                    = (Just (toString data),rChannel,world)
                    = (Nothing,rChannel,world)
            //Call whileConnected function
            # (mbConState,mbw,out,close,iworld)
                = handlers.ConnectionHandlersIWorld.whileConnected data conState (fromOk mbr) {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world} 
            //Queue refresh when there was new data or when the connection was closed
            # iworld = if (isNothing data) iworld (queueUrgentRefresh [instanceNo] ["New TCP data for "<+++instanceNo] iworld)
            # iworld = if close (queueUrgentRefresh [instanceNo] ["TCP connection closed for "<+++instanceNo] iworld) iworld
            //Write share
            # iworld=:{ioTasks={todo,done},ioStates,world} = writeShareIfNeeded sds mbw iworld
            | mbConState =:(Error _)
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException (fromError mbConState)) ioStates
 		        # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
            # conStates = 'DM'.put opts.ConnectionInstanceOpts.connectionId (fromOk mbConState,close) conStates
            //Send data if produced
            # (sChannel,world) = case out of
                []          = (sChannel,world)
                data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
            | close
                //Remove the connection state if configured in the connection listener options
                # conStates = if opts.ConnectionInstanceOpts.removeOnClose
                    ('DM'.del opts.ConnectionInstanceOpts.connectionId conStates)
                # ioStates  = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive conStates) ioStates
 		        # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
            | otherwise
                //Perssist connection
                # ioStates  = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive conStates) ioStates
                = process (i+1) chList {iworld & ioTasks={done=[ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:done],todo=todo},ioStates=ioStates,world=world}
        Just (IODestroyed conStates)
 	        # world = closeRChannel rChannel world
            # world = closeChannel sChannel world
            //Remove the state for this connection
            # conStates = 'DM'.del opts.ConnectionInstanceOpts.connectionId conStates
            //If this is the last connection for this task, we can clean up.
            # ioStates = if ('DM'.mapSize conStates == 0) ('DM'.del opts.ConnectionInstanceOpts.taskId ioStates) ioStates
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            //No state, just close
 	        # world = closeRChannel rChannel world
            # world = closeChannel sChannel world
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance bt=:(BackgroundTask eval):todo]}}
    # iworld=:{ioTasks={done,todo}} = eval {iworld & ioTasks = {done=done,todo=todo}}
    = process (i+1) chList {iworld & ioTasks={done=[BackgroundInstance bt:done],todo=todo}}
process i chList iworld=:{ioTasks={done,todo=[t:todo]}}
    = process (i+1) chList {iworld & ioTasks={done=[t:done],todo=todo}}

writeShareIfNeeded sds Nothing iworld = iworld
writeShareIfNeeded sds (Just w) iworld 
    # (_,iworld) = 'SDS'.write w sds iworld //TODO: Deal with exceptions at this level
    = iworld

addListener :: !TaskId !Int !Bool !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addListener taskId port removeOnClose connectionTask iworld=:{ioTasks={todo,done}, ioStates, world}
    //Open listener
    # (success, mbListener, world) = openTCP_Listener port world
    | not success
        = (Error (exception ("Error: port "+++ toString port +++ " already in use.")), {iworld & ioTasks = {done=done,todo=todo},world = world})
    # opts = {ListenerInstanceOpts|taskId = taskId, nextConnectionId = 0, port = port, connectionTask= connectionTask, removeOnClose = removeOnClose}
    # todo = todo ++ [ListenerInstance opts (fromJust mbListener)]
    # ioStates = 'DM'.put taskId (IOActive 'DM'.newMap) ioStates
    = (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})

addConnection :: !TaskId !String !Int !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addConnection taskId=:(TaskId instanceNo _) host port connectionTask iworld=:{ioTasks={done,todo},ioStates,world}
    # (mbIP,world) = lookupIPAddress host world
    | mbIP =: Nothing
        = (Error (exception ("Failed to connect to host "+++ host)), {iworld & ioTasks = {done=done,todo=todo}, world = world})
    # (tReport,mbConn,world) = connectTCP_MT Nothing (fromJust mbIP,port) world
    = case mbConn of
            = (Error (exception ("Failed to connect to host "+++ host)), {iworld & ioTasks = {done=done,todo=todo}, world = world})
        Just {DuplexChannel|rChannel,sChannel}
            # ip                                = fromJust mbIP
            # (ConnectionTask handlers sds)     = connectionTask
            // Read share
            # (mbr,iworld) = 'SDS'.read sds {iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world}
            | mbr =: (Error _)
                = (liftError mbr, iworld)
            // Evaluate onConnect handler
            # (mbl,mbw,out,close,iworld=:{IWorld|ioTasks={done,todo},ioStates,world}) = handlers.ConnectionHandlersIWorld.onConnect (toString ip) (fromOk mbr) iworld
            // Write possible output
            # (sChannel,world) = case out of
                []          = (sChannel,world)
                data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
            //Close connection, or add to queue
            | close
         	    # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = (Ok (), {iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})
            | otherwise
                # opts = {ConnectionInstanceOpts|taskId = taskId, connectionId = 0, remoteHost = ip, connectionTask = connectionTask, removeOnClose = False}
                # todo = todo ++ [ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}]
                # ioStates = case mbl of
                    Ok l        = 'DM'.put taskId (IOActive ('DM'.fromList [(0,(l,False))])) ioStates
                    Error e     = 'DM'.put taskId (IOException e) ioStates
                = (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})

checkSelect :: !Int ![(!Int,!SelectResult)] -> (!Maybe SelectResult,![(!Int,!SelectResult)])
checkSelect i chList =:[(who,what):ws] | (i == who) = (Just what,ws)
checkSelect i chList = (Nothing,chList)

halt :: !*IWorld -> *IWorld
halt iworld=:{ioTasks={todo=[],done}} = iworld
halt iworld=:{ioTasks={todo=[ListenerInstance _ listener:todo],done},world}
 	# world = closeRChannel listener world
    = halt {iworld & ioTasks = {todo=todo,done=done}}
halt iworld=:{ioTasks={todo=[ConnectionInstance _ {rChannel,sChannel}:todo],done},world}
 	# world = closeRChannel rChannel world
    # world = closeChannel sChannel world
    = halt {iworld & ioTasks = {todo=todo,done=done}}
halt iworld=:{ioTasks={todo=[BackgroundInstance _ :todo],done},world}
    = halt {iworld & ioTasks= {todo=todo,done=done}}