signature MLTON_THREAD =
sig
structure AtomicState:
sig
datatype t = NonAtomic | Atomic of int
end
val atomically: (unit -> 'a) -> 'a
val atomicBegin: unit -> unit
val atomicEnd: unit -> unit
val atomicState: unit -> AtomicState.t
structure Runnable:
sig
type t
end
type 'a t
val atomicSwitch: ('a t -> Runnable.t) -> 'a
val new: ('a -> unit) -> 'a t
val prepend: 'a t * ('b -> 'a) -> 'b t
val prepare: 'a t * 'a -> Runnable.t
val switch: ('a t -> Runnable.t) -> 'a
end
MLton.Thread provides access to MLton’s user-level thread
implementation (i.e. not OS-level threads). Threads are lightweight
data structures that represent a paused computation. Runnable threads
are threads that will begin or continue computing when switch-ed to.
MLton.Thread does not include a default scheduling mechanism, but it
can be used to implement both preemptive and non-preemptive threads.
-
type AtomicState.tthe type of atomic states.
-
atomically fruns
fin a critical section. -
atomicBegin ()begins a critical section.
-
atomicEnd ()ends a critical section.
-
atomicState ()returns the current atomic state.
-
type Runnable.tthe type of threads that can be resumed.
-
type 'a tthe type of threads that expect a value of type
'a. -
atomicSwitch flike
switch, but assumes an atomic calling context. Uponswitch-ing back to the current thread, an implicitatomicEndis performed. -
new fcreates a new thread that, when run, applies
fto the value given to the thread.fmust terminate by `switch`ing to another thread or exiting the process. -
prepend (t, f)creates a new thread (destroying
tin the process) that first appliesfto the value given to the thread and then continues witht. This is a constant time operation. -
prepare (t, v)prepares a new runnable thread (destroying
tin the process) that will evaluatetonv. -
switch fapplies
fto the current thread to getrt, and then start running threadrt. It is an error forfto perform anotherswitch.fis guaranteed to run atomically.
Example of non-preemptive threads
structure Queue:
sig
type 'a t
val new: unit -> 'a t
val enque: 'a t * 'a -> unit
val deque: 'a t -> 'a option
end =
struct
datatype 'a t = T of {front: 'a list ref, back: 'a list ref}
fun new () = T {front = ref [], back = ref []}
fun enque (T {back, ...}, x) = back := x :: !back
fun deque (T {front, back}) =
case !front of
[] => (case !back of
[] => NONE
| l => let val l = rev l
in case l of
[] => raise Fail "deque"
| x :: l => (back := []; front := l; SOME x)
end)
| x :: l => (front := l; SOME x)
end
structure Thread:
sig
val exit: unit -> 'a
val run: unit -> unit
val spawn: (unit -> unit) -> unit
val yield: unit -> unit
end =
struct
open MLton
open Thread
val topLevel: Thread.Runnable.t option ref = ref NONE
local
val threads: Thread.Runnable.t Queue.t = Queue.new ()
in
fun ready (t: Thread.Runnable.t) : unit =
Queue.enque(threads, t)
fun next () : Thread.Runnable.t =
case Queue.deque threads of
NONE => valOf (!topLevel)
| SOME t => t
end
fun 'a exit (): 'a = switch (fn _ => next ())
fun new (f: unit -> unit): Thread.Runnable.t =
Thread.prepare
(Thread.new (fn () => ((f () handle _ => exit ())
; exit ())),
())
fun schedule t = (ready t; next ())
fun yield (): unit = switch (fn t => schedule (Thread.prepare (t, ())))
val spawn = ready o new
fun run(): unit =
(switch (fn t =>
(topLevel := SOME (Thread.prepare (t, ()))
; next()))
; topLevel := NONE)
end
val rec loop =
fn 0 => ()
| n => (print(concat[Int.toString n, "\n"])
; Thread.yield()
; loop(n - 1))
val rec loop' =
fn 0 => ()
| n => (Thread.spawn (fn () => loop n); loop' (n - 2))
val _ = Thread.spawn (fn () => loop' 10)
val _ = Thread.run ()
val _ = print "success\n"
Example of preemptive threads
structure Queue:
sig
type 'a t
val new: unit -> 'a t
val enque: 'a t * 'a -> unit
val deque: 'a t -> 'a option
end =
struct
datatype 'a t = T of {front: 'a list ref, back: 'a list ref}
fun new () = T {front = ref [], back = ref []}
fun enque (T {back, ...}, x) = back := x :: !back
fun deque (T {front, back}) =
case !front of
[] => (case !back of
[] => NONE
| l => let val l = rev l
in case l of
[] => raise Fail "deque"
| x :: l => (back := []; front := l; SOME x)
end)
| x :: l => (front := l; SOME x)
end
structure Thread:
sig
val exit: unit -> 'a
val run: unit -> unit
val spawn: (unit -> unit) -> unit
val yield: unit -> unit
end =
struct
open Posix.Signal
open MLton
open Itimer Signal Thread
val topLevel: Thread.Runnable.t option ref = ref NONE
local
val threads: Thread.Runnable.t Queue.t = Queue.new ()
in
fun ready (t: Thread.Runnable.t) : unit =
Queue.enque(threads, t)
fun next () : Thread.Runnable.t =
case Queue.deque threads of
NONE => valOf (!topLevel)
| SOME t => t
end
fun 'a exit (): 'a = switch (fn _ => next ())
fun new (f: unit -> unit): Thread.Runnable.t =
Thread.prepare
(Thread.new (fn () => ((f () handle _ => exit ())
; exit ())),
())
fun schedule t = (ready t; next ())
fun yield (): unit = switch (fn t => schedule (Thread.prepare (t, ())))
val spawn = ready o new
fun setItimer t =
Itimer.set (Itimer.Real,
{value = t,
interval = t})
fun run (): unit =
(switch (fn t =>
(topLevel := SOME (Thread.prepare (t, ()))
; new (fn () => (setHandler (alrm, Handler.handler schedule)
; setItimer (Time.fromMilliseconds 20)))))
; setItimer Time.zeroTime
; ignore alrm
; topLevel := NONE)
end
val rec delay =
fn 0 => ()
| n => delay (n - 1)
val rec loop =
fn 0 => ()
| n => (delay 500000; loop (n - 1))
val rec loop' =
fn 0 => ()
| n => (Thread.spawn (fn () => loop n); loop' (n - 1))
val _ = Thread.spawn (fn () => loop' 10)
val _ = Thread.run ()
val _ = print "success\n"