Difference between revisions of "Master/slave processes"
m (release) |
m (review) |
||
(2 intermediate revisions by the same user not shown) | |||
Line 17: | Line 17: | ||
A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave. | A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave. | ||
The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master | The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master are made on the proxy object. | ||
All methods are asynchronous (using | All methods are asynchronous (using an <vp>executionContext_pool</vp> as execution context), so {{lang|Suspending Predicates|suspending predicates}} must be applied. And synchronization to non-threadsafe structures must be used. <vp>postAction</vp> will have to be used to switch execution context to the GUI thread for GUI operations (and in many applications for safe access to program state). | ||
The master/slave process concept is illustrated by | The master/slave process concept is illustrated by a demo project. In the demo project the master is a builder similar to the '''vipBuilder'''. This master spawns a number of compiler/slave processes. | ||
The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something. | The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something. | ||
Notice that the demo program uses the name space <vp>compiler_ms</vp> for interfaces and classes used in the process interface. For simplicity the name space declarations are not included in this article. | |||
The compiler/slave has this interface: | The compiler/slave has this interface: | ||
Line 29: | Line 31: | ||
<vip> | <vip> | ||
interface slave | interface slave | ||
open core | open core | ||
[pfc\masterSlaveProcess::slave] | |||
domains | domains | ||
Line 35: | Line 38: | ||
predicates | predicates | ||
compile : (string Filename | compile : (string Filename) -> compile_rsp CompileResult suspending. | ||
predicates | predicates | ||
setSettings : (string Settings). | setSettings : (string Settings) -> unit suspending. | ||
end interface slave | end interface slave | ||
Line 45: | Line 48: | ||
Notice that in this academic example the interface name is <vp>slave</vp>. In a real program it would be more natural to call it e.g. <vp>compiler_slave</vp>. | Notice that in this academic example the interface name is <vp>slave</vp>. In a real program it would be more natural to call it e.g. <vp>compiler_slave</vp>. | ||
We will consider the details of this interface below, currently the interesting thing is that it defines | We will consider the details of this interface below, currently the interesting thing is that it defines procedures <vp>compile</vp> and <vp>setSettings</vp>. Notice that they are {{lang|Suspending Predicates|suspending predicates}}. | ||
When the master spawn this slave it will | When the master spawn this slave it will call the <vp>setSettings</vp> to inform the slave about settings (i.e. corresponding to the command line flags , etc of a regular compiler). And then it will call <vp>compile</vp> on a file and when that operation completes it will (potentially) call <vp>compile</vp> again on another file. The master will continue this until there are no more files to compile and then it will terminate the slave. | ||
Actually, the | Actually, the master will spawn a number of slaves, so that several files can be compiled in parallel. | ||
The master has this interface towards the slave: | |||
<vip> | <vip> | ||
interface master | interface master | ||
[pfc\masterSlaveProcess::master] | |||
domains | domains | ||
Line 65: | Line 69: | ||
</vip> | </vip> | ||
This interface only contains a single notification <vp>compilerMessage</vp> through which the compiler can notify the master with messages from the compilation, while the compilation takes place. | This interface only contains a single notification <vp>compilerMessage</vp> through which the compiler can notify the master with messages from the compilation, while the compilation takes place. Notice that this predicate is not a suspending predicate, this is because notifications completely asynchronous in the sense that no result is returned to the slave and the slave does not wait for the notification to be handled. Be aware that additional notifications and final results can arrive before other notifications has been handled. This may lead to race conditions, and if they are not acceptable you should use a suspending predicate/function instead. | ||
There are restrictions on these interfaces: | There are restrictions on these interfaces: | ||
* They can contain one-input-one-output | * They can contain one-input-one-output {{lang|Suspending Predicates|suspending function}} | ||
* They can contain one-input | * They can contain one-input notification predicates | ||
* They cannot contain any other kind of predicates | * They cannot contain any other kind of predicates | ||
* The input and output of the procedures and notifications must be serializable | * The input and output of the procedures and notifications must be serializable | ||
Line 88: | Line 92: | ||
* The serialization/deserialization, communication and dispatching of data/messages | * The serialization/deserialization, communication and dispatching of data/messages | ||
The implementation of the proxy classes are quite simple | The implementation of the proxy classes are quite simple as described in the next sections. | ||
=== slaveProcess === | === slaveProcess === | ||
Line 103: | Line 107: | ||
</vip> | </vip> | ||
It supports the <vp>slave</vp> interface (which is the main proxy facility). It also supports <vp>useExe</vp> which is the PFC standard interface for subprocesses and finally it also contains the predicate <vp>terminate_toRemote</vp> which is used internally to terminate the slave process. | |||
It supports the <vp>slave</vp> interface (which is the main proxy facility). It also supports <vp>useExe</vp> which is the PFC standard interface for subprocesses and finally is | |||
The class declaration of the <vp>slaveProcess</vp> looks like this: | The class declaration of the <vp>slaveProcess</vp> looks like this: | ||
Line 112: | Line 115: | ||
constructors | constructors | ||
new : (master Master). | new : (string SlaveProgram, master Master, string CmdLine). | ||
end class slaveProcess | end class slaveProcess | ||
Line 126: | Line 129: | ||
clauses | clauses | ||
new(Master) :- | new(SlaveProgram, Master, CmdLine) :- | ||
slaveProcessSupport::new( | slaveProcessSupport::new(SlaveProgram, CmdLine), | ||
register_notify("compilerMessage", Master:compilerMessage). | register_notify("compilerMessage", Master:compilerMessage). | ||
clauses | clauses | ||
compile(Filename | compile(Filename) = call_toRemote(predicate_name(), Filename). | ||
clauses | clauses | ||
setSettings(Settings) | setSettings(Settings) = call_toRemote(predicate_name(), Settings). | ||
end implement slaveProcess | end implement slaveProcess | ||
</vip> | </vip> | ||
The implementation inherits from <vp>slaveProcessSupport</vp> which is initialized with a "path" to the | The implementation inherits from <vp>slaveProcessSupport</vp> which is initialized with a "path" to the <vp>SlaveProgram</vp>. | ||
The implementation also | The implementation must also register all calls and notifications of the <vp>master</vp> interface. In this case it is just the <vp>compilerMessage</vp> notification. | ||
Finally, the implementation contains a proxy method for each of the calls and notifications in the <vp>slave</vp> interface. | Finally, the implementation contains a proxy method for each of the calls and notifications in the <vp>slave</vp> interface. | ||
Line 170: | Line 171: | ||
constructors | constructors | ||
new : (slave Slave). | new : (slave Slave, string MsPipe). | ||
end class masterProxy | end class masterProxy | ||
</vip> | </vip> | ||
The constructor takes the implementation of the <vp>slave</vp> interface as argument. The implementation of the <vp>masterProxy</vp> looks like this: | The constructor takes the implementation of the <vp>slave</vp> interface as argument and the name of a <vp>MsPipe</vp> to use for communication. The implementation of the <vp>masterProxy</vp> looks like this: | ||
<vip> | <vip> | ||
Line 182: | Line 183: | ||
clauses | clauses | ||
new(Slave) :- | new(Slave, MsPipe) :- | ||
register_call("compile", Slave:compile), | register_call("compile", Slave:compile), | ||
register_call("setSettings", Slave:setSettings), | |||
masterProxySupport::new(MsPipe). | |||
clauses | clauses | ||
compilerMessage(Message) :- | compilerMessage(Message) :- | ||
notify_wait_toRemote(predicate_name(), Message). | |||
end implement masterProxy | end implement masterProxy | ||
Line 198: | Line 200: | ||
The master program (i.e. the builder) is a console program that | The master program (i.e. the builder) is a console program that | ||
* compiles a set of files | * compiles a set of files | ||
* using a number of slave-compilers and | * using a number of slave-compilers and | ||
* writes the messages from the compilers to the console. | * writes the messages from the compilers to the console. | ||
Finally after all compilation is completed it will write some statistics to the console. | Finally after all compilation is completed it will write some statistics to the console. | ||
Line 211: | Line 215: | ||
predicates | predicates | ||
compileFiles : (string Settings, setM{string} Files, positive ProcessCount) -> pfc\asynchronous\future{string} Result. | compileFiles : (string SlaveProgram, string Settings, setM{string} Files, positive ProcessCount, boolean Debug) | ||
-> pfc\asynchronous\future{string} Result. | |||
end class compiler | end class compiler | ||
Line 218: | Line 223: | ||
As you can see the <vp>compiler</vp> class can actually (privately) create objects of type <vp>compiler</vp>. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the <vp>compiler</vp> class. | As you can see the <vp>compiler</vp> class can actually (privately) create objects of type <vp>compiler</vp>. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the <vp>compiler</vp> class. | ||
The compiler interface declares a <vp> | The <vp>compiler</vp> interface declares a <vp>compileFiles2</vp> predicate: | ||
<vip> | |||
interface compiler | |||
predicates | |||
compileFiles2 : (string Settings, setM{string} Files) -> pfc\asynchronous\future{compiler_ms\slave::compile_rsp} Result. | |||
end interface compiler | |||
</vip> | |||
As you can see it uses a <vp>future</vp>, which is an important part of submitting suspending predicates for execution. The implementation looks like this: | |||
<vip> | <vip> | ||
implement compiler | implement compiler | ||
supports master | supports master | ||
open compiler_ms\, slave, pfc\asynchronous\ | open core, compiler_ms\, slave, pfc\asynchronous\ | ||
class facts | |||
cs : criticalSection := criticalSection::new(1000) [constant, immediate]. | |||
clauses | clauses | ||
compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :- | |||
ProcessCount2 = math::min(list::length(Files:asList), ProcessCount), | |||
CL = | |||
[ new(SlaveProgram, D) || | |||
C = std::cIterate(ProcessCount2), | |||
D = Debug ** toBoolean(C = 0) | |||
], | |||
RL = [ C:compileFiles2(Settings, Files) || C in CL ], | |||
FC = future::fold(RL, plus, compilerResult_zero), | |||
R = FC:futureExtension::map(asString). | |||
facts | facts | ||
Line 245: | Line 260: | ||
constructors | constructors | ||
new : (string | new : (string SlaveProgram, boolean Debug). | ||
clauses | clauses | ||
new( | new(SlaveProgram, Debug) :- | ||
slave := slaveProcess::new(This), | CmdLine = if true = Debug then "-debug" else "" end if, | ||
slave := slaveProcess::new(SlaveProgram, This, CmdLine), | |||
slave:setNativeCreationFlags(multiThread_native::idle_priority_class), | slave:setNativeCreationFlags(multiThread_native::idle_priority_class), | ||
slave:run(), | slave:run(). | ||
slave:setSettings(Settings). | |||
clauses | |||
compileFiles2(Settings, Files) = | |||
future::submit( | |||
{ = R :- | |||
setSettings(Settings), | |||
R = doCompileFiles(Files, compilerResult_zero) | |||
}). | |||
predicates | |||
setSettings : (string Settings) suspending. | |||
clauses | |||
setSettings(Settings) :- | |||
_ = slave:setSettings(Settings). | |||
predicates | |||
doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending. | |||
clauses | clauses | ||
doCompileFiles(Files, Acc | doCompileFiles(Files, Acc) = CompileResult :- | ||
if File = Files:tryRemoveFirst() then | if File = Files:tryRemoveFirst() then | ||
slave:compile(File, | CR = slave:compile(File), | ||
cs:synchronize({ :- stdio::writef(" %s %s\n", File, CR:asString()) }), | |||
CompileResult = doCompileFiles(Files, Acc:plus(CR)) | |||
else | else | ||
slave:terminate_toRemote(), | slave:terminate_toRemote(), | ||
CompileResult = Acc | |||
end if. | end if. | ||
Line 284: | Line 312: | ||
clauses | clauses | ||
compilerMessage(compilerMessage_ntf(File, Line, Message)) :- | compilerMessage(compilerMessage_ntf(File, Line, Message)) :- | ||
stdio::writef(" %s(%d): %s\n", File, Line, Message). | cs:synchronize({ :- stdio::writef(" %s(%d): %s\n", File, Line, Message) }). | ||
end implement compiler | end implement compiler | ||
</vip> | </vip> | ||
The asynchronous method in the master/slave concept | The asynchronous method in the master/slave concept is based on suspending predicates. It is outside the scope of this article to go fully into details about asynchronous programming, suspending predicates and <vp>future</vp>s/<vp>promise</vp>s. | ||
<vp> | Here we want the main thread to wait for the result of a number of compilers it has started, and for such combined waiting and result summing, we will use <vp>future</vp>s. | ||
The main predicate (<vp>compileFiles</vp>) will start a number of <vp>slave</vp> programs, and submit compilation "job" (<vp>compileFiles2</vp>) for each of these <vp>slave</vp>s. The submission of such a job is represented by a <vp>future</vp> which will return the result some time in the "future". The results of all these <vp>future</vp>s are folded into a single future, which in turn is "mapped" to a string. | |||
<vip> | <vip> | ||
clauses | clauses | ||
compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :- | |||
ProcessCount2 = math::min(list::length(Files:asList), ProcessCount), | |||
CL = | |||
[ new(SlaveProgram, D) || | |||
C = std::cIterate(ProcessCount2), | |||
D = Debug ** toBoolean(C = 0) | |||
], | |||
RL = [ C:compileFiles2(Settings, Files) || C in CL ], | |||
FC = future::fold(RL, plus, compilerResult_zero), | |||
R = FC:futureExtension::map(asString). | |||
</vip> | </vip> | ||
As you can see simple processing is a bit more complicated when futures are involved. But predicates like <vp>future::fold</vp> deals with a number of things: First of all it combines the results, but it also deals with asynchronous (as well as synchronous) waiting on the individual results, and in case some of the involved sub jobs completes with an exception it will also handle cancellation of the remaining sub jobs. | |||
The | |||
The predicate <vp>compileFiles2</vp> submits a compilation "job" to be executed in the default <vp>executionContext</vp>; which in this case will be <vp>executionContext_pool::defaultPool</vp>: | |||
<vip> | <vip> | ||
clauses | clauses | ||
compileFiles2(Settings, Files) = | |||
future::submit( | |||
{ = R :- | |||
setSettings(Settings), | |||
R = doCompileFiles(Files, compilerResult_zero) | |||
}). | |||
</vip> | </vip> | ||
The result is a <vp>future</vp>. | |||
The <vp>doCompileFiles</vp> contains the heart loop of the compilation. Each started compiler will be controlled with a <vp>doCompileFiles</vp> loop: | The suspending predicate <vp>doCompileFiles</vp> contains the heart loop of the compilation. Each started compiler will be controlled with a <vp>doCompileFiles</vp> loop: | ||
<vip> | <vip> | ||
predicates | |||
doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending. | |||
clauses | clauses | ||
doCompileFiles(Files, Acc | doCompileFiles(Files, Acc) = CompileResult :- | ||
if File = Files:tryRemoveFirst() then | if File = Files:tryRemoveFirst() then | ||
slave:compile(File, | CR = slave:compile(File), | ||
cs:synchronize({ :- stdio::writef(" %s %s\n", File, CR:asString()) }), | |||
CompileResult = doCompileFiles(Files, Acc:plus(CR)) | |||
else | else | ||
slave:terminate_toRemote(), | slave:terminate_toRemote(), | ||
CompileResult = Acc | |||
end if. | end if. | ||
</vip> | </vip> | ||
The loop will try to remove a <vp>File</vp> to compile from the <vp>Files</vp> to compile. Since several loops will do this in parallel it is important that <vp>tryRemoveFirst</vp> is thread safe. In this example this is ensured by using a <vp>setM_redBlack_cas</vp> (i.e. a ''compare and swap'' red black set; see the <vp>main::run</vp> implementation). | |||
If a <vp>File</vp> can be removed from the set the <vp>slave</vp> is asked to <vp>compile</vp> it (asynchronous using a <vp>continuation</vp>). When | If a <vp>File</vp> can be removed from the set the <vp>slave</vp> is asked to <vp>compile</vp> it (asynchronous using a <vp>continuation</vp>). When the compilation is completed, we will write the result to stdio and loop to <vp>doCompileFiles</vp> again. | ||
This will continue until <vp>Files</vp> becomes empty in which case the <vp>slave</vp> is informed to terminate (<vp>slave:terminate_toRemote()</vp>), and the accumulated result is returned. | This will continue until <vp>Files</vp> becomes empty in which case the <vp>slave</vp> is informed to terminate (<vp>slave:terminate_toRemote()</vp>), and the accumulated result is returned. | ||
Line 347: | Line 379: | ||
<vip> | <vip> | ||
clauses | |||
compilerMessage(compilerMessage_ntf(File, Line, Message)) :- | compilerMessage(compilerMessage_ntf(File, Line, Message)) :- | ||
stdio::writef(" %s(%d): %s\n", File, Line, Message). | cs:synchronize({ :- stdio::writef(" %s(%d): %s\n", File, Line, Message) }). | ||
</vip> | </vip> | ||
Line 356: | Line 388: | ||
=== The slave program === | === The slave program === | ||
The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console | The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console can therefore '''''not''''' be used for input/output. | ||
In this slave program everything interesting is packed into the implementation of <vp>theCompiler</vp>: | In this slave program everything interesting is packed into the implementation of <vp>theCompiler</vp>: | ||
<vip> | <vip> | ||
implement theCompiler | implement theCompiler inherits masterProxy | ||
supports slave | supports slave | ||
open compiler_ms\, master, pfc\asynchronous\ | open core, compiler_ms\, master, pfc\asynchronous\ | ||
facts | facts | ||
settings : string := "". | settings : string := "". | ||
clauses | clauses | ||
run() :- | run(MsPipe) :- | ||
new():wait(). | new(MsPipe):wait(). | ||
clauses | clauses | ||
wait() :- | wait() :- | ||
_ = | _ = fromLoop:futureExtension::get(). | ||
constructors | constructors | ||
new : (). | new : (string MsPipe). | ||
clauses | clauses | ||
new() :- | new(MsPipe) :- | ||
masterProxy::new(This, MsPipe). | |||
clauses | clauses | ||
compile(Filename | compile(Filename) = compile_rsp(Lines, Errors:value, D:get()) :- | ||
T1 = time::now(), | |||
Errors = varM_integer::new(), | |||
Lines = 20 + math::random(3000), | |||
foreach I = std::cIterate(Lines) + 1 and not(executionContext::isCanceled()) do | |||
foreach _ = std::cIterate(math::random(10000)) do | |||
end foreach, | |||
R = math::random(), | |||
if R < 1 / 2000 then | |||
Errors:inc(), | |||
compilerMessage(compilerMessage_ntf(Filename, I, settings)) | |||
end if | |||
end foreach, | |||
D = duration::new(T1, time::now()). | |||
clauses | clauses | ||
setSettings(Settings) :- | setSettings(Settings) = unit :- | ||
settings := Settings. | settings := Settings. | ||
Line 411: | Line 439: | ||
<vp>theCompiler</vp> implements the <vp>slave</vp> interface which it pass as argument to the constructor of the <vp>masterProxy</vp> object, thereby the communication and message loops are established. | <vp>theCompiler</vp> implements the <vp>slave</vp> interface which it pass as argument to the constructor of the <vp>masterProxy</vp> object, thereby the communication and message loops are established. | ||
Recall that <vp>compile</vp> and <vp>setSettings</vp> are suspending predicates. | |||
<vp>compile</vp> performs a simple dummy simulation a compiler, by using CPU, returning messages and a final result. | |||
The simulation "compiles" a random number of lines each with a certain probability to issue a compilerMessage which is notified back to the <vp>master</vp> (i.e. through the proxy object). | |||
The <vp> | The compilation also test for termination request from the master (i.e. <vp>not(executionContext::isCanceled())</vp>), this test would actually only be relevant if the compilation was terminated in the middle of compilation which this example will never do. But when compiling files from an IDE it is custom to have a "stop compilation" facility. This facility could wait all compilers to ask for the next file to compile and terminate them there, but it could also send a terminate request to the slave, in which case the fromLoop would complete. | ||
In this program the main thread will simply <vp>wait</vp>s for the termination of the <vp>fromLoop</vp>. | |||
[[Category:Tutorials]] | [[Category:Tutorials]] |
Latest revision as of 12:32, 15 October 2024
The PFC master/slave process concept can be used for a number of reasons:
- To make computations in parallel
- To isolate computations from each other
- To manage independent subprocesses
Master/slave concepts
Master/slave processes are organized hierarchically:
- A master process can have several slave processes
- Each slave process has exactly one master process.
- A slave process can be master process of other slave processes.
The master process spawns and owns the slave processes.
A master and slave process can communicate with each other using remote procedure calls and notifications. All communication is serialized on pipes that are established between the master and the slave.
The communication is (quite) transparent to the programs, in the master process the slave is represented by a proxy object; calls and notifications from the master to the slave is performed by invoking methods on the proxy object. Likewise the master is represented by a proxy object in the slave and calls and notifications from slave to master are made on the proxy object.
All methods are asynchronous (using an executionContext_pool as execution context), so suspending predicates must be applied. And synchronization to non-threadsafe structures must be used. postAction will have to be used to switch execution context to the GUI thread for GUI operations (and in many applications for safe access to program state).
The master/slave process concept is illustrated by a demo project. In the demo project the master is a builder similar to the vipBuilder. This master spawns a number of compiler/slave processes.
The demo is academic in the sense that nothing is really build; the "compilers" merely simulate that they do something.
Notice that the demo program uses the name space compiler_ms for interfaces and classes used in the process interface. For simplicity the name space declarations are not included in this article.
The compiler/slave has this interface:
interface slave open core [pfc\masterSlaveProcess::slave] domains compile_rsp = compile_rsp(integer LineCount, integer Errors, durationValue CompilationTime). predicates compile : (string Filename) -> compile_rsp CompileResult suspending. predicates setSettings : (string Settings) -> unit suspending. end interface slave
Notice that in this academic example the interface name is slave. In a real program it would be more natural to call it e.g. compiler_slave.
We will consider the details of this interface below, currently the interesting thing is that it defines procedures compile and setSettings. Notice that they are suspending predicates.
When the master spawn this slave it will call the setSettings to inform the slave about settings (i.e. corresponding to the command line flags , etc of a regular compiler). And then it will call compile on a file and when that operation completes it will (potentially) call compile again on another file. The master will continue this until there are no more files to compile and then it will terminate the slave.
Actually, the master will spawn a number of slaves, so that several files can be compiled in parallel.
The master has this interface towards the slave:
interface master [pfc\masterSlaveProcess::master] domains compilerMessage_ntf = compilerMessage_ntf(string File, integer Line, string Message). predicates compilerMessage : (compilerMessage_ntf Message). end interface master
This interface only contains a single notification compilerMessage through which the compiler can notify the master with messages from the compilation, while the compilation takes place. Notice that this predicate is not a suspending predicate, this is because notifications completely asynchronous in the sense that no result is returned to the slave and the slave does not wait for the notification to be handled. Be aware that additional notifications and final results can arrive before other notifications has been handled. This may lead to race conditions, and if they are not acceptable you should use a suspending predicate/function instead.
There are restrictions on these interfaces:
- They can contain one-input-one-output suspending function
- They can contain one-input notification predicates
- They cannot contain any other kind of predicates
- The input and output of the procedures and notifications must be serializable
These two interfaces describes the master/slave process interface.
To make the system complete the following things must be implemented:
- A proxy class must be implemented for each of these interfaces
- The slave must implement the slave interface
- The master must implement the master interface as a context to the slave proxy
The following is also necessary, but that is handled by PFC classes:
- The subprocess creation
- The creation of communication pipes
- The serialization/deserialization, communication and dispatching of data/messages
The implementation of the proxy classes are quite simple as described in the next sections.
slaveProcess
For the master process we will need a proxy representing the slave process, the interface of this proxy looks like this:
interface slaveProcess supports slave, useExe predicates from pfc\masterSlaveProcessSupport terminate_toRemote end interface slaveProcess
It supports the slave interface (which is the main proxy facility). It also supports useExe which is the PFC standard interface for subprocesses and finally it also contains the predicate terminate_toRemote which is used internally to terminate the slave process.
The class declaration of the slaveProcess looks like this:
class slaveProcess : slaveProcess constructors new : (string SlaveProgram, master Master, string CmdLine). end class slaveProcess
The Master is the implementation of the master interface that the slave will communicate with. I.e. it is the master's implementation of the master interface.
The implementation of the slaveProcess looks like this:
implement slaveProcess inherits slaveProcessSupport open pfc\ clauses new(SlaveProgram, Master, CmdLine) :- slaveProcessSupport::new(SlaveProgram, CmdLine), register_notify("compilerMessage", Master:compilerMessage). clauses compile(Filename) = call_toRemote(predicate_name(), Filename). clauses setSettings(Settings) = call_toRemote(predicate_name(), Settings). end implement slaveProcess
The implementation inherits from slaveProcessSupport which is initialized with a "path" to the SlaveProgram.
The implementation must also register all calls and notifications of the master interface. In this case it is just the compilerMessage notification.
Finally, the implementation contains a proxy method for each of the calls and notifications in the slave interface.
You will notice that registration as well as proxy methods is trivial code.
masterProxy
For the slave process we will need a proxy representing the master interface provided by the master. The interface of this proxy looks like this:
interface masterProxy supports master properties from pfc\masterSlaveProcessSupport fromLoop end interface masterProxy
It simply supports the master interface and provides a fromLoop predicate that must be invoked to receive communication from the master. We will return to this predicate below.
The class declaration of the masterProxy looks like this:
class masterProxy : masterProxy constructors new : (slave Slave, string MsPipe). end class masterProxy
The constructor takes the implementation of the slave interface as argument and the name of a MsPipe to use for communication. The implementation of the masterProxy looks like this:
implement masterProxy inherits masterProxySupport open pfc\ clauses new(Slave, MsPipe) :- register_call("compile", Slave:compile), register_call("setSettings", Slave:setSettings), masterProxySupport::new(MsPipe). clauses compilerMessage(Message) :- notify_wait_toRemote(predicate_name(), Message). end implement masterProxy
It inherits from masterProxySupport and contains registration of all slave calls and notifications, and proxy implantations of all master calls and notifications.
The master program
The master program (i.e. the builder) is a console program that
- compiles a set of files
- using a number of slave-compilers and
- writes the messages from the compilers to the console.
Finally after all compilation is completed it will write some statistics to the console.
This is done by the compiler::compileFiles predicate
class compiler : compiler open core [noDefaultConstructor] predicates compileFiles : (string SlaveProgram, string Settings, setM{string} Files, positive ProcessCount, boolean Debug) -> pfc\asynchronous\future{string} Result. end class compiler
As you can see the compiler class can actually (privately) create objects of type compiler. Each of these objects represents a compiler subprocess, but the management of these are kept privately in the implementation of the compiler class.
The compiler interface declares a compileFiles2 predicate:
interface compiler predicates compileFiles2 : (string Settings, setM{string} Files) -> pfc\asynchronous\future{compiler_ms\slave::compile_rsp} Result. end interface compiler
As you can see it uses a future, which is an important part of submitting suspending predicates for execution. The implementation looks like this:
implement compiler supports master open core, compiler_ms\, slave, pfc\asynchronous\ class facts cs : criticalSection := criticalSection::new(1000) [constant, immediate]. clauses compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :- ProcessCount2 = math::min(list::length(Files:asList), ProcessCount), CL = [ new(SlaveProgram, D) || C = std::cIterate(ProcessCount2), D = Debug ** toBoolean(C = 0) ], RL = [ C:compileFiles2(Settings, Files) || C in CL ], FC = future::fold(RL, plus, compilerResult_zero), R = FC:futureExtension::map(asString). facts slave : slaveProcess. constructors new : (string SlaveProgram, boolean Debug). clauses new(SlaveProgram, Debug) :- CmdLine = if true = Debug then "-debug" else "" end if, slave := slaveProcess::new(SlaveProgram, This, CmdLine), slave:setNativeCreationFlags(multiThread_native::idle_priority_class), slave:run(). clauses compileFiles2(Settings, Files) = future::submit( { = R :- setSettings(Settings), R = doCompileFiles(Files, compilerResult_zero) }). predicates setSettings : (string Settings) suspending. clauses setSettings(Settings) :- _ = slave:setSettings(Settings). predicates doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending. clauses doCompileFiles(Files, Acc) = CompileResult :- if File = Files:tryRemoveFirst() then CR = slave:compile(File), cs:synchronize({ :- stdio::writef(" %s %s\n", File, CR:asString()) }), CompileResult = doCompileFiles(Files, Acc:plus(CR)) else slave:terminate_toRemote(), CompileResult = Acc end if. constants compilerResult_zero : compile_rsp = compile_rsp(0, 0, 0). class predicates asString : (compile_rsp CompileResponse [this]) -> string. clauses asString(compile_rsp(LineCount, Errors, CompilationTime)) = string::format("% lines in %p % erros", LineCount, duration::new(CompilationTime), Errors). class predicates plus : (compile_rsp A [this], compile_rsp B) -> compile_rsp C. clauses plus(compile_rsp(AL, BL, CL), compile_rsp(AR, BR, CR)) = compile_rsp(AL + AR, BL + BR, CL + CR). % master clauses compilerMessage(compilerMessage_ntf(File, Line, Message)) :- cs:synchronize({ :- stdio::writef(" %s(%d): %s\n", File, Line, Message) }). end implement compiler
The asynchronous method in the master/slave concept is based on suspending predicates. It is outside the scope of this article to go fully into details about asynchronous programming, suspending predicates and futures/promises. Here we want the main thread to wait for the result of a number of compilers it has started, and for such combined waiting and result summing, we will use futures.
The main predicate (compileFiles) will start a number of slave programs, and submit compilation "job" (compileFiles2) for each of these slaves. The submission of such a job is represented by a future which will return the result some time in the "future". The results of all these futures are folded into a single future, which in turn is "mapped" to a string.
clauses compileFiles(SlaveProgram, Settings, Files, ProcessCount, Debug) = R :- ProcessCount2 = math::min(list::length(Files:asList), ProcessCount), CL = [ new(SlaveProgram, D) || C = std::cIterate(ProcessCount2), D = Debug ** toBoolean(C = 0) ], RL = [ C:compileFiles2(Settings, Files) || C in CL ], FC = future::fold(RL, plus, compilerResult_zero), R = FC:futureExtension::map(asString).
As you can see simple processing is a bit more complicated when futures are involved. But predicates like future::fold deals with a number of things: First of all it combines the results, but it also deals with asynchronous (as well as synchronous) waiting on the individual results, and in case some of the involved sub jobs completes with an exception it will also handle cancellation of the remaining sub jobs.
The predicate compileFiles2 submits a compilation "job" to be executed in the default executionContext; which in this case will be executionContext_pool::defaultPool:
clauses compileFiles2(Settings, Files) = future::submit( { = R :- setSettings(Settings), R = doCompileFiles(Files, compilerResult_zero) }).
The result is a future.
The suspending predicate doCompileFiles contains the heart loop of the compilation. Each started compiler will be controlled with a doCompileFiles loop:
predicates doCompileFiles : (setM{string} Files, compile_rsp Acc) -> compile_rsp OnCompileResult suspending. clauses doCompileFiles(Files, Acc) = CompileResult :- if File = Files:tryRemoveFirst() then CR = slave:compile(File), cs:synchronize({ :- stdio::writef(" %s %s\n", File, CR:asString()) }), CompileResult = doCompileFiles(Files, Acc:plus(CR)) else slave:terminate_toRemote(), CompileResult = Acc end if.
The loop will try to remove a File to compile from the Files to compile. Since several loops will do this in parallel it is important that tryRemoveFirst is thread safe. In this example this is ensured by using a setM_redBlack_cas (i.e. a compare and swap red black set; see the main::run implementation). If a File can be removed from the set the slave is asked to compile it (asynchronous using a continuation). When the compilation is completed, we will write the result to stdio and loop to doCompileFiles again. This will continue until Files becomes empty in which case the slave is informed to terminate (slave:terminate_toRemote()), and the accumulated result is returned. The master also supports the compilerMessage notification:
clauses compilerMessage(compilerMessage_ntf(File, Line, Message)) :- cs:synchronize({ :- stdio::writef(" %s(%d): %s\n", File, Line, Message) }).
In our case it simply writes the messages to the standard output stream.
The slave program
The slave program is also a console program. But it is very important to notice that the slave does not have a console window attached and that the console can therefore not be used for input/output.
In this slave program everything interesting is packed into the implementation of theCompiler:
implement theCompiler inherits masterProxy supports slave open core, compiler_ms\, master, pfc\asynchronous\ facts settings : string := "". clauses run(MsPipe) :- new(MsPipe):wait(). clauses wait() :- _ = fromLoop:futureExtension::get(). constructors new : (string MsPipe). clauses new(MsPipe) :- masterProxy::new(This, MsPipe). clauses compile(Filename) = compile_rsp(Lines, Errors:value, D:get()) :- T1 = time::now(), Errors = varM_integer::new(), Lines = 20 + math::random(3000), foreach I = std::cIterate(Lines) + 1 and not(executionContext::isCanceled()) do foreach _ = std::cIterate(math::random(10000)) do end foreach, R = math::random(), if R < 1 / 2000 then Errors:inc(), compilerMessage(compilerMessage_ntf(Filename, I, settings)) end if end foreach, D = duration::new(T1, time::now()). clauses setSettings(Settings) = unit :- settings := Settings. end implement theCompiler
theCompiler implements the slave interface which it pass as argument to the constructor of the masterProxy object, thereby the communication and message loops are established.
Recall that compile and setSettings are suspending predicates.
compile performs a simple dummy simulation a compiler, by using CPU, returning messages and a final result. The simulation "compiles" a random number of lines each with a certain probability to issue a compilerMessage which is notified back to the master (i.e. through the proxy object).
The compilation also test for termination request from the master (i.e. not(executionContext::isCanceled())), this test would actually only be relevant if the compilation was terminated in the middle of compilation which this example will never do. But when compiling files from an IDE it is custom to have a "stop compilation" facility. This facility could wait all compilers to ask for the next file to compile and terminate them there, but it could also send a terminate request to the slave, in which case the fromLoop would complete.
In this program the main thread will simply waits for the termination of the fromLoop.