Recitation 16
Concurrency: Condition Variables and Message Passing
Condition Module
For this module we refer to OCaml's condition module documentation.
The reader writer concurrency pattern
(taken from lec16.ml)
(* Reader/writer; a classic conurrency pattern (concurrent readers and
* one exclusive writer, CRXW). There is mutual exclusion between a
* single writer and any of many readers, but readers can operate at
* the same time (because they do not change any shared state).
*
* This is accomplished with a shared variable n which counts the
* number of readers currently active. Each reader momentarily acquires
* the mutex to increment the count, then does their work, then
* momentarily acquires the mutex to decrement the count.
*
* The writer needs to wait until there are no readers. This is
* achieved using a condition variable to signal when no are readers
* active. The writer waits for the condition to be true. The readers
* signal the condition if when they finish there are no readers
* active.
*
* Such waiting on a condition before taking a mutex is known as a
* semaphore.
*
* Condition.wait operates by simultaneously unlocking the specified
* mutex and waiting (sleeping) for the specified condition to be
* signalled by some other thread. Since the mutex is unlocked other
* threads can do work, including using Condition.signal to signal the
* condition being waited for. The mutex is reacquired before
* Condition.wait returns, so must be explicitly unlocked afterwards.
*
*)
let data = ref 0
(* Number of currently active readers, n. *)
let n = ref 0
(* Mutex for protecting counter, n. To modify data need to lock the
* mutex and have n=0. *)
let m = Mutex.create ()
(* Condition variable for signaling that no readers are active. *)
let c = Condition.create ()
let reader_incr () =
Mutex.lock m ; incr n ; Mutex.unlock m
let reader_decr () =
Mutex.lock m ; decr n ; Mutex.unlock m
let reader_signal_when_empty () =
Mutex.lock m ;
if !n=0 then Condition.signal c ;
Mutex.unlock m
let read i =
reader_incr () ;
print_string("Reader " ^ string_of_int(i)
^ " read data " ^ string_of_int(!data) ^ "\n");
Thread.delay (Random.float 1.5) ; (* Simulates reader doing read work *)
print_string("Reader " ^ string_of_int(i) ^ " has finished reading\n") ;
reader_decr () ;
reader_signal_when_empty ()
let reader i =
while true do
read i;
Thread.delay (Random.float 1.5) (* Simulates reader doing other work *)
done
(* In order to write, the writer waits for the condition of no
* readers currently active (n=0). Note that readers can easily starve out
* the writer, this is a drawback of reader/writer locks.
*
* To simulate the writer having a time when the data is in an
* inconsistent state, the data value is set to 1 while the writer has
* the lock and is reset to 0 when the writer is done "working". Readers
* should thus only see value 0 and never 1.
*)
let write () =
Mutex.lock m ;
while !n<>0 do Condition.wait c m done ;
(* This is a critical section, have mutex m locked from the
* Condition.wait *)
print_string "The writer is writing\n" ; flush stdout ;
data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ;
Mutex.unlock m
let writer () =
while true do write () ; Thread.delay (Random.float 1.5) done
(* Test with one writer and 3 readers *)
let test_reader_writer () =
ignore (Thread.create writer () );
for i=0 to 3 do ignore(Thread.create reader i) done
Event Module
For this module we refer to OCaml's event module documentation.
An example using events
(taken from lec16.ml)
(* SYNCHRONOUS MESSAGES - OCaml also has a synchronous communication
* mechanism provided by the Event module. These implement the
* channels defined by Reppy, 1999. *)
let c1 = Event.new_channel ()
let c2 = Event.new_channel ()
(* Receive from the specified channel. Print out the stages. *)
let r (c) =
let idr = string_of_int(Thread.id(Thread.self ()))
in print_string("Before rec in " ^ idr ^"\n");
let e = Event.receive c
in print_string("During rec in " ^ idr ^ "\n");
let v = Event.sync e
in
print_string("Recd " ^ v ^ " in " ^ idr ^ "\n")
(* Receive from one of two specified channels. Print out the stages. *)
let r2 (c1,c2) =
let idr = string_of_int(Thread.id(Thread.self ()))
in print_string("Before rec in " ^ idr ^"\n");
let e1 = Event.receive c1
and e2 = Event.receive c2
in print_string("During rec in " ^ idr ^ "\n ");
let v = Event.select [e1;e2]
in
print_string("Recd " ^ v ^ " in " ^ idr ^ "\n")
(* Send the given message on the specified channel. Print out the stages. *)
let s (c,m) =
let ids = string_of_int(Thread.id(Thread.self ()))
in print_string("Start of sender " ^ ids ^ "\n");
let e = Event.send c m
in
Event.sync e;
print_string("End of sender " ^ ids ^ "\n")
(* Only one of the receiver threads gets the data and finishes, the
* other is left waiting to synchronize until the second send after the
* sleep. *)
let test_channels() =
ignore(Thread.create r (c1));
ignore(Thread.create r (c1));
ignore(Thread.create s (c1,"hello1"));
Thread.delay 2.0;
ignore(Thread.create s (c1,"after delay"))
(* Only one of the two sender threads has its message read, the other
* is left waiting to synchronize until the second read after the
* sleep. *)
let test_channels2() =
ignore(Thread.create r2 (c1,c2));
ignore(Thread.create s (c2,"hello2"));
ignore(Thread.create s (c1,"hello1"));
Thread.delay 2.0;
ignore(Thread.create r2 (c1,c2))