implementation module iTasks._Framework.TaskServer

import StdFile, StdBool, StdInt, StdClass, StdList, StdMisc, StdArray, StdTuple, StdOrdList
import Data.Maybe, Data.Functor, Data.Error, System.Time, Text
from Data.Map import :: Map, :: Size
import qualified Data.List as DL
import qualified Data.Map as DM
import qualified iTasks._Framework.SDS as SDS
import TCPChannelClass, TCPChannels, TCPEvent, TCPStringChannels, TCPDef, tcp

import iTasks._Framework.IWorld
import iTasks._Framework.Task
import iTasks._Framework.TaskEval

//Helper type that holds the mainloop instances during a select call
//in these mainloop instances the unique listeners and read channels
//have been temporarily removed.
:: *IOTaskInstanceDuringSelect
    = ListenerInstanceDS !ListenerInstanceOpts
    | ConnectionInstanceDS !ConnectionInstanceOpts !*TCP_SChannel
    | BackgroundInstanceDS !BackgroundTask

serve :: !Int !ConnectionTask ![BackgroundTask] (*IWorld -> (!Maybe Timeout,!*IWorld)) *IWorld -> *IWorld
serve port ct bt determineTimeout iworld
    = loop determineTimeout (init port ct bt iworld)

init :: !Int !ConnectionTask ![BackgroundTask] !*IWorld -> *IWorld
init port ct bt iworld=:{IWorld|ioTasks,world}
    # (success, mbListener, world) = openTCP_Listener port world
    | not success = abort ("Error: port "+++ toString port +++ " already in use.\n")
    # opts = {ListenerInstanceOpts|taskId=TaskId 0 0, nextConnectionId=0, port=port, connectionTask=ct, removeOnClose = True}
    # ioStates = 'DM'.fromList [(TaskId 0 0, IOActive 'DM'.newMap)]
    = {iworld & ioTasks = {done=[],todo=[ListenerInstance opts (fromJust mbListener):map BackgroundInstance bt]}, ioStates = ioStates,  world = world}

loop :: !(*IWorld -> (!Maybe Timeout,!*IWorld)) !*IWorld -> *IWorld
loop determineTimeout iworld
    # (mbTimeout,iworld=:{IWorld|ioTasks={todo},world}) = determineTimeout iworld
    //Check which mainloop tasks have data available
    # (todo,chList,world) = select mbTimeout todo world
    //Process the select result
    # iworld =:{shutdown,ioTasks={done}} = process 0 chList {iworld & ioTasks = {done=[],todo=todo}, world = world}
    //Move everything from the done list  back to the todo list
    # iworld = {iworld & ioTasks={todo = reverse done,done=[]}}
    //Everything needs to be re-evaluated
    | shutdown  = halt iworld
    | otherwise = loop determineTimeout iworld

select :: (Maybe Timeout) *[IOTaskInstance] *World -> (!*[IOTaskInstance],![(Int,SelectResult)],!*World)
select mbTimeout mlInstances world
    # (listeners,rChannels,mlInstances)
        = toSelectSet mlInstances
    # (chList,(TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)),_,world)	
        = selectChannel_MT mbTimeout (TCP_Pair (TCP_Listeners listeners) (TCP_RChannels rChannels)) TCP_Void world
    # (mlInstances, chList)
        = fromSelectSet listeners rChannels mlInstances chList
    = (mlInstances, chList, world)

toSelectSet :: !*[IOTaskInstance] -> *(!*[*TCP_Listener],!*[*TCP_RChannel],!*[*IOTaskInstanceDuringSelect])
toSelectSet [] = ([],[],[])
toSelectSet [i:is]
    # (ls,rs,is) = toSelectSet is
    = case i of
        ListenerInstance opts l = ([l:ls],rs,[ListenerInstanceDS opts:is])
        ConnectionInstance opts {rChannel,sChannel} = (ls,[rChannel:rs],[ConnectionInstanceDS opts sChannel:is])
        BackgroundInstance bt = (ls,rs,[BackgroundInstanceDS bt:is])

/* Restore the list of main loop instances.
    In the same pass also update the indices in the select result to match the
    correct indices of the main loop instance list.
*/
fromSelectSet :: !*[*TCP_Listener] !*[*TCP_RChannel] !*[*IOTaskInstanceDuringSelect] ![(!Int,!SelectResult)] -> *(![*IOTaskInstance],![(!Int,!SelectResult)])
fromSelectSet ls rs is chList
    # (numListeners,ls) = ulength ls
    # sortedChList      = sortBy (\(x,_) (y,_) -> (x < y)) chList //The single-pass algorithm expects a sorted select result
    = fromSelectSet` 0 numListeners 0 0 ls rs sortedChList is
where
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs _ [] = ([],[])
    //Listeners
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers [l:ls] rs [] [ListenerInstanceDS opts:is]
        # (is,_) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs [] is
        = ([ListenerInstance opts l:is],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers [l:ls] rs [(c,what):ch] [ListenerInstanceDS opts:is]
        | c == numSeenListeners //Check select result
            # (is,ch) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs ch is
            = ([ListenerInstance opts l:is],[(i,what):ch])
        | otherwise 
            # (is,ch) = fromSelectSet` (i+1) numListeners (numSeenListeners+1) numSeenReceivers ls rs [(c,what):ch] is
            = ([ListenerInstance opts l:is],ch)
    //Receivers
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls [rChannel:rs] [] [ConnectionInstanceDS opts sChannel:is]
        # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [] is
        = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],[])
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls [rChannel:rs] [(c,what):ch] [ConnectionInstanceDS opts sChannel:is]
        | c == numListeners + numSeenReceivers
            # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs ch is
            = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],[(i,what):ch])
        | otherwise
            # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners (numSeenReceivers+1) ls rs [(c,what):ch] is
            = ([ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:is],ch)
    //Background tasks
    fromSelectSet` i numListeners numSeenListeners numSeenReceivers ls rs ch [BackgroundInstanceDS bt:is]
        # (is,ch) = fromSelectSet` (i+1) numListeners numSeenListeners numSeenReceivers ls rs ch is
        = ([BackgroundInstance bt:is],ch)

    ulength [] = (0,[])
    ulength [x:xs]
        # (n,xs) = ulength xs
        = (n + 1,[x:xs])

//TODO: Use share notification to trigger task re-evaluation based on io events
process :: !Int [(!Int,!SelectResult)] !*IWorld -> *IWorld
process i chList iworld=:{ioTasks={done,todo=[]}} = iworld
process i chList iworld=:{ioTasks={done,todo=[ListenerInstance lopts listener:todo]},ioStates,world}
    # (TaskId instanceNo _) = lopts.ListenerInstanceOpts.taskId
    = case 'DM'.get lopts.ListenerInstanceOpts.taskId ioStates of
        //Active listener:
        Just (IOActive conStates)
            # (mbSelect,chList) = checkSelect i chList
            | mbSelect =:(Just _)
     	        # (tReport, mbNewConn, listener, world)   = receive_MT (Just 0) listener world
                | tReport == TR_Success
                    # (ip,{rChannel,sChannel}) = fromJust mbNewConn
                    # (ConnectionTask handlers sds) = lopts.ListenerInstanceOpts.connectionTask
                    # (mbr,iworld) = 'SDS'.read sds {iworld & ioTasks={done=done,todo=todo},world=world}
                    | mbr =:(Error _)
                        # iworld=:{ioTasks={done,todo},world} = queueUrgentRefresh [instanceNo] ["IO Exception for instance "<+++instanceNo] iworld
                        # ioStates = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOException (snd (fromError mbr))) ioStates
 	                    # world = closeRChannel listener world
                        = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
                    # (mbConState,mbw,out,close,iworld) = handlers.ConnectionHandlersIWorld.onConnect (toString ip) (fromOk mbr) iworld
                    # iworld = queueUrgentRefresh [instanceNo] ["New TCP connection for instance "<+++instanceNo] iworld
                    # iworld=:{ioTasks={done,todo},world}  = writeShareIfNeeded sds mbw iworld
                    | mbConState =:(Error _)
                        # ioStates = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOException (fromError mbConState)) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                    # conStates = 'DM'.put lopts.ListenerInstanceOpts.nextConnectionId (fromOk mbConState,close) conStates
                    # (sChannel,world) = case out of
                        []          = (sChannel,world)
                        data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
                    | close
                    //Close the connection immediately
                        # world = closeRChannel rChannel world
                        # world = closeChannel sChannel world
                        //Remove the connection state if configured in the connection listener options
                        # conStates = if lopts.ListenerInstanceOpts.removeOnClose
                            ('DM'.del lopts.ListenerInstanceOpts.nextConnectionId conStates)
                            conStates
                        # ioStates  = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOActive conStates) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                    | otherwise 
                    //Persist the connection
                        # copts = {ConnectionInstanceOpts|taskId = lopts.ListenerInstanceOpts.taskId
                                  ,connectionId = lopts.ListenerInstanceOpts.nextConnectionId
                                  ,remoteHost = ip, connectionTask = lopts.ListenerInstanceOpts.connectionTask
                                  ,removeOnClose = lopts.ListenerInstanceOpts.removeOnClose}
                        # todo = todo ++ [ConnectionInstance copts {rChannel=rChannel,sChannel=sChannel}]
                        # lopts = {lopts & nextConnectionId = lopts.nextConnectionId + 1}
                        # ioStates  = 'DM'.put lopts.ListenerInstanceOpts.taskId (IOActive conStates) ioStates
                        = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, ioStates = ioStates, world=world}
                //We did not accept properly accept a connection
                | otherwise
                    = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, world=world}
            //Nothing to do
            | otherwise
                = process (i+1) chList {iworld & ioTasks={done=[ListenerInstance lopts listener:done],todo=todo}, world=world}
        //Destroyed listener:
        Just (IODestroyed conStates)
 	        # world = closeRChannel listener world
            //If there are no connections belonging to this listener we can clean up, if there are the last connection will cleanup
            # ioStates = if ('DM'.mapSize conStates == 0) ('DM'.del lopts.ListenerInstanceOpts.taskId ioStates) ioStates
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
        //There was an exception or the state has already been removed
        _
 	        # world = closeRChannel listener world
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
           
process i chList iworld=:{ioTasks={done,todo=[ConnectionInstance opts {rChannel,sChannel}:todo]},ioStates,world}
    # (TaskId instanceNo _) = opts.ConnectionInstanceOpts.taskId
    = case 'DM'.get opts.ConnectionInstanceOpts.taskId ioStates of
        Just (IOActive conStates)
            # (ConnectionTask handlers sds) = opts.ConnectionInstanceOpts.connectionTask
            # (mbSelect,chList) = checkSelect i chList

            # mbConState = 'DM'.get opts.ConnectionInstanceOpts.connectionId conStates
            | mbConState =: Nothing
                //Set exception, close connection and continue 
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException "Missing connection state") ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            # conState = fst (fromJust mbConState)
            //Read sds
            # (mbr,iworld=:{ioTasks={done,todo},world}) = 'SDS'.read sds {iworld & ioTasks={done=done,todo=todo},world=world}
            | mbr =:(Error _)
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException (snd (fromError mbr))) ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
            //Check if disconnected
            | mbSelect =:(Just SR_Disconnected) || mbSelect=:(Just SR_EOM)
                //Call disconnect function
                # (conState,mbw,iworld) = handlers.ConnectionHandlersIWorld.onDisconnect conState (fromOk mbr) {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
                # iworld = queueUrgentRefresh [instanceNo] ["TCP connection disconnected for "<+++instanceNo] iworld
                # iworld=:{world,ioStates} = writeShareIfNeeded sds mbw iworld
                # ioStates = case conState of
                    Ok state
                        = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive ('DM'.put opts.ConnectionInstanceOpts.connectionId (state,True) conStates)) ioStates
                    Error e
                        = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException e) ioStates
 	            # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioStates = ioStates, world=world}
            //Read channel data
            # (data,rChannel,world) = case mbSelect of
                Just SR_Available
		            # (data,rChannel,world) = receive rChannel world
                    = (Just (toString data),rChannel,world)
                _
                    = (Nothing,rChannel,world)
            //Call whileConnected function
            # (mbConState,mbw,out,close,iworld)
                = handlers.ConnectionHandlersIWorld.whileConnected data conState (fromOk mbr) {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world} 
            //Queue refresh when there was new data or when the connection was closed
            # iworld = if (isNothing data) iworld (queueUrgentRefresh [instanceNo] ["New TCP data for "<+++instanceNo] iworld)
            # iworld = if close (queueUrgentRefresh [instanceNo] ["TCP connection closed for "<+++instanceNo] iworld) iworld
            //Write share
            # iworld=:{ioTasks={todo,done},ioStates,world} = writeShareIfNeeded sds mbw iworld
            | mbConState =:(Error _)
                # ioStates = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOException (fromError mbConState)) ioStates
 		        # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
            # conStates = 'DM'.put opts.ConnectionInstanceOpts.connectionId (fromOk mbConState,close) conStates
            //Send data if produced
            # (sChannel,world) = case out of
                []          = (sChannel,world)
                data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
            | close
                //Remove the connection state if configured in the connection listener options
                # conStates = if opts.ConnectionInstanceOpts.removeOnClose
                    ('DM'.del opts.ConnectionInstanceOpts.connectionId conStates)
                    conStates
                # ioStates  = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive conStates) ioStates
 		        # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = process (i+1) chList {iworld & ioTasks={done=done,todo=todo},ioStates=ioStates,world=world}
            | otherwise
                //Perssist connection
                # ioStates  = 'DM'.put opts.ConnectionInstanceOpts.taskId (IOActive conStates) ioStates
                = process (i+1) chList {iworld & ioTasks={done=[ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}:done],todo=todo},ioStates=ioStates,world=world}
        Just (IODestroyed conStates)
 	        # world = closeRChannel rChannel world
            # world = closeChannel sChannel world
            //Remove the state for this connection
            # conStates = 'DM'.del opts.ConnectionInstanceOpts.connectionId conStates
            //If this is the last connection for this task, we can clean up.
            # ioStates = if ('DM'.mapSize conStates == 0) ('DM'.del opts.ConnectionInstanceOpts.taskId ioStates) ioStates
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
        _
            //No state, just close
 	        # world = closeRChannel rChannel world
            # world = closeChannel sChannel world
            = process (i+1) chList {iworld & ioTasks={done=done,todo=todo}, ioStates = ioStates, world=world}
           
process i chList iworld=:{ioTasks={done,todo=[BackgroundInstance bt=:(BackgroundTask eval):todo]}}
    # iworld=:{ioTasks={done,todo}} = eval {iworld & ioTasks = {done=done,todo=todo}}
    = process (i+1) chList {iworld & ioTasks={done=[BackgroundInstance bt:done],todo=todo}}
process i chList iworld=:{ioTasks={done,todo=[t:todo]}}
    = process (i+1) chList {iworld & ioTasks={done=[t:done],todo=todo}}

writeShareIfNeeded sds Nothing iworld = iworld
writeShareIfNeeded sds (Just w) iworld 
    # (_,iworld) = 'SDS'.write w sds iworld //TODO: Deal with exceptions at this level
    = iworld

addListener :: !TaskId !Int !Bool !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addListener taskId port removeOnClose connectionTask iworld=:{ioTasks={todo,done}, ioStates, world}
    //Open listener
    # (success, mbListener, world) = openTCP_Listener port world
    | not success
        = (Error (exception ("Error: port "+++ toString port +++ " already in use.")), {iworld & ioTasks = {done=done,todo=todo},world = world})
    # opts = {ListenerInstanceOpts|taskId = taskId, nextConnectionId = 0, port = port, connectionTask= connectionTask, removeOnClose = removeOnClose}
    # todo = todo ++ [ListenerInstance opts (fromJust mbListener)]
    # ioStates = 'DM'.put taskId (IOActive 'DM'.newMap) ioStates
    = (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})

addConnection :: !TaskId !String !Int !ConnectionTask !*IWorld -> (!MaybeError TaskException (),!*IWorld)
addConnection taskId=:(TaskId instanceNo _) host port connectionTask iworld=:{ioTasks={done,todo},ioStates,world}
    # (mbIP,world) = lookupIPAddress host world
    | mbIP =: Nothing
        = (Error (exception ("Failed to connect to host "+++ host)), {iworld & ioTasks = {done=done,todo=todo}, world = world})
    # (tReport,mbConn,world) = connectTCP_MT Nothing (fromJust mbIP,port) world
    = case mbConn of
        Nothing
            = (Error (exception ("Failed to connect to host "+++ host)), {iworld & ioTasks = {done=done,todo=todo}, world = world})
        Just {DuplexChannel|rChannel,sChannel}
            # ip                                = fromJust mbIP
            # (ConnectionTask handlers sds)     = connectionTask
            // Read share
            # (mbr,iworld) = 'SDS'.read sds {iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world}
            | mbr =: (Error _)
                = (liftError mbr, iworld)
            // Evaluate onConnect handler
            # (mbl,mbw,out,close,iworld=:{IWorld|ioTasks={done,todo},ioStates,world}) = handlers.ConnectionHandlersIWorld.onConnect (toString ip) (fromOk mbr) iworld
            // Write possible output
            # (sChannel,world) = case out of
                []          = (sChannel,world)
                data        = foldl (\(s,w) d -> send (toByteSeq d) s w) (sChannel,world) data
            //Close connection, or add to queue
            | close
         	    # world = closeRChannel rChannel world
                # world = closeChannel sChannel world
                = (Ok (), {iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})
            | otherwise
                # opts = {ConnectionInstanceOpts|taskId = taskId, connectionId = 0, remoteHost = ip, connectionTask = connectionTask, removeOnClose = False}
                # todo = todo ++ [ConnectionInstance opts {rChannel=rChannel,sChannel=sChannel}]
                # ioStates = case mbl of
                    Ok l        = 'DM'.put taskId (IOActive ('DM'.fromList [(0,(l,False))])) ioStates
                    Error e     = 'DM'.put taskId (IOException e) ioStates
                = (Ok (),{iworld & ioTasks = {done=done,todo=todo}, ioStates = ioStates, world = world})

checkSelect :: !Int ![(!Int,!SelectResult)] -> (!Maybe SelectResult,![(!Int,!SelectResult)])
checkSelect i chList =:[(who,what):ws] | (i == who) = (Just what,ws)
checkSelect i chList = (Nothing,chList)

halt :: !*IWorld -> *IWorld
halt iworld=:{ioTasks={todo=[],done}} = iworld
halt iworld=:{ioTasks={todo=[ListenerInstance _ listener:todo],done},world}
 	# world = closeRChannel listener world
    = halt {iworld & ioTasks = {todo=todo,done=done}}
halt iworld=:{ioTasks={todo=[ConnectionInstance _ {rChannel,sChannel}:todo],done},world}
 	# world = closeRChannel rChannel world
    # world = closeChannel sChannel world
    = halt {iworld & ioTasks = {todo=todo,done=done}}
halt iworld=:{ioTasks={todo=[BackgroundInstance _ :todo],done},world}
    = halt {iworld & ioTasks= {todo=todo,done=done}}