Master/slave processes
The PFC master/slave process concept can be used for a number of reasons:
- To make computations in parallel
- To isolate computations from each other
- To manage independent subprocesses
Master/slave concepts
Master/slave processes are organized hierarchically:
- A master process can have several slave processes
- Each slave process has exactly one master process.
- A slave process can be master process of other slave processes.
The master process spawns and owns the slave processes.
A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave.
The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master are made on the proxy object.
All methods are asynchronous (using an executionContext_pool as execution context), so suspending predicates must be applied. And synchronization to non-threadsafe structures must be used. postAction will have to be used to switch execution context to the GUI thread for GUI operations (and in many applications for safe access to program state).
The master/slave process concept is illustrated by a demo project. In the demo project the master is a builder similar to the vipBuilder. This master spawns a number of compiler/slave processes.
The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something.
Notice that the demo program uses the name space compiler_ms for interfaces and classes used in the process interface. For simplicity the name space declarations are not included in this article.
The compiler/slave has this interface:
interface slave open core [pfc\masterSlaveProcess::slave] domains compile_rsp = compile_rsp(integer LineCount, integer Errors, durationValue CompilationTime). predicates compile : (string Filename) -> compile_rsp CompileResult suspending. predicates setSettings : (string Settings) -> unit suspending. end interface slave
Notice that in this academic example the interface name is slave. In a real program it would be more natural to call it e.g. compiler_slave.
We will consider the details of this interface below, currently the interesting thing is that it defines procedures compile and setSettings. Notice that they are suspending predicates.
When the master spawn this slave it will call the setSettings to inform the slave about settings (i.e. corresponding to the command line flags , etc of a regular compiler). And then it will call compile on a file and when that operation completes it will (potentially) call compile again on another file. The master will continue this until there are no more files to compile and then it will terminate the slave.
Actually, the master will spawn a number of slaves, so that several files can be compiled in parallel.
The master has this interface towards the slave:
interface master [pfc\masterSlaveProcess::master] domains compilerMessage_ntf = compilerMessage_ntf(string File, integer Line, string Message). predicates compilerMessage : (compilerMessage_ntf Message). end interface master
This interface only contains a single notification compilerMessage through which the compiler can notify the master with messages from the compilation, while the compilation takes place. Notice that this predicate is not a suspending predicate, this is because notifications completely asynchronous in the sense that no result is returned to the slave and the slave does not wait for the notification to be handled. Be aware that additional notifications and final results can arrive before other notifications has been handled. This may lead to race conditions, and if they are not acceptable you should use a suspending predicate/function instead.
There are restrictions on these interfaces:
- They can contain one-input-one-output suspending function
- They can contain one-input notification predicates
- They cannot contain any other kind of predicates
- The input and output of the procedures and notifications must be serializable
These two interfaces describes the master/slave process interface.
To make the system complete the following things must be implemented:
- A proxy class must be implemented for each of these interfaces
- The slave must implement the slave interface
- The master must implement the master interface as a context to the slave proxy
The following is also necessary, but that is handled by PFC classes:
- The subprocess creation
- The creation of communication pipes
- The serialization/deserialization, communication and dispatching of data/messages
The implementation of the proxy classes are quite simple as described in the next sections.
slaveProcess
For the master process we will need a proxy representing the slave process, the interface of this proxy looks like this:
interface slaveProcess supports slave, useExe predicates from pfc\masterSlaveProcessSupport terminate_toRemote end interface slaveProcess
It supports the slave interface (which is the main proxy facility). It also supports useExe which is the PFC standard interface for subprocesses and finally it also contains the predicate terminate_toRemote which is used internally to terminate the slave process.
The class declaration of the slaveProcess looks like this:
class slaveProcess : slaveProcess constructors new : (string SlaveProgram, master Master, string CmdLine). end class slaveProcess
The Master is the implementation of the master interface that the slave will communicate with. I.e. it is the master's implementation of the master interface.
The implementation of the slaveProcess looks like this:
implement slaveProcess inherits slaveProcessSupport open pfc\ clauses new(SlaveProgram, Master, CmdLine) :- slaveProcessSupport::new(SlaveProgram, CmdLine), register_notify("compilerMessage", Master:compilerMessage). clauses compile(Filename) = call_toRemote(predicate_name(), Filename). clauses setSettings(Settings) = call_toRemote(predicate_name(), Settings). end implement slaveProcess
The implementation inherits from slaveProcessSupport which is initialized with a "path" to the SlaveProgram.
The implementation must also register all calls and notifications of the master interface. In this case it is just the compilerMessage notification.
Finally, the implementation contains a proxy method for each of the calls and notifications in the slave interface.
You will notice that registration as well as proxy methods is trivial code.
masterProxy
For the slave process we will need a proxy representing the master interface provided by the master. The interface of this proxy looks like this:
interface masterProxy supports master properties from pfc\masterSlaveProcessSupport fromLoop end interface masterProxy
It simply supports the master interface and provides a fromLoop predicate that must be invoked to receive communication from the master. We will return to this predicate below.
The class declaration of the masterProxy looks like this:
class masterProxy : masterProxy constructors new : (slave Slave, string MsPipe). end class masterProxy
The constructor takes the implementation of the slave interface as argument and the name of a MsPipe to use for communication. The implementation of the masterProxy looks like this:
implement masterProxy inherits masterProxySupport open pfc\ clauses new(Slave, MsPipe) :- register_call("compile", Slave:compile), register_call("setSettings", Slave:setSettings), masterProxySupport::new(MsPipe). clauses compilerMessage(Message) :- notify_wait_toRemote(predicate_name(), Message). end implement masterProxy
It inherits from masterProxySupport and contains registration of all slave calls and notifications, and proxy implantations of all master calls and notifications.
The master program
The master program (i.e. the builder) is a console program that
- compiles a set of files
- using a number of slave-compilers and
- writes the messages from the compilers to the console.
Finally after all compilation is completed it will write some statistics to the console.
This is done by the compiler::compileFiles predicate
class compiler : compiler open core [noDefaultConstructor] predicates compileFiles : (string SlaveProgram, string Settings, setM{string} Files, positive ProcessCount, boolean Debug) -> pfc\asynchronous\future{string} Result. end class compiler
As you can see the compiler class can actually (privately) create objects of type compiler. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the compiler class.
The compiler interface declares a compileFiles2 predicate:
interface compiler predicates compileFiles2 : (string Settings, setM{string} Files) -> pfc\asynchronous\future{compiler_ms\slave::compile_rsp} Result. end interface compiler
As you can see it uses a future, which is an important part of submitting suspending predicates for execution. The implementation looks like this:
implement compiler supports master open core, compiler_ms\, slave, pfc\asynchronous\ class facts cs : criticalSection := criticalSection::new(1000) [constant, immediate]. clauses compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :- ProcessCount2 = math::min(list::length(Files:asList), ProcessCount), CL = [ new(SlaveProgram, D) || C = std::cIterate(ProcessCount2), D = Debug ** toBoolean(C = 0) ], RL = [ C:compileFiles2(Settings, Files) || C in CL ], FC = future::fold(RL, plus, compilerResult_zero), R = FC:futureExtension::map(asString). facts slave : slaveProcess. constructors new : (string SlaveProgram, boolean Debug). clauses new(SlaveProgram, Debug) :- CmdLine = if true = Debug then "-debug" else "" end if, slave := slaveProcess::new(SlaveProgram, This, CmdLine), slave:setNativeCreationFlags(multiThread_native::idle_priority_class), slave:run(). clauses compileFiles2(Settings, Files) = future::submit( { = R :- setSettings(Settings), R = doCompileFiles(Files, compilerResult_zero) }). predicates setSettings : (string Settings) suspending. clauses setSettings(Settings) :- _ = slave:setSettings(Settings). predicates doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending. clauses doCompileFiles(Files, Acc) = CompileResult :- if File = Files:tryRemoveFirst() then CR = slave:compile(File), cs:synchronize({ :- stdio::writef(" %s %s\n", File, CR:asString()) }), CompileResult = doCompileFiles(Files, Acc:plus(CR)) else slave:terminate_toRemote(), CompileResult = Acc end if. constants compilerResult_zero : compile_rsp = compile_rsp(0, 0, 0). class predicates asString : (compile_rsp CompileResponse [this]) -> string. clauses asString(compile_rsp(LineCount, Errors, CompilationTime)) = string::format("% lines in %p % erros", LineCount, duration::new(CompilationTime), Errors). class predicates plus : (compile_rsp A [this], compile_rsp B) -> compile_rsp C. clauses plus(compile_rsp(AL, BL, CL), compile_rsp(AR, BR, CR)) = compile_rsp(AL + AR, BL + BR, CL + CR). % master clauses compilerMessage(compilerMessage_ntf(File, Line, Message)) :- cs:synchronize({ :- stdio::writef(" %s(%d): %s\n", File, Line, Message) }). end implement compiler
The asynchronous method in the master/slave concept is based on suspending predicates. It is outside the scope of this article to go fully into details about asynchronous programming, suspending predicates and futures/promises. Here we want the main thread to wait for the result of a number of compilers it has started, and for such combined waiting and result summing, we will use futures.
The main predicate (compileFiles) will start a number of slave programs, and submit compilation "job" (compileFiles2) for each of these slaves. The submission of such a job is represented by a future which will return the result some time in the "future". The results of all these futures are folded into a single future, which in turn is "mapped" to a string.
clauses compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :- ProcessCount2 = math::min(list::length(Files:asList), ProcessCount), CL = [ new(SlaveProgram, D) || C = std::cIterate(ProcessCount2), D = Debug ** toBoolean(C = 0) ], RL = [ C:compileFiles2(Settings, Files) || C in CL ], FC = future::fold(RL, plus, compilerResult_zero), R = FC:futureExtension::map(asString).
As you can see simple processing is a bit more complicated when futures are involved. But predicates like future::fold deals with a number of things: First of all it combines the results, but it also deals with asynchronous (as well as synchronous) waiting on the individual results, and in case some of the involved sub jobs completes with an exception it will also handle cancellation of the remaining sub jobs.
The predicate compileFiles2 submits a compilation "job" to be executed in the default executionContext; which in this case will be executionContext_pool::defaultPool:
clauses compileFiles2(Settings, Files) = future::submit( { = R :- setSettings(Settings), R = doCompileFiles(Files, compilerResult_zero) }).
The result is a future.
The suspending predicate doCompileFiles contains the heart loop of the compilation. Each started compiler will be controlled with a doCompileFiles loop:
predicates doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending. clauses doCompileFiles(Files, Acc) = CompileResult :- if File = Files:tryRemoveFirst() then CR = slave:compile(File), cs:synchronize({ :- stdio::writef(" %s %s\n", File, CR:asString()) }), CompileResult = doCompileFiles(Files, Acc:plus(CR)) else slave:terminate_toRemote(), CompileResult = Acc end if.
The loop will try to remove a File to compile from the Files to compile. Since several loops will do this in parallel it is important that tryRemoveFirst is thread safe. In this example this is ensured by using a setM_redBlack_cas (i.e. a compare and swap red black set; see the main::run implementation). If a File can be removed from the set the slave is asked to compile it (asynchronous using a continuation). When the compilation is completed, we will write the result to stdio and loop to doCompileFiles again. This will continue until Files becomes empty in which case the slave is informed to terminate (slave:terminate_toRemote()), and the accumulated result is returned. The master also supports the compilerMessage notification:
clauses compilerMessage(compilerMessage_ntf(File, Line, Message)) :- cs:synchronize({ :- stdio::writef(" %s(%d): %s\n", File, Line, Message) }).
In our case it simply writes the messages to the standard output stream.
The slave program
The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console can therefore not be used for input/output.
In this slave program everything interesting is packed into the implementation of theCompiler:
implement theCompiler inherits masterProxy supports slave open core, compiler_ms\, master, pfc\asynchronous\ facts settings : string := "". clauses run(MsPipe) :- new(MsPipe):wait(). clauses wait() :- _ = fromLoop:futureExtension::get(). constructors new : (string MsPipe). clauses new(MsPipe) :- masterProxy::new(This, MsPipe). clauses compile(Filename) = compile_rsp(Lines, Errors:value, D:get()) :- T1 = time::now(), Errors = varM_integer::new(), Lines = 20 + math::random(3000), foreach I = std::cIterate(Lines) + 1 and not(executionContext::isCanceled()) do foreach _ = std::cIterate(math::random(10000)) do end foreach, R = math::random(), if R < 1 / 2000 then Errors:inc(), compilerMessage(compilerMessage_ntf(Filename, I, settings)) end if end foreach, D = duration::new(T1, time::now()). clauses setSettings(Settings) = unit :- settings := Settings. end implement theCompiler
theCompiler implements the slave interface which it pass as argument to the constructor of the masterProxy object, thereby the communication and message loops are established.
Recall that compile and setSettings are suspending predicates.
compile performs a simple dummy simulation a compiler, by using CPU, returning messages and a final result. The simulation "compiles" a random number of lines each with a certain probability to issue a compilerMessage which is notified back to the master (i.e. through the proxy object).
The compilation also test for termination request from the master (i.e. not(executionContext::isCanceled())), this test would actually only be relevant if the compilation was terminated in the middle of compilation which this example will never do. But when compiling files from an IDE it is custom to have a "stop compilation" facility. This facility could wait all compilers to ask for the next file to compile and terminate them there, but it could also send a terminate request to the slave, in which case the fromLoop would complete.
In this program the main thread will simply waits for the termination of the fromLoop.