WebSocket futures

From wiki.visual-prolog.com

The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

The webSocket library for use in a web-server is asynchronous and here I will "touch the surface" of programming asynchronous using futures.

A webSocket service has a single onAttach_async predicate

interface webSocketService
    open core, pfc\asynchronous\
 
predicates
    onAttach_async : (httpRequest Request, webSocket WebSocket) -> future{unit}.
    % @short This predicate is invoked when a client created a webSocket connection to this service.
    % The service is registered in a requestQueue_webSocket.
    % @end
 
end interface webSocketService

You create a request queue from your service:

requestQueue_webSocket::new(threadpool, demoWebSocketService::new())

Which you attach to an urlGroup like you do with the other kinds of services.

The overall scenario is the following:

  • Front end code creates a webSocket connection to your service
  • If successful your onAttach_async predicate will be called with the request that made the connection and with the created "server side end" of the web socket.
  • The request can be used to provide additional information about the connection (who, why, what about, …)
  • The front end and your service can then receive and send messages on the web socket, until one of the ends closes the web socket.

Front-end code can look like this:

function wsEcho() {
    const ws = new WebSocket(wsUrl + 'echo')
    const id = getId()
    ws.onopen = evt => {
        logP('webSocket connected ' + id)
        ws.send('Hello Visual Prolog User! (echo)' + id)
    }
    ws.onclose = evt => logP('webSocket diconnected ' + id)
    ws.onerror = evt => logP('<span style="color: red">Error ' + id + ':</span> ' + evt.data)
    ws.onmessage = evt => {
        logP('<span style="color: blue">Response ' + id + ': ' + evt.data + '</span>')
        ws.close()
    }
}

The front-end code have callbacks attached to deal with the four events open, message, error and close. I will not go into details about the code: I am certain that any front-end programmer will use some kind of library software on top of the web sockets anyway.

Notice however that a web socket URL looks like this:

ws://<server>/<path>

In the webServiceDemo example the last part of the URL is used to distinguish between different webSocket services/behaviors:

clauses
    onAttach_async(Request, WebSocket) = onAttach2_async(filename::getLastDirectory(Request:path, _), WebSocket).
 
predicates
    onAttach2_async : (string Last, webSocket WebSocket) -> future{unit}.
clauses
    onAttach2_async("echo", WebSocket) = echoLoop_async(WebSocket) :-
        !.
 
    onAttach2_async("async", WebSocket) = async_async(WebSocket) :-
        !.
 
    onAttach2_async("jsonAsync", WebSocket) = jsonAsync_async(WebSocket) :-
        !.
 
    onAttach2_async("ball", WebSocket) = ball_async(WebSocket) :-
        !.
 
    onAttach2_async(Service, WebSocket) = 
        WebSocket:writeMessage_async(webSocket::close(210, string::format("Unknown service %", Service))).

You have of course noticed that the predicates ends in _async and that they are functions which returns a future{unit}.

This is because they are asynchronous predicates based on our promise/future library.

Let us first look at some (illegal) pseudo code (inspired by the asynchronous features of C# and JavaScript (ES2017)):

class predicates
    echoLoop_async : (webSocket WebSocket) asynchronous.
clauses
    echoLoop_async(WebSocket) :-
        Message = await WebSocket:readMessage_async(),
        if webSocket::close(_, _) = Message then
        else
            await WebSocket:writeMessage_async(Message),
            echoLoop_async(WebSocket)
        end if.

If you delete the words asynchronous and await the code looks like a completely normal (synchronous) loop. The await indicates that we await the completion of an asynchronous computation before we continue. However we do not actually wait in that point, instead we suspend the computation until the asynchronous computation completes and then we resume the computation again from the await point.

await resembles a vpi::processEvents in the sense that it is an explicit point where other computations can take place.

The actual running of this code is however quite different, and has more resemblance to our nondeterm predicates. When we call the asynchronous predicate the execution will continue normally until we meet an await; at that point it will return to the caller. Later when the await'ed operation completes we will jump back into the code (just like we jump back into nondeterm code on failure) and continue the execution, until we meet another await or finish. But nondeterm predicates we only return from an asynchronous predicate the first time, the other jump-back-into the code will never return to the original call point again.

I.e. to the caller an asynchronous predicate completes/returns when the first await is met. The other parts of the code are run when the relevant asynchronous computations complete.

Presumably the caller of an asynchronous predicate is interested in the final result of the computation and have little interest in the "random" state after the first await. Therefore the caller actually receives a future{} object that represents the result of the entire asynchronous computation.

Unfortunately, asynchronous and await are not implemented in our language, so you will have to "translate" the code yourself. Hence you will need to know more about future's.

A future represents the result of an asynchronous computation that may not complete until some point in the future. As such it has three major states:

  • it will be pending until the computation is completed (or cancelled)
  • it will be completed once the computation has finished. The completion can happen with:
    • a success indicating that the computation went well
    • or an error indicating that something went wrong (an exception was raised and not caught)
  • it can also be cancelled meaning that the computation was cancelled before it completed

There are only two possible state transitions:

  • A pending future can complete changing to the completed state
  • A pending future can cancel changing to the cancelled state

There are many ways to use future's. Here we will focus on how to "translate" asynchronous predicates into ordinary code using the thenDo operation on futures.

thenDo is used to handle the complete state transition as a kind of event listening (it will also automatically handle the cancel state transition). It is however more complex than normal event listening, because the result of the event handling is itself an asynchronous operation.

An asynchronous function that returns a value of type Type translates into a function that returns future{Type}.

If the predicate does not return anything then it is treated as if is a function that returns core::unit, and thus translates into a function that returns future{unit}. core::unit is a domain that only contains the value core::unit.

So

class predicates
    echoLoop_async : (webSocket WebSocket) asynchronous.

translates into

class predicates
    echoLoop_async : (webSocket WebSocket) -> future{unit}.

Likewise the readMessage_async function in:

        Message = await WebSocket:readMessage_async(),

Is actually function that returns future{message Message}

predicates
    readMessage_async : () -> future{message Message}.

The await will translate into a thenDo call on the returned future; it is called with a continuation that receives the Message we await and returns a future F1 that represents the remaining operation.

clauses
    echoLoop_async(WebSocket) =
        WebSocket:readMessage_async():thenDo(
            { (Message) = F1 :-
<<
                if webSocket::close(_, _) = Message then
                else
                    await WebSocket:writeMessage_async(Message),
                    echoLoop_async(WebSocket)
                end if
>>
            }).

In the then-part of the if-statement we must return a future{unit} that represent the completion of the completion of the entire operation. In that point the entire operation is completed so we create a future{unit} that is already completed. This is done with future::newUnit():

clauses
    echoLoop_async(WebSocket) =
        WebSocket:readMessage_async():thenDo(
            { (Message) = F1 :-
                if webSocket::close(_, _) = Message then
                    F1 = future::newUnit()
                else
<<
                    await WebSocket:writeMessage_async(Message),
                    echoLoop_async(WebSocket)
>>
                end if
            }).

In the else-part we have a (future{unit}) await on writeMessage_async. This translates into a thenDo-call with a continuation that receives the (core::unit) result of the writeMessage_async call and must return a future F2 that represents the remaining operation:

clauses
    echoLoop_async(WebSocket) =
        WebSocket:readMessage_async():thenDo(
            { (Message) = F1 :-
                if webSocket::close(_, _) = Message then
                    F1 = future::newUnit()
                else
                    F1 =
                        WebSocket:writeMessage_async(Message):thenDo(
                            { = F2 :-
<<
                                echoLoop_async(WebSocket)
>>
                            })
                end if
            }).

The future that represent the remaining echoLoop_async call is simply the future that this call returns:

clauses
    echoLoop_async(WebSocket) =
        WebSocket:readMessage_async():thenDo(
            { (Message) = F1 :-
                if webSocket::close(_, _) = Message then
                    F1 = future::newUnit()
                else
                    F1 =
                        WebSocket:writeMessage_async(Message):thenDo(
                            { = F2 :-
                                F2 = echoLoop_async(WebSocket)
                            })
                end if
            }).

As you can see each await creates a nesting thenDo. So things that are N await's into to the future are thenDo-ed N levels deeper.

Future variables like F1 and F2 are only used in one nesting level and therefore they can have the same name:

clauses
    echoLoop_async(WebSocket) =
        WebSocket:readMessage_async():thenDo(
            { (Message) = F :-
                if webSocket::close(_, _) = Message then
                    F = future::newUnit()
                else
                    F =
                        WebSocket:writeMessage_async(Message):thenDo(
                            { = F :-
                                F = echoLoop_async(WebSocket)
                            })
                end if
            }).

Furthermore, they can often be eliminated by shifting to expressions instead of statements:

clauses
    echoLoop_async(WebSocket) =
        WebSocket:readMessage_async():thenDo(
            { (Message) = 
                if webSocket::close(_, _) = Message then
                    future::newUnit()
                else
                    WebSocket:writeMessage_async(Message):thenDo(
                        { = 
                            echoLoop_async(WebSocket)
                        })
                end if
            }).

More about futures (and promises)

Chaining

Notice that in several JavaScript texts often use chaining instead of deeper nesting (e.g. https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Using_promises).

Chaining code looks like this (very often formatted like here):

clauses
    chaining_async() =
        something1_async()
        :thenDo({ (R1) = something2_async(R1) })
        :thenDo({ (R2) = something3_async(R2) })
        :thenDo({ (R3) = something4_async(R3) }).

It looks nice because you simply have the operations after each other (embedded in some thenDo-noise, but any way).

Such code does work and it looks relatively nice, so why all the nesting?

Well if you compare to a similar nesting structure:

clauses
    nesting_async() =
        something1_async():thenDo(
            { (R1) =
                something2_async(R1):thenDo(
                    { (R2) =
                        something3_async(R1, R2):thenDo(
                            { (R3) =
                                something4_async(R1, R2, R3)
                            })
                    })
            }).

You will notice the important difference that the nested version have access to earlier results.

In nesting code it is also quite simple to deal with branching code:

clauses
    nesting_async() =
        something1_async():thenDo(
            { (R1) =
                if test(R1) then
                    somethingTrue_async(R1)
                else
                    somethingFalse1_async(R1):thenDo(
                        { (R2) =
                            somethingFalse2_async(R1, R2)
                        })
                end if
            }).

Chaining will only handle "sequential" actions.

So chaining is actually rather limited, and you might as well get used the nesting method.

(The asynchronous programming constructions in C# and JavaScript corresponds to nesting code, not chaining code).