Master/slave processes
Visual Prolog 11 preliminary documentation. This article contains preliminary documentation for the upcoming release |
The PFC master/slave process concept can be used for a number of reasons:
- To make computations in parallel
- To isolate computations from each other
- To manage independent subprocesses
Master/slave concepts
Master/slave processes are organized hierarchically:
- A master process can have several slave processes
- Each slave process has exactly one master process.
- A slave process can be master process of other slave processes.
The master process spawns and owns the slave processes.
A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave.
The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master is made on the proxy object.
All methods are asynchronous (using a threadpool as execution context), so asynchronous coding style must be applied. And synchronization to non-threadsafe structures must be used. postAction will have to be used to switch execution context to the GUI thread for GUI operations (and in many applications for safe access to program state).
The master/slave process concept is illustrated by an demo project. In the demo project the master is a builder similar to the IDE and vipBuilder, this master spawns a number of compiler/slave processes.
The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something.
The compiler/slave has this interface:
interface slave open core, pfc\asynchronous\ domains compile_rsp = compile_rsp(integer LineCount, integer Errors, durationValue CompilationTime). predicates compile : (string Filename, continuation{compile_rsp} OnCompileResult). predicates setSettings : (string Settings). end interface slave
Notice that in this academic example the interface name is slave. In a real program it would be more natural to call it e.g. compiler_slave.
We will consider the details of this interface below, currently the interesting thing is that it defines a procedure compile and a notification setSettings.
When the master spawn this slave it will "notify" the slave about settings (i.e. corresponding to the command line flags , etc of a regular compiler). And then it will call compile on a file and when that operation completes it will (potentially) call compile again on another file. The master will continue this until there are no more files to compile and then it will terminate the slave.
Actually, the builder will spawn a number of slaves, so that several files can be compiled in parallel.
Likewise the master has this interface (towards the slave):
interface master domains compilerMessage_ntf = compilerMessage_ntf(string File, integer Line, string Message). predicates compilerMessage : (compilerMessage_ntf Message). end interface master
This interface only contains a single notification compilerMessage through which the compiler can notify the master with messages from the compilation, while the compilation takes place.
There are restrictions on these interfaces:
- They can contain one-input-one-output procedures (in continuation style)
- They can contain one-input notifications
- They cannot contain any other kind of predicates
- The input and output of the procedures and notifications must be serializable
These two interfaces describes the master/slave process interface.
To make the system complete the following things must be implemented:
- A proxy class must be implemented for each of these interfaces
- The slave must implement the slave interface
- The master must implement the master interface as a context to the slave proxy
The following is also necessary, but that is handled by PFC classes:
- The subprocess creation
- The creation of communication pipes
- The serialization/deserialization, communication and dispatching of data/messages
The implementation of the proxy classes are quite simple:
slaveProcess
For the master process we will need a proxy representing the slave process, the interface of this proxy looks like this:
interface slaveProcess supports slave, useExe predicates from pfc\masterSlaveProcessSupport terminate_toRemote end interface slaveProcess
It supports the slave interface (which is the main proxy facility). It also supports useExe which is the PFC standard interface for subprocesses and finally is has a standard extra predicate to request the termination of the subprocess.
The class declaration of the slaveProcess looks like this:
class slaveProcess : slaveProcess constructors new : (master Master). end class slaveProcess
The Master is the implementation of the master interface that the slave will communicate with. I.e. it is the master's implementation of the master interface.
The implementation of the slaveProcess looks like this:
implement slaveProcess inherits slaveProcessSupport open pfc\ clauses new(Master) :- slaveProcessSupport::new("slave.exe"), register_notify("compilerMessage", Master:compilerMessage). clauses compile(Filename, OnCompileResult) :- call_toRemote(predicate_name(), Filename, OnCompileResult). clauses setSettings(Settings) :- notify_toRemote(predicate_name(), Settings). end implement slaveProcess
The implementation inherits from slaveProcessSupport which is initialized with a "path" to the slave program (which in our case is in the same directory as the master and has the name "slave.exe"). If needed we can also supply a command line for the program in this initialization call.
The implementation also registers all calls and notifications of the master interface. In this case it is just the compilerMessage notification.
Finally, the implementation contains a proxy method for each of the calls and notifications in the slave interface.
You will notice that registration as well as proxy methods is trivial code.
masterProxy
For the slave process we will need a proxy representing the master interface provided by the master. The interface of this proxy looks like this:
interface masterProxy supports master properties from pfc\masterSlaveProcessSupport fromLoop end interface masterProxy
It simply supports the master interface and provides a fromLoop predicate that must be invoked to receive communication from the master. We will return to this predicate below.
The class declaration of the masterProxy looks like this:
class masterProxy : masterProxy constructors new : (slave Slave). end class masterProxy
The constructor takes the implementation of the slave interface as argument. The implementation of the masterProxy looks like this:
implement masterProxy inherits masterProxySupport open pfc\ clauses new(Slave) :- register_call("compile", Slave:compile), register_notify("setSettings", Slave:setSettings). clauses compilerMessage(Message) :- notify_toRemote(predicate_name(), Message). end implement masterProxy
It inherits from masterProxySupport and contains registration of all slave calls and notifications, and proxy implantations of all master calls and notifications.
The master program
The master program (i.e. the builder) is a console program that
- compiles a set of files
- using a number of slave-compilers and
- writes the messages from the compilers to the console.
Finally after all compilation is completed it will write some statistics to the console.
This is done by the compiler::compileFiles predicate
class compiler : compiler open core [noDefaultConstructor] predicates compileFiles : (string Settings, setM{string} Files, positive ProcessCount) -> pfc\asynchronous\future{string} Result. end class compiler
As you can see the compiler class can actually (privately) create objects of type compiler. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the compiler class.
The compiler interface declares a doCompileFiles predicate and will not be shown here. Instead we will look at the implementation:
implement compiler supports master open compiler_ms\, slave, pfc\asynchronous\, continuationExtension clauses compileFiles(Settings, Files, ProcessCount) = R :- CL = [ compileFiles2(Settings, Files) || _ = std::cIterate(ProcessCount) ], FC = future::fold(CL, compilerResult_zero, plus), R = FC:map(asString). class predicates compileFiles2 : (string Settings, setM{string} Files) -> future{compile_rsp}. clauses compileFiles2(Settings, Files) = promise::newContinuation( { (Cont) :- Compiler = compiler::new(Settings), Compiler:doCompileFiles(Files, compilerResult_zero, Cont) }). facts slave : slaveProcess. constructors new : (string Settings). clauses new(Settings) :- slave := slaveProcess::new(This), slave:setNativeCreationFlags(multiThread_native::idle_priority_class), slave:run(), slave:setSettings(Settings). clauses doCompileFiles(Files, Acc, OnCompileResult) :- if File = Files:tryRemoveFirst() then slave:compile(File, OnCompileResult:mkContinue( { (CR) :- stdio::writef(" %s %s\n", File, CR:asString()), doCompileFiles(Files, Acc:plus(CR), OnCompileResult) })) else slave:terminate_toRemote(), OnCompileResult:success(Acc) end if. constants compilerResult_zero : compile_rsp = compile_rsp(0, 0, 0). class predicates asString : (compile_rsp CompileResponse [this]) -> string. clauses asString(compile_rsp(LineCount, Errors, CompilationTime)) = string::format("% lines in %p % erros", LineCount, duration::new(CompilationTime), Errors). class predicates plus : (compile_rsp A [this], compile_rsp B) -> compile_rsp C. clauses plus(compile_rsp(AL, BL, CL), compile_rsp(AR, BR, CR)) = compile_rsp(AL + AR, BL + BR, CL + CR). % master clauses compilerMessage(compilerMessage_ntf(File, Line, Message)) :- stdio::writef(" %s(%d): %s\n", File, Line, Message). end implement compiler
The asynchronous method in the master/slave concept are based on continuation's. It is outside the scope of this article to go into details about asynchronous programming, promises/futures and continuation's. continuation's are more lightweight and efficient than promises/futures, but also offer less functionality. Here we want the main thread to wait for the result of a number of compilers it has started, and for such combined waiting and result summing, we will use promises. So the predicate compileFiles2:
class predicates compileFiles2 : (string Settings, setM{string} Files) -> future{compile_rsp}. clauses compileFiles2(Settings, Files) = promise::newContinuation( { (Cont) :- Compiler = compiler::new(Settings), Compiler:doCompileFiles(Files, compilerResult_zero, Cont) }).
Starts a "compiler" and return the result as a future. promise::newContinuation is a predicate that can "lift" a continuation based routine into a future.
The main entry predicate compileFiles
clauses compileFiles(Settings, Files, ProcessCount) = R :- CL = [ compileFiles2(Settings, Files) || _ = std::cIterate(ProcessCount) ], FC = future::fold(CL, compilerResult_zero, plus), R = FC:map(asString).
Starts ProcessCount compilers and uses future::fold and map to wait for their termination and summing up the result.
The doCompileFiles contains the heart loop of the compilation. Each started compiler will be controlled with a doCompileFiles loop:
clauses doCompileFiles(Files, Acc, OnCompileResult) :- if File = Files:tryRemoveFirst() then slave:compile(File, OnCompileResult:mkContinue( { (CR) :- stdio::writef(" %s %s\n", File, CR:asString()), doCompileFiles(Files, Acc:plus(CR), OnCompileResult) })) else slave:terminate_toRemote(), OnCompileResult:success(Acc) end if.
Basically the loop will try to obtain and remove File to compile from the Files to compile. Since several loops will do this in parallel it is important that tryRemoveFirst is thread safe. In this example this is ensured by using a setM_redBlack_cas (i.e. a compare and swap red black set; see the main::run implementation). If a File can be removed from the set the slave is asked to compile it (asynchronous using a continuation). When thew compilation is completed, we will write the result to stdio and loop to doCompileFiles again. This will continue until Files becomes empty in which case the slave is informed to terminate (slave:terminate_toRemote()), and the accumulated result is returned. The master also supports the compilerMessage notification:
clauses compilerMessage(compilerMessage_ntf(File, Line, Message)) :- stdio::writef(" %s(%d): %s\n", File, Line, Message).
In our case it simply writes the messages to the standard output stream.
The slave program
The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console input/output streams are used for communication with the master and can therefore not be used for other input/output.
In this slave program everything interesting is packed into the implementation of theCompiler:
implement theCompiler supports slave open compiler_ms\, master, pfc\asynchronous\, continuationExtension facts master : masterProxy [constant]. settings : string := "". clauses run() :- new():wait(). clauses wait() :- _ = master:fromLoop:get(). constructors new : (). clauses new() :- master := masterProxy::new(This). clauses compile(Filename, OnCompileResult) :- OnCompileResult:completeWith( { = compile_rsp(Lines, Errors:value, D:get()) :- T1 = time::now(), Errors = varM_integer::new(), Lines = 20 + math::random(3000), foreach I = std::cIterate(Lines) + 1 and not(master:fromLoop:isCompleted()) do foreach _ = std::cIterate(math::random(10000)) do end foreach, R = math::random(), if R < 1 / 2000 then Errors:inc(), master:compilerMessage(compilerMessage_ntf(Filename, I, settings)) end if end foreach, D = duration::new(T1, time::now()) }). clauses setSettings(Settings) :- settings := Settings. end implement theCompiler
theCompiler implements the slave interface which it pass as argument to the constructor of the masterProxy object, thereby the communication and message loops are established.
In this program the main thread will simply wait for the termination of the fromLoop.
The master can notify the slave about settings using setSettings notification. You should be aware that the entries in a slave are unsynchronized and can be invoked in parallel by the master. The master can make calls sequential by making next call when the current completes, but the master has no means of knowing when a notification has been "completed".
The compile call completes the OnCompileResult continuation with the result of a little function that simulates a compilation. The simulation "compiles" a random number of lines each with a certain probability to issue a compilerMessage which is notified back to the master (i.e. through the proxy object).
The compilation also test for termination request from the master (i.e. master:fromLoop:isCompleted()), this test would actually only be relevant if the compilation was terminated in the middle of compilation which this example will never do. But when compiling files from an IDE it is custom to have a "stop compilation" facility. This facility could wait all compilers to ask for the next file to compile and terminate them there, but it could also send a terminate request to the slave, in which case the fromLoop would complete.