[MLton] CML
Matthew Fluet
fluet@cs.cornell.edu
Mon, 3 May 2004 20:37:28 -0400 (EDT)
> One thing I never understood about CML: why is it that a single object
> represents a bi-directional channel instead of having a separate read and
> write channel. The advantage of the latter is that it makes it much easier
> (and obvious) for the GC to eliminate threads that can have no effect. As an
> example, if the write end of a channel is lost (GC'd) then any thread
> blocking on a read from the channel is garbage. Because of the synchronous
> aspect of channels, the same is true of the reverse situation.
>
> There are clearly times when you need a bi-directional connection, but that
> can trivially be done by creating two channels and packaging the write end of
> one and the read end of the other one.
>
> I assume that unless you are lucky, with the CML method, I will get threads
> blocked for ever and never collected quite frequently, right?
Yes, I believe it is true that you can end up with multiple threads
blocked on a read of a channel and never collected, but it is not a
trivial situation. When a thread blocks on a channel (read or write), it
is added to the appropriate queue in the channel. If the channel gets
GCed, then all the blocked threads will be GCed as well. (That is,
everything will be GCed in a deadlock situation). So, the situation you
describe requires the channel to kept live by some active thread, but
never writes to the channel or blocks reading the channel.
Even if you separate the read and write ports of a channel, you need some
shared structure to communicate the values from the writes to the reads.
The only way to get the behavior you describe would seem to require weak
pointers and/or finalizers.
Right now, channels are represented as:
datatype 'a chan =
CHAN of {prio : int ref,
inQ : (trans_id * 'a S.thread) Q.t,
outQ : (trans_id * 'a S.thread S.thread) Q.t}
so, if the channel remains live, then so does everything on the inQ and
outQ. I could imagine:
type 'a inQ = (trans_id * 'a S.thread) Q.t
type 'a outQ = (trans_id * 'a S.thread) Q.t
datatype 'a in_chan = INCHAN of {prio : int ref,
inQ : 'a inQ,
outQ : 'a outQ Weak.t}
datatype 'a out_chan = OUTCHAN of {prio : int ref,
inQ : 'a inQ Weak.t,
outQ : 'a outQ}
And creating a channel would do something like:
fun channel () =
let
val prio = ref 1
val inQ = Q.new ()
val outQ = Q.new ()
in
(INCHAN {prio = prio, inQ = inQ, outQ = Weak.new outQ},
OUTCHAN {prio = prio, inQ = Weak.new inQ, outQ = outQ})
end
Now, when sending on an out_chan, if the inQ is gone (implying that the
in_chan was GCed), then we know that no reachable thread will ever read
from the channel again; so, the sending thead can reset the outQ (making
all the enqueued sending threads that are waiting for a reader
unreachable) and block without enqueing itself.
However, this solution will only kill off the blocked writers if some
thread actually attempts to do a send after all the readers have
disappeared. The out_chan might be kept alive, but never sent to, so the
writers will remain alive.
So, it seems like we really need finalizers.
datatype 'a chan =
CHAN of {prio: int ref, inQ: 'a inQ, outQ: 'a outQ}
type 'a in_chan = 'a chan Finalizable.t option ref
type 'a out_chan = 'a chan Finalizable.t option ref
And creating a channel would do something like:
fun channel () =
let
val prio = ref 1
val inQ = Q.new ()
val outQ = Q.new ()
val chan = {prio = prio, inQ = inQ, outQ = outQ}
val incf = Finalizable.new chan
val inc = ref (SOME incf)
val incw = Weak.new inc
val outc' = Finalizable.new chan
val outc = ref (SOME outcf)
val outcw = Weak.new outc
val () = Finalizable.addFinalizer
(incf, fn () =>
case Weak.get outcw of
NONE => ()
| SOME outc => outc := NONE)
val () = Finalizable.addFinalizer
(outf, fn () =>
case Weak.get incw of
NONE => ()
| SOME inc => inc := NONE)
in
(inc, outc)
end
Now, when an in_chan is GCed, it will trigger the outQ' finalizer, which
will bang NONE into the out_chan, killing all the blocked writers.
In any case, though, weak pointers and finalizers are a lot more
complicated to implementthan the simple (albeit, potentially wasteful)
bidirectional channels. I can understand why CML defaults to these.
> One other question: with the current implementation, just how fast is
> something like ping-pong (I.e., two threads just sending unit messages to
> each other as fast as they can until some count is reached)?
Here's the core of a simple program:
fun pong ch =
let
fun loop () =
let
val () = recv ch
in
loop ()
end
val _ = spawn (fn () => loop ())
in
()
end
fun ping ch n =
let
fun loop i =
if i > n then RunCML.shutdown OS.Process.success
else let
val () = send (ch, ())
in
loop (i + 1)
end
val _ = spawn (fn () => loop 0)
in
()
end
Running it, taking n from the command line, I get:
[fluet@cfs39 tests 56]% ./ping-pong 100000
Time start: 1083629751.794
Time end: 1083629751.878
Time diff: 83ms