Master/slave processes

From wiki.visual-prolog.com

The PFC master/slave process concept can be used for a number of reasons:

  • To make computations in parallel
  • To isolate computations from each other
  • To manage independent subprocesses

Master/slave concepts

Master/slave processes are organized hierarchically:

  • A master process can have several slave processes
  • Each slave process has exactly one master process.
  • A slave process can be master process of other slave processes.

The master process spawns and owns the slave processes.

A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave.

The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master is made on the proxy object.

All methods are asynchronous (using a threadpool as execution context), so asynchronous coding style must be applied. And synchronization to non-threadsafe structures must be used. postAction will have to be used to switch execution context to the GUI thread for GUI operations (and in many applications for safe access to program state).

The master/slave process concept is illustrated by an demo project. In the demo project the master is a builder similar to the IDE and vipBuilder, this master spawns a number of compiler/slave processes.

The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something.

The compiler/slave has this interface:

interface slave
    open core, pfc\asynchronous\
 
domains
    compile_rsp = compile_rsp(integer LineCount, integer Errors, durationValue CompilationTime).
 
predicates
    compile : (string Filename, continuation{compile_rsp} OnCompileResult).
 
predicates
    setSettings : (string Settings).
 
end interface slave

Notice that in this academic example the interface name is slave. In a real program it would be more natural to call it e.g. compiler_slave.

We will consider the details of this interface below, currently the interesting thing is that it defines a procedure compile and a notification setSettings.

When the master spawn this slave it will "notify" the slave about settings (i.e. corresponding to the command line flags , etc of a regular compiler). And then it will call compile on a file and when that operation completes it will (potentially) call compile again on another file. The master will continue this until there are no more files to compile and then it will terminate the slave.

Actually, the builder will spawn a number of slaves, so that several files can be compiled in parallel.

Likewise the master has this interface (towards the slave):

interface master
 
domains
    compilerMessage_ntf = compilerMessage_ntf(string File, integer Line, string Message).
 
predicates
    compilerMessage : (compilerMessage_ntf Message).
 
end interface master

This interface only contains a single notification compilerMessage through which the compiler can notify the master with messages from the compilation, while the compilation takes place.

There are restrictions on these interfaces:

  • They can contain one-input-one-output procedures (in continuation style)
  • They can contain one-input notifications
  • They cannot contain any other kind of predicates
  • The input and output of the procedures and notifications must be serializable

These two interfaces describes the master/slave process interface.

To make the system complete the following things must be implemented:

  • A proxy class must be implemented for each of these interfaces
  • The slave must implement the slave interface
  • The master must implement the master interface as a context to the slave proxy

The following is also necessary, but that is handled by PFC classes:

  • The subprocess creation
  • The creation of communication pipes
  • The serialization/deserialization, communication and dispatching of data/messages

The implementation of the proxy classes are quite simple:

slaveProcess

For the master process we will need a proxy representing the slave process, the interface of this proxy looks like this:

interface slaveProcess supports slave, useExe
 
predicates from pfc\masterSlaveProcessSupport
    terminate_toRemote
 
end interface slaveProcess


It supports the slave interface (which is the main proxy facility). It also supports useExe which is the PFC standard interface for subprocesses and finally is has a standard extra predicate to request the termination of the subprocess.

The class declaration of the slaveProcess looks like this:

class slaveProcess : slaveProcess
 
constructors
    new : (master Master).
 
end class slaveProcess

The Master is the implementation of the master interface that the slave will communicate with. I.e. it is the master's implementation of the master interface.

The implementation of the slaveProcess looks like this:

implement slaveProcess inherits slaveProcessSupport
    open pfc\
 
clauses
    new(Master) :-
        slaveProcessSupport::new("slave.exe"),
        register_notify("compilerMessage", Master:compilerMessage).
 
clauses
    compile(Filename, OnCompileResult) :-
        call_toRemote(predicate_name(), Filename, OnCompileResult).
 
clauses
    setSettings(Settings) :-
        notify_toRemote(predicate_name(), Settings).
 
end implement slaveProcess

The implementation inherits from slaveProcessSupport which is initialized with a "path" to the slave program (which in our case is in the same directory as the master and has the name "slave.exe"). If needed we can also supply a command line for the program in this initialization call.

The implementation also registers all calls and notifications of the master interface. In this case it is just the compilerMessage notification.

Finally, the implementation contains a proxy method for each of the calls and notifications in the slave interface.

You will notice that registration as well as proxy methods is trivial code.

masterProxy

For the slave process we will need a proxy representing the master interface provided by the master. The interface of this proxy looks like this:

interface masterProxy supports master
 
properties from pfc\masterSlaveProcessSupport
    fromLoop
 
end interface masterProxy

It simply supports the master interface and provides a fromLoop predicate that must be invoked to receive communication from the master. We will return to this predicate below.

The class declaration of the masterProxy looks like this:

class masterProxy : masterProxy
 
constructors
    new : (slave Slave).
 
end class masterProxy

The constructor takes the implementation of the slave interface as argument. The implementation of the masterProxy looks like this:

implement masterProxy inherits masterProxySupport
    open pfc\
 
clauses
    new(Slave) :-
        register_call("compile", Slave:compile),
        register_notify("setSettings", Slave:setSettings).
 
clauses
    compilerMessage(Message) :-
        notify_toRemote(predicate_name(), Message).
 
end implement masterProxy

It inherits from masterProxySupport and contains registration of all slave calls and notifications, and proxy implantations of all master calls and notifications.

The master program

The master program (i.e. the builder) is a console program that

  • compiles a set of files
  • using a number of slave-compilers and
  • writes the messages from the compilers to the console.

Finally after all compilation is completed it will write some statistics to the console.

This is done by the compiler::compileFiles predicate

class compiler : compiler
    open core
    [noDefaultConstructor]
 
predicates
    compileFiles : (string Settings, setM{string} Files, positive ProcessCount) -> pfc\asynchronous\future{string} Result.
 
end class compiler

As you can see the compiler class can actually (privately) create objects of type compiler. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the compiler class.

The compiler interface declares a doCompileFiles predicate and will not be shown here. Instead we will look at the implementation:

implement compiler
    supports master
    open compiler_ms\, slave, pfc\asynchronous\, continuationExtension
 
clauses
    compileFiles(Settings, Files, ProcessCount) = R :-
        CL = [ compileFiles2(Settings, Files) || _ = std::cIterate(ProcessCount) ],
        FC = future::fold(CL, compilerResult_zero, plus),
        R = FC:map(asString).
 
class predicates
    compileFiles2 : (string Settings, setM{string} Files) -> future{compile_rsp}.
clauses
    compileFiles2(Settings, Files) =
        promise::newContinuation(
            { (Cont) :-
                Compiler = compiler::new(Settings),
                Compiler:doCompileFiles(Files, compilerResult_zero, Cont)
            }).
 
facts
    slave : slaveProcess.
 
constructors
    new : (string Settings).
clauses
    new(Settings) :-
        slave := slaveProcess::new(This),
        slave:setNativeCreationFlags(multiThread_native::idle_priority_class),
        slave:run(),
        slave:setSettings(Settings).
 
clauses
    doCompileFiles(Files, Acc, OnCompileResult) :-
        if File = Files:tryRemoveFirst() then
            slave:compile(File,
                OnCompileResult:mkContinue(
                    { (CR) :-
                        stdio::writef("    %s %s\n", File, CR:asString()),
                        doCompileFiles(Files, Acc:plus(CR), OnCompileResult)
                    }))
        else
            slave:terminate_toRemote(),
            OnCompileResult:success(Acc)
        end if.
 
constants
    compilerResult_zero : compile_rsp = compile_rsp(0, 0, 0).
 
class predicates
    asString : (compile_rsp CompileResponse [this]) -> string.
clauses
    asString(compile_rsp(LineCount, Errors, CompilationTime)) =
        string::format("% lines in %p % erros", LineCount, duration::new(CompilationTime), Errors).
 
class predicates
    plus : (compile_rsp A [this], compile_rsp B) -> compile_rsp C.
clauses
    plus(compile_rsp(AL, BL, CL), compile_rsp(AR, BR, CR)) = compile_rsp(AL + AR, BL + BR, CL + CR).
 
% master
clauses
    compilerMessage(compilerMessage_ntf(File, Line, Message)) :-
        stdio::writef("        %s(%d): %s\n", File, Line, Message).
 
end implement compiler

The asynchronous method in the master/slave concept are based on continuation's. It is outside the scope of this article to go into details about asynchronous programming, promises/futures and continuation's. continuation's are more lightweight and efficient than promises/futures, but also offer less functionality. Here we want the main thread to wait for the result of a number of compilers it has started, and for such combined waiting and result summing, we will use promises. So the predicate compileFiles2:

class predicates
    compileFiles2 : (string Settings, setM{string} Files) -> future{compile_rsp}.
clauses
    compileFiles2(Settings, Files) =
        promise::newContinuation(
            { (Cont) :-
                Compiler = compiler::new(Settings),
                Compiler:doCompileFiles(Files, compilerResult_zero, Cont)
            }).

Starts a "compiler" and return the result as a future. promise::newContinuation is a predicate that can "lift" a continuation based routine into a future.

The main entry predicate compileFiles

clauses
    compileFiles(Settings, Files, ProcessCount) = R :-
        CL = [ compileFiles2(Settings, Files) || _ = std::cIterate(ProcessCount) ],
        FC = future::fold(CL, compilerResult_zero, plus),
        R = FC:map(asString).

Starts ProcessCount compilers and uses future::fold and map to wait for their termination and summing up the result.

The doCompileFiles contains the heart loop of the compilation. Each started compiler will be controlled with a doCompileFiles loop:

clauses
    doCompileFiles(Files, Acc, OnCompileResult) :-
        if File = Files:tryRemoveFirst() then
            slave:compile(File,
                OnCompileResult:mkContinue(
                    { (CR) :-
                        stdio::writef("    %s %s\n", File, CR:asString()),
                        doCompileFiles(Files, Acc:plus(CR), OnCompileResult)
                    }))
        else
            slave:terminate_toRemote(),
            OnCompileResult:success(Acc)
        end if.


Basically the loop will try to obtain and remove File to compile from the Files to compile.  Since several loops will do this in parallel it is important that tryRemoveFirst is thread safe.  In this example this is ensured by using a setM_redBlack_cas  (i.e. a compare and swap red black set; see the main::run implementation).

If a File can be removed from the set the slave is asked to compile it (asynchronous using a continuation).  When thew compilation is completed, we will write the result to stdio and loop to doCompileFiles again.

This will continue until Files becomes empty in which case the slave is informed to terminate (slave:terminate_toRemote()), and the accumulated result is returned.

The master also supports the compilerMessage notification:

 clauses
    compilerMessage(compilerMessage_ntf(File, Line, Message)) :-
        stdio::writef("        %s(%d): %s\n", File, Line, Message).

In our case it simply writes the messages to the standard output stream.

The slave program

The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console input/output streams are used for communication with the master and can therefore not be used for other input/output.

In this slave program everything interesting is packed into the implementation of theCompiler:

implement theCompiler
    supports slave
    open compiler_ms\, master, pfc\asynchronous\, continuationExtension
 
facts
    master : masterProxy [constant].
    settings : string := "".
 
clauses
    run() :-
        new():wait().
 
clauses
    wait() :-
        _ = master:fromLoop:get().
 
constructors
    new : ().
clauses
    new() :-
        master := masterProxy::new(This).
 
clauses
    compile(Filename, OnCompileResult) :-
        OnCompileResult:completeWith(
            {  = compile_rsp(Lines, Errors:value, D:get()) :-
                T1 = time::now(),
                Errors = varM_integer::new(),
                Lines = 20 + math::random(3000),
                foreach I = std::cIterate(Lines) + 1 and not(master:fromLoop:isCompleted()) do
                    foreach _ = std::cIterate(math::random(10000)) do
                    end foreach,
                    R = math::random(),
                    if R < 1 / 2000 then
                        Errors:inc(),
                        master:compilerMessage(compilerMessage_ntf(Filename, I, settings))
                    end if
                end foreach,
                D = duration::new(T1, time::now())
            }).
 
clauses
    setSettings(Settings) :-
        settings := Settings.
 
end implement theCompiler

theCompiler implements the slave interface which it pass as argument to the constructor of the masterProxy object, thereby the communication and message loops are established.

In this program the main thread will simply wait for the termination of the fromLoop.

The master can notify the slave about settings using setSettings notification. You should be aware that the entries in a slave are unsynchronized and can be invoked in parallel by the master. The master can make calls sequential by making next call when the current completes, but the master has no means of knowing when a notification has been "completed".

The compile call completes the OnCompileResult continuation with the result of a little function that simulates a compilation. The simulation "compiles" a random number of lines each with a certain probability to issue a compilerMessage which is notified back to the master (i.e. through the proxy object).

The compilation also test for termination request from the master (i.e. master:fromLoop:isCompleted()), this test would actually only be relevant if the compilation was terminated in the middle of compilation which this example will never do. But when compiling files from an IDE it is custom to have a "stop compilation" facility. This facility could wait all compilers to ask for the next file to compile and terminate them there, but it could also send a terminate request to the slave, in which case the fromLoop would complete.