(* Reader/writer; a classic conurrency pattern (concurrent readers and * one exclusive writer, CRXW). There is mutual exclusion between a * single writer and any of many readers, but readers can operate at * the same time (because they do not change any shared state). * * This is accomplished with a shared variable n which counts the * number of readers currently active. Each reader momentarily acquires * the mutex to increment the count, then does their work, then * momentarily acquires the mutex to decrement the count. * * The writer needs to wait until there are no readers. This is * achieved using a condition variable to signal when no are readers * active. The writer waits for the condition to be true. The readers * signal the condition if when they finish there are no readers * active. * * Such waiting on a condition before taking a mutex is known as a * semaphore. * * Condition.wait operates by simultaneously unlocking the specified * mutex and waiting (sleeping) for the specified condition to be * signalled by some other thread. Since the mutex is unlocked other * threads can do work, including using Condition.signal to signal the * condition being waited for. The mutex is reacquired before * Condition.wait returns, so must be explicitly unlocked afterwards. * *) let data = ref 0 (* Number of currently active readers, n. *) let n = ref 0 (* Mutex for protecting counter, n. To modify data need to lock the * mutex and have n=0. *) let m = Mutex.create () (* Condition variable for signaling that no readers are active. *) let c = Condition.create () let reader_incr () = Mutex.lock m ; incr n ; Mutex.unlock m let reader_decr () = Mutex.lock m ; decr n ; Mutex.unlock m let reader_signal_when_empty () = Mutex.lock m ; if !n=0 then Condition.signal c ; Mutex.unlock m let read i = reader_incr () ; print_string("Reader " ^ string_of_int(i) ^ " read data " ^ string_of_int(!data) ^ "\n"); Thread.delay (Random.float 1.5) ; (* Simulates reader doing read work *) print_string("Reader " ^ string_of_int(i) ^ " has finished reading\n") ; reader_decr () ; reader_signal_when_empty () let reader i = while true do read i; Thread.delay (Random.float 1.5) (* Simulates reader doing other work *) done (* In order to write, the writer waits for the condition of no * readers currently active (n=0). Note that readers can easily starve out * the writer, this is a drawback of reader/writer locks. * * To simulate the writer having a time when the data is in an * inconsistent state, the data value is set to 1 while the writer has * the lock and is reset to 0 when the writer is done "working". Readers * should thus only see value 0 and never 1. *) let write () = Mutex.lock m ; while !n<>0 do Condition.wait c m done ; (* This is a critical section, have mutex m locked from the * Condition.wait *) print_string "The writer is writing\n" ; flush stdout ; data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ; Mutex.unlock m let writer () = while true do write () ; Thread.delay (Random.float 1.5) done (* Test with one writer and 3 readers *) let test_reader_writer () = ignore (Thread.create writer () ); for i=0 to 3 do ignore(Thread.create reader i) done
(* SYNCHRONOUS MESSAGES - OCaml also has a synchronous communication * mechanism provided by the Event module. These implement the * channels defined by Reppy, 1999. *) let c1 = Event.new_channel () let c2 = Event.new_channel () (* Receive from the specified channel. Print out the stages. *) let r (c) = let idr = string_of_int(Thread.id(Thread.self ())) in print_string("Before rec in " ^ idr ^"\n"); let e = Event.receive c in print_string("During rec in " ^ idr ^ "\n"); let v = Event.sync e in print_string("Recd " ^ v ^ " in " ^ idr ^ "\n") (* Receive from one of two specified channels. Print out the stages. *) let r2 (c1,c2) = let idr = string_of_int(Thread.id(Thread.self ())) in print_string("Before rec in " ^ idr ^"\n"); let e1 = Event.receive c1 and e2 = Event.receive c2 in print_string("During rec in " ^ idr ^ "\n "); let v = Event.select [e1;e2] in print_string("Recd " ^ v ^ " in " ^ idr ^ "\n") (* Send the given message on the specified channel. Print out the stages. *) let s (c,m) = let ids = string_of_int(Thread.id(Thread.self ())) in print_string("Start of sender " ^ ids ^ "\n"); let e = Event.send c m in Event.sync e; print_string("End of sender " ^ ids ^ "\n") (* Only one of the receiver threads gets the data and finishes, the * other is left waiting to synchronize until the second send after the * sleep. *) let test_channels() = ignore(Thread.create r (c1)); ignore(Thread.create r (c1)); ignore(Thread.create s (c1,"hello1")); Thread.delay 2.0; ignore(Thread.create s (c1,"after delay")) (* Only one of the two sender threads has its message read, the other * is left waiting to synchronize until the second read after the * sleep. *) let test_channels2() = ignore(Thread.create r2 (c1,c2)); ignore(Thread.create s (c2,"hello2")); ignore(Thread.create s (c1,"hello1")); Thread.delay 2.0; ignore(Thread.create r2 (c1,c2))