signature MLTON_THREAD = sig structure AtomicState: sig datatype t = NonAtomic | Atomic of int end val atomically: (unit -> 'a) -> 'a val atomicBegin: unit -> unit val atomicEnd: unit -> unit val atomicState: unit -> AtomicState.t structure Runnable: sig type t end type 'a t val atomicSwitch: ('a t -> Runnable.t) -> 'a val new: ('a -> unit) -> 'a t val prepend: 'a t * ('b -> 'a) -> 'b t val prepare: 'a t * 'a -> Runnable.t val switch: ('a t -> Runnable.t) -> 'a end
MLton.Thread provides access to MLton's user-level thread implementation (i.e. not OS-level threads). Threads are lightweight data structures that represent a paused computation. Runnable threads are threads that will begin or continue computing when switched to. MLton.Thread does not include a default scheduling mechanism, but it can be used to implement both preemptive and non-preemptive threads.
-
type AtomicState.t
the type of atomic states. -
atomically f
runs f in a critical section. -
atomicBegin ()
begins a critical section. -
atomicEnd ()
ends a critical section. -
atomicState ()
returns the current atomic state. -
type Runnable.t
the type of threads that can be resumed. -
type 'a t
the type of threads that expect a value of type 'a. -
atomicSwitch f
like switch, but assumes an atomic calling context. Upon switching back to the current thread, an implicit atomicEnd is performed. -
new f
creates a new thread that, when run, applies f to the value given to the thread. f must terminate by switching to another thread or exiting the process. -
prepend (t, f)
creates a new thread (destroying t in the process) that first applies f to the value given to the thread and then continues with t. This is a constant time operation. -
prepare (t, v)
prepares a new runnable thread (destroying t in the process) that will evaluate t on v. -
switch f
applies f to the current thread to get rt, and then start running thread rt. It is an error for f to perform another switch. f is guaranteed to run atomically.
Example of non-preemptive threads
structure Queue: sig type 'a t val new: unit -> 'a t val enque: 'a t * 'a -> unit val deque: 'a t -> 'a option end = struct datatype 'a t = T of {front: 'a list ref, back: 'a list ref} fun new() = T{front = ref [], back = ref []} fun enque(T{back, ...}, x) = back := x :: !back fun deque(T{front, back}) = case !front of [] => (case !back of [] => NONE | l => let val l = rev l in case l of [] => raise Fail "deque" | x :: l => (back := []; front := l; SOME x) end) | x :: l => (front := l; SOME x) end structure Thread: sig val exit: unit -> 'a val run: unit -> unit val spawn: (unit -> unit) -> unit val yield: unit -> unit end = struct open MLton open Thread val topLevel: Thread.Runnable.t option ref = ref NONE local val threads: Thread.Runnable.t Queue.t = Queue.new() in fun ready (t: Thread.Runnable.t) : unit = Queue.enque(threads, t) fun next () : Thread.Runnable.t = case Queue.deque threads of NONE => valOf(!topLevel) | SOME t => t end fun 'a exit(): 'a = switch(fn _ => next()) fun new(f: unit -> unit): Thread.Runnable.t = Thread.prepare (Thread.new (fn () => ((f() handle _ => exit()) ; exit())), ()) fun schedule t = (ready t; next()) fun yield(): unit = switch(fn t => schedule (Thread.prepare (t, ()))) val spawn = ready o new fun run(): unit = (switch(fn t => (topLevel := SOME (Thread.prepare (t, ())) ; next())) ; topLevel := NONE) end val rec loop = fn 0 => () | n => (print(concat[Int.toString n, "\n"]) ; Thread.yield() ; loop(n - 1)) val rec loop' = fn 0 => () | n => (Thread.spawn(fn () => loop n); loop'(n - 2)) val _ = Thread.spawn(fn () => loop' 10) val _ = Thread.run() val _ = print "success\n"
Example of preemptive threads
structure Queue: sig type 'a t val new: unit -> 'a t val enque: 'a t * 'a -> unit val deque: 'a t -> 'a option end = struct datatype 'a t = T of {front: 'a list ref, back: 'a list ref} fun new () = T {front = ref [], back = ref []} fun enque (T {back, ...}, x) = back := x :: !back fun deque (T {front, back}) = case !front of [] => (case !back of [] => NONE | l => let val l = rev l in case l of [] => raise Fail "deque" | x :: l => (back := []; front := l; SOME x) end) | x :: l => (front := l; SOME x) end structure Thread: sig val exit: unit -> 'a val run: unit -> unit val spawn: (unit -> unit) -> unit val yield: unit -> unit end = struct open Posix.Signal open MLton open Itimer Signal Thread val topLevel: Thread.Runnable.t option ref = ref NONE local val threads: Thread.Runnable.t Queue.t = Queue.new () in fun ready t = Queue.enque (threads, t) fun next () = case Queue.deque threads of NONE => valOf (!topLevel) | SOME t => t end fun 'a exit (): 'a = switch (fn _ => next ()) fun new (f: unit -> unit): Thread.Runnable.t = Thread.prepare (Thread.new (fn () => ((f () handle _ => exit ()) ; exit ())), ()) fun schedule t = (ready t; next ()) fun yield (): unit = switch (fn t => schedule (Thread.prepare (t, ()))) val spawn = ready o new fun setItimer t = Itimer.set (Itimer.Real, {value = t, interval = t}) fun run (): unit = (switch (fn t => (topLevel := SOME (Thread.prepare (t, ())) ; new (fn () => (setHandler (alrm, Handler.handler schedule) ; setItimer (Time.fromMilliseconds 20))))) ; setItimer Time.zeroTime ; ignore alrm ; topLevel := NONE) end val rec delay = fn 0 => () | n => delay (n - 1) val rec loop = fn 0 => () | n => (delay 500000; loop (n - 1)) val rec loop' = fn 0 => () | n => (Thread.spawn (fn () => loop n); loop' (n - 1)) val _ = Thread.spawn (fn () => loop' 10) val _ = Thread.run () val _ = print "success\n"