[MLton] CML

Matthew Fluet fluet@cs.cornell.edu
Mon, 3 May 2004 20:37:28 -0400 (EDT)

> One  thing  I  never  understood  about  CML:  why is it that a single object
> represents a bi-directional channel instead of having  a  separate  read  and
> write  channel.   The advantage of the latter is that it makes it much easier
> (and obvious) for the GC to eliminate threads that can have no effect.  As an
> example,  if  the  write  end  of  a  channel  is lost (GC'd) then any thread
> blocking on a read from the channel is garbage.  Because of  the  synchronous
> aspect of channels, the same is true of the reverse situation.
> There  are  clearly times when you need a bi-directional connection, but that
> can trivially be done by creating two channels and packaging the write end of
> one and the read end of the other one.
> I  assume  that unless you are lucky, with the CML method, I will get threads
> blocked for ever and never collected quite frequently, right?

Yes, I believe it is true that you can end up with multiple threads
blocked on a read of a channel and never collected, but it is not a
trivial situation. When a thread blocks on a channel (read or write), it
is added to the appropriate queue in the channel.  If the channel gets
GCed, then all the blocked threads will be GCed as well.  (That is,
everything will be GCed in a deadlock situation).  So, the situation you
describe requires the channel to kept live by some active thread, but
never writes to the channel or blocks reading the channel.

Even if you separate the read and write ports of a channel, you need some
shared structure to communicate the values from the writes to the reads.
The only way to get the behavior you describe would seem to require weak
pointers and/or finalizers.

Right now, channels are represented as:
      datatype 'a chan =
         CHAN of {prio : int ref,
                  inQ  : (trans_id * 'a S.thread) Q.t,
                  outQ : (trans_id * 'a S.thread S.thread) Q.t}
so, if the channel remains live, then so does everything on the inQ and
outQ.  I could imagine:

   type 'a inQ = (trans_id * 'a S.thread) Q.t
   type 'a outQ = (trans_id * 'a S.thread) Q.t
   datatype 'a in_chan = INCHAN of {prio : int ref,
                                    inQ  : 'a inQ,
                                    outQ : 'a outQ Weak.t}
   datatype 'a out_chan = OUTCHAN of {prio : int ref,
                                      inQ  : 'a inQ Weak.t,
                                      outQ : 'a outQ}
And creating a channel would do something like:
   fun channel () =
       val prio = ref 1
       val inQ = Q.new ()
       val outQ = Q.new ()
       (INCHAN {prio = prio, inQ = inQ, outQ = Weak.new outQ},
        OUTCHAN {prio = prio, inQ = Weak.new inQ, outQ = outQ})

Now, when sending on an out_chan, if the inQ is gone (implying that the
in_chan was GCed), then we know that no reachable thread will ever read
from the channel again; so, the sending thead can reset the outQ (making
all the enqueued sending threads that are waiting for a reader
unreachable) and block without enqueing itself.

However, this solution will only kill off the blocked writers if some
thread actually attempts to do a send after all the readers have
disappeared.  The out_chan might be kept alive, but never sent to, so the
writers will remain alive.

So, it seems like we really need finalizers.

  datatype 'a chan =
     CHAN of {prio: int ref, inQ: 'a inQ, outQ: 'a outQ}
  type 'a in_chan = 'a chan Finalizable.t option ref
  type 'a out_chan = 'a chan Finalizable.t option ref

And creating a channel would do something like:
   fun channel () =
       val prio = ref 1
       val inQ = Q.new ()
       val outQ = Q.new ()
       val chan = {prio = prio, inQ = inQ, outQ = outQ}

       val incf = Finalizable.new chan
       val inc = ref (SOME incf)
       val incw = Weak.new inc
       val outc' = Finalizable.new chan
       val outc = ref (SOME outcf)
       val outcw = Weak.new outc

       val () = Finalizable.addFinalizer
                (incf, fn () =>
                 case Weak.get outcw of
                    NONE => ()
                  | SOME outc => outc := NONE)
       val () = Finalizable.addFinalizer
                (outf, fn () =>
                 case Weak.get incw of
                    NONE => ()
                  | SOME inc => inc := NONE)
       (inc, outc)

Now, when an in_chan is GCed, it will trigger the outQ' finalizer, which
will bang NONE into the out_chan, killing all the blocked writers.

In any case, though, weak pointers and finalizers are a lot more
complicated to implementthan the simple (albeit, potentially wasteful)
bidirectional channels.  I can understand why CML defaults to these.

> One other question:  with  the  current  implementation,  just  how  fast  is
> something  like  ping-pong  (I.e.,  two threads just sending unit messages to
> each other as fast as they can until some count is reached)?

Here's the core of a simple program:

   fun pong ch =
	 fun loop () =
	       val () = recv ch
	       loop ()
	 val _ = spawn (fn () => loop ())

   fun ping ch n =
	 fun loop i =
	    if i > n then RunCML.shutdown OS.Process.success
	       else let
		       val () = send (ch, ())
		       loop (i + 1)
	 val _ = spawn (fn () => loop 0)

Running it, taking n from the command line, I get:

[fluet@cfs39 tests 56]% ./ping-pong 100000
Time start: 1083629751.794
Time end:   1083629751.878
Time diff:  83ms