Processus concurrents
L'écriture d'une application composée de plusieurs processus concurrents
fait perdre la propriété de déterminisme des programmes séquentiels.
Pour des processus partageant une même zone mémoire, le résultat du programme
suivant ne peut pas être déduit de sa lecture.
lex x = ref 1 ;; |
processus P |
processus Q |
x := ! x + 1 ;; |
x := ! x * 2 ;; |
|
À la fin de l'exécution de P et Q, la référence x peut
valoir 3 ou 4, selon l'ordre de calcul de chaque processus.
Il en est de même pour la terminaison de l'application. Comme l'état
mémoire dépend du déroulement de chaque processus parallèle, une
application peut ne pas terminer pour une certaine exécution et se
terminer dans une autre. Pour apporter un certain contrôle à
l'exécution, les processus doivent se synchroniser.
Pour des processus utilisant des mémoires distinctes, mais communiquant entre eux, leur interaction dépend du type de communication.
On introduit pour l'exemple
suivant deux primitives de communication send qui envoie une valeur
en indiquant le destinataire, et receive qui reçoit une valeur d'un processus.
Soient les deux processus communicants P et Q suivants :
processus P |
processus Q |
lex x = ref 1 ;; |
lex y = ref 1 ;; |
send(Q,! x); |
y := ! y + 3 ; |
x := ! x * 2 ; |
y := ! y + receive(P); |
send(Q,! x); |
send(P,! y); |
x := ! x + receive(Q); |
y := ! y + receive(P); |
Dans le cas d'une communication évanescente, le processus Q peut
rater les émissions de P. On retombe dans le non-déterminisme du
modèle précédent.
Pour une communication asynchrone, le médium du canal de communication
conserve les différentes valeurs transmises. Seule la réception est bloquante.
Le processus P peut être en attente sur Q, bien que ce dernier n'a
pas encore lu les deux envois de P. Ce qui ne l'empêche pas d'émettre.
Pour une communication synchrone, l'émission est elle aussi bloquante.
Dans notre exemple le send(Q,!
x); de P attend que Q
soit en réception (receive(P);).
Une fois l'information transmise,
les deux processus continuent leur chemin. Malheureusement, dans notre exemple,
P et Q se retrouvent sur une instruction d'émission bloquante, et
le programme n'avancera plus.
On peut classer les applications concurrentes en cinq catégories
suivant que les unités de programmes les composant sont :
-
sans relation ;
- avec relation mais sans synchronisation ;
- avec relation d'exclusion mutuelle ;
- avec relation d'exclusion mutuelle et communication ;
- avec relation, sans exclusion mutuelle et avec communication synchrone.
La difficulté de réalisation vient principalement des dernières
catégories. Nous allons à présent voir comment résoudre ces
difficultés en utilisant les bibliothèques Objective CAML.
Compilation avec processus légers
La bibliothèque sur les threads d'Objective CAML est découpée en cinq modules dont les quatre premiers
définissent
chaqu'un des types abstraits :
-
module Thread : création et exécution de thread
(type Thread.t);
- module Mutex : création, pose et enlèvement de verrous
(type Mutex.t);
- module Condition : création de conditions (signaux), attente et réveil sur condition
(type COndition.t);
- module Event : création de canaux de communications
(type 'a Event.channel), des valeurs y transitant
(type 'a Event.event), et des fonctions de communication.
- module ThreadUnix : redéfinition des fonctions
d'entrées-sorties du module Unix
pour qu'elles ne soient pas bloquantes.
Cette bibliothèque ne fait pas partie de la bibliothèque d'exécution
d'Objective CAML. Son
utilisation nécessite soit de construire un nouveau toplevel, soit de compiler ses programmes
avec l'option -custom de la manière suivante :
$ ocamlc -thread -custom threads.cma fichiers.ml -cclib -lthreads
$ ocamlmktop -thread -custom -o threadtop thread.cma -cclib -lthreads
Par défaut la bibliothèque de threads n'est pas utilisable avec le compilateur natif, sauf dans le cas où le système d'exploitation implante des threads système conformes à la
norme POSIX 10031.
On compile alors ses exécutables en ajoutant les bibliothèques C
unix.a et pthread.a :
Unix ainsi :
ocamlc -thread -custom threads.cma fichiers.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
ocamltop -thread -custom threads.cma fichiers.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
ocamlcopt -thread threads.cmxa fichiers.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
Module Thread
Le module Thread d'Objective CAML contient les primitives de
création et de gestion des processus légers. Nous n'en ferons pas
une présentation exhaustive, en particulier les opérations d'entrées-sorties
sur les descripteurs de fichiers Unix ot été décrites au chapitre
précédent.
La création d'un processus léger se fait par appel à :
# Thread.create
;;
- : ('a -> 'b) -> 'a -> Thread.t = <fun>
Le premier argument, de type ('a -> 'b) correspond à la
fonction exécutée par le processus créé ; le second argument, de
type 'a, est l'argument attendu par la fonction
exécutée ; le résultat de l'appel est le descripteur associé
au processus. Le processus ainsi créé est détruit automatiquement
lorsque la fonction associée termine.
Connaissant son descripteur, on peut demander l'exécution d'un
processus et en attendre la fin en utilisant la fonction
join.
# let
f_proc1
()
=
for
i=
0
to
1
0
do
Printf.printf
"(%d)"
i;
flush
stdout
done;
print_newline()
;;
val f_proc1 : unit -> unit = <fun>
# let
t1
=
Thread.create
f_proc1
()
;;
val t1 : Thread.t = <abstr>
# Thread.join
t1
;;
(0)(1)(2)(3)(4)(5)(6)(7)(8)(9)(10)
- : unit = <unknown constructor>
Warning
Le résultat de l'exécution d'un processus n'est pas récupéré par
le processus père, mais perdu quand le processus fils se termine. |
On peut également interrompre brutalement le déroulement d'un
processus dont on connaît le descripteur par la fonction
kill. Créons, par exemple, un processus pour l'interrompre
immédiatement :
# let
n
=
ref
0
;;
val n : int ref = {contents=0}
# let
f_proc1
()
=
while
true
do
incr
n
done
;;
val f_proc1 : unit -> unit = <fun>
# let
go
()
=
n
:=
0
;
let
t1
=
Thread.create
f_proc1
()
in
Thread.kill
t1
;
Printf.printf
"n = %d\n"
!
n
;;
val go : unit -> unit = <fun>
# go
()
;;
n = 0
- : unit = ()
Un processus peut mettre fin à son activité par la fonction :
# Thread.exit
;;
- : unit -> unit = <fun>
Il peut suspendre son activité pendant un temps donné par appel à :
# Thread.delay
;;
- : float -> unit = <fun>
L'argument indique le nombre de secondes d'attente. Nous reprenons
l'exemple précédent en lui ajoutant des temporisations.
Nous créons un
premier processus t1 dont la fonction associée crée à
son tour un processus t2, attend un délai de d
secondes et met fin à t2 et affiche le contenu de n.
# let
f_proc2
d
=
n
:=
0
;
let
t2
=
Thread.create
f_proc1
()
in
Thread.delay
d
;
Thread.kill
t2
;;
val f_proc2 : float -> unit = <fun>
# let
t1
=
Thread.create
f_proc2
0
.
2
5
in
Thread.join
t1
;
Printf.printf
"n = %d\n"
!
n
;;
n = 3854419
- : unit = ()
Maintenant que nous savons créer des processus, voyons comment nous
pouvons les faire coopérer.