Communication synchrone
Le module Event de la bibliothèque de threads implante
une communication de valeurs quelconques entre deux processus
via des canaux de communication particuliers. La communication effective
de la valeur est synchronisée au moyen d'événements d'émission ou de
réception.
Ce modèle de communications synchronisées sur des événements permet de
transférer sur des canaux typés des valeurs du langage, y compris
des fermetures, des objets ou des événements.
Il est décrit dans la thèse de J. Reppy ([Rep92]).
Synchronisation sur événements de communication
Les événements de communication engendrés correspondent
à l'envoi et la réception d'une valeur sur un canal :
-
send c v envoie une valeur v sur le canal c
- receive c réceptionne une valeur sur la canal c
Pour qu'ils réalisent l'action physique qui leur est associée, deux
événements doivent être synchronisés. Pour cela on introduit une
opération de synchronisation (sync) sur les événements.
L'émission et la réception d'une valeur ne sont effectives que lorsque
les deux processus communicants sont en phase de synchronisation. Si
un seul processus cherche à se synchroniser, l'opération est bloquante
et attend que le deuxième processus effectue sa synchronisation. Cela
implique qu'un émetteur cherchant à synchroniser l'envoi d'une valeur
(sync(send c v)) peut se retrouver bloquer en attente d'une
synchronisation du récepteur (sync(receive c)).
Valeurs transmises
Les canaux de communication par où transitent les valeurs échangées
sont typés : seules les valeurs du type paramètre du canal y sont
acceptées. Rien n'empêche de créer de nombreux canaux par type de
valeurs à communiquer. Comme cette communication s'effectue entre
processus légers Objective CAML, n'importe quelle valeur du langage peut
être envoyée sur un canal de son type. Cela est valable pour les
fermetures, les objets et aussi des événements pour une demande de
synchronisation déportée.
Module Event
Les valeurs encapsulées dans les événements de communication
transitent par des canaux de communication du type abstrait 'a
channel. La fonction de création d'un canal est :
# Event.new_channel
;;
- : unit -> 'a Event.channel = <fun>
Les événements d'émission et de réception sont créés par un appel aux fonctions :
# Event.send
;;
- : 'a Event.channel -> 'a -> unit Event.event = <fun>
# Event.receive
;;
- : 'a Event.channel -> 'a Event.event = <fun>
On peut considérer ces deux fonctions comme des constructeurs
du type abstrait 'a event. L'événement construit à partir de l'émission
ne conserve pas une information de type de la valeur à transmettre
(type unit Event.event). Par contre l'événement de réception
en tient compte pour récupérer la valeur
pendant une synchronisation. Elles ne sont pas bloquantes dans la mesure
où la transmission d'une valeur n'est réalisée qu'au moment de la synchronisation
de deux processus par la fonction :
# Event.sync
;;
- : 'a Event.event -> 'a = <fun>
Cette fonction peut être est bloquante pour l'émetteur et le récepteur.
Il en existe une
version non bloquante :
# Event.poll
;;
- : 'a Event.event -> 'a option = <fun>
Cette fonction vérifie qu'un autre processus est en attente
de synchronisation. Si c'est le cas elle réalise les transmissions,
et retourne la valeur Some v si v est la valeur associée à l'événement,
et None sinon.
Le message reçu, extrait par la fonction sync peut être
le résultat d'un processus plus ou moins compliqué mettant en
oeuvre d'autres échanges de messages.
Exemple de synchronisation
On définit trois processus légers, le premier t1 envoie une chaîne
de caractères sur le canal c (fonction g)
partagé par tous les processus. Les deux autres t2 et t3
attendent une valeur sur ce même canal.
Voici les fonctions exécutées par les différents processus :
# let
c
=
Event.new_channel
();;
val c : '_a Event.channel = <abstr>
# let
f
()
=
let
ids
=
string_of_int
(Thread.id
(Thread.self
()))
in
print_string
("-------- avant -------"
^
ids)
;
print_newline()
;
let
e
=
Event.receive
c
in
print_string
("-------- pendant -------"
^
ids)
;
print_newline()
;
let
v
=
Event.sync
e
in
print_string
(v
^
" "
^
ids
^
" "
)
;
print_string
("-------- après -------"
^
ids)
;
print_newline()
;;
val f : unit -> unit = <fun>
# let
g
()
=
let
ids
=
string_of_int
(Thread.id
(Thread.self
()))
in
print_string
("Début de "
^
ids
^
"\n"
);
let
e2
=
Event.send
c
"hello"
in
Event.sync
e2
;
print_string
("Fin de "
^
ids)
;
print_newline
()
;;
val g : unit -> unit = <fun>
Les trois processus sont alors créés et exécutés :
# let
t1,
t2,
t3
=
Thread.create
f
(),
Thread.create
f
(),
Thread.create
g
();;
Début de 5
-------- avant -------6
-------- pendant -------6
hello 6 -------- après -------6
-------- avant -------7
-------- pendant -------7
val t1 : Thread.t = <abstr>
val t2 : Thread.t = <abstr>
val t3 : Thread.t = <abstr>
# Thread.delay
1
.
0
;;
Fin de 5
- : unit = <unknown constructor>
On s'aperçoit que l'émission peut être bloquante. La trace de fin de t1
s'affiche après les traces de synchronisation de t2 et t3
Seul un des deux processus t1 et t2 est vraiment terminé,
comme le montre les appels suivants :
# Thread.kill
t1;;
- : unit = ()
# Thread.kill
t2;;
Uncaught exception: Failure("Thread.kill: killed thread")