Synchronisation des processus
C'est dans le cadre de processus partageant une même zone mémoire
que le mot << concurrence >> prend tout son sens : les divers processus
impliqués sont en concurrence pour l'accès à cette unique
ressource qu'est la mémoire2. Au problème du partage de
ressource vient s'ajouter celui du manque de maîtrise de
l'alternance et du temps d'exécution des processus concurrents.
Le système qui gère l'ensemble des processus, peut à tout moment
interrompre un calcul en cours. Ainsi
lorsque deux processus coopèrent, ils doivent pouvoir garantir
l'intégrité des manipulations de certaines données
partagées. Pour celà, un processus doit pouvoir rester maître de
ces données tant qu'il n'a pas achevé un calcul ou toute autre
opération (par exemple, une acquisition sur un périphérique).
Pour garantir l'exclusivité d'accès aux données
à un seul processus, on met en oeuvre un mécanisme dit d'exclusion mutuelle.
Section critique et exclusion mutuelle
Les mécanismes d'exclusion mutuelle sont implantés à l'aide de
données particulières appelées verrous. Les opérations
sur les verrous sont limitées à leur création, leur pose et leur
libération. Un verrou est la plus petite donnée partagée par un
ensemble de processus concurrents. Sa manipulation est toujours
exclusive. À la notion d'exclusivité de manipulation d'un verrou
s'ajoute celle d'exclusivité de détention : seul le processus
qui a pris un verrou peut le libérer ; si d'autres processus
veulent à leur tour disposer de ce verrou, ils doivent en attendre
la libération par le processus détenteur.
Le module Mutex est utiliser pour créer des verrous
entre processus en relation d'exclusion mutuelle
sur une zone mémoire.
Module Mutex
Les fonctions du module Mutex implantent la gestion des
verrous. Nous allons illustrer leur utilisation par deux petits
exemples classiques de concurrence.
Une fois un verrou créé, un thread peut le prendre et le relâcher.
# Mutex.create
;;
- : unit -> Mutex.t = <fun>
# Mutex.lock
;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock
;;
- : Mutex.t -> unit = <fun>
La fonction lock tente de prendre un verrou et y parvient si
il est libre. Dans le cas contraire, le processus est suspendu tant
que ce verrou n'a pas été relâché. Un seul processus ne peut
tenir un verrou dans le même temps.
Il existe une variante à la prise de verrou qui n'est pas bloquante.
# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
Si le verrou est déjà pris, la fonction retourne false. Dans
l'autre cas, la fonction prend le verrou et retourne true.
Les philosophes orientaux
Cette petite histoire, due à Dijkstra, illustre un pur problème de
partage de ressources.
Cinq philosophes orientaux partagent leur temps entre l'étude et la
venue au réfectoire pour manger un bol de riz. La salle affectée à
la sustentation des philosophes ne comprend qu'une seule table ronde sur
laquelle sont disposés un grand plat de riz (toujours plein) cinq
bols et cinq baguettes.
Figure 4.1 : La table des philosophes orientaux
<<<<<<< PC.tex
Comme on le voit sur cette figure, un philosophe qui prend les deux
baguettes autour de son bol empêche ses voisins d'en
faire autant. Dès qu'il dépose l'une de ses baguettes, son voisin,
affamé, peut s'en emparer. Le cas échéant, ce dernier devra
attendre que l'autre baguette soit disponible.
Pour simplifier les choses, on supposera que chaque philosophe a des habitudes
et vient
toujours à la même place. On modélise les cinq baguettes par
autant de verrous stockés dans un vecteur b. Ainsi, un seul
philosophe à la fois peut détenir une baguette. Manger et
méditer sont simulés par une suspension des processus. On
introduit un petit temps de réflexion entre la prise et le dépôt
de chacune des deux baguettes.
# let
b
=
let
b0
=
Array.create
5
(Mutex.create())
in
for
i=
1
to
4
do
b0.
(i)
<-
Mutex.create()
done;
b0
;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]
# =======
Comme
on
le
voit
sur
la
figure
\
ref{fig-
philosophes},
un
philosophe
qui
prend
les
deux
baguettes
autour
de
son
bol
empê
che
ses
voisins
imm\
'ediats
d'en
faire
autant.
Dè
s
qu'il
dé
pose
l'une
de
ses
baguettes,
son
voisin,
affamé
,
peut
s'en
emparer.
Le
cas
é
ché
ant,
ce
dernier
devra
attendre
que
l'autre
baguette
soit
disponible.
%
Pour
simplifier
les
choses,
on
supposera
que
chaque
philosophe
a
des
habitudes
%
et
vient
toujours
à
la
mê
me
place.
On
modé
lise
les
philosophes
par
cinq
processus
et
les
cinq
baguettes
par
autant
de
verrous.
Un
philosophe
qui
s'empare
d'une
baguette
est
un
processus
qui
prend
un
verrou.
Ainsi,
un
seul
philosophe
à
la
fois
peut
dé
tenir
une
baguette
donner.
La
mé
ditation
et
la
mastication
sont
simulé
es
par
une
suspension
d'un
processus.
On
introduit
un
petit
delai
entre
la
prise
et
le
dé
p\
^
ot
d'une
baguette.
\
begin{caml}
let
b
=
let
b0
=
Array.create
5
(Mutex.create())
in
Array.map
(fun
_
->
Mutex.create
())
b0
;
b0
;;
Characters 0-7:
Syntax error
# >>>>>>>
1
.
2
3
let
mediter
t
=
Thread.delay
(Random.float
t)
let
manger
=
mediter
;;
Characters 0-7:
Syntax error
# let
philosophe
i
=
let
ii
=
(i+
1
)
mod
5
in
while
true
do
mediter
3
.
;
Mutex.lock
b.
(i);
Printf.printf
"Le philosophe (%d) prend sa baguette gauche"
i
;
Printf.printf
" et médite encore un peu\n"
;
mediter
0
.
2
;
Mutex.lock
b.
(ii);
Printf.printf
"Le philosophe (%d) prend sa baguette droite\n"
i;
manger
0
.
5
;
Mutex.unlock
b.
(i);
Printf.printf
"Le philosophe (%d) repose sa baguette gauche"
i;
Printf.printf
" et commence déjà à méditer\n"
;
mediter
0
.
1
5
;
Mutex.unlock
b.
(ii);
Printf.printf
"Le philosophe (%d) repose sa baguette droite\n"
i
done
;;
Characters 69-76:
Unbound value mediter
On pourra tester ce petit programme en exécutant :
for
i=
0
to
4
do
ignore
(Thread.create
philosophe
i)
done
;
while
true
do
Thread.delay
5
.
done
;;
On suspend, dans la boucle infinie while, le processus
principal de façon à augmenter les chances des processus
philosophes d'entrer en action.
On utilise dans la boucle d'activité des
philosophes des délais d'attente entre certaines opérations. Ces
délais, dont la durée est choisie aléatoirement, dans une
certaine limite, on pour but de créer un peu de disparité dans
l'exécution parallèle des processus.
Problèmes de la solution naïve
Il peut arriver une chose terrible à nos philosophes : ils arrivent
tous en même temps et se saisissent de leur baguette gauche. Dans ce
cas nous sommes dans une situation dite d'inter-blocage. Plus
aucun philosophe ne pourra manger ! Nous sommes dans un cas de famine.
Pour éviter cela, les philosophes peuvent reposer une baguette
s'il n'arrivent pas à prendre la deuxième.
Cela est fort courtois, mais permet
à deux philosophes
de se liguer contre un troisième pour l'empêcher de se sustenter
en ne relâchant les baguettes que si l'autre voisin les a prises.
Il existe diverses solutions à ce
problème. L'une d'elle fait l'objet de l'exercice page X.
Producteurs et consommateurs I
Le couple producteurs-consommateurs est un exemple
classique de la programmation concurrente. Un groupe de processus,
les producteurs,
est chargé d'emmagasiner des données dans une file d'attente ; un second
groupe, les consommateurs, est chargé de les déstocker. Chaque
intervenant exclut les autres.
Nous implantons ce schéma en utilisant une file d'attente partagée
entre les producteurs et les consommateurs. Pour garantir le bon
fonctionnement de l'ensemble, la file d'attente est manipulée en
exclusion mutuelle afin de garantir l'intégrité des opérations
d'ajout et de retrait.
f est la file d'attente partagée, et m le verrou
d'exclusion mutuelle.
# let
f
=
Queue.create
()
and
m
=
Mutex.create
()
;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>
Nous divisons l'activité d'un producteur en deux parties : créer
un produit (fonction produire) et stocker un produit
(fonction stocker). Seule l'opération de stockage a besoin
du verrou.
# let
produire
i
p
d
=
incr
p
;
Thread.delay
d
;
Printf.printf
"Le producteur (%d) a produit %d\n"
i
!
p
;
flush
stdout
;;
val produire : int -> int ref -> float -> unit = <fun>
# let
stocker
i
p
=
Mutex.lock
m
;
Queue.add
(i,!
p)
f
;
Printf.printf
"Le producteur (%d) a ajouté son %d-ieme produit\n"
i
!
p
;
flush
stdout
;
Mutex.unlock
m
;;
val stocker : int -> int ref -> unit = <fun>
Le code du producteur est une boucle sans fin de création et de
stockage. Nous introduisons un délai aléatoire d'attente à la
fin de chaque itération afin d'obtenir un effet de
désynchronisation dans l'exécution.
# let
producteur
i
=
let
p
=
ref
0
and
d
=
Random.float
2
.
in
while
true
do
produire
i
p
d
;
stocker
i
p
;
Thread.delay
(Random.float
2
.
5
)
done
;;
val producteur : int -> unit = <fun>
La seule opération du consommateur est le retrait d'un élément de
la file prenant garde à l'existence effective d'un produit.
# let
consommateur
i
=
while
true
do
Mutex.lock
m
;
(
try
let
ip,
p
=
Queue.take
f
in
Printf.printf
"Le consommateur(%d) "
i
;
Printf.printf
"a retiré le produit (%d,%d)\n"
ip
p
;
flush
stdout
;
with
Queue.
Empty
->
Printf.printf
"Le consommateur(%d) "
i
;
print_string
"est reparti les mains vides\n"
)
;
Mutex.unlock
m
;
Thread.delay
(Random.float
2
.
5
)
done
;;
val consommateur : int -> unit = <fun>
Le programme de test suivant crée quatre producteurs et quatre consommateurs.
for
i
=
0
to
3
do
ignore
(Thread.create
producteur
i);
ignore
(Thread.create
consommateur
i)
done
;
while
true
do
Thread.delay
5
.
done
;;
Attente et synchronisation
La relation d'exclusion mutuelle n'est pas assez fine pour
décrire la synchronisation entre threads.
Il n'est pas rare que le travail d'un processus dépende
de la réalisation d'une action par un autre processus, modifiant par là une
certaine condition.
Il est alors souhaitable que les processus puissent
communiquer le fait que cette condition ait pu changer, indiquant
aux processus en attente de la tester de nouveau.
Les différents prrcessus sont alors en relation d'exclusion mutuelle avec communication.
Dans l'exemple précédent, un consommateur, plutôt que repartir les
mains vides pourrait attendre qu'un producteur vienne réapprovisionner
le stock. Ce dernier pourra signaler au consommateur en attente qu'il
y a quelque chose à emporter.
Le modèle de l'attente sur condition d'une prise de verrou est connu sous
le nom de sémaphore.
Sémaphores
Un sémaphore est une variable
entière s ne pouvant prendre que des valeurs positives (ou nulles).
Une fois s initialisé, les seules opérations admises
sont : wait(s) et signal(s), notées respectivement P(s) et V(s).
Elles sont définies ainsi, s correspond au nombre de ressources d'un type donné.
-
wait(s) : si s > 0 alors s := s -1, sinon l'exécution du
processus ayant appelé wait(s) est suspendue.
- signal(s) : si un processus a été suspendu lors d'une exécution antérieure
d'un wait(s) alors le réveiller, sinon s := s + 1.
Un sémaphore ne prenant que les valeurs 0 ou 1 est appelé
sémaphore binaire.
Module Condition
Les fonctions du module Condition implantent des primitives
de mise en sommeil et de réveil de processus sur un signal. Un
signal, dans ce cadre, est une variable partagée par l'ensemble des
processus. Son type est abstrait et les fonctions de manipulation
sont :
-
create
- : unit -> Condition.t qui crée
un nouveau signal.
- signal
- : Condition.t -> unit qui
réveille l'un des processus en attente du signal.
- broadcast
- : Condition.t -> unit qui
réveille l'ensemble des processus en attente du signal.
- wait
- : Condition.t -> Mutex.t -> unit
qui suspend le processus appelant sur le signal passé en premier
argument. Le second argument est un verrou utilisé pour protéger
la manipulation du signal. Il est libéré, puis repositionné à
chaque exécution de la fonction.
Producteurs et consommateurs (2)
Nous reprenons l'exemple des producteurs et consommateurs en
utilisant le mécanisme des variable de condition pour mettre en
attente un consommateur arrivant alors que le magasin est vide.
# let
c
=
Condition.create
()
;;
val c : Condition.t = <abstr>
# let
stocker2
i
p
=
Mutex.lock
m
;
Queue.add
(i,!
p)
f
;
Printf.printf
"Le producteur (%d) a ajoute son %d-ème produit\n"
i
!
p
;
flush
stdout
;
Condition.signal
c
;
Mutex.unlock
m
;;
val stocker2 : int -> int ref -> unit = <fun>
# let
producteur2
i
=
let
p
=
ref
0
in
let
d
=
Random.float
2
.
in
while
true
do
produire
i
p
d;
stocker2
i
p;
Thread.delay
(Random.float
2
.
5
)
done
;;
val producteur2 : int -> unit = <fun>
L'activité du consommateur se fait alors en deux temps :
attendre qu'un produit soit disponible puis emporter le produit. Le
verrou est pris quand l'attente prend fin et il est rendu dès
que le consommateur a emporté son produit. L'attente se fait sur
la variable c.
# let
attendre2
i
=
Mutex.lock
m
;
while
Queue.length
f
=
0
do
Printf.printf
"Le consommateur (%d) attend\n"
i
;
Condition.wait
c
m
done
;;
val attendre2 : int -> unit = <fun>
# let
emporter2
i
=
let
ip,
p
=
Queue.take
f
in
Printf.printf
"Le consommateur (%d) "
i
;
Printf.printf
"emporte le produit (%d, %d)\n"
ip
p
;
flush
stdout
;
Mutex.unlock
m
;;
val emporter2 : int -> unit = <fun>
# let
consommateur2
i
=
while
true
do
attendre2
i;
emporter2
i;
Thread.delay
(Random.float
2
.
5
)
done
;;
val consommateur2 : int -> unit = <fun>
Notons qu'il n'est plus besoin de vérifier l'existence effective d'un
produit puisqu'un consommateur doit attendre l'existence d'un produit
dans la file d'attente pour que sa suspension prenne fin. Vu que la
fin de son attente correspond à la prise du verrou, il ne court pas
le risque de se faire dérober le nouveau produit avant de l'avoir
emporté.
Lecteurs et écrivain
Voici un autre exemple classique de processus concurrents dans lequel
les agents n'ont pas le même comportement vis-à-vis de la donnée
partagée.
Un écrivain et des lecteurs opèrent sur une donnée
partagée. L'action du premier est susceptible de rendre
momentanément les données incohérentes alors que les seconds
n'ont qu'une action passive. La difficulté vient du fait que nous ne
souhaitons pas interdire à plusieurs lecteurs de consulter la
donnée simultanément.
Une solution à ce problème est de gérer un compteur n
dénombrant les lecteurs en train d'accéder aux
données. L'écriture ne sera acceptable que si le nombre de
lecteurs est nul.
# let
data
=
ref
0
;;
val data : int ref = {contents=0}
# let
n
=
ref
0
;;
val n : int ref = {contents=0}
# let
m
=
Mutex.create
()
;;
val m : Mutex.t = <abstr>
# let
c
=
Condition.create
()
;;
val c : Condition.t = <abstr>
La donnée est symbolisée par un entier valant 0 lorsqu'elle est
cohérente.
Les opérations sur le compteur sont protégées par son verrou.
# let
cpt_incr
()
=
Mutex.lock
m
;
incr
n
;
Mutex.unlock
m
;;
val cpt_incr : unit -> unit = <fun>
# let
cpt_decr
()
=
Mutex.lock
m
;
decr
n
;
Mutex.unlock
m
;;
val cpt_decr : unit -> unit = <fun>
# let
cpt_signal
()
=
Mutex.lock
m
;
if
!
n=
0
then
Condition.signal
c
;
Mutex.unlock
m
;;
val cpt_signal : unit -> unit = <fun>
Les lecteurs mettent à jour le compteur et n'émettent le signal
indiquant qu'ils ont libéré la donnée que si plus aucun lecteur
n'est présent.
# let
lire
i
=
cpt_incr
()
;
Printf.printf
"Le lecteur (%d) lit (donnée=%d)\n"
i
!
data
;
Thread.delay
(Random.float
1
.
5
)
;
Printf.printf
"Le lecteur (%d) a fini sa lecture\n"
i
;
cpt_decr
()
;
cpt_signal
()
;;
val lire : int -> unit = <fun>
# let
lecteur
i
=
while
true
do
lire
i
;
Thread.delay
(Random.float
1
.
5
)
done
;;
val lecteur : int -> unit = <fun>
L'écrivain tente de bloquer le compteur pour interdire aux lecteurs
d'accéder à la donnée partagée. Mais il ne peut le faire que
si le compteur est nul, sinon l'écrivain attend le signal lui
indiquant que c'est le cas.
# let
ecrire
()
=
Mutex.lock
m
;
while
!
n<>
0
do
Condition.wait
c
m
done
;
print_string
"L'écrivain écrit\n"
;
flush
stdout
;
data
:=
1
;
Thread.delay
(Random.float
1
.
)
;
data
:=
0
;
Mutex.unlock
mn
;;
Characters 206-208:
Unbound value mn
# let
ecrivain
()
=
while
true
do
ecrire
()
;
Thread.delay
(Random.float
1
.
5
)
done
;;
Characters 36-42:
Unbound value ecrire
On crée pour tester ces fonctions un écrivain et six lecteurs.
ignore
(Thread.create
ecrivain
());
for
i=
0
to
5
do
ignore(Thread.create
lecteur
i)
done;
while
true
do
Thread.delay
5
.
done
;;
Cette solution garantit que l'écrivain et les lecteurs n'ont pas accès
aux données en même temps. Mais rien ne garantit que l'écrivain
puisse un jour remplir son office, nous sommes là encore confronter
à un cas de famine.