Précédent Index Suivant

Synchronisation des processus

C'est dans le cadre de processus partageant une même zone mémoire que le mot << concurrence >> prend tout son sens : les divers processus impliqués sont en concurrence pour l'accès à cette unique ressource qu'est la mémoire2. Au problème du partage de ressource vient s'ajouter celui du manque de maîtrise de l'alternance et du temps d'exécution des processus concurrents.

Le système qui gère l'ensemble des processus, peut à tout moment interrompre un calcul en cours. Ainsi lorsque deux processus coopèrent, ils doivent pouvoir garantir l'intégrité des manipulations de certaines données partagées. Pour celà, un processus doit pouvoir rester maître de ces données tant qu'il n'a pas achevé un calcul ou toute autre opération (par exemple, une acquisition sur un périphérique). Pour garantir l'exclusivité d'accès aux données à un seul processus, on met en oeuvre un mécanisme dit d'exclusion mutuelle.

Section critique et exclusion mutuelle

Les mécanismes d'exclusion mutuelle sont implantés à l'aide de données particulières appelées verrous. Les opérations sur les verrous sont limitées à leur création, leur pose et leur libération. Un verrou est la plus petite donnée partagée par un ensemble de processus concurrents. Sa manipulation est toujours exclusive. À la notion d'exclusivité de manipulation d'un verrou s'ajoute celle d'exclusivité de détention : seul le processus qui a pris un verrou peut le libérer ; si d'autres processus veulent à leur tour disposer de ce verrou, ils doivent en attendre la libération par le processus détenteur.

Le module Mutex est utiliser pour créer des verrous entre processus en relation d'exclusion mutuelle sur une zone mémoire.

Module Mutex

Les fonctions du module Mutex implantent la gestion des verrous. Nous allons illustrer leur utilisation par deux petits exemples classiques de concurrence.

Une fois un verrou créé, un thread peut le prendre et le relâcher.

# Mutex.create ;;
- : unit -> Mutex.t = <fun>
# Mutex.lock ;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock ;;
- : Mutex.t -> unit = <fun>
La fonction lock tente de prendre un verrou et y parvient si il est libre. Dans le cas contraire, le processus est suspendu tant que ce verrou n'a pas été relâché. Un seul processus ne peut tenir un verrou dans le même temps.

Il existe une variante à la prise de verrou qui n'est pas bloquante.

# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
Si le verrou est déjà pris, la fonction retourne false. Dans l'autre cas, la fonction prend le verrou et retourne true.

Les philosophes orientaux

Cette petite histoire, due à Dijkstra, illustre un pur problème de partage de ressources.

Cinq philosophes orientaux partagent leur temps entre l'étude et la venue au réfectoire pour manger un bol de riz. La salle affectée à la sustentation des philosophes ne comprend qu'une seule table ronde sur laquelle sont disposés un grand plat de riz (toujours plein) cinq bols et cinq baguettes.



Figure 4.1 : La table des philosophes orientaux


<<<<<<< PC.tex Comme on le voit sur cette figure, un philosophe qui prend les deux baguettes autour de son bol empêche ses voisins d'en faire autant. Dès qu'il dépose l'une de ses baguettes, son voisin, affamé, peut s'en emparer. Le cas échéant, ce dernier devra attendre que l'autre baguette soit disponible.

Pour simplifier les choses, on supposera que chaque philosophe a des habitudes et vient toujours à la même place. On modélise les cinq baguettes par autant de verrous stockés dans un vecteur b. Ainsi, un seul philosophe à la fois peut détenir une baguette. Manger et méditer sont simulés par une suspension des processus. On introduit un petit temps de réflexion entre la prise et le dépôt de chacune des deux baguettes.

# let b =
let b0 = Array.create 5 (Mutex.create()) in
for i=1 to 4 do b0.(i) <- Mutex.create() done;
b0 ;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]
# =======
Comme on le voit sur la figure \ref{fig-philosophes}, un philosophe
qui prend les deux baguettes autour de son bol empêche ses voisins
imm\'ediats d'en faire autant. Dès qu'il dépose l'une de ses
baguettes, son voisin, affamé, peut s'en emparer. Le cas échéant, ce
dernier devra attendre que l'autre baguette soit disponible.

%Pour simplifier les choses, on supposera que chaque philosophe a des habitudes
%et vient toujours à la même place.
On modélise les philosophes par cinq processus et les cinq baguettes
par autant de verrous. Un philosophe qui s'empare d'une baguette est
un processus qui prend un verrou. Ainsi, un seul philosophe à la fois
peut détenir une baguette donner. La méditation et la mastication sont
simulées par une suspension d'un processus. On introduit un petit
delai entre la prise et le dép\^ot d'une baguette.
\begin{caml}
let b = let b0 = Array.create 5 (Mutex.create())
in Array.map (fun _ -> Mutex.create ()) b0 ; b0 ;;
Characters 0-7:
Syntax error
# >>>>>>> 1.23
let mediter t = Thread.delay (Random.float t)
let manger = mediter ;;
Characters 0-7:
Syntax error

# let philosophe i =
let ii = (i+1) mod 5
in while true do
mediter 3. ;
Mutex.lock b.(i);
Printf.printf "Le philosophe (%d) prend sa baguette gauche" i ;
Printf.printf " et médite encore un peu\n";
mediter 0.2;
Mutex.lock b.(ii);
Printf.printf "Le philosophe (%d) prend sa baguette droite\n" i;
manger 0.5;
Mutex.unlock b.(i);
Printf.printf "Le philosophe (%d) repose sa baguette gauche" i;
Printf.printf " et commence déjà à méditer\n";
mediter 0.15;
Mutex.unlock b.(ii);
Printf.printf "Le philosophe (%d) repose sa baguette droite\n" i
done ;;
Characters 69-76:
Unbound value mediter
On pourra tester ce petit programme en exécutant :

for i=0 to 4 do ignore (Thread.create philosophe i) done ;
while true do Thread.delay 5. done ;;
On suspend, dans la boucle infinie while, le processus principal de façon à augmenter les chances des processus philosophes d'entrer en action.

On utilise dans la boucle d'activité des philosophes des délais d'attente entre certaines opérations. Ces délais, dont la durée est choisie aléatoirement, dans une certaine limite, on pour but de créer un peu de disparité dans l'exécution parallèle des processus.

Problèmes de la solution naïve
Il peut arriver une chose terrible à nos philosophes : ils arrivent tous en même temps et se saisissent de leur baguette gauche. Dans ce cas nous sommes dans une situation dite d'inter-blocage. Plus aucun philosophe ne pourra manger ! Nous sommes dans un cas de famine.

Pour éviter cela, les philosophes peuvent reposer une baguette s'il n'arrivent pas à prendre la deuxième. Cela est fort courtois, mais permet à deux philosophes de se liguer contre un troisième pour l'empêcher de se sustenter en ne relâchant les baguettes que si l'autre voisin les a prises. Il existe diverses solutions à ce problème. L'une d'elle fait l'objet de l'exercice page X.

Producteurs et consommateurs I

Le couple producteurs-consommateurs est un exemple classique de la programmation concurrente. Un groupe de processus, les producteurs, est chargé d'emmagasiner des données dans une file d'attente ; un second groupe, les consommateurs, est chargé de les déstocker. Chaque intervenant exclut les autres.

Nous implantons ce schéma en utilisant une file d'attente partagée entre les producteurs et les consommateurs. Pour garantir le bon fonctionnement de l'ensemble, la file d'attente est manipulée en exclusion mutuelle afin de garantir l'intégrité des opérations d'ajout et de retrait.

f est la file d'attente partagée, et m  le verrou d'exclusion mutuelle.

# let f = Queue.create () and m = Mutex.create () ;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>
Nous divisons l'activité d'un producteur en deux parties : créer un produit (fonction produire) et stocker un produit (fonction stocker). Seule l'opération de stockage a besoin du verrou.

# let produire i p d =
incr p ;
Thread.delay d ;
Printf.printf "Le producteur (%d) a produit %d\n" i !p ;
flush stdout ;;
val produire : int -> int ref -> float -> unit = <fun>

# let stocker i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Le producteur (%d) a ajouté son %d-ieme produit\n" i !p ;
flush stdout ;
Mutex.unlock m ;;
val stocker : int -> int ref -> unit = <fun>
Le code du producteur est une boucle sans fin de création et de stockage. Nous introduisons un délai aléatoire d'attente à la fin de chaque itération afin d'obtenir un effet de désynchronisation dans l'exécution.

# let producteur i =
let p = ref 0 and d = Random.float 2.
in while true do
produire i p d ;
stocker i p ;
Thread.delay (Random.float 2.5)
done ;;
val producteur : int -> unit = <fun>
La seule opération du consommateur est le retrait d'un élément de la file prenant garde à l'existence effective d'un produit.

# let consommateur i =
while true do
Mutex.lock m ;
( try
let ip, p = Queue.take f
in Printf.printf "Le consommateur(%d) " i ;
Printf.printf "a retiré le produit (%d,%d)\n" ip p ;
flush stdout ;
with
Queue.Empty ->
Printf.printf "Le consommateur(%d) " i ;
print_string "est reparti les mains vides\n" ) ;
Mutex.unlock m ;
Thread.delay (Random.float 2.5)
done ;;
val consommateur : int -> unit = <fun>


Le programme de test suivant crée quatre producteurs et quatre consommateurs.

for i = 0 to 3 do
ignore (Thread.create producteur i);
ignore (Thread.create consommateur i)
done ;
while true do Thread.delay 5. done ;;


Attente et synchronisation

La relation d'exclusion mutuelle n'est pas assez fine pour décrire la synchronisation entre threads. Il n'est pas rare que le travail d'un processus dépende de la réalisation d'une action par un autre processus, modifiant par là une certaine condition. Il est alors souhaitable que les processus puissent communiquer le fait que cette condition ait pu changer, indiquant aux processus en attente de la tester de nouveau. Les différents prrcessus sont alors en relation d'exclusion mutuelle avec communication.

Dans l'exemple précédent, un consommateur, plutôt que repartir les mains vides pourrait attendre qu'un producteur vienne réapprovisionner le stock. Ce dernier pourra signaler au consommateur en attente qu'il y a quelque chose à emporter.

Le modèle de l'attente sur condition d'une prise de verrou est connu sous le nom de sémaphore.

Sémaphores
Un sémaphore est une variable entière s ne pouvant prendre que des valeurs positives (ou nulles). Une fois s initialisé, les seules opérations admises sont : wait(s) et signal(s), notées respectivement P(s) et V(s). Elles sont définies ainsi, s correspond au nombre de ressources d'un type donné. Un sémaphore ne prenant que les valeurs 0 ou 1 est appelé sémaphore binaire.

Module Condition

Les fonctions du module Condition implantent des primitives de mise en sommeil et de réveil de processus sur un signal. Un signal, dans ce cadre, est une variable partagée par l'ensemble des processus. Son type est abstrait et les fonctions de manipulation sont :
create
: unit -> Condition.t qui crée un nouveau signal.
signal
: Condition.t -> unit qui réveille l'un des processus en attente du signal.
broadcast
: Condition.t -> unit qui réveille l'ensemble des processus en attente du signal.
wait
: Condition.t -> Mutex.t -> unit qui suspend le processus appelant sur le signal passé en premier argument. Le second argument est un verrou utilisé pour protéger la manipulation du signal. Il est libéré, puis repositionné à chaque exécution de la fonction.

Producteurs et consommateurs (2)

Nous reprenons l'exemple des producteurs et consommateurs en utilisant le mécanisme des variable de condition pour mettre en attente un consommateur arrivant alors que le magasin est vide.

# let c = Condition.create () ;;
val c : Condition.t = <abstr>
# let stocker2 i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Le producteur (%d) a ajoute son %d-ème produit\n" i !p ;
flush stdout ;
Condition.signal c ;
Mutex.unlock m ;;
val stocker2 : int -> int ref -> unit = <fun>
# let producteur2 i =
let p = ref 0 in
let d = Random.float 2.
in while true do
produire i p d;
stocker2 i p;
Thread.delay (Random.float 2.5)
done ;;
val producteur2 : int -> unit = <fun>
L'activité du consommateur se fait alors en deux temps : attendre qu'un produit soit disponible puis emporter le produit. Le verrou est pris quand l'attente prend fin et il est rendu dès que le consommateur a emporté son produit. L'attente se fait sur la variable c.

# let attendre2 i =
Mutex.lock m ;
while Queue.length f = 0 do
Printf.printf "Le consommateur (%d) attend\n" i ;
Condition.wait c m
done ;;
val attendre2 : int -> unit = <fun>
# let emporter2 i =
let ip, p = Queue.take f in
Printf.printf "Le consommateur (%d) " i ;
Printf.printf "emporte le produit (%d, %d)\n" ip p ;
flush stdout ;
Mutex.unlock m ;;
val emporter2 : int -> unit = <fun>
# let consommateur2 i =
while true do
attendre2 i;
emporter2 i;
Thread.delay (Random.float 2.5)
done ;;
val consommateur2 : int -> unit = <fun>
Notons qu'il n'est plus besoin de vérifier l'existence effective d'un produit puisqu'un consommateur doit attendre l'existence d'un produit dans la file d'attente pour que sa suspension prenne fin. Vu que la fin de son attente correspond à la prise du verrou, il ne court pas le risque de se faire dérober le nouveau produit avant de l'avoir emporté.

Lecteurs et écrivain

Voici un autre exemple classique de processus concurrents dans lequel les agents n'ont pas le même comportement vis-à-vis de la donnée partagée.

Un écrivain et des lecteurs opèrent sur une donnée partagée. L'action du premier est susceptible de rendre momentanément les données incohérentes alors que les seconds n'ont qu'une action passive. La difficulté vient du fait que nous ne souhaitons pas interdire à plusieurs lecteurs de consulter la donnée simultanément.

Une solution à ce problème est de gérer un compteur n dénombrant les lecteurs en train d'accéder aux données. L'écriture ne sera acceptable que si le nombre de lecteurs est nul.

# let data = ref 0 ;;
val data : int ref = {contents=0}
# let n = ref 0 ;;
val n : int ref = {contents=0}
# let m = Mutex.create () ;;
val m : Mutex.t = <abstr>
# let c = Condition.create () ;;
val c : Condition.t = <abstr>
La donnée est symbolisée par un entier valant 0 lorsqu'elle est cohérente.

Les opérations sur le compteur sont protégées par son verrou.

# let cpt_incr () = Mutex.lock m ; incr n ; Mutex.unlock m ;;
val cpt_incr : unit -> unit = <fun>
# let cpt_decr () = Mutex.lock m ; decr n ; Mutex.unlock m ;;
val cpt_decr : unit -> unit = <fun>
# let cpt_signal () = Mutex.lock m ;
if !n=0 then Condition.signal c ;
Mutex.unlock m ;;
val cpt_signal : unit -> unit = <fun>


Les lecteurs mettent à jour le compteur et n'émettent le signal indiquant qu'ils ont libéré la donnée que si plus aucun lecteur n'est présent.

# let lire i =
cpt_incr () ;
Printf.printf "Le lecteur (%d) lit (donnée=%d)\n" i !data ;
Thread.delay (Random.float 1.5) ;
Printf.printf "Le lecteur (%d) a fini sa lecture\n" i ;
cpt_decr () ;
cpt_signal () ;;
val lire : int -> unit = <fun>

# let lecteur i = while true do lire i ; Thread.delay (Random.float 1.5) done ;;
val lecteur : int -> unit = <fun>


L'écrivain tente de bloquer le compteur pour interdire aux lecteurs d'accéder à la donnée partagée. Mais il ne peut le faire que si le compteur est nul, sinon l'écrivain attend le signal lui indiquant que c'est le cas.

# let ecrire () =
Mutex.lock m ;
while !n<>0 do Condition.wait c m done ;
print_string "L'écrivain écrit\n" ; flush stdout ;
data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ;
Mutex.unlock mn ;;
Characters 206-208:
Unbound value mn

# let ecrivain () =
while true do ecrire () ; Thread.delay (Random.float 1.5) done ;;
Characters 36-42:
Unbound value ecrire


On crée pour tester ces fonctions un écrivain et six lecteurs.

ignore (Thread.create ecrivain ());
for i=0 to 5 do ignore(Thread.create lecteur i) done;
while true do Thread.delay 5. done ;;


Cette solution garantit que l'écrivain et les lecteurs n'ont pas accès aux données en même temps. Mais rien ne garantit que l'écrivain puisse un jour remplir son office, nous sommes là encore confronter à un cas de famine.


Précédent Index Suivant