Master/slave processes

From wiki.visual-prolog.com

(Redirected from Master/Slave processes)

The PFC master/slave process concept can be used for a number of reasons:

  • To make computations in parallel
  • To isolate computations from each other
  • To manage independent subprocesses

Master/slave concepts

Master/slave processes are organized hierarchically:

  • A master process can have several slave processes
  • Each slave process has exactly one master process.
  • A slave process can be master process of other slave processes.

The master process spawns and owns the slave processes.

A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave.

The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master are made on the proxy object.

All methods are asynchronous (using an executionContext_pool as execution context), so suspending predicates must be applied. And synchronization to non-threadsafe structures must be used. postAction will have to be used to switch execution context to the GUI thread for GUI operations (and in many applications for safe access to program state).

The master/slave process concept is illustrated by an demo project. In the demo project the master is a builder similar to the vipBuilder. This master spawns a number of compiler/slave processes.

The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something.

Notice that the demo program uses the name space compiler_ms for interfaces and classes used in the process interface. Fir simplicity the name space declarations are not included in this article.

The compiler/slave has this interface:

interface slave
    open core
    [pfc\masterSlaveProcess::slave]
 
domains
    compile_rsp = compile_rsp(integer LineCount, integer Errors, durationValue CompilationTime).
 
predicates
    compile : (string Filename) -> compile_rsp CompileResult suspending.
 
predicates
    setSettings : (string Settings) -> unit suspending.
 
end interface slave

Notice that in this academic example the interface name is slave. In a real program it would be more natural to call it e.g. compiler_slave.

We will consider the details of this interface below, currently the interesting thing is that it defines procedures compile and setSettings. Notice that they are suspending predicates.

When the master spawn this slave it will call the setSettings to inform the slave about settings (i.e. corresponding to the command line flags , etc of a regular compiler). And then it will call compile on a file and when that operation completes it will (potentially) call compile again on another file. The master will continue this until there are no more files to compile and then it will terminate the slave.

Actually, the master will spawn a number of slaves, so that several files can be compiled in parallel.

Likewise the master has this interface (towards the slave):

interface master
    [pfc\masterSlaveProcess::master]
 
domains
    compilerMessage_ntf = compilerMessage_ntf(string File, integer Line, string Message).
 
predicates
    compilerMessage : (compilerMessage_ntf Message).
 
end interface master

This interface only contains a single notification compilerMessage through which the compiler can notify the master with messages from the compilation, while the compilation takes place. Notice that this predicate is not a suspending predicate, this is because notifications completely asynchronous in the sense that no result is returned to the slave and the slave does not wait for the notification to be handled. Be aware that additional notifications and final results can arrive before other notifications has been handled. This may lead to race conditions, and if they are not acceptable you should use a suspending predicate/function instead.

There are restrictions on these interfaces:

  • They can contain one-input-one-output suspending function
  • They can contain one-input notification predicates
  • They cannot contain any other kind of predicates
  • The input and output of the procedures and notifications must be serializable

These two interfaces describes the master/slave process interface.

To make the system complete the following things must be implemented:

  • A proxy class must be implemented for each of these interfaces
  • The slave must implement the slave interface
  • The master must implement the master interface as a context to the slave proxy

The following is also necessary, but that is handled by PFC classes:

  • The subprocess creation
  • The creation of communication pipes
  • The serialization/deserialization, communication and dispatching of data/messages

The implementation of the proxy classes are quite simple as described in the next sections.

slaveProcess

For the master process we will need a proxy representing the slave process, the interface of this proxy looks like this:

interface slaveProcess supports slave, useExe
 
predicates from pfc\masterSlaveProcessSupport
    terminate_toRemote
 
end interface slaveProcess

It supports the slave interface (which is the main proxy facility). It also supports useExe which is the PFC standard interface for subprocesses and finally is has a standard extra predicate to request the termination of the subprocess. Finally, it also contains the predicate terminate_toRemote which is used internally to terminate the slave process.

The class declaration of the slaveProcess looks like this:

class slaveProcess : slaveProcess
 
constructors
    new : (string SlaveProgram, master Master, string CmdLine).
 
end class slaveProcess

The Master is the implementation of the master interface that the slave will communicate with. I.e. it is the master's implementation of the master interface.

The implementation of the slaveProcess looks like this:

implement slaveProcess inherits slaveProcessSupport
    open pfc\
 
clauses
    new(SlaveProgram, Master, CmdLine) :-
        slaveProcessSupport::new(SlaveProgram, CmdLine),
        register_notify("compilerMessage", Master:compilerMessage).
 
clauses
    compile(Filename) = call_toRemote(predicate_name(), Filename).
 
clauses
    setSettings(Settings) = call_toRemote(predicate_name(), Settings).
 
end implement slaveProcess

The implementation inherits from slaveProcessSupport which is initialized with a "path" to the SlaveProgram.

The implementation must also register all calls and notifications of the master interface. In this case it is just the compilerMessage notification.

Finally, the implementation contains a proxy method for each of the calls and notifications in the slave interface.

You will notice that registration as well as proxy methods is trivial code.

masterProxy

For the slave process we will need a proxy representing the master interface provided by the master. The interface of this proxy looks like this:

interface masterProxy supports master
 
properties from pfc\masterSlaveProcessSupport
    fromLoop
 
end interface masterProxy

It simply supports the master interface and provides a fromLoop predicate that must be invoked to receive communication from the master. We will return to this predicate below.

The class declaration of the masterProxy looks like this:

class masterProxy : masterProxy
 
constructors
    new : (slave Slave, string MsPipe).
 
end class masterProxy

The constructor takes the implementation of the slave interface as argument and the name of a MsPipe to use for communication. The implementation of the masterProxy looks like this:

implement masterProxy inherits masterProxySupport
    open pfc\
 
clauses
    new(Slave, MsPipe) :-
        register_call("compile", Slave:compile),
        register_call("setSettings", Slave:setSettings),
        masterProxySupport::new(MsPipe).
 
clauses
    compilerMessage(Message) :-
        notify_wait_toRemote(predicate_name(), Message).
 
end implement masterProxy

It inherits from masterProxySupport and contains registration of all slave calls and notifications, and proxy implantations of all master calls and notifications.

The master program

The master program (i.e. the builder) is a console program that

  • compiles a set of files
  • using a number of slave-compilers and
  • writes the messages from the compilers to the console.

Finally after all compilation is completed it will write some statistics to the console.

This is done by the compiler::compileFiles predicate

class compiler : compiler
    open core
    [noDefaultConstructor]
 
predicates
    compileFiles : (string SlaveProgram, string Settings, setM{string} Files, positive ProcessCount, boolean Debug)
        -> pfc\asynchronous\future{string} Result.
 
end class compiler

As you can see the compiler class can actually (privately) create objects of type compiler. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the compiler class.

The compiler interface declares a compileFiles2 predicate:

interface compiler
 
predicates
    compileFiles2 : (string Settings, setM{string} Files) -> pfc\asynchronous\future{compiler_ms\slave::compile_rsp} Result.
 
end interface compiler

As you can see it uses a future, which is an important part of submitting suspending predicates for execution. The implementation looks like this:

implement compiler
    supports master
    open core, compiler_ms\, slave, pfc\asynchronous\
 
class facts
    cs : criticalSection := criticalSection::new(1000) [constant, immediate].
 
clauses
    compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :-
        ProcessCount2 = math::min(list::length(Files:asList), ProcessCount),
        CL =
            [ new(SlaveProgram, D) ||
                C = std::cIterate(ProcessCount2),
                D = Debug ** toBoolean(C = 0)
            ],
        RL = [ C:compileFiles2(Settings, Files) || C in CL ],
        FC = future::fold(RL, plus, compilerResult_zero),
        R = FC:futureExtension::map(asString).
 
facts
    slave : slaveProcess.
 
constructors
    new : (string SlaveProgram, boolean Debug).
clauses
    new(SlaveProgram, Debug) :-
        CmdLine = if true = Debug then "-debug" else "" end if,
        slave := slaveProcess::new(SlaveProgram, This, CmdLine),
        slave:setNativeCreationFlags(multiThread_native::idle_priority_class),
        slave:run().
 
clauses
    compileFiles2(Settings, Files) =
        future::submit(
            {  = R :-
                setSettings(Settings),
                R = doCompileFiles(Files, compilerResult_zero)
            }).
 
predicates
    setSettings : (string Settings) suspending.
clauses
    setSettings(Settings) :-
        _ = slave:setSettings(Settings).
 
predicates
    doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending.
clauses
    doCompileFiles(Files, Acc) = CompileResult :-
        if File = Files:tryRemoveFirst() then
            CR = slave:compile(File),
            cs:synchronize({  :- stdio::writef("    %s %s\n", File, CR:asString()) }),
            CompileResult = doCompileFiles(Files, Acc:plus(CR))
        else
            slave:terminate_toRemote(),
            CompileResult = Acc
        end if.
 
constants
    compilerResult_zero : compile_rsp = compile_rsp(0, 0, 0).
 
class predicates
    asString : (compile_rsp CompileResponse [this]) -> string.
clauses
    asString(compile_rsp(LineCount, Errors, CompilationTime)) =
        string::format("% lines in %p % erros", LineCount, duration::new(CompilationTime), Errors).
 
class predicates
    plus : (compile_rsp A [this], compile_rsp B) -> compile_rsp C.
clauses
    plus(compile_rsp(AL, BL, CL), compile_rsp(AR, BR, CR)) = compile_rsp(AL + AR, BL + BR, CL + CR).
 
% master
clauses
    compilerMessage(compilerMessage_ntf(File, Line, Message)) :-
        cs:synchronize({  :- stdio::writef("        %s(%d): %s\n", File, Line, Message) }).
 
end implement compiler

The asynchronous method in the master/slave concept is based on suspending predicates. It is outside the scope of this article to go fully into details about asynchronous programming, suspending predicates and futures/promises. Here we want the main thread to wait for the result of a number of compilers it has started, and for such combined waiting and result summing, we will use futures.

The main predicate (compileFiles) will start a number of slave programs, and submit compilation "job" (compileFiles2) for each of these slaves. The submission of such a job is represented by a future which will return the result some time in the "future". The results of all these futures are folded into a single future, which in turn is "mapped" to a string.

clauses
    compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :-
        ProcessCount2 = math::min(list::length(Files:asList), ProcessCount),
        CL =
            [ new(SlaveProgram, D) ||
                C = std::cIterate(ProcessCount2),
                D = Debug ** toBoolean(C = 0)
            ],
        RL = [ C:compileFiles2(Settings, Files) || C in CL ],
        FC = future::fold(RL, plus, compilerResult_zero),
        R = FC:futureExtension::map(asString).

As you can see simple processing is a bit more complicated when futures are involved. But predicates like future::fold deals with a number of things: First of all it combines the results, but it also deals with asynchronous (as well as synchronous) waiting on the individual results, and in case some of the involved sub jobs completes with an exception it will also handle cancellation of the remaining sub jobs.


The predicate compileFiles2 submits a compilation "job" to be executed in the default executionContext; which in this case will be executionContext_pool::defaultPool:

clauses
    compileFiles2(Settings, Files) =
        future::submit(
            {  = R :-
                setSettings(Settings),
                R = doCompileFiles(Files, compilerResult_zero)
            }).

The result is a future.

The suspending predicate doCompileFiles contains the heart loop of the compilation. Each started compiler will be controlled with a doCompileFiles loop:

predicates
    doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending.
clauses
    doCompileFiles(Files, Acc) = CompileResult :-
        if File = Files:tryRemoveFirst() then
            CR = slave:compile(File),
            cs:synchronize({  :- stdio::writef("    %s %s\n", File, CR:asString()) }),
            CompileResult = doCompileFiles(Files, Acc:plus(CR))
        else
            slave:terminate_toRemote(),
            CompileResult = Acc
        end if.
The loop will try to remove a File to compile from the Files to compile.  Since several loops will do this in parallel it is important that tryRemoveFirst is thread safe.  In this example this is ensured by using a setM_redBlack_cas  (i.e. a compare and swap red black set; see the main::run implementation).

If a File can be removed from the set the slave is asked to compile it (asynchronous using a continuation).  When the compilation is completed, we will write the result to stdio and loop to doCompileFiles again.

This will continue until Files becomes empty in which case the slave is informed to terminate (slave:terminate_toRemote()), and the accumulated result is returned.

The master also supports the compilerMessage notification:

clauses
    compilerMessage(compilerMessage_ntf(File, Line, Message)) :-
        cs:synchronize({  :- stdio::writef("        %s(%d): %s\n", File, Line, Message) }).

In our case it simply writes the messages to the standard output stream.

The slave program

The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console can therefore not be used for input/output.

In this slave program everything interesting is packed into the implementation of theCompiler:

implement theCompiler inherits masterProxy
    supports slave
    open core, compiler_ms\, master, pfc\asynchronous\
 
facts
    settings : string := "".
 
clauses
    run(MsPipe) :-
        new(MsPipe):wait().
 
clauses
    wait() :-
        _ = fromLoop:futureExtension::get().
 
constructors
    new : (string MsPipe).
clauses
    new(MsPipe) :-
        masterProxy::new(This, MsPipe).
 
clauses
    compile(Filename) = compile_rsp(Lines, Errors:value, D:get()) :-
        T1 = time::now(),
        Errors = varM_integer::new(),
        Lines = 20 + math::random(3000),
        foreach I = std::cIterate(Lines) + 1 and not(executionContext::isCanceled()) do
            foreach _ = std::cIterate(math::random(10000)) do
            end foreach,
            R = math::random(),
            if R < 1 / 2000 then
                Errors:inc(),
                compilerMessage(compilerMessage_ntf(Filename, I, settings))
            end if
        end foreach,
        D = duration::new(T1, time::now()).
 
clauses
    setSettings(Settings) = unit :-
        settings := Settings.
 
end implement theCompiler

theCompiler implements the slave interface which it pass as argument to the constructor of the masterProxy object, thereby the communication and message loops are established.

Recall that compile and setSettings are suspending predicates.

compile performs a simple dummy simulation a compiler, by using CPU, returning messages and a final result. The simulation "compiles" a random number of lines each with a certain probability to issue a compilerMessage which is notified back to the master (i.e. through the proxy object).

The compilation also test for termination request from the master (i.e. not(executionContext::isCanceled())), this test would actually only be relevant if the compilation was terminated in the middle of compilation which this example will never do. But when compiling files from an IDE it is custom to have a "stop compilation" facility. This facility could wait all compilers to ask for the next file to compile and terminate them there, but it could also send a terminate request to the slave, in which case the fromLoop would complete.

In this program the main thread will simply waits for the termination of the fromLoop.