[MLton-user] simple, portable, asynchronous programming in SML
Stephen Weeks
sweeks@sweeks.com
Wed, 19 Jul 2006 15:08:14 -0700
Lately I've been thinking about asynchronous programming in SML
(e.g. network or GUI programming), where computations occur in
response to external events that can occur at any time, possibly after
some long delay (e.g. waiting for input from a socket or for keyboard
input). An obvious answer to the question of how to write such code
is to use something like Concurrent ML (CML), which has first-class
events and preemptive threads. This allows one to write code in a
natural style, where a thread blocks until an event becomes enabled
and other threads can continue in the interim.
Unfortunately, there are several problems with using CML. First and
foremost, programming with preemptive threads is significantly more
challenging than single-threaded programming. Second, while MLton has
some support for CML, the basis library has not yet been updated to be
thread safe. Doing this would be difficult, and would be further
complicated by the desire to not impose a performance hit on
single-threaded code. Third, CML is not implemented with portable SML
code -- its implementation requires (at least) primitives for grabbing
the current thread (or continuation). These primitives are available
in MLton and SML/NJ but not in other SML implementations.
It would be nice to have a portable SML library for asynchronous
programming that lets one keep the simple reasoning of single-threaded
programming. In addition to simplifying user code,
single-threadedness would enable the use of existing library code
unchanged, including the MLton basis library. What follows is an
explanation of such a library.
This is my first write-up of this idea and code, so there are surely
errors and places for improvement. Please send your thoughts. My
hope for sending this note is to get good feedback, and then to polish
the code and explanation, adding them to the wiki and possibly the
MLton library. The code is under the MLton license, so feel free to
try it out in your own projects.
If you are familiar with CML, the basic idea is derived from the
following equation:
CML = Events + Threads + Preemption
The idea is to directly implement just the "event" part of CML in
portable SML, leaving threads and preemption as (non-portable) add-ons.
Almost all of the CML abstractions (channel, ivar, mvar, mailbox,
multicast) make perfect sense without threads and preemption, and can
easily be implemented in portable SML. As a separable decision, if
the SML implementation supports threads (or continuations), one can
trivially use them with events (the key operation be Cml.sync).
Further, as a separable decision, if the SML implementation supports
preemption (e.g. via a timer signal), one can choose to use that. But
a big benefit of this approach is to avoid preemption, and to live
with just events, or possibly events plus non-preemptive threads.
Here is the signature I have in mind for events.
--------------------------------------------------------------------------------
signature EVENT =
sig
type 'a t
val always: 'a -> 'a t
val choose: 'a t list -> 'a t
val channel: unit -> {get: 'a t, put: 'a -> unit t}
val every: 'a t * ('a -> unit) -> unit
val ivar: unit -> {get: 'a t, put: 'a -> unit}
val mailbox: unit -> {get: 'a t, put: 'a -> unit}
val multicast: unit -> {get: unit -> 'a t, put: 'a -> unit}
val mvar: unit -> {get: 'a t, put: 'a -> unit}
val never: unit -> 'a t
val runHandlers: unit -> unit
val when: 'a t * ('a -> unit) -> unit
val wrap: 'a t * ('a -> 'b) -> 'b t
end
--------------------------------------------------------------------------------
If you're familiar with CML, this should look somewhat, although not
completely, familiar. In addition to throwing out parts of CML, I've
also MLton-ized the signature and cleaned up things. In any case, I
will explain from the ground up.
An event of type 'a t is a first-class object that represents the
possibility of communicating a value of type 'a from one part of the
program to another. At any point in time, an event is either enabled
or disabled, depending on whether it is able to supply a value.
Events naturally express asynchronicity. The simplest possible
example might be a timeout event that becomes enabled at some future
time (and remains enabled thereafter). As another example, one might
have an event that represents the bytes coming over a socket. The
event is enabled when bytes are available, and disabled if not. As
another example, one might have an event that represents keyboard
input, and becomes enabled with the key value of each keypress.
The key operation on events that allows one to consume the value
supplied by an event is "when".
val when: 'a t * ('a -> unit) -> unit
If "e" is an event of type 'a t, and h: 'a -> unit, then
when (e, h)
says that the next time "e" becomes enabled with value "a", for the
handler "h" to consume "a", i.e. run "h a". "h" is referred to as a
"handler", since it is handling a value supplied by "e". When does "h
a" actually run? Since there are no threads or preemption, it doesn't
run immediately or in its own thread. Rather, the handler is
"scheduled" to run at some later date. This simply stores the handler
in some hidden state of the event module. It is up to clients of the
event module to call "runHandlers"
val runHandlers: unit -> unit
to run all scheduled handlers. The expectation is not that user code
is littered with calls to runHandlers. Rather, the expectation is
that the code that interfaces with the low-level asynchronous system
stuff (select, GUI callbacks, etc.) calls runHandlers whenever system
events enable new handlers. At that point, a single call to
runHandlers suffices to propagate all the effects of the low-level
event throughout the SML world.
The benefit of this approach to handlers is preservation of the
non-preemptive programming model and the immensely easier reasoning
about programs that goes along with it. Code that installs handlers,
i.e. calls "when", doesn't have to worry about the handlers running
while it is running and interfering with its state. Handlers
themselves are assured to run to completion without interruption from
other handlers. There is no unexpected context switching due to
preemption or due to blocking. Code doesn't block -- it just installs
handlers.
The basic operation that allows two parts of the program to
communicate using events is a FIFO channel.
val channel: unit -> {get: 'a t, put: 'a -> unit t}
If one constructs a channel with
val {get, put} = channel ()
then "put" and "get" correspond to the two sides, input and output, of
the channel. "get" is an event that is enabled whenever some other
part of the program is putting a value on the other side of the
channel, while "put a" is enabled whenever some other part of the
program is getting a value from the channel. For example, suppose
that one does the following:
when (put a, h1)
when (get, h2)
when (get, h3)
Then, "h1" is waiting for "put a" to be enabled, i.e. waiting for some
other part of the program to "get" from the channel. Fortunately,
both "h2" and "h3" are waiting for a put. Since both sides of the
channel are enabled, a communication happens. Whether h2 or h3
receives "a" is unspecified; supposing it is h2, then the handler "h1
()" will run, as will "h2 a". The term for this kind of communication
is "synchronous rendezvous" because both sides of the communication
wait until the other is enabled, and then both are simultaneously
committed to the communication occurring. In this example, "h3" would
still be waiting for another value to be put on the channel.
In addition to channels, there are several event combinators for
creating basic events and building more complicated events from
simpler ones.
val always: 'a -> 'a t
val never: unit -> 'a t
val choose: 'a t list -> 'a t
val wrap: 'a t * ('a -> 'b) -> 'b t
"always a" returns an event that is always enabled with value "a".
"never ()", returns an event that is never enabled. If "e1", ...,
"en" are events, then
choose [e1, ..., en]
is a new event that is enabled whenever any of its constituent events
ei. "choose" implements the idea of selective communication.
If "e" is an event of type 'a t and f: 'a -> 'b, then
wrap (e, f)
is enabled with value "f a" whenever "e" is enabled with value "a".
The computation "f a" only occurs, however, if and when the event is
committed to.
For events that repeatedly fire, a useful construct is "every"
val every: 'a t * ('a -> unit) -> unit
The idea is that "every (e, h)" waits until "e" is enabled with value
"a", at which point it handles "a" with "h", and then repeats, waiting
for "e". That is, "every (e, h)" is equivalent to the following
infinite expression E:
E = when (e, fn a => (h a; E))
"every" is easily defined using recursion and "when".
fun every (e, h) =
let
fun loop () = when (e, fn a => (h a; loop ()))
in
loop ()
end
Finally, there are several other ways to communicate using events (all
of these appear in CML): mailboxes, ivars, mvars, and multicast
channels.
A mailbox is exactly like a channel except that "put" is buffered.
val mailbox: unit -> {get: 'a t, put: 'a -> unit}
For a mailbox, "get" is exactly the same as for channel -- it is
enabled when some value is in the mailbox. The difference between
channel and mailbox is in the type of "put", which returns "unit t"
for channels and "unit" for mailboxes. For a mailbox, put immediately
commits to putting a value, whereas for a channel, put returns an
event that will be enabled only when the getter is there. Mailboxes
are easily implemented in terms of channels and "when".
fun mailbox () =
let
val {get, put} = channel ()
in
{get = get,
put = fn a => when (put a, ignore)}
end
An ivar is a write-once variable. Once it is written to, subsequent
writes fail and all reads succeed.
val ivar: unit -> {get: 'a t, put: 'a -> unit}
An ivar is easily implemented in terms of a channel. The idea is that
"put a" starts a loop that repeatedly puts "a" on the channel.
fun ivar () =
let
val {get, put} = channel ()
val isSet = ref false
val put =
fn a =>
if !isSet then
die "ivar"
else
every (put a, ignore)
in
{get = get,
put = put}
end
An mvar is a cell that is either empty or full, in which case it
holds a single value. It is like an "'a option ref".
val mvar: unit -> {get: 'a t, put: 'a -> unit}
"put" stores a value in the cell. It is an error to call put if the
cell is full. "get" is enabled with value "a" when the cell is full,
holding value "a". Handling a "get" removes the value from the cell.
A multicast is a FIFO channel to which one writes a sequence of
messages that any number of readers can see. It is like a mailbox in
terms of put, but where each reader gets his own private output
channel to read all the messages.
val multicast: unit -> {get: unit -> 'a t, put: 'a -> unit}
If one constructs a multicast channel with
val {get, put} = multicast ()
then "put" adds another value to the sequence of messages. A call to
"get" at time t returns an event that will receive (in order) every
value put after time t.
That covers all of the event module. I've appended complete code
implementing it, less than 400 lines, to the end of this message. The
code also includes a space-safe version of mutable queues, where enque
is curried so that one can hold on to the back of the queue without
holding on to the entire queue.
The main CML operation that is not implemented is "sync", which blocks
a thread on an event until the event becomes enabled.
val sync: 'a t -> 'a
It is impossible to implement sync without support from the
compiler/runtime. A simple way such support might be provided is via
a pause function.
val pause: (('a -> unit) -> unit) -> 'a
The idea is that "pause f" pauses the currently running thread and
passes to "f" a function "g" such that "g a" will re-enable the thread
to continue with value "a". With pause, sync can be easily
implemented.
fun sync e = pause (fn k => when (e, k))
One could add sync like this, and program with non-preemptive threads
without changing the model much. The main problem with sync is that
whether a function blocks or not is no longer reflected in the type
system. Without sync, the only equivalent of blocking is for a
function to return an event, which is then reflected in its type. The
problem with blocking is that it allows other threads/handlers to run,
and so complicates reasoning about a piece of code. So, either one
must trust the documentation for a function when it says it doesn't
block, or one must assume that it does block and take into account all
possible interactions with other threads/handlers. Thus, I think it
is much better to program without sync if possible.
If the compiler/runtime support timer signals, it is possible to use
them to add preemption to the above, but that seems like a bad idea to
me. The simplicity of non-preemptive reasoning is gone, and the
implementation below is almost certainly broken, as it is not thread
safe.
Code follows.
--------------------------------------------------------------------------------
signature EVENT =
sig
type 'a t
val always: 'a -> 'a t
val choose: 'a t list -> 'a t
val channel: unit -> {get: 'a t, put: 'a -> unit t}
val every: 'a t * ('a -> unit) -> unit
val ivar: unit -> {get: 'a t, put: 'a -> unit}
val mailbox: unit -> {get: 'a t, put: 'a -> unit}
val multicast: unit -> {get: unit -> 'a t, put: 'a -> unit}
val mvar: unit -> {get: 'a t, put: 'a -> unit}
val never: unit -> 'a t
val runHandlers: unit -> unit
val when: 'a t * ('a -> unit) -> unit
val wrap: 'a t * ('a -> 'b) -> 'b t
end
structure List =
struct
fun foreach (l, f) = List.app f l
end
structure Util =
struct
fun const c _ = c
fun die s = raise Fail s
fun pass a f = f a
fun recur (a, f) =
let
fun loop a = f (a, loop)
in
loop a
end
end
structure Queue:>
sig
type 'a t
val deque: 'a t -> 'a option
val enque: 'a t -> 'a -> unit
val make: unit -> 'a t
end =
struct
structure Node =
struct
datatype 'a t = T of ('a * 'a t) option ref
fun make () = T (ref NONE)
end
datatype 'a t = T of {back: 'a Node.t ref,
front: 'a Node.t ref}
fun make () =
let
val n = Node.make ()
in
T {back = ref n, front = ref n}
end
fun enque (T {back, ...}) = fn a =>
let
val Node.T r = !back
val n = Node.make ()
in
r := SOME (a, n);
back := n
end
fun deque (T {front, ...}) =
let
val Node.T r = !front
in
case !r of
NONE => NONE
| SOME (a, n) => (front := n; SOME a)
end
end
structure Event:> EVENT =
struct
open Util
val handlers: (unit -> unit) Queue.t = Queue.make ()
val scheduleHandler: (unit -> unit) -> unit = Queue.enque handlers
fun runHandlers () =
recur ((), fn ((), loop) =>
case Queue.deque handlers of
NONE => ()
| SOME h => (h (); loop ()))
structure Handler:
sig
type 'a t
val hasBeenScheduled: 'a t -> bool
val make: ('a -> unit) -> 'a t
val precompose: 'a t * ('b -> 'a) -> 'b t
(* It is an error to call Handler.schedule h if Handler.hasBeenScheduled h.
*)
val schedule: 'a t -> 'a -> unit
end =
struct
datatype 'a t = T of {handler: 'a -> unit,
hasBeenScheduled: bool ref}
fun make f =
T {handler = f,
hasBeenScheduled = ref false}
fun hasBeenScheduled (T {hasBeenScheduled = h, ...}) = !h
fun schedule (T {handler, hasBeenScheduled}) =
if !hasBeenScheduled then
die "Handler.schedule of handler that hasBeenScheduled"
else
let
val () = hasBeenScheduled := true
in
fn a => scheduleHandler (fn () => handler a)
end
fun precompose (T {handler, hasBeenScheduled}, f) =
T {handler = handler o f,
hasBeenScheduled = hasBeenScheduled}
end
structure Handlers:
sig
type ('a, 'b) t
val add: ('a, 'b) t -> 'b -> 'a Handler.t -> unit
val make: unit -> ('a, 'b) t
val scheduleAll: ('a, unit) t -> 'a -> unit
val scheduleOne: ('a, 'b) t -> (('a -> unit) * 'b) option
end =
struct
datatype ('a, 'b) t = T of {extra: 'b,
handler: 'a Handler.t} Queue.t
fun make () = T (Queue.make ())
fun add (T q) =
let
val enque = Queue.enque q
in
fn e => fn h => enque {extra = e, handler = h}
end
fun scheduleAll (T q) =
let
val hs =
recur
([], fn (hs, loop) =>
case Queue.deque q of
NONE => hs
| SOME {handler = h, ...} =>
loop (if Handler.hasBeenScheduled h then
hs
else
Handler.schedule h :: hs))
in
fn a => List.foreach (hs, pass a)
end
fun scheduleOne (T q) =
recur ((), fn ((), loop) =>
case Queue.deque q of
NONE => NONE
| SOME {extra, handler} =>
if Handler.hasBeenScheduled handler then
loop ()
else
SOME (Handler.schedule handler, extra))
end
structure PollResult =
struct
datatype 'a t =
Enabled of 'a
(* Invariant: We never pass a handler that hasBeenScheduled to a
* NotEnabled.
*)
| NotEnabled of 'a Handler.t -> unit
end
datatype z = datatype PollResult.t
datatype 'a t = T of unit -> 'a PollResult.t
fun always a = T (const (Enabled a))
fun never () = T (const (NotEnabled ignore))
fun poll (T p) =
case p () of
Enabled a => SOME a
| NotEnabled _ => NONE
fun addHandler (T p, h) =
if Handler.hasBeenScheduled h then
()
else
case p () of
Enabled a => Handler.schedule h a
| NotEnabled f => f h
fun when (d, g) = addHandler (d, Handler.make g)
fun every (e, f) =
recur ((), fn ((), loop) => when (e, fn a => (f a; loop ())))
fun wrap (T p, g) =
T (fn () =>
NotEnabled
(fn h =>
let
val h = Handler.precompose (h, g)
in
case p () of
Enabled a =>
Handler.schedule (Handler.precompose (h, const a)) ()
| NotEnabled f =>
f h
end))
fun ivar () =
let
val getters = Handlers.make ()
val r = ref (NotEnabled (Handlers.add getters ()))
val get = T (fn () => !r)
fun put a =
case !r of
NotEnabled _ =>
(r := Enabled a;
Handlers.scheduleAll getters a)
| _ => die "ivar put"
in
{get = get, put = put}
end
fun mvar () =
let
val getters = Handlers.make ()
val add = Handlers.add getters ()
val r = ref NONE
fun put a =
if isSome (!r) then
die "mvar put"
else
case Handlers.scheduleOne getters of
NONE => r := SOME a
| SOME (h, ()) => h a
val get =
T (fn () =>
case !r of
NONE => NotEnabled add
| SOME a => (r := NONE; Enabled a))
in
{get = get, put = put}
end
fun 'a channel () =
let
val getters: ('a, unit) Handlers.t = Handlers.make ()
val putters: (unit, 'a) Handlers.t = Handlers.make ()
val add = Handlers.add getters ()
val get =
T (fn () =>
case Handlers.scheduleOne putters of
NONE => NotEnabled add
| SOME (h, a) => (h (); Enabled a))
val add = Handlers.add putters
fun put a =
T (fn () =>
case Handlers.scheduleOne getters of
NONE => NotEnabled (add a)
| SOME (h, ()) => (h a; Enabled ()))
in
{get = get, put = put}
end
fun mailbox () =
let
val {get, put} = channel ()
in
{get = get,
put = fn a => when (put a, ignore)}
end
fun 'a multicast () =
let
datatype 'a n = T of 'a * 'a n t
local
val {get, put} = ivar ()
in
val getters: 'a n t ref = ref get
val putter: ('a n -> unit) ref = ref put
end
fun put a =
let
val {get = g, put = p} = ivar ()
val () = !putter (T (a, g))
val () = getters := g
val () = putter := p
in
()
end
fun get () =
let
val getters = !getters
val {get, put} = channel ()
val () =
recur (getters, fn (g, loop) =>
when (g, fn T (a, g) =>
when (put a, fn () => loop g)))
in
get
end
in
{get = get, put = put}
end
fun choose es =
T (fn () =>
recur
((es, []), fn ((es, fs), loop) =>
case es of
[] =>
NotEnabled
(fn h =>
recur (fs, fn (fs, loop) =>
case fs of
[] => ()
| f :: fs =>
(f h;
if Handler.hasBeenScheduled h then
()
else
loop fs)))
| T p :: es =>
case p () of
Enabled a => Enabled a
| NotEnabled f => loop (es, f :: fs)))
end