prodcons.mlton
Stephen Weeks
MLton@sourcelight.com
Mon, 9 Jul 2001 11:10:53 -0700
Here is a prodcons.mlton that uses preemptive threads. The old one used
non-preemptive threads. This test exposes a bug in mlton-20000906 that has
since been fixed in 20010706.
----------------------------------------
(* Translated from prodcons.ocaml. *)
functor Z (S: sig
structure Primitive:
sig
structure Stdio:
sig
val print: string -> unit
end
end
structure MLton:
sig
structure Itimer:
sig
datatype which =
Prof
| Real
| Virtual
val set: which * {value: Time.time,
interval: Time.time} -> unit
end
structure Thread:
sig
type 'a t
val atomicBegin: unit -> unit
val atomicEnd: unit -> unit
val new: ('a -> unit) -> 'a t
val switch: ('a t -> 'b t * 'b) -> 'a
end
structure Signal:
sig
type signal
val alrm: signal
structure Handler:
sig
datatype t =
Default
| Ignore
| Handler of unit Thread.t -> unit Thread.t
val get: signal -> t
val set: signal * t -> unit
val simple: (unit -> unit) -> t
end
end
end
end) =
struct
open S
fun for (start, stop, f) =
let
fun loop i =
if i > stop
then ()
else (f i; loop (i + 1))
in
loop start
end
fun print s = () (* Primitive.Stdio.print s *)
structure Queue:
sig
type 'a t
val new: unit -> 'a t
val enque: 'a t * 'a -> unit
val deque: 'a t -> 'a option
end =
struct
datatype 'a t = T of {front: 'a list ref, back: 'a list ref}
fun new () = T {front = ref [], back = ref []}
fun enque (T {back, ...}, x) = back := x :: !back
fun deque (T {front, back}) =
case !front of
[] => (case !back of
[] => NONE
| l => let val l = rev l
in case l of
[] => raise Fail "deque"
| x :: l => (back := []; front := l; SOME x)
end)
| x :: l => (front := l; SOME x)
end
structure Thread:
sig
val exit: unit -> 'a
val run: unit -> unit
val spawn: (unit -> unit) -> unit
val yield: unit -> unit
structure Mutex:
sig
type t
val new: unit -> t
val lock: t * string -> unit
val unlock: t -> unit
end
structure Condition:
sig
type t
val new: unit -> t
val signal: t -> unit
val wait: t * Mutex.t -> unit
end
end =
struct
open MLton
open Itimer Signal Thread
val topLevel: unit Thread.t option ref = ref NONE
local
val threads: unit Thread.t Queue.t = Queue.new ()
in
fun ready (t): unit = Queue.enque (threads, t)
fun next () =
case Queue.deque threads of
NONE => (print "switching to toplevel\n"
; valOf (!topLevel))
| SOME t => t
end
fun 'a exit (): 'a = switch (fn _ => (next (), ()))
fun new (f: unit -> unit): unit Thread.t =
Thread.new (fn () => ((f () handle _ => exit ())
; exit ()))
fun schedule t =
(print "scheduling\n"
; ready t
; next ())
fun yield (): unit = switch (fn t => (schedule t, ()))
val spawn = ready o new
fun setItimer t =
Itimer.set (Itimer.Real,
{value = t,
interval = t})
fun run (): unit =
(switch (fn t =>
(topLevel := SOME t
; (new (fn () => (Handler.set (alrm, Handler.Handler schedule)
; setItimer (Time.fromMilliseconds 20))),
())))
; setItimer Time.zeroTime
; Handler.set (alrm, Handler.Ignore)
; topLevel := NONE)
structure Mutex =
struct
datatype t = T of {locked: bool ref,
waiting: unit Thread.t Queue.t}
fun new () =
T {locked = ref false,
waiting = Queue.new ()}
fun lock (T {locked, waiting, ...}, name) =
let
fun loop () =
(print (concat [name, " lock looping\n"])
; Thread.atomicBegin ()
; if !locked
then (print "mutex is locked\n"
; switch (fn t =>
(Thread.atomicEnd ()
; Queue.enque (waiting, t)
; (next (), ())))
; loop ())
else (print "mutex is not locked\n"
; locked := true
; Thread.atomicEnd ()))
in loop ()
end
fun safeUnlock (T {locked, waiting, ...}) =
(locked := false
; (case Queue.deque waiting of
NONE => ()
| SOME t => (print "unlock found waiting thread\n"
; ready t)))
fun unlock (m: t) =
(print "unlock atomicBegin\n"
; Thread.atomicBegin ()
; safeUnlock m
; Thread.atomicEnd ())
end
structure Condition =
struct
datatype t = T of {waiting: unit Thread.t Queue.t}
fun new () = T {waiting = Queue.new ()}
fun wait (T {waiting, ...}, m) =
(switch (fn t =>
(Mutex.safeUnlock m
; print "wait unlocked mutex\n"
; Queue.enque (waiting, t)
; (next (), ())))
; Mutex.lock (m, "wait"))
fun signal (T {waiting, ...}) =
case Queue.deque waiting of
NONE => ()
| SOME t => ready t
end
end
structure Mutex = Thread.Mutex
structure Condition = Thread.Condition
val count = ref 0
val data = ref 0
val produced = ref 0
val consumed = ref 0
val m = Mutex.new ()
val c = Condition.new ()
fun producer n =
for (1, n, fn i =>
(print (concat ["producer acquiring lock ", Int.toString i, "\n"])
; Mutex.lock (m, "producer")
; print "producer acquired lock\n"
; while !count = 1 do Condition.wait (c, m)
; print "producer passed condition\n"
; data := i
; count := 1
; Condition.signal c
; print "producer releasing lock\n"
; Mutex.unlock m
; print "producer released lock\n"
; produced := !produced + 1))
fun consumer n =
let val i = ref 0
in
while !i <> n do
(print (concat ["consumer acquiring lock ", Int.toString (!i), "\n"])
; Mutex.lock (m, "consumer")
; print "consumer acquired lock\n"
; while !count = 0 do Condition.wait (c, m)
; i := !data
; count := 0
; Condition.signal c
; print "consumer releasing lock\n"
; Mutex.unlock m
; print "consumer released lock\n"
; consumed := !consumed + 1)
end
fun atoi s = case Int.fromString s of SOME num => num | NONE => 0
fun printl [] = TextIO.print "\n" | printl (h::t) = ( TextIO.print h ; printl t )
fun main (name, args) =
let
val n = atoi (hd (args @ ["1"]))
val p = Thread.spawn (fn () => producer n)
val c = Thread.spawn (fn () => consumer n)
val _ = Thread.run ()
val _ = Posix.Process.sleep (Time.fromSeconds 1)
val _ = printl [Int.toString (!produced),
" ",
Int.toString (!consumed)]
in
()
end
end
structure Z = Z (structure MLton = MLton
structure Primitive = Primitive)
val _ = Z.main ( CommandLine.name (), CommandLine.arguments () )