Recitation 16
Concurrency: Condition Variables and Message Passing

Condition Module

For this module we refer to OCaml's condition module documentation.

The reader writer concurrency pattern

(taken from lec16.ml)
(* Reader/writer; a classic conurrency pattern (concurrent readers and
 * one exclusive writer, CRXW).  There is mutual exclusion between a
 * single writer and any of many readers, but readers can operate at
 * the same time (because they do not change any shared state).
 *
 * This is accomplished with a shared variable n which counts the
 * number of readers currently active. Each reader momentarily acquires
 * the mutex to increment the count, then does their work, then
 * momentarily acquires the mutex to decrement the count.
 *
 * The writer needs to wait until there are no readers.  This is
 * achieved using a condition variable to signal when no are readers
 * active.  The writer waits for the condition to be true.  The readers
 * signal the condition if when they finish there are no readers
 * active.
 *
 * Such waiting on a condition before taking a mutex is known as a
 * semaphore.
 *
 * Condition.wait operates by simultaneously unlocking the specified
 * mutex and waiting (sleeping) for the specified condition to be
 * signalled by some other thread.  Since the mutex is unlocked other
 * threads can do work, including using Condition.signal to signal the
 * condition being waited for.  The mutex is reacquired before
 * Condition.wait returns, so must be explicitly unlocked afterwards.
 *
 *)

let data = ref 0

(* Number of currently active readers, n.  *)

let n = ref 0

(* Mutex for protecting counter, n. To modify data need to lock the
 * mutex and have n=0. *)

let m = Mutex.create ()

(* Condition variable for signaling that no readers are active.  *)

let c = Condition.create ()

let reader_incr () =
  Mutex.lock m ; incr n ; Mutex.unlock m

let reader_decr () =
  Mutex.lock m ; decr n ; Mutex.unlock m

let reader_signal_when_empty () =
  Mutex.lock m ;
  if !n=0 then Condition.signal c ;
  Mutex.unlock m

let read i =
  reader_incr () ;
  print_string("Reader " ^ string_of_int(i)
	       ^ " read data " ^ string_of_int(!data) ^ "\n");
  Thread.delay (Random.float 1.5) ;  (* Simulates reader doing read work *)
  print_string("Reader " ^ string_of_int(i) ^ " has finished reading\n") ;
  reader_decr () ;
  reader_signal_when_empty ()

let reader i =
  while true do
    read i;
    Thread.delay (Random.float 1.5) (* Simulates reader doing other work *)
  done

(* In order to write, the writer waits for the condition of no
 * readers currently active (n=0).  Note that readers can easily starve out
 * the writer, this is a drawback of reader/writer locks.
 *
 * To simulate the writer having a time when the data is in an
 * inconsistent state, the data value is set to 1 while the writer has
 * the lock and is reset to 0 when the writer is done "working".  Readers
 * should thus only see value 0 and never 1.
 *)

let write () =
  Mutex.lock m ;
  while !n<>0 do Condition.wait c m done ;
  (* This is a critical section, have mutex m locked from the
   * Condition.wait *)
  print_string "The writer is writing\n" ; flush stdout ;
  data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ;
  Mutex.unlock m

let writer () =
  while true do write () ; Thread.delay (Random.float 1.5) done

(* Test with one writer and 3 readers  *)

let test_reader_writer () =
  ignore (Thread.create writer () );
  for i=0 to 3 do ignore(Thread.create reader i) done

Event Module

For this module we refer to OCaml's event module documentation.

An example using events

(taken from lec16.ml)
(* SYNCHRONOUS MESSAGES - OCaml also has a synchronous communication
 * mechanism provided by the Event module.  These implement the
 * channels defined by Reppy, 1999.  *)

let c1 = Event.new_channel ()
let c2 = Event.new_channel ()

(* Receive from the specified channel.  Print out the stages.  *)

let r (c) =
  let idr = string_of_int(Thread.id(Thread.self ()))
  in print_string("Before rec in " ^ idr ^"\n");
    let e = Event.receive c
    in print_string("During rec in " ^ idr ^ "\n");
      let v = Event.sync e
      in
	print_string("Recd " ^ v ^ " in " ^ idr ^ "\n")

(* Receive from one of two specified channels.  Print out the stages. *)

let r2 (c1,c2) =
  let idr = string_of_int(Thread.id(Thread.self ()))
  in print_string("Before rec in " ^ idr ^"\n");
    let e1 = Event.receive c1
    and e2 = Event.receive c2
    in print_string("During rec in " ^ idr ^ "\n ");
      let v = Event.select [e1;e2]
      in
	print_string("Recd " ^ v ^ " in " ^ idr ^ "\n")

(* Send the given message on the specified channel.  Print out the stages. *)

let s (c,m) =
  let ids = string_of_int(Thread.id(Thread.self ()))
  in print_string("Start of sender " ^ ids ^ "\n");
    let e = Event.send c m
    in
      Event.sync e;
      print_string("End of sender " ^ ids ^ "\n")

(* Only one of the receiver threads gets the data and finishes, the
 * other is left waiting to synchronize until the second send after the
 * sleep. *)

let test_channels() =
  ignore(Thread.create r (c1));
  ignore(Thread.create r (c1));
  ignore(Thread.create s (c1,"hello1"));
  Thread.delay 2.0;
  ignore(Thread.create s (c1,"after delay"))

(* Only one of the two sender threads has its message read, the other
 * is left waiting to synchronize until the second read after the
 * sleep. *)

let test_channels2() =
  ignore(Thread.create r2 (c1,c2));
  ignore(Thread.create s (c2,"hello2"));
  ignore(Thread.create s (c1,"hello1"));
  Thread.delay 2.0;
  ignore(Thread.create r2 (c1,c2))