Here are some notes on MLton’s implementation of ConcurrentML.
Concurrent ML was originally implemented for SML/NJ. It was ported to MLton in the summer of 2004. The main difference between the implementations is that SML/NJ uses continuations to implement CML threads, while MLton uses its underlying thread package. Presently, MLton’s threads are a little more heavyweight than SML/NJ’s continuations, but it’s pretty clear that there is some fat there that could be trimmed.
The implementation of CML in SML/NJ is built upon the first-class
continuations of the SMLofNJ.Cont
module.
type 'a cont
val callcc: ('a cont -> 'a) -> 'a
val isolate: ('a -> unit) -> 'a cont
val throw: 'a cont -> 'a -> 'b
The implementation of CML in MLton is built upon the first-class threads of the MLtonThread module.
type 'a t
val new: ('a -> unit) -> 'a t
val prepare: 'a t * 'a -> Runnable.t
val switch: ('a t -> Runnable.t) -> 'a
The port is relatively straightforward, because CML always throws to a continuation at most once. Hence, an "abstract" implementation of CML could be built upon first-class one-shot continuations, which map equally well to SML/NJ’s continuations and MLton’s threads.
The "essence" of the port is to transform:
callcc (fn k => ... throw k' v')
to
switch (fn t => ... prepare (t', v'))
which suffices for the vast majority of the CML implementation.
There was only one complicated transformation: blocking multiple base events. In SML/NJ CML, the representation of base events is given by:
datatype 'a event_status
= ENABLED of {prio: int, doFn: unit -> 'a}
| BLOCKED of {
transId: trans_id ref,
cleanUp: unit -> unit,
next: unit -> unit
} -> 'a
type 'a base_evt = unit -> 'a event_status
When synchronizing on a set of base events, which are all blocked, we
must invoke each BLOCKED
function with the same transId
and
cleanUp
(the transId
is (checked and) set to CANCEL
by the
cleanUp
function, which is invoked by the first enabled event; this
"fizzles" every other event in the synchronization group that later
becomes enabled). However, each BLOCKED
function is implemented by
a callcc, so that when the event is enabled, it throws back to the
point of synchronization. Hence, the next function (which doesn’t
return) is invoked by the BLOCKED
function to escape the callcc and
continue in the thread performing the synchronization. In SML/NJ this
is implemented as follows:
fun ext ([], blockFns) = callcc (fn k => let
val throw = throw k
val (transId, setFlg) = mkFlg()
fun log [] = S.atomicDispatch ()
| log (blockFn:: r) =
throw (blockFn {
transId = transId,
cleanUp = setFlg,
next = fn () => log r
})
in
log blockFns; error "[log]"
end)
(Note that S.atomicDispatch
invokes the continuation of the next
continuation on the ready queue.) This doesn’t map well to the MLton
thread model. Although it follows the
callcc (fn k => ... throw k v)
model, the fact that blockFn
will also attempt to do
callcc (fn k' => ... next ())
means that the naive transformation will result in nested switch
-es.
We need to think a little more about what this code is trying to do.
Essentially, each blockFn
wants to capture this continuation, hold
on to it until the event is enabled, and continue with next; when the
event is enabled, before invoking the continuation and returning to
the synchronization point, the cleanUp
and other event specific
operations are performed.
To accomplish the same effect in the MLton thread implementation, we have the following:
datatype 'a status =
ENABLED of {prio: int, doitFn: unit -> 'a}
| BLOCKED of {transId: trans_id,
cleanUp: unit -> unit,
next: unit -> rdy_thread} -> 'a
type 'a base = unit -> 'a status
fun ext ([], blockFns): 'a =
S.atomicSwitch
(fn (t: 'a S.thread) =>
let
val (transId, cleanUp) = TransID.mkFlg ()
fun log blockFns: S.rdy_thread =
case blockFns of
[] => (S.atomicEnd (); S.next ())
| blockFn::blockFns =>
let
val () = S.atomicBegin ()
in
(S.prep o S.new)
(fn _ => fn () =>
let
val () = S.atomicBegin ()
val x = blockFn {transId = transId,
cleanUp = cleanUp,
next = fn () => log blockFns}
in S.switch(fn _ => S.prepVal (t, x))
end)
end
in
log blockFns
end)
To avoid the nested switch
-es, I run the blockFn
in it’s own
thread, whose only purpose is to return to the synchronization point.
This corresponds to the throw (blockFn {…})
in the SML/NJ
implementation. I’m worried that this implementation might be a
little expensive, starting a new thread for each blocked event (when
there are only multiple blocked events in a synchronization group).
But, I don’t see another way of implementing this behavior in the
MLton thread model.
Note that another way of thinking about what is going on is to
consider each blockFn
as prepending a different set of actions to
the thread t
. It might be possible to give a
MLton.Thread.unsafePrepend
.
fun unsafePrepend (T r: 'a t, f: 'b -> 'a): 'b t =
let
val t =
case !r of
Dead => raise Fail "prepend to a Dead thread"
| New g => New (g o f)
| Paused (g, t) => Paused (fn h => g (f o h), t)
in (* r := Dead; *)
T (ref t)
end
I have commented out the r := Dead
, which would allow multiple
prepends to the same thread (i.e., not destroying the original thread
in the process). Of course, only one of the threads could be run: if
the original thread were in the Paused
state, then multiple threads
would share the underlying runtime/primitive thread. Now, this
matches the "one-shot" nature of CML continuations/threads, but I’m
not comfortable with extending MLton.Thread
with such an unsafe
operation.
Other than this complication with blocking multiple base events, the
port was quite routine. (As a very pleasant surprise, the CML
implementation in SML/NJ doesn’t use any SML/NJ-isms.) There is a
slight difference in the way in which critical sections are handled in
SML/NJ and MLton; since MLton.Thread.switch
always leaves a
critical section, it is sometimes necessary to add additional
atomicBegin
-s/atomicEnd
-s to ensure that we remain in a critical
section after a thread switch.
While looking at virtually every file in the core CML implementation, I took the liberty of simplifying things where it seemed possible; in terms of style, the implementation is about half-way between Reppy’s original and MLton’s.
Some changes of note:
-
util/
contains all pertinent data-structures: (functional and imperative) queues, (functional) priority queues. Hence, it should be easier to switch in more efficient or real-time implementations. -
core-cml/scheduler.sml
: in both implementations, this is where most of the interesting action takes place. I’ve made the connection betweenMLton.Thread.t
-s andThreadId.thread_id
-s more abstract than it is in the SML/NJ implementation, and encapsulated all of theMLton.Thread
operations in this module. -
eliminated all of the "by hand" inlining
Future Extensions
The CML documentation says the following:
CML.joinEvt: thread_id -> unit event
joinEvt tid
creates an event value for synchronizing on the termination of the thread with the ID tid. There are three ways that a thread may terminate: the function that was passed to spawn (or spawnc) may return; it may call the exit function, or it may have an uncaught exception. Note that
joinEvt
does not distinguish between these cases; it also does not become enabled if the named thread deadlocks (even if it is garbage collected).
I believe that the MLton.Finalizable
might be able to relax that
last restriction. Upon the creation of a 'a Scheduler.thread
, we
could attach a finalizer to the underlying 'a MLton.Thread.t
that
enables the joinEvt
(in the associated ThreadID.thread_id
) when
the 'a MLton.Thread.t
becomes unreachable.
I don’t know why CML doesn’t have
CML.kill: thread_id -> unit
which has a fairly simple implementation — setting a kill flag in the
thread_id
and adjusting the scheduler to discard any killed threads
that it takes off the ready queue. The fairness of the scheduler
ensures that a killed thread will eventually be discarded. The
semantics are little murky for blocked threads that are killed,
though. For example, consider a thread blocked on SyncVar.mTake mv
and a thread blocked on SyncVar.mGet mv
. If the first thread is
killed while blocked, and a third thread does SyncVar.mPut (mv, x)
,
then we might expect that we’ll enable the second thread, and never
the first. But, when only the ready queue is able to discard killed
threads, then the SyncVar.mPut
could enable the first thread
(putting it on the ready queue, from which it will be discarded) and
leave the second thread blocked. We could solve this by adjusting the
TransID.trans_id types
and the "cleaner" functions to look for both
canceled transactions and transactions on killed threads.
John Reppy says that MarlowEtAl01 and FlattFindler04
explain why CML.kill
would be a bad idea.
Between CML.timeOutEvt
and CML.kill
, one could give an efficient
solution to the recent comp.lang.ml
post about terminating a
function that doesn’t complete in a given time.
fun timeOut (f: unit -> 'a, t: Time.time): 'a option =
let
val iv = SyncVar.iVar ()
val tid = CML.spawn (fn () => SyncVar.iPut (iv, f ()))
in
CML.select
[CML.wrap (CML.timeOutEvt t, fn () => (CML.kill tid; NONE)),
CML.wrap (SyncVar.iGetEvt iv, fn x => SOME x)]
end
Space Safety
There are some CML related posts on the MLton mailing list:
that discuss concerns that SML/NJ’s implementation is not space efficient, because multi-shot continuations can be held indefinitely on event queues. MLton is better off because of the one-shot nature — when an event enables a thread, all other copies of the thread waiting in other event queues get turned into dead threads (of zero size).