Here are some notes on MLton’s implementation of ConcurrentML.

Concurrent ML was originally implemented for SML/NJ. It was ported to MLton in the summer of 2004. The main difference between the implementations is that SML/NJ uses continuations to implement CML threads, while MLton uses its underlying thread package. Presently, MLton’s threads are a little more heavyweight than SML/NJ’s continuations, but it’s pretty clear that there is some fat there that could be trimmed.

The implementation of CML in SML/NJ is built upon the first-class continuations of the SMLofNJ.Cont module.

type 'a cont
val callcc: ('a cont -> 'a) -> 'a
val isolate: ('a -> unit) -> 'a cont
val throw: 'a cont -> 'a -> 'b

The implementation of CML in MLton is built upon the first-class threads of the MLtonThread module.

type 'a t
val new: ('a -> unit) -> 'a t
val prepare: 'a t * 'a -> Runnable.t
val switch: ('a t -> Runnable.t) -> 'a

The port is relatively straightforward, because CML always throws to a continuation at most once. Hence, an "abstract" implementation of CML could be built upon first-class one-shot continuations, which map equally well to SML/NJ’s continuations and MLton’s threads.

The "essence" of the port is to transform:

callcc (fn k => ... throw k' v')


switch (fn t => ... prepare (t', v'))

which suffices for the vast majority of the CML implementation.

There was only one complicated transformation: blocking multiple base events. In SML/NJ CML, the representation of base events is given by:

datatype 'a event_status
  = ENABLED of {prio: int, doFn: unit -> 'a}
  | BLOCKED of {
        transId: trans_id ref,
        cleanUp: unit -> unit,
        next: unit -> unit
      } -> 'a
type 'a base_evt = unit -> 'a event_status

When synchronizing on a set of base events, which are all blocked, we must invoke each BLOCKED function with the same transId and cleanUp (the transId is (checked and) set to CANCEL by the cleanUp function, which is invoked by the first enabled event; this "fizzles" every other event in the synchronization group that later becomes enabled). However, each BLOCKED function is implemented by a callcc, so that when the event is enabled, it throws back to the point of synchronization. Hence, the next function (which doesn’t return) is invoked by the BLOCKED function to escape the callcc and continue in the thread performing the synchronization. In SML/NJ this is implemented as follows:

fun ext ([], blockFns) = callcc (fn k => let
      val throw = throw k
      val (transId, setFlg) = mkFlg()
      fun log [] = S.atomicDispatch ()
        | log (blockFn:: r) =
            throw (blockFn {
                transId = transId,
                cleanUp = setFlg,
                next = fn () => log r
        log blockFns; error "[log]"

(Note that S.atomicDispatch invokes the continuation of the next continuation on the ready queue.) This doesn’t map well to the MLton thread model. Although it follows the

callcc (fn k => ... throw k v)

model, the fact that blockFn will also attempt to do

callcc (fn k' => ... next ())

means that the naive transformation will result in nested switch-es.

We need to think a little more about what this code is trying to do. Essentially, each blockFn wants to capture this continuation, hold on to it until the event is enabled, and continue with next; when the event is enabled, before invoking the continuation and returning to the synchronization point, the cleanUp and other event specific operations are performed.

To accomplish the same effect in the MLton thread implementation, we have the following:

datatype 'a status =
   ENABLED of {prio: int, doitFn: unit -> 'a}
 | BLOCKED of {transId: trans_id,
               cleanUp: unit -> unit,
               next: unit -> rdy_thread} -> 'a

type 'a base = unit -> 'a status

fun ext ([], blockFns): 'a =
     (fn (t: 'a S.thread) =>
         val (transId, cleanUp) = TransID.mkFlg ()
         fun log blockFns: S.rdy_thread =
            case blockFns of
               [] => ()
             | blockFn::blockFns =>
                  (S.prep o
                  (fn _ => fn () =>
                      val () = S.atomicBegin ()
                      val x = blockFn {transId = transId,
                                       cleanUp = cleanUp,
                                       next = fn () => log blockFns}
                   in S.switch(fn _ => S.prepVal (t, x))
         log blockFns

To avoid the nested switch-es, I run the blockFn in it’s own thread, whose only purpose is to return to the synchronization point. This corresponds to the throw (blockFn {...}) in the SML/NJ implementation. I’m worried that this implementation might be a little expensive, starting a new thread for each blocked event (when there are only multiple blocked events in a synchronization group). But, I don’t see another way of implementing this behavior in the MLton thread model.

Note that another way of thinking about what is going on is to consider each blockFn as prepending a different set of actions to the thread t. It might be possible to give a MLton.Thread.unsafePrepend.

fun unsafePrepend (T r: 'a t, f: 'b -> 'a): 'b t =
      val t =
         case !r of
            Dead => raise Fail "prepend to a Dead thread"
          | New g => New (g o f)
          | Paused (g, t) => Paused (fn h => g (f o h), t)
   in (* r := Dead; *)
      T (ref t)

I have commented out the r := Dead, which would allow multiple prepends to the same thread (i.e., not destroying the original thread in the process). Of course, only one of the threads could be run: if the original thread were in the Paused state, then multiple threads would share the underlying runtime/primitive thread. Now, this matches the "one-shot" nature of CML continuations/threads, but I’m not comfortable with extending MLton.Thread with such an unsafe operation.

Other than this complication with blocking multiple base events, the port was quite routine. (As a very pleasant surprise, the CML implementation in SML/NJ doesn’t use any SML/NJ-isms.) There is a slight difference in the way in which critical sections are handled in SML/NJ and MLton; since MLton.Thread.switch always leaves a critical section, it is sometimes necessary to add additional atomicBegin-s/atomicEnd-s to ensure that we remain in a critical section after a thread switch.

While looking at virtually every file in the core CML implementation, I took the liberty of simplifying things where it seemed possible; in terms of style, the implementation is about half-way between Reppy’s original and MLton’s.

Some changes of note:

  • util/ contains all pertinent data-structures: (functional and imperative) queues, (functional) priority queues. Hence, it should be easier to switch in more efficient or real-time implementations.

  • core-cml/scheduler.sml: in both implementations, this is where most of the interesting action takes place. I’ve made the connection between MLton.Thread.t-s and ThreadId.thread_id-s more abstract than it is in the SML/NJ implementation, and encapsulated all of the MLton.Thread operations in this module.

  • eliminated all of the "by hand" inlining

Future Extensions

The CML documentation says the following:

CML.joinEvt: thread_id -> unit event
  • joinEvt tid

    creates an event value for synchronizing on the termination of the thread with the ID tid. There are three ways that a thread may terminate: the function that was passed to spawn (or spawnc) may return; it may call the exit function, or it may have an uncaught exception. Note that joinEvt does not distinguish between these cases; it also does not become enabled if the named thread deadlocks (even if it is garbage collected).

I believe that the MLton.Finalizable might be able to relax that last restriction. Upon the creation of a 'a Scheduler.thread, we could attach a finalizer to the underlying 'a MLton.Thread.t that enables the joinEvt (in the associated ThreadID.thread_id) when the 'a MLton.Thread.t becomes unreachable.

I don’t know why CML doesn’t have

CML.kill: thread_id -> unit

which has a fairly simple implementation — setting a kill flag in the thread_id and adjusting the scheduler to discard any killed threads that it takes off the ready queue. The fairness of the scheduler ensures that a killed thread will eventually be discarded. The semantics are little murky for blocked threads that are killed, though. For example, consider a thread blocked on SyncVar.mTake mv and a thread blocked on SyncVar.mGet mv. If the first thread is killed while blocked, and a third thread does SyncVar.mPut (mv, x), then we might expect that we’ll enable the second thread, and never the first. But, when only the ready queue is able to discard killed threads, then the SyncVar.mPut could enable the first thread (putting it on the ready queue, from which it will be discarded) and leave the second thread blocked. We could solve this by adjusting the TransID.trans_id types and the "cleaner" functions to look for both canceled transactions and transactions on killed threads.

John Reppy says that MarlowEtAl01 and FlattFindler04 explain why CML.kill would be a bad idea.

Between CML.timeOutEvt and CML.kill, one could give an efficient solution to the recent post about terminating a function that doesn’t complete in a given time.

  fun timeOut (f: unit -> 'a, t: Time.time): 'a option =
       val iv = SyncVar.iVar ()
       val tid = CML.spawn (fn () => SyncVar.iPut (iv, f ()))
       [CML.wrap (CML.timeOutEvt t, fn () => (CML.kill tid; NONE)),
        CML.wrap (SyncVar.iGetEvt iv, fn x => SOME x)]

Space Safety

There are some CML related posts on the MLton mailing list:

that discuss concerns that SML/NJ’s implementation is not space efficient, because multi-shot continuations can be held indefinitely on event queues. MLton is better off because of the one-shot nature — when an event enables a thread, all other copies of the thread waiting in other event queues get turned into dead threads (of zero size).