[MLton] cvs commit: MAIL: Initial port of CML from SML/NJ to MLton.
Matthew Fluet
fluet@mlton.org
Sat, 1 May 2004 13:16:52 -0700
fluet 04/05/01 13:16:51
Added: lib/cml README TODO cml.cm
lib/cml/cml-lib cml-lib.cm multicast.sig multicast.sml
result.sig result.sml simple-rpc.sig simple-rpc.sml
trace-cml.cm trace-cml.sig trace-cml.sml
lib/cml/core-cml channel.sig channel.sml cml.sig cml.sml
core-cml.cm cvar.sig cvar.sml event.sig event.sml
mailbox.sig mailbox.sml rep-types.sml run-cml.sig
run-cml.sml running.sml scheduler-hooks.sig
scheduler-hooks.sml scheduler.sig scheduler.sml
sync-var.sig sync-var.sml thread-id.sig
thread-id.sml thread.sig thread.sml timeout.sig
timeout.sml trans-id.sig trans-id.sml version.sig
version.sml
lib/cml/tests exit.cm exit.sml primes-multicast.cm
primes-multicast.sml primes.cm primes.sml
run-main.sml timeout.cm timeout.sml
lib/cml/util assert.sig assert.sml critical.sig critical.sml
debug.sig debug.sml fun-priority-queue.fun
fun-priority-queue.sig fun-queue.sig fun-queue.sml
imp-queue.sig imp-queue.sml local-assert.fun
local-debug.fun timeit.sig timeit.sml util.cm
Log:
MAIL: Initial port of CML from SML/NJ to MLton.
See cml/README for more details. (Which ought to be the first file in
this commit message.)
Revision Changes Path
1.1 mlton/lib/cml/README
Index: README
===================================================================
This is an initial port of CML from SML/NJ to MLton.
The implementation of CML in SML/NJ is built upon the first-class
continuations of the SMLofNJ.Cont module:
type 'a cont
val callcc : ('a cont -> 'a) -> 'a
val throw : 'a cont -> 'a -> 'b
val isolate : ('a -> unit) -> 'a cont
The implementation of CML in MLton is built upon the first-class
threads of the MLton.Thread module:
type 'a t
val new : ('a -> unit) -> 'a t
val switch : ('a t -> 'b t * 'b) -> 'a
The port is relatively straightforward, because CML throws to a
continuation at most once. Hence, an "abstract" implementation of
CML could be built upon first-class one-shot continuations, which map
equally well to SML/NJ's continuations and MLton's threads.
The "essence" of the port is to transform:
... callcc (fn k => ... throw k' v') ...
to
... switch (fn t => ... (t', v')) ...
which suffices for the vast majority of the CML implementation.
There was only one complicated transformation: blocking multiple base
events. In SML/NJ CML, the representation of base events is given by:
datatype 'a event_status
= ENABLED of {prio : int, doFn : unit -> 'a}
| BLOCKED of {
transId : trans_id ref,
cleanUp : unit -> unit,
next : unit -> unit
} -> 'a
type 'a base_evt = unit -> 'a event_status
When synchronizing on a set of base events, which are all blocked, we
must invoke each BLOCKED function with the same transId and cleanUp
(the transId is (checked and) set to CANCEL by the cleanUp function,
which is invoked by the first enabled event; this "fizzles" every
other event in the synchronization group that later becomes enabled).
However, each BLOCKED function is implemented by a callcc, so that
when the event is enabled, it throws back to the point of
synchronization. Hence, the next function (which doesn't return) is
invoked by the BLOCKED function to escape the callcc and continue in
the thread performing the synchronization. In SML/NJ this is
implemented as follows:
fun ext ([], blockFns) = callcc (fn k => let
val throw = throw k
val (transId, setFlg) = mkFlg()
fun log [] = S.atomicDispatch ()
| log (blockFn :: r) =
throw (blockFn {
transId = transId,
cleanUp = setFlg,
next = fn () => log r
})
in
log blockFns; error "[log]"
end)
(Note that S.atomicDispatch invokes the continuation of the next
continuation on the ready queue.) This doesn't map well to the MLton
thread model. Although it follows the
... callcc (fn k => ... throw k v) ...
model, the fact that blockFn will also attempt to do
... callcc (fn k' => ... next ()) ...
the naive transformation will result in nested switch-es.
We need to think a little more about what this code is trying to do.
Essentially, each blockFn wants to capture this continuation, hold on
to it until the event is enabled, and continue with next; when the
event is enabled, before invoking the continuation and returning to
the synchronization point, the cleanUp and other event specific
operations are performed.
To accomplish the same effect in the MLton thread implemenation, we
have the following:
datatype 'a status =
ENABLED of {prio : int, doitFn : unit -> 'a}
| BLOCKED of {transId : trans_id,
cleanUp : unit -> unit,
next : unit -> rdy_thread} -> 'a
type 'a base = unit -> 'a status
and
fun ext ([], blockFns) : 'a =
S.atomicSwitch
(fn (t : 'a S.thread) =>
let
val (transId, cleanUp) = TransID.mkFlg ()
fun log blockFns : S.rdy_thread =
case blockFns of
[] => S.next ()
| blockFn::blockFns =>
S.new
(fn _ => fn () =>
let
val () = S.atomicBegin ()
val x = blockFn {transId = transId,
cleanUp = cleanUp,
next = fn () => log blockFns}
in S.switch(fn _ => (t, x))
end)
in
(log blockFns, ())
end)
To avoid the nested switch-es, I run the blockFn in it's own thread,
whose only purpose is to return to the synchronization point. This
corresponds to the throw (blockFn {...}) in the SML/NJ
implementation. I'm worried that this implementation might be a
little expensive, starting a new thread for each blocked event (when
there are only multiple blocked events in a synchronization group).
But, I don't see another way of implementing this behavior in the
MLton thread model.
Note that another way of thinking about what is going on is to
consider each blockFn as prepending a different set of actions to the
thread t. It might be possible to give a MLton.Thread.unsafePrepend:
fun unsafePrepend (T r: 'a t, f: 'b -> 'a): 'b t =
let
val t =
case !r of
Dead => raise Fail "prepend to a Dead thread"
| New g => New (g o f)
| Paused (g, t) => Paused (fn h => g (f o h), t)
in (* r := Dead; *)
T (ref t)
end
I have commented out the r := Dead, which would allow multiple
prepends to the same thread (i.e., not destroying the original thread
in the process). Of course, only one of the threads could be run: if
the original thread were in the Paused state, then multiple threads
would share the underlying runtime/primitive thread. Now, this
matches the "one-shot" nature of CML continuations/threads, but I'm
not comfortable with extending the MLton.Thread module with such an
unsafe operation.
Other than this complication with blocking multiple base events, the
port was quite routine. (As a very pleasant surprise, the CML
implementation in SML/NJ doesn't use any SML/NJ-isms.) There is a
slight difference in the way in which critical sections are handled in
SML/NJ and MLton; since MLton.Thread.switch _always_ leaves a critical
section, it is sometimes necessary to add additional atomicBegin/End-s
to ensure that we remain in a critical section after a thread switch.
While looking at virtually every file in the core-CML implementation,
I took the liberty of simplifying things where it seemed possible; in
terms of style, the implementation is about half-way between Reppy's
original and MLton's.
Some changes of note:
* util/ contains all pertinant data-structures: (functional and
imperative) queues, (functional) priority queues. Hence, it
should be easier to switch in more efficient or real-time
implementations.
* core-cml/scheduler.sml: in both implementations, this is where most
of the interesting action takes place. I've made the connection
between MLton.Thread.t-s and ThreadId.thread_id-s more abstract
than it is in the SML/NJ implemenation, and encapsulated all of
the MLton.Thread operations in this module.
* eliminated all of the "by hand" inlining
All of the core CML functionality is present:
signature CML; structure CML : CML
signature CVAR; structure CVar : CVAR
signature MAILBOX; structure Mailbox : MAILBOX
signature MULTICAST; structure Multicast : MULTICAST
signature SIMPLE_RPC; structure SimpleRPC : SIMPLE_RPC
A minimal RunCML : RUN_CML structure is present:
signature RUN_CML =
sig
val isRunning : unit -> bool
val doit : (unit -> unit) * Time.time option -> OS.Process.status
val shutdown : OS.Process.status -> 'a
end
This does not include all of the cleanup and logging operations of the
SML/NJ RunCML structure. However, it does include the CML.timeOutEvt
and CML.atTimeEvt functions, and a preemptive scheduler that knows to
sleep when there are no ready threads and some threads blocked on time
events.
None of the Standard ML Basis Library has been made either
MLton.Thread or CML safe. Much of the IO and OS structures have event
based equivalents, which should be implemented.
For now, I've extended the CML signature with
val print : string -> unit
which executes in a critical region. For now, this is the "right
thing" for interfacing with the Basis Library and the underlying
system calls. (Using Posix.Error.SysCall, it is sufficient to execute
Basis Library code in a critical section; things that get interrupted
will automatically restart with signals blocked.) It also has the
nice property that one can write a program with open CML whose only IO
(while executing RunCML.doit) is print, and it will compile under both
MLton and SML/NJ.
Some thoughts on future extensions. The CML documentation says the
following:
CML.joinEvt: thread_id -> unit event
joinEvt tid
creates an event value for synchronizing on the termination of
the thread with the ID tid.
There are three ways that a thread may terminate: the
function that was passed to spawn (or spawnc) may return; it
may call the exit function, or it may have an uncaught
exception.
Note that joinEvt does not distinguish between these cases;
it also does not become enabled if the named thread
deadlocks (even if it is garbage collected).
I believe that the MLton.Finalizable might be able to relax that last
restriction. Upon the creation of a 'a Scheduler.thread, we could
attatch a finalizer to the underlying 'a MLton.Thread.t that enables
the joinEvt (in the associated ThreadID.thread_id) when the 'a
MLton.Thread.t becomes unreachable.
I don't know why CML doesn't have
CML.kill: thread_id -> unit
which has a fairly simple implementation -- setting a kill flag in the
thread_id and adjusting the scheduler to discard any killed threads
that it takes off the ready queue. The fairness of the scheduler
ensures that a killed thread will eventually be discarded. The
semantics are little murky for blocked threads that are killed,
though. For example, consider a thread blocked on SyncVar.mTake mv
and a thread blocked on SyncVar.mGet mv. If the first thread is
killed while blocked, and a third thread does SyncVar.mPut (mv, x),
then we might expect that we'll enable the second thread, and never
the first. But, when only the ready queue is able to discard killed
threads, then the SyncVar.mPut could enable the first thread (putting
it on the ready queue, from which it will be discarded) and leave the
second thread blocked. We could solve this by adjusting the
TransID.trans_id types and the "cleaner" functions to look for both
cancelled transactions and transactions on killed threads.
Between CML.timeOutEvt and CML.kill, one could give an efficient
solution to the recent comp.lang.ml post about terminating a function
that doesn't complete in a given time:
fun timeOut (f : unit -> 'a, t : Time.time) : 'a option =
let
val iv = SyncVar.iVar ()
val tid = CML.spawn (fn () => SyncVar.iPut (iv, f ()))
in
CML.select
[CML.wrap (CML.timeOutEvt t, fn () => (CML.kill tid; NONE)),
CML.wrap (SyncVar.iGetEvt iv, fn x => SOME x)]
end
1.1 mlton/lib/cml/TODO
Index: TODO
===================================================================
* Implement Okasaki real-time data-structures.
+ mailbox.sml -- needs functional queue
+ scheduler.sml -- needs imperative queue
+ timeout.sml -- needs functional priority queue
1.1 mlton/lib/cml/cml.cm
Index: cml.cm
===================================================================
Group is
core-cml/core-cml.cm
cml-lib/cml-lib.cm
1.1 mlton/lib/cml/cml-lib/cml-lib.cm
Index: cml-lib.cm
===================================================================
Group is
multicast.sig
multicast.sml
simple-rpc.sig
simple-rpc.sml
1.1 mlton/lib/cml/cml-lib/multicast.sig
Index: multicast.sig
===================================================================
(* multicast.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* multicast-sig.sml
*
* COPYRIGHT (c) 1990 by John H. Reppy. See COPYRIGHT file for details.
*
* Asynchronous multicast (one-to-many) channels.
*)
signature MULTICAST =
sig
type 'a mchan
type 'a port
type 'a event = 'a CML.event
(* create a new multicast channel *)
val mChannel : unit -> 'a mchan
(* create a new output port on a channel *)
val port : 'a mchan -> 'a port
(* create a new output port on a channel that has the same state as the
* given port. I.e., the stream of messages seen on the two ports will
* be the same.
* NOTE: if two (or more) independent threads are reading from the
* same port, then the copy operation may not be accurate.
*)
val copy : 'a port -> 'a port
(* receive a message from a port *)
val recv : 'a port -> 'a
val recvEvt : 'a port -> 'a event
(* send a message to all of the ports of a channel *)
val multicast : ('a mchan * 'a) -> unit
end
1.1 mlton/lib/cml/cml-lib/multicast.sml
Index: multicast.sml
===================================================================
(* multicast.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* multicast.sml
*
* COPYRIGHT (c) 1994 AT&T Bell Laboratories.
*
* Asynchronous multicast (one-to-many) channels. This implementation
* is based on a condition variable implementation of multicast channels.
* See Chapter 5 of "Concurrent Programming in ML" for details.
*)
structure Multicast : MULTICAST =
struct
structure SV = SyncVar
type 'a event = 'a CML.event
datatype 'a request =
Message of 'a
| NewPort
datatype 'a mc_state = MCState of ('a * 'a mc_state SV.ivar)
datatype 'a port =
Port of (('a * 'a mc_state SV.ivar) CML.chan * 'a mc_state SV.ivar SV.mvar)
datatype 'a mchan =
MChan of ('a request CML.chan * 'a port CML.chan)
fun mkPort cv =
let
val outCh = CML.channel()
val stateVar = SV.mVarInit cv
fun tee cv =
let
val (MCState(v, nextCV)) = SV.iGet cv
in
CML.send (outCh, (v, nextCV))
; tee nextCV
end
val _ = CML.spawn (fn () => tee cv)
in
Port(outCh, stateVar)
end
fun mChannel () =
let
val reqCh = CML.channel()
and replyCh = CML.channel()
fun server cv =
case (CML.recv reqCh) of
NewPort =>
(CML.send (replyCh, mkPort cv)
; server cv)
| (Message m) =>
let
val nextCV = SV.iVar()
in
SV.iPut (cv, MCState(m, nextCV))
; server nextCV
end
val _ = CML.spawn (fn () => server (SV.iVar()))
in
MChan(reqCh, replyCh)
end
fun multicast (MChan(ch, _), m) = CML.send (ch, Message m)
fun port (MChan(reqCh, replyCh)) =
(CML.send (reqCh, NewPort)
; CML.recv replyCh)
fun copy (Port(_, stateV)) = mkPort(SV.mGet stateV)
fun recvMsg stateV (v, nextCV) =
let val _ = SV.mSwap (stateV, nextCV)
in v
end
fun recv (Port(ch, stateV)) = recvMsg stateV (CML.recv ch)
fun recvEvt (Port(ch, stateV)) = CML.wrap(CML.recvEvt ch, recvMsg stateV)
end
1.1 mlton/lib/cml/cml-lib/result.sig
Index: result.sig
===================================================================
(* result.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* result.sml
*
* COPYRIGHT (c) 1996 AT&T Research.
*
*)
signature RESULT =
sig
type 'a result
val result : unit -> 'a result
val put : ('a result * 'a) -> unit
val putExn : ('a result * exn) -> unit
val get : 'a result -> 'a
val getEvt : 'a result -> 'a CML.event
end
1.1 mlton/lib/cml/cml-lib/result.sml
Index: result.sml
===================================================================
(* result.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* result.sml
*
* COPYRIGHT (c) 1996 AT&T Research.
*
*)
structure Result :> RESULT =
struct
structure SV = SyncVar
datatype 'a result_val = EXN of exn | RES of 'a
type 'a result = 'a result_val SV.ivar
fun result () = SV.iVar()
fun put (iv, v) = SV.iPut(iv, RES v)
fun putExn (iv, ex) = SV.iPut(iv, EXN ex)
fun wrap (RES v) = v
| wrap (EXN ex) = raise ex
fun get iv = wrap(SV.iGet iv)
fun getEvt iv = CML.wrap(SV.iGetEvt iv, wrap)
end
1.1 mlton/lib/cml/cml-lib/simple-rpc.sig
Index: simple-rpc.sig
===================================================================
(* simple-rpc.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* simple-rpc-sig.sml
*
* COPYRIGHT (c) 1997 AT&T Labs Research.
*
* Generators for simple RPC protocols.
*)
signature SIMPLE_RPC =
sig
type 'a event = 'a CML.event
val mkRPC : ('a -> 'b) ->
{call : 'a -> 'b,
entryEvt : unit event}
val mkRPC_In : (('a * 'c) -> 'b) ->
{call : 'a -> 'b,
entryEvt : 'c -> unit event}
val mkRPC_Out : ('a -> ('b * 'c)) ->
{call : 'a -> 'b,
entryEvt : 'c event}
val mkRPC_InOut : (('a * 'c) -> ('b * 'd)) ->
{call : 'a -> 'b,
entryEvt : 'c -> 'd event}
end
1.1 mlton/lib/cml/cml-lib/simple-rpc.sml
Index: simple-rpc.sml
===================================================================
(* simple-rpc.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* simple-rpc.sml
*
* COPYRIGHT (c) 1997 AT&T Labs Research.
*
* Generators for simple RPC protocols.
*)
structure SimpleRPC : SIMPLE_RPC =
struct
type 'a event = 'a CML.event
fun call reqMB arg =
let val replV = SyncVar.iVar()
in
Mailbox.send(reqMB, (arg, replV))
; SyncVar.iGet replV
end
fun mkRPC f =
let
val reqMB = Mailbox.mailbox()
val entryEvt =
CML.wrap
(Mailbox.recvEvt reqMB, fn (arg, replV) =>
SyncVar.iPut(replV, f arg))
in
{call = call reqMB, entryEvt = entryEvt}
end
fun mkRPC_In f =
let
val reqMB = Mailbox.mailbox()
val reqEvt = Mailbox.recvEvt reqMB
fun entryEvt state =
CML.wrap
(reqEvt, fn (arg, replV) =>
SyncVar.iPut(replV, f(arg, state)))
in
{call = call reqMB, entryEvt = entryEvt}
end
fun mkRPC_Out f =
let
val reqMB = Mailbox.mailbox()
val reqEvt = Mailbox.recvEvt reqMB
val entryEvt =
CML.wrap
(reqEvt, fn (arg, replV) =>
let val (res, state') = f arg
in SyncVar.iPut(replV, res); state'
end)
in
{call = call reqMB, entryEvt = entryEvt}
end
fun mkRPC_InOut f =
let
val reqMB = Mailbox.mailbox()
val reqEvt = Mailbox.recvEvt reqMB
fun entryEvt state =
CML.wrap
(reqEvt, fn (arg, replV) =>
let val (res, state') = f(arg, state)
in SyncVar.iPut(replV, res); state'
end)
in
{call = call reqMB, entryEvt = entryEvt}
end
end
1.1 mlton/lib/cml/cml-lib/trace-cml.cm
Index: trace-cml.cm
===================================================================
(* trace-cml.cm
*
* COPYRIGHT (c) 1996 AT&T Research.
*
* The TraceCML library module needs access to CML internals, so we package
* it up into a sub-group.
*)
Group (cm-descr/trace-cml.cm)
(* Notice that the "owner" specification above gets ignored by the old
* CM. Under the old CM clients use _this_ file to refer to the
* trace library, under the new CM clients use the description file
* in cm-descr/trace-cml.cm.
* This is done to avoid a file-naming conflict for smlnj-lib.cm.
* The conflict is caused by the old CM's path-search mechanism and
* does not occur under the new CM. *)
signature TRACE_CML
structure TraceCML
is
#if defined (NEW_CM)
$/smlnj-lib.cm
$cml/basis.cm
$cml/core-cml.cm
$cml/cml.cm
#else
smlnj-lib.cm
../src/basis.cm
../src/core-cml.cm
../src/cml.cm
#endif
trace-cml-sig.sml
trace-cml.sml
1.1 mlton/lib/cml/cml-lib/trace-cml.sig
Index: trace-cml.sig
===================================================================
(* trace-cml-sig.sml
*
* COPYRIGHT (c) 1992 AT&T Bell Laboratories
*
* This module provides rudimentary debugging support in the form of mechanisms
* to control debugging output, and to monitor thread termination. This
* version of this module is adapted from Cliff Krumvieda's utility for tracing
* CML programs. It provides three facilities: trace modules, for controlling
* debugging output; thread watching, for detecting thread termination; and
* a mechanism for reporting uncaught exceptions on a per thread basis.
*)
signature TRACE_CML =
sig
(** Trace modules **
*
* The basic idea is that one defines a heirarchy of ``trace
* modules,'' which provide valves for debugging output.
*)
type trace_module
(* where to direct trace output to *)
datatype trace_to
= TraceToOut
| TraceToErr
| TraceToNull
| TraceToFile of string
| TraceToStream of TextIO.outstream
val setTraceFile : trace_to -> unit
(* Direct the destination of trace output. Note: TraceToStream
* can only be specified as a destination if CML is running.
*)
val traceRoot : trace_module
(* the root module of the trace hierarchy *)
exception NoSuchModule
val traceModule : (trace_module * string) -> trace_module
val nameOf : trace_module -> string
(* return the name of the module *)
val moduleOf : string -> trace_module
(* return the module specified by the given string, or raise
* NoSuchModule if none exists.
*)
val traceOn : trace_module -> unit
(* turn tracing on for a module and its descendents *)
val traceOff : trace_module -> unit
(* turn tracing off for a module and its descendents *)
val traceOnly : trace_module -> unit
(* turn tracing on for a module (but not for its descendents) *)
val amTracing : trace_module -> bool
(* return true if this module is being traced *)
val status : trace_module -> (trace_module * bool) list
(* return a list of the registered modules dominated by the given
* module, and their status.
*)
val trace : (trace_module * (unit -> string list)) -> unit
(* conditionally generate tracing output *)
(** Thread watching **)
val watcher : trace_module
(* controls printing of thread watching messages; the module's name
* is "/ThreadWatcher/"
*)
val watch : (string * CML.thread_id) -> unit
(* watch the given thread for unexpected termination *)
val unwatch : CML.thread_id -> unit
(* stop watching the named thread *)
(** Uncaught exception handling **)
val setUncaughtFn : ((CML.thread_id * exn) -> unit) -> unit
(* this sets the default uncaught exception action. *)
val setHandleFn : ((CML.thread_id * exn) -> bool) -> unit
(* add an additional uncaught exception action. If the action returns
* true, then no further action is taken. This can be used to handle
* application specific exceptions.
*)
val resetUncaughtFn : unit -> unit
(* this resets the default uncaught exception action to the system default,
* and removes any layered actions.
*)
end; (* TRACE_CML *)
1.1 mlton/lib/cml/cml-lib/trace-cml.sml
Index: trace-cml.sml
===================================================================
(* trace-cml.sml
*
* COPYRIGHT (c) 1992 AT&T Bell Laboratories
*
* This module provides rudimentary debugging support in the form of mechanisms
* to control debugging output, and to monitor thread termination. This
* version of this module is adapted from Cliff Krumvieda's utility for tracing
* CML programs. It provides three facilities: trace modules, for controlling
* debugging output; thread watching, for detecting thread termination; and
* a mechanism for reporting uncaught exceptions on a per thread basis.
*)
structure TraceCML : TRACE_CML =
struct
structure SV = SyncVar
(* where to direct trace output to *)
datatype trace_to
= TraceToOut
| TraceToErr
| TraceToNull
| TraceToFile of string
| TraceToStream of TextIO.outstream
exception NoSuchModule
(** Trace Modules **)
datatype trace_module = TM of {
full_name : string,
label : string,
tracing : bool ref,
children : trace_module list ref
}
val traceRoot = TM{
full_name = "/",
label = "",
tracing = ref false,
children = ref []
}
fun forAll f = let
fun for (tm as TM{children, ...}) = (f tm; forChildren(!children))
and forChildren [] = ()
| forChildren (tm::r) = (for tm; forChildren r)
in
for
end
structure SS = Substring
fun findTraceModule name = let
fun eq ss (TM{label, ...}) = (SS.compare(SS.all label, ss) = EQUAL)
fun find ([], tm) = SOME tm
| find (arc::rest, tm as TM{label, children, ...}) = let
val eqArc = eq arc
fun findChild [] = NONE
| findChild (c::r) =
if (eqArc c) then find(rest, c) else findChild r
in
findChild (!children)
end
in
find (
SS.tokens (fn #"/" => true | _ => false) (SS.all name),
traceRoot)
end
fun traceModule' (TM parent, name) = let
fun checkChildren [] = let
val tm = TM{
full_name = (#full_name parent ^ name),
label = name,
tracing = ref(!(#tracing parent)),
children = ref []
}
in
(#children parent) := tm :: !(#children parent);
tm
end
| checkChildren((tm as TM{label, ...})::r) =
if (label = name) then tm else checkChildren r
in
checkChildren (! (#children parent))
end
(* return the name of the module *)
fun nameOf (TM{full_name, ...}) = full_name
(* return the module specified by the given string *)
fun moduleOf' name = (case findTraceModule name
of NONE => raise NoSuchModule
| (SOME tm) => tm
(* end case *))
(* turn tracing on for a module and its descendents *)
val traceOn' = forAll (fn (TM{tracing, ...}) => tracing := true)
(* turn tracing off for a module and its descendents *)
val traceOff' = forAll (fn (TM{tracing, ...}) => tracing := false)
(* turn tracing on for a module (but not for its descendents) *)
fun traceOnly' (TM{tracing, ...}) = tracing := true
(* return true if this module is being traced *)
fun amTracing (TM{tracing, ...}) = !tracing
(* return a list of the registered modules dominated by the given
* module, and their status.
*)
fun status' root = let
fun list (tm as TM{tracing, children, ...}, l) =
listChildren (!children, (tm, !tracing)::l)
and listChildren ([], l) = l
| listChildren (c::r, l) = listChildren(r, list(c, l))
in
rev (list (root, []))
end
(** Trace printing **)
val traceDst = ref TraceToOut
val traceCleanup = ref (fn () => ())
fun setTraceFile' t = traceDst := t
(** NOTE: there are bookkeeping bugs, when changing the trace destination
** from TraceToStream to something else (where the original destination
** was TraceToFile).
**)
fun tracePrint s = let
fun output strm = (TextIO.output(strm, s); TextIO.flushOut strm)
in
case !traceDst
of TraceToOut => output TextIO.stdOut
| TraceToErr => output TextIO.stdErr
| TraceToNull => ()
| (TraceToFile fname) => let
val dst = let
val strm = TextIO.openOut fname
in
traceCleanup := (fn () => TextIO.closeOut strm);
TraceToStream strm
end handle _ => (
Debug.sayDebug(concat[
"TraceCML: unable to open \"", fname,
"\", redirecting to stdout"
]);
TraceToOut)
in
setTraceFile' dst;
tracePrint s
end
| (TraceToStream strm) => output strm
(* end case *)
end
(** Trace server **)
val traceCh : (unit -> string list) CML.chan = CML.channel()
val traceUpdateCh : (unit -> unit) CML.chan = CML.channel()
fun traceServer () = let
val evt = [
CML.wrap(CML.recvEvt traceCh, fn f => tracePrint(concat(f()))),
CML.wrap(CML.recvEvt traceUpdateCh, fn f => f())
]
fun loop () = (CML.select evt; loop())
in
loop()
end (* traceServer *)
fun tracerStart () = (CML.spawn traceServer; ())
fun tracerStop () = ((!traceCleanup)(); traceCleanup := (fn () => ()))
val _ = (
RunCML.logChannel ("TraceCML:trace", traceCh);
RunCML.logChannel ("TraceCML:trace-update", traceUpdateCh);
RunCML.logServer ("TraceCML:trace-server", tracerStart, tracerStop))
local
fun carefully f = if RunCML.isRunning()
then CML.send(traceUpdateCh, f)
else f()
fun carefully' f = if RunCML.isRunning()
then let
val reply = SV.iVar()
in
CML.send (traceUpdateCh, fn () => (SV.iPut(reply, f())));
SV.iGet reply
end
else f()
in
fun traceModule arg = carefully' (fn () => traceModule' arg)
fun moduleOf name = carefully' (fn () => moduleOf' name)
fun traceOn tm = carefully (fn () => traceOn' tm)
fun traceOff tm = carefully (fn () => traceOff' tm)
fun traceOnly tm = carefully (fn () => traceOnly' tm)
fun setTraceFile f = carefully (fn () => setTraceFile' f)
fun status root = carefully' (fn () => status' root)
end (* local *)
fun trace (TM{tracing, ...}, prFn) =
if (RunCML.isRunning() andalso (!tracing))
then CML.send(traceCh, prFn)
else ()
(** Thread watching **)
(* controls printing of thread watching messages *)
val watcher = traceModule (traceRoot, "ThreadWatcher")
val _ = traceOn watcher
datatype watcher_msg
= WATCH of (CML.thread_id * unit CML.chan)
| UNWATCH of (CML.thread_id * unit SV.ivar)
val watcherMb : watcher_msg Mailbox.mbox = Mailbox.mailbox ()
(* stop watching the named thread *)
fun unwatch tid = let
val ackV = SV.iVar()
in
Mailbox.send(watcherMb, UNWATCH(tid, ackV));
SV.iGet ackV
end
(* watch the given thread for unexpected termination *)
fun watch (name, tid) = let
val unwatchCh = CML.channel()
fun handleTermination () = (
trace (watcher, fn () => [
"WARNING! Watched thread ", name, CML.tidToString tid,
" has died.\n"
]);
unwatch tid)
fun watcherThread () = (
Mailbox.send (watcherMb, WATCH(tid, unwatchCh));
CML.select [
CML.recvEvt unwatchCh,
CML.wrap (CML.joinEvt tid, handleTermination)
])
in
CML.spawn (watcherThread); ()
end
structure TidTbl = HashTableFn (
struct
type hash_key = CML.thread_id
val hashVal = CML.hashTid
val sameKey = CML.sameTid
end)
(* the watcher server *)
fun startWatcher () = let
val tbl = TidTbl.mkTable (32, Fail "startWatcher")
fun loop () = (case (Mailbox.recv watcherMb)
of (WATCH arg) => TidTbl.insert tbl arg
| (UNWATCH(tid, ack)) => (
(* notify the watcher that the thread is no longer being
* watched, and then acknowledge the unwatch command.
*)
CML.send(TidTbl.remove tbl tid, ())
handle _ => ();
(* acknowledge that the thread has been removed *)
SV.iPut(ack, ()))
(* end case *);
loop ())
in
CML.spawn loop; ()
end
val _ = (
RunCML.logMailbox ("TraceCML:watcherMb", watcherMb);
RunCML.logServer ("TraceCML:watcher-server", startWatcher, fn () => ()))
(** Uncaught exception handling **)
fun defaultHandlerFn (tid, ex) = let
val raisedAt = (case (SMLofNJ.exnHistory ex)
of [] => ["\n"]
| l => [" raised at ", List.last l, "\n"]
(* end case *))
in
Debug.sayDebug (concat ([
CML.tidToString tid, " uncaught exception ",
exnName ex, " [", exnMessage ex, "]"
] @ raisedAt))
end
val defaultHandler = ref defaultHandlerFn
val handlers = ref ([] : ((CML.thread_id * exn) -> bool) list)
(* this sets the default uncaught exception action. *)
fun setUncaughtFn' action = defaultHandler := action
(* add an additional uncaught exception action. If the action returns
* true, then no further action is taken. This can be used to handle
* handle application specific exceptions.
*)
fun setHandleFn' action = handlers := action :: !handlers
(* this resets the default uncaught exception action to the system default,
* and removes any layered actions.
*)
fun resetUncaughtFn' () = (defaultHandler := defaultHandlerFn; handlers := [])
val exnUpdateCh : (unit -> unit) CML.chan = CML.channel()
fun exnServerStartup () = let
val errCh = Mailbox.mailbox()
(* this function is installed as the default handler for threads;
* it sends the thread ID and uncaught exception to the ExnServer.
*)
fun threadHandler exn = Mailbox.send(errCh, (CML.getTid(), exn))
(* invoke the hsndler actions on the uncaught exception *)
fun handleExn arg = let
val hdlrList = !handlers and dfltHndlr = !defaultHandler
fun loop [] = dfltHndlr arg
| loop (hdlr::r) = if (hdlr arg) then () else loop r
in
CML.spawn (fn () => ((loop hdlrList) handle _ => (dfltHndlr arg)));
()
end
val event = [
CML.wrap (CML.recvEvt exnUpdateCh, fn f => f()),
CML.wrap (Mailbox.recvEvt errCh, handleExn)
]
fun server () = (CML.select event; server())
in
Thread.defaultExnHandler := threadHandler;
CML.spawn server; ()
end
val _ = (
RunCML.logChannel ("TraceCML:exnUpdateCh", exnUpdateCh);
RunCML.logServer ("TraceCML", exnServerStartup, fn () => ()))
local
fun carefully f = if RunCML.isRunning() then CML.send(exnUpdateCh, f) else f()
in
fun setUncaughtFn arg = carefully (fn () => setUncaughtFn' arg)
fun setHandleFn arg = carefully (fn () => setHandleFn' arg)
fun resetUncaughtFn arg = carefully (fn () => resetUncaughtFn' arg)
end (* local *)
end; (* TraceCML *)
1.1 mlton/lib/cml/core-cml/channel.sig
Index: channel.sig
===================================================================
(* channel.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* channel-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The representation of synchronous channels.
*)
signature CHANNEL =
sig
type 'a chan
val channel : unit -> 'a chan
val sameChannel : ('a chan * 'a chan) -> bool
val send : ('a chan * 'a) -> unit
val recv : 'a chan -> 'a
val sendEvt : ('a chan * 'a) -> unit Event.event
val recvEvt : 'a chan -> 'a Event.event
val sendPoll : ('a chan * 'a) -> bool
val recvPoll : 'a chan -> 'a option
end
signature CHANNEL_EXTRA =
sig
include CHANNEL
val resetChan : 'a chan -> unit
end
1.1 mlton/lib/cml/core-cml/channel.sml
Index: channel.sml
===================================================================
(* channel.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* channel.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The representation of synchronous channels.
*
* To ensure that we always leave the atomic region exactly once, we
* require that the blocking operation be responsible for leaving the
* atomic region (in the event case, it must also execute the clean-up
* action). The doitFn always transfers control to the blocked thread
* without leaving the atomic region. Note that the send (and sendEvt)
* blockFns run using the receiver's thread ID.
*)
structure Channel : CHANNEL_EXTRA =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure Q = ImpQueue
structure S = Scheduler
structure E = Event
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
fun debug' msg = debug (fn () => msg)
datatype trans_id = datatype TransID.trans_id
datatype trans_id_state = datatype TransID.trans_id_state
datatype 'a chan =
CHAN of {prio : int ref,
inQ : (trans_id * 'a S.thread) Q.t,
outQ : (trans_id * 'a S.thread S.thread) Q.t}
fun resetChan (CHAN {prio, inQ, outQ}) =
(prio := 1
; Q.reset inQ
; Q.reset outQ)
fun channel () = CHAN {prio = ref 1, inQ = Q.new (), outQ = Q.new ()}
(* sameChannel : ('a chan * 'a chan) -> bool *)
fun sameChannel (CHAN {prio = prio1, ...}, CHAN {prio = prio2, ...}) =
prio1 = prio2
(* bump a priority value by one, returning the old value *)
fun bumpPriority (p as ref n) = (p := n+1; n)
(* functions to clean channel input and output queues *)
local
fun cleaner (TXID txst, _) =
case !txst of CANCEL => true | _ => false
in
fun cleanAndChk (prio, q) : int =
(Q.clean (q, cleaner)
; if Q.empty q
then 0
else bumpPriority prio)
fun cleanAndDeque q =
Q.cleanAndDeque (q, cleaner)
fun enqueAndClean (q, item) =
Q.enqueAndClean (q, item, cleaner)
end
fun send (CHAN {prio, inQ, outQ}, msg) =
let
val () = Assert.assertNonAtomic' "Channel.send"
val () = debug' "send(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.send(1)"
val () = S.atomicBegin ()
val () = debug' "send(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.send(2)", SOME 1)
val () =
case cleanAndDeque inQ of
SOME (rtxid, rt) =>
let
val () = debug' "send(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.send(3.1.1)", SOME 1)
val () =
S.readyAndSwitch
(fn () =>
(prio := 1
; TransID.force rtxid
; (rt, msg)))
val () = debug' "send(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.send(3.1.2)"
in
()
end
| NONE =>
let
val () = debug' "send(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.send(3.2.1)", SOME 1)
val rt =
S.atomicSwitchToNext
(fn st => Q.enque (outQ, (TransID.mkTxId (), st)))
val () = debug' "send(3.2.2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.send(3.2.2)", SOME 1)
val () = S.atomicReadyAndSwitch (fn () => (rt, msg))
val () = debug' "send(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.send(3.2.2)"
in
()
end
val () = debug' "send(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.send(4)"
in
()
end
fun sendEvt (CHAN {prio, inQ, outQ}, msg) =
let
fun doitFn () =
let
val () = Assert.assertAtomic' ("Channel.sendEvt.doitFn", NONE)
val (rtxid, rt) = valOf (Q.deque inQ)
val () = debug' "sendEvt(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendEvt(3.1.1)", SOME 1)
val () =
S.readyAndSwitch
(fn () =>
(prio := 1
; TransID.force rtxid
; (rt, msg)))
val () = debug' "sendEvt(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.sendEvt(3.1.2)"
in
()
end
fun blockFn {transId, cleanUp, next} =
let
val () = Assert.assertAtomic' ("Channel.sendEvt.blockFn", NONE)
val () = debug' "sendEvt(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.1)", SOME 1)
val rt =
S.atomicSwitch
(fn st =>
(enqueAndClean (outQ, (transId, st))
; (next (), ())))
val () = debug' "sendEvt(3.2.2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.2)", SOME 1)
val () = cleanUp ()
val () = S.atomicReadyAndSwitch (fn () => (rt, msg))
val () = debug' "sendEvt(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.sendEvt(3.2.2)"
in
()
end
fun pollFn () =
let
val () = Assert.assertAtomic' ("Channel.sendEvt.pollFn", NONE)
val () = debug' "sendEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendEvt(2)", SOME 1)
in
case cleanAndChk (prio, inQ) of
0 => E.blocked blockFn
| prio => E.enabled {prio = prio, doitFn = doitFn}
end
in
E.bevt pollFn
end
fun sendPoll (CHAN {prio, inQ, ...}, msg) =
let
val () = Assert.assertNonAtomic' "Channel.sendPoll"
val () = debug' "sendPoll(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.sendPoll(1)"
val () = S.atomicBegin ()
val () = debug' "sendPoll(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendPoll(1)", SOME 1)
val b =
case cleanAndDeque inQ of
SOME (rtxid, rt) =>
let
val () = debug' "sendPoll(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendPoll(3.1.1)", SOME 1)
val () =
S.readyAndSwitch
(fn () =>
(prio := 1
; TransID.force rtxid
; (rt, msg)))
val b = true
val () = debug' "sendPoll(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.sendPoll(3.1.2)"
in
b
end
| NONE =>
let
val () = debug' "sendPoll(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.1)", SOME 1)
val b = false
val () = debug' "sendPoll(3.2.2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.2)", SOME 1)
val () = S.atomicEnd ()
val () = debug' "sendPoll(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.sendPoll(3.2.2)"
in
b
end
val () = debug' "sendPoll(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.sendPoll(4)"
in
b
end
fun recv (CHAN {prio, inQ, outQ}) =
let
val () = Assert.assertNonAtomic' "Channel.recv"
val () = debug' "recv(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recv(1)"
val () = S.atomicBegin ()
val () = debug' "recv(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recv(2)", SOME 1)
val msg =
case cleanAndDeque outQ of
SOME (stxid, st) =>
let
val () = debug' "recv(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recv(3.1.1)", SOME 1)
val msg =
S.switch
(fn rt =>
(prio := 1
; TransID.force stxid
; (st, rt)))
val () = debug' "recv(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recv(3.1.1)"
in
msg
end
| NONE =>
let
val () = debug' "recv(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1)
val msg =
S.atomicSwitchToNext
(fn rt => enqueAndClean (inQ, (TransID.mkTxId (), rt)))
val () = debug' "recv(3.2.2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recv(3.2.2)", SOME 1)
val () = S.atomicEnd ()
val () = debug' "recv(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recv(3.2.3)"
in
msg
end
val () = debug' "recv(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recv(4)"
in
msg
end
fun recvEvt (CHAN {prio, inQ, outQ}) =
let
fun doitFn () =
let
val () = Assert.assertAtomic' ("Channel.recvEvt.doitFn", NONE)
val (stxid, st) = valOf (Q.deque outQ)
val () = debug' "recvEvt(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvEvt(3.1.1)", SOME 1)
val msg =
S.switch
(fn rt =>
(prio := 1
; TransID.force stxid
; (st, rt)))
val () = debug' "recvEvt(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recvEvt(3.1.1)"
in
msg
end
fun blockFn {transId, cleanUp, next} =
let
val () = Assert.assertAtomic' ("Channel.recvEvt.blockFn", NONE)
val () = debug' "recvEvt(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.1)", SOME 1)
val msg =
S.atomicSwitch
(fn rt =>
(enqueAndClean (inQ, (transId, rt))
; (next (), ())))
val () = debug' "recvEvt(3.2.2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.2)", SOME 1)
val () = cleanUp ()
val () = S.atomicEnd ()
val () = debug' "recvEvt(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recvEvt(3.2.3)"
in
msg
end
fun pollFn () =
let
val () = Assert.assertAtomic' ("Channel.recvEvt.pollFn", NONE)
val () = debug' "recvEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvEvt(2)", SOME 1)
in
case cleanAndChk (prio, outQ) of
0 => E.blocked blockFn
| prio => E.enabled {prio = prio, doitFn = doitFn}
end
in
E.bevt pollFn
end
fun recvPoll (CHAN {prio, outQ, ...}) =
let
val () = Assert.assertNonAtomic' "Channel.recvPoll"
val () = debug' "recvPoll(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recvPoll(1)"
val () = S.atomicBegin ()
val () = debug' "recvPoll(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvPoll(2)", SOME 1)
val msg =
case cleanAndDeque outQ of
SOME (stxid, st) =>
let
val () = debug' "recvPoll(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvPoll(3.1.1)", SOME 1)
val msg =
S.switch
(fn rt =>
(prio := 1
; TransID.force stxid
; (st, rt)))
val msg = SOME msg
val () = debug' "recvPoll(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recvPoll(3.1.1)"
in
msg
end
| NONE =>
let
val () = debug' "recv(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1)
val msg = NONE
val () = debug' "recvPoll(3.2.2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Channel.recvPoll(3.2.2)", SOME 1)
val () = S.atomicEnd ()
val () = debug' "recvPoll(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recvPoll(3.2.3)"
in
msg
end
val () = debug' "recvPoll(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recvPoll(4)"
in
msg
end
end
1.1 mlton/lib/cml/core-cml/cml.sig
Index: cml.sig
===================================================================
(* cml.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* cml-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The interface to the core CML features.
*)
signature CML =
sig
include VERSION
include THREAD
include CHANNEL
include EVENT
include TIME_OUT
val print : string -> unit
end
1.1 mlton/lib/cml/core-cml/cml.sml
Index: cml.sml
===================================================================
(* cml.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* cml.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure CML : CML =
struct
open Version
open Thread
open Channel
open Event
open TimeOut
fun print s =
(Scheduler.atomicBegin ();
TextIO.print s;
Scheduler.atomicEnd ())
end
1.1 mlton/lib/cml/core-cml/core-cml.cm
Index: core-cml.cm
===================================================================
Group is
../util/util.cm
rep-types.sml
running.sml
trans-id.sig
trans-id.sml
cvar.sig
cvar.sml
thread-id.sig
thread-id.sml
scheduler-hooks.sig
scheduler-hooks.sml
scheduler.sig
scheduler.sml
event.sig
event.sml
thread.sig
thread.sml
channel.sig
channel.sml
timeout.sig
timeout.sml
version.sig
version.sml
cml.sig
cml.sml
mailbox.sig
mailbox.sml
sync-var.sig
sync-var.sml
run-cml.sig
run-cml.sml
1.1 mlton/lib/cml/core-cml/cvar.sig
Index: cvar.sig
===================================================================
(* cvar.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* ???
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature CVAR =
sig
datatype cvar = datatype RepTypes.cvar
datatype cvar_state = datatype RepTypes.cvar_state
val new : unit -> cvar
end
1.1 mlton/lib/cml/core-cml/cvar.sml
Index: cvar.sml
===================================================================
(* cvar.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* ???
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure CVar : CVAR =
struct
structure R = RepTypes
(* Condition variables are essentially unit valued ivars, and
* are used for various internal synchronization conditions
* (e.g., nack events, I/O synchronization, and thread termination).
*)
datatype cvar = datatype R.cvar
datatype cvar_state = datatype R.cvar_state
fun new () = CVAR (ref (CVAR_unset []))
end
1.1 mlton/lib/cml/core-cml/event.sig
Index: event.sig
===================================================================
(* event.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* events-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The representation of event values and the event combinators.
*)
signature EVENT =
sig
type 'a event
val never : 'a event
val alwaysEvt : 'a -> 'a event
val wrap : ('a event * ('a -> 'b)) -> 'b event
val wrapHandler : ('a event * (exn -> 'a)) -> 'a event
val guard : (unit -> 'a event) -> 'a event
val withNack : (unit event -> 'a event) -> 'a event
val choose : 'a event list -> 'a event
val sync : 'a event -> 'a
val select : 'a event list -> 'a
end
signature EVENT_EXTRA =
sig
include EVENT
type 'a status
val enabled : {prio : int, doitFn : unit -> 'a} -> 'a status
val blocked : ({transId : TransID.trans_id,
cleanUp : unit -> unit,
next : unit -> Scheduler.rdy_thread} -> 'a) -> 'a status
val bevt : (unit -> 'a status) -> 'a event
val atomicCVarSet : CVar.cvar -> unit
val cvarGetEvt : CVar.cvar -> unit event
end
1.1 mlton/lib/cml/core-cml/event.sml
Index: event.sml
===================================================================
(* event.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* event.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The representation of event values and the event combinators.
*
* Some important requirements on the implementation of base event values:
*
* 1) The pollFn, doitFn, and blockFn are always called from inside
* atomic regions.
*
* 2) The pollFn returns an integer priority: this is 0 when not enabled,
* ~1 for fixed priority, and a positive value for dynamic priority.
* The standard scheme is to associate a counter with the underlying
* synchronization object, and to increase it by one for each
* synchronization attempt.
*
* 3) The blockFn is responsible for exiting the atomic region; the doitFns
* should NOT leave the atomic region.
*
* 4) The blockFn is responsible for executing the "cleanUp" action
* prior to leaving the atomic region.
*)
structure Event : EVENT_EXTRA =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure S = Scheduler
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
fun debug' msg = debug (fn () => msg)
datatype trans_id = datatype TransID.trans_id
datatype trans_id_state = datatype TransID.trans_id_state
datatype cvar = datatype CVar.cvar
datatype cvar_state = datatype CVar.cvar_state
datatype status = datatype RepTypes.status
val enabled = ENABLED
val blocked = BLOCKED
type 'a base = 'a RepTypes.base
datatype event = datatype RepTypes.event
val bevt = fn pollFn => BEVT [pollFn]
datatype 'a group =
BASE of 'a base list
| GRP of 'a group list
| NACK of cvar * 'a group
(** Condition variables. Because these variables are set inside atomic
** regions, we have to use different conventions for clean-up, etc.
** Instead of requiring the blockFn continuation to call the cleanUp
** action and to leave the atomic region, we call the cleanUp function
** when setting the condition variable (in atomicCVarSet), and have the
** invariant that the blockFn continuation is dispatched outside the
** atomic region.
**)
(* set a condition variable;
* we assume that this function is always executed in an atomic region.
*)
fun atomicCVarSet (CVAR state) : unit =
let
val () = Assert.assertAtomic' ("Event.atomicCVarSet", NONE)
val () = debug' "atomicCVarSet" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.atomicCVarSet", SOME 1)
in
case !state of
CVAR_unset waiting =>
let
fun add waiting : unit =
case waiting of
[] => ()
| ({transId = TXID txst, cleanUp, thread}::waiting) =>
(case !txst of
CANCEL => ()
| TRANS =>
(txst := CANCEL
; cleanUp ()
; S.ready thread)
; add waiting)
in
state := CVAR_set 1
; add waiting
end
| _ => raise Fail "atomicCVarSet"
end
(* the event constructor for waiting on a cvar.
*)
fun cvarGetEvt (CVAR state) : unit event =
let
fun doitFn () =
let
val () = Assert.assertAtomic' ("Event.cvarGetEvt.doitFn", NONE)
val () = debug' "cvarGetEvt(3.1.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.cvarGetEvt(3.1.1)", SOME 1)
val () = state := CVAR_set 1
val () = S.atomicEnd ()
val () = debug' "cvarGetEvt(3.1.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.cvarGetEvt(3.1.2)"
in
()
end
fun blockFn {transId, cleanUp, next} =
let
val () = Assert.assertAtomic' ("Event.cvarGetEvt.blockFn", NONE)
val () = debug' "cvarGetEvt(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.cvarGetEvt(3.2.1)", SOME 1)
val () =
S.atomicSwitch
(fn t =>
let
val item = {transId = transId,
cleanUp = cleanUp,
thread = t}
val waiting =
case !state of
CVAR_unset waiting => waiting
| _ => raise Fail "cvarGetEvt:blockFn"
in
state := CVAR_unset (item::waiting)
; (next (), ())
end)
val () = debug' "cvarGetEvt(3.2.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.cvarGetEvt(3.2.2)"
in
()
end
fun pollFn () =
let
val () = Assert.assertAtomic' ("Event.cvarGetEvt.pollFn", NONE)
val () = debug' "cvarGetEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.cvarGetEvt(2)", SOME 1)
in
case !state of
CVAR_set n =>
(state := CVAR_set (n + 1)
; enabled {prio = n, doitFn = doitFn})
| _ => blocked blockFn
end
in
bevt pollFn
end
(* event combinators *)
val never : 'a event =
BEVT []
fun alwaysEvt (v : 'a) : 'a event =
let
fun doitFn () =
let
val () = Assert.assertAtomic' ("Event.alwaysEvt.doitFn", NONE)
val () = debug' "alwaysEvt(3.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.alwaysEvt(3.1)", SOME 1)
val () = S.atomicEnd ()
val () = debug' "alwaysEvt(3.2)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.alwaysEvt(3.2)"
in
v
end
fun pollFn () =
let
val () = Assert.assertAtomic' ("Event.alwaysEvt.pollFn", NONE)
val () = debug' "alwaysEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.alwaysEvt(2)", SOME 1)
in
enabled {prio = ~1, doitFn = doitFn}
end
in
bevt pollFn
end
fun wrap (evt : 'a event, wfn : 'a -> 'b) : 'b event =
let
fun wrapF f x = wfn (f x)
fun wrapBaseEvt pollFn () =
case pollFn () of
ENABLED {prio, doitFn} =>
ENABLED {prio = prio, doitFn = wrapF doitFn}
| BLOCKED blockFn =>
BLOCKED (wrapF blockFn)
fun wrap' evt =
case evt of
BEVT bevts =>
BEVT(List.map wrapBaseEvt bevts)
| CHOOSE evts =>
CHOOSE(List.map wrap' evts)
| GUARD g =>
GUARD(fn () => wrap (g (), wfn))
| WNACK f =>
WNACK(fn evt => wrap (f evt, wfn))
in
wrap' evt
end
fun wrapHandler (evt : 'a event, hfn : exn -> 'a) : 'a event =
let
fun wrapF f x = (f x) handle exn => hfn exn
fun wrapBaseEvt pollFn () =
case pollFn () of
ENABLED {prio, doitFn} =>
ENABLED {prio = prio, doitFn = wrapF doitFn}
| BLOCKED blockFn =>
BLOCKED (wrapF blockFn)
fun wrap' evt =
case evt of
BEVT bevts =>
BEVT(List.map wrapBaseEvt bevts)
| CHOOSE evts =>
CHOOSE(List.map wrap' evts)
| GUARD g =>
GUARD(fn () => wrapHandler (g (), hfn))
| WNACK f =>
WNACK(fn evt => wrapHandler (f evt, hfn))
in
wrap' evt
end
val guard = GUARD
val withNack = WNACK
fun choose (evts : 'a event list) : 'a event =
let
val () = Assert.assertNonAtomic' "Event.choose"
val () = debug' "choose(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.choose(1)"
fun gatherBEvts (evts, bevts') =
case (evts, bevts') of
([], bevts') => BEVT bevts'
| ((BEVT bevts)::evts, bevts') => gatherBEvts (evts, bevts @ bevts')
| (evts, []) => gather (evts, [])
| (evts, bevts') => gather (evts, [BEVT bevts'])
and gather (evts, evts') =
case (evts, evts') of
([], [evt']) => evt'
| ([], evts') => CHOOSE evts'
| ((CHOOSE cevts)::evts, evts') =>
gather (evts, cevts @ evts')
| ((BEVT [])::evts, evts') =>
gather (evts, evts')
| ((BEVT bevts)::evts, (BEVT bevts')::evts') =>
gather (evts, BEVT (bevts @ bevts')::evts')
| (evt::evts, evts') =>
gather (evts, evt::evts')
val evt = gatherBEvts (List.rev evts, [])
in
evt
end
local
val cnt = ref 0
fun random i =
let val j = !cnt
in
if j = 1000000 then cnt := 0 else cnt := j + 1
; Int.rem (j, i)
end
in
fun selectDoitFn (doitFns : {prio : int, doitFn : 'a} list) : 'a =
let
val () = Assert.assertAtomic' ("Event.selectDoitFn", NONE)
val () = debug' "selectDoitFn(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.selectDoitFn(2)", SOME 1)
in
case doitFns of
[{doitFn, ...}] => doitFn
| doitFns =>
let
fun select (doitFns, maxP,
doitFnsMaxP, numMaxP,
doitFnsFixed, numFixed) =
case doitFns of
[] => (case (doitFnsMaxP, doitFnsFixed) of
([doitFn], []) => doitFn
| ([], [doitFn]) => doitFn
| (doitFnsMaxP, doitFnsFixed) =>
let
val bias = 2
val num = numFixed + bias * numMaxP
val k = random num
in
if k < numFixed
then List.nth (doitFnsFixed, k)
else List.nth (doitFnsMaxP,
Int.mod(k - numFixed, numMaxP))
end)
| {prio, doitFn}::doitFns =>
if prio = ~1
then select(doitFns, maxP,
doitFnsMaxP, numMaxP,
doitFn::doitFnsFixed, numFixed + 1)
else if prio > maxP
then select(doitFns, prio,
[doitFn], 1,
doitFnsFixed, numFixed)
else if prio = maxP
then select(doitFns, maxP,
doitFn::doitFnsMaxP, numMaxP + 1,
doitFnsFixed, numFixed)
else select(doitFns, maxP,
doitFnsMaxP, numMaxP,
doitFnsFixed, numFixed)
in
select (doitFns, 0, [], 0, [], 0)
end
end
end
fun syncOnBEvt (pollFn : 'a base) : 'a =
let
val () = Assert.assertNonAtomic' "Event.syncOnBEvt"
val () = debug' "syncOnBEvt(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.syncOnBEvt(1)"
val () = S.atomicBegin ()
val () = debug' "syncOnBEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnBEvt(2)", SOME 1)
val x =
case pollFn () of
ENABLED {doitFn, ...} => doitFn ()
| BLOCKED blockFn =>
let val (transId, cleanUp) = TransID.mkFlg ()
in blockFn {transId = transId,
cleanUp = cleanUp,
next = S.next}
end
val () = debug' "syncOnBEvt(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.syncOnBEvt(4)"
in
x
end
(* this function handles the case of synchronizing on a list of
* base events (w/o any negative acknowledgements). It also handles
* the case of syncrhonizing on NEVER.
*)
fun syncOnBEvts (bevts : 'a base list) : 'a =
let
val () = Assert.assertNonAtomic' "Event.syncOnBEvts"
val () = debug' "syncOnBEvts(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.syncOnBEvts(1)"
fun ext (bevts, blockFns) : 'a =
let
val () = debug' "syncOnBEvts(2).ext" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext", SOME 1)
in
case bevts of
[] =>
let
val () = debug' "syncOnBEvts(2).ext([])" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext([])", SOME 1)
in
S.atomicSwitch
(fn (t : 'a S.thread) =>
let
val (transId, cleanUp) = TransID.mkFlg ()
fun log blockFns : S.rdy_thread =
let
val () = debug' "syncOnBEvts(2).ext([]).log" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext([]).log", SOME 1)
in
case blockFns of
[] => S.next ()
| blockFn::blockFns =>
S.new
(fn _ => fn () =>
let
val () = S.atomicBegin ()
val x = blockFn {transId = transId,
cleanUp = cleanUp,
next = fn () => log blockFns}
in S.switch(fn _ => (t, x))
end)
end
in
(log blockFns, ())
end)
end
| pollFn::bevts =>
(case pollFn () of
ENABLED doitFn => extRdy (bevts, [doitFn])
| BLOCKED blockFn => ext (bevts, blockFn::blockFns))
end
and extRdy (bevts, doitFns) : 'a =
let
val () = debug' "syncOnBEvts(2).extRdy" (* Atomic 1*)
val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).extRdy", SOME 1)
in
case bevts of
[] =>
let val doitFn = selectDoitFn doitFns
in doitFn ()
end
| pollFn::bevts =>
(case pollFn () of
ENABLED doitFn => extRdy (bevts, doitFn::doitFns)
| _ => extRdy (bevts, doitFns))
end
val x =
case bevts of
[] => S.switchToNext (fn _ => ())
| [bevt] => syncOnBEvt bevt
| bevts => (S.atomicBegin (); ext (bevts, []))
val () = debug' "syncOnBEvts(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.syncOnBEvts(4)"
in
x
end
(* walk the event group tree, collecting the base events (with associated
* ack flags), and a list of flag sets. A flag set is a (cvar * ack flag list)
* pairs, where the flags are those associated with the events covered by the
* nack cvar.
*)
type ack_flg = bool ref
type ack_flgs = ack_flg list
type 'a back = 'a base * ack_flg
type 'a backs = 'a back list
type flg_set = cvar * ack_flg list
type flg_sets = flg_set list
fun collect (gevt : 'a group) : 'a backs * flg_sets =
let
fun gatherWrapped (gevt : 'a group, backs : 'a backs, flgSets : flg_sets) :
'a backs * flg_sets =
let
fun gather (gevt : 'a group, backs : 'a backs,
ackFlgs : ack_flgs, flgSets : flg_sets) :
'a backs * ack_flgs * flg_sets =
case gevt of
BASE bevts =>
let
fun append (bevts, backs, ackFlgs) =
case bevts of
[] => (backs, ackFlgs)
| bevt::bevts =>
let val ackFlg = ref false
in append (bevts, (bevt, ackFlg)::backs, ackFlg::ackFlgs)
end
val (backs', ackFlgs') = append (bevts, backs, ackFlgs)
in
(backs', ackFlgs', flgSets)
end
| GRP gevt =>
let
fun f (gevt', (backs', ackFlgs', flgSets')) =
gather (gevt', backs', ackFlgs', flgSets')
in List.foldl f (backs, ackFlgs, flgSets) gevt
end
| NACK (cvar, gevt) =>
let
val (backs', ackFlgs', flgSets') =
gather (gevt, backs, [], flgSets)
in
(backs', ackFlgs' @ ackFlgs, (cvar, ackFlgs')::flgSets')
end
val (backs, _, flgSets) = gather (gevt, backs, [], flgSets)
in
(backs, flgSets)
end
in
case gevt of
GRP _ =>
let
val ackFlg = ref false
fun gather (gevt : 'a group, backs : 'a backs, flgSets : flg_sets) :
'a backs * flg_sets =
case gevt of
BASE bevts =>
let
fun append (bevts, backs) =
case bevts of
[] => backs
| bevt::bevts => append (bevts, (bevt, ackFlg)::backs)
in
(append (bevts, backs), flgSets)
end
| GRP gevt =>
let
fun f (gevt', (backs', flgSets')) =
gather(gevt', backs', flgSets')
in List.foldl f (backs, flgSets) gevt
end
| NACK _ =>
gatherWrapped (gevt, backs, flgSets)
in
gather (gevt, [], [])
end
| gevt => gatherWrapped (gevt, [], [])
end
(* this function handles the more complicated case of synchronization
* on groups of events where negative acknowledgements are involved.
*)
fun syncOnGrp (gevt : 'a group) : 'a =
let
val () = Assert.assertNonAtomic' "Event.syncOnGrp"
val () = debug' "syncOnGrp(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.syncOnGrp(1)"
val (backs, flgSets) = collect gevt
fun chkCVars () =
let
val () = debug' "syncOnGrp.chkCVars" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnGrp.chkCVars", SOME 1)
(* chkCVar checks the flags of a flag set.
* If they are all false, then the corresponding cvar
* is set to signal the negative ack.
*)
fun chkCVar (cvar, flgs) =
if List.exists ! flgs
then ()
else atomicCVarSet cvar
in
List.app chkCVar flgSets
end
fun ext (backs, blockFns) : 'a =
let
val () = debug' "syncOnGrp(2).ext" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext", SOME 1)
in
case backs of
[] =>
let
val () = debug' "syncOnGrp(2).ext([])" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext([])", SOME 1)
in
S.atomicSwitch
(fn (t : 'a S.thread) =>
let
val (transId, cleanUp) = TransID.mkFlg ()
val cleanUp = fn flg => fn () =>
(cleanUp ()
; flg := true
; chkCVars ())
fun log blockFns : S.rdy_thread =
let
val () = debug' "syncOnGrp(2).ext([]).log" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext([]).log", SOME 1)
in
case blockFns of
[] => S.next ()
| (blockFn,ackFlg)::blockFns =>
S.new
(fn _ => fn () =>
let
val () = S.atomicBegin ()
val x = blockFn {transId = transId,
cleanUp = cleanUp ackFlg,
next = fn () => log blockFns}
in S.switch(fn _ => (t, x))
end)
end
in
(log blockFns, ())
end)
end
| (pollFn,ackFlg)::backs =>
(case pollFn () of
ENABLED {prio, doitFn} =>
extRdy (backs, [{prio = prio,doitFn = (doitFn, ackFlg)}])
| BLOCKED blockFn => ext (backs, (blockFn,ackFlg)::blockFns))
end
and extRdy (backs, doitFns) : 'a =
let
val () = debug' "syncOnGrp.extRdy(2)" (* Atomic 1*)
val () = Assert.assertAtomic' ("Event.syncOnGrp.extRdy(2)", SOME 1)
in
case backs of
[] => let
val (doitFn, flg) = selectDoitFn doitFns
in
flg := true
; chkCVars ()
; doitFn ()
end
| (pollFn,ackFlg)::backs =>
(case pollFn () of
ENABLED {prio, doitFn} =>
extRdy (backs, {prio = prio, doitFn = (doitFn, ackFlg)}::doitFns)
| _ => extRdy (backs, doitFns))
end
val x =
case backs of
[(bevt, _)] => syncOnBEvt bevt
| _ => (S.atomicBegin (); ext (backs, []))
val () = debug' "syncOnGrp(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.syncOnGrp(4)"
in
x
end
local
(* force the evaluation of any guards in an event collection,
* returning an event group.
*)
fun forceBL (evts : 'a event list, bevts : 'a base list) : 'a group =
case evts of
[] => BASE bevts
| evt::evts =>
(case force evt of
BASE bevts' => forceBL (evts, bevts' @ bevts)
| GRP gevts => forceGL (evts, if List.null bevts then gevts else gevts @ [BASE bevts])
| gevt => forceGL (evts, if List.null bevts then [gevt] else [gevt, BASE bevts]))
and forceGL (evts : 'a event list, gevts : 'a group list) : 'a group =
case (evts, gevts) of
([], [gevt]) => gevt
| ([], gevts) => GRP gevts
| (evt::evts, gevts) =>
(case (force evt, gevts) of
(BASE [], gevts) =>
forceGL (evts, gevts)
| (BASE bevts', (BASE bevts)::gevts) =>
forceGL (evts, BASE (bevts' @ bevts)::gevts)
| (GRP gevts', gevts) =>
forceGL (evts, gevts' @ gevts)
| (gevt, gevts) =>
forceGL (evts, gevt::gevts))
and force (evt : 'a event) : 'a group =
let
val gevt =
case evt of
BEVT bevts => BASE bevts
| CHOOSE evts => forceBL (evts, [])
| GUARD g => force (g ())
| WNACK f =>
let val cvar = CVar.new ()
in NACK(cvar, force (f (cvarGetEvt cvar)))
end
in
gevt
end
in
fun sync evt =
let
val () = Assert.assertNonAtomic' "Event.sync"
val () = debug' "sync(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.sync(1)"
val x =
case force evt of
BASE bevts => syncOnBEvts bevts
| gevt => syncOnGrp gevt
val () = debug' "sync(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.sync(4)"
in
x
end
fun select (evts : 'a event list) : 'a =
let
val () = Assert.assertNonAtomic' "Event.select"
val () = debug' "select(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.select(1)"
val x =
case forceBL (evts, []) of
BASE bevts => syncOnBEvts bevts
| gevt => syncOnGrp gevt
val () = debug' "select(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Event.select(4)"
in
x
end
end
end
1.1 mlton/lib/cml/core-cml/mailbox.sig
Index: mailbox.sig
===================================================================
(* mailbox.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* mailbox-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* Asynchronous channels (called mailboxes).
*)
signature MAILBOX =
sig
type 'a mbox
val mailbox : unit -> 'a mbox
val sameMailbox : ('a mbox * 'a mbox) -> bool
val send : ('a mbox * 'a) -> unit
val recv : 'a mbox -> 'a
val recvEvt : 'a mbox -> 'a CML.event
val recvPoll : 'a mbox -> 'a option
end
signature MAILBOX_EXTRA =
sig
include MAILBOX
val resetMbox : 'a mbox -> unit
end
1.1 mlton/lib/cml/core-cml/mailbox.sml
Index: mailbox.sml
===================================================================
(* mailbox.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* mailbox.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* Asynchronous channels (called mailboxes).
*)
structure Mailbox : MAILBOX_EXTRA =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure Q = FunQueue
structure S = Scheduler
structure E = Event
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
fun debug' msg = debug (fn () => msg)
datatype trans_id = datatype TransID.trans_id
datatype trans_id_state = datatype TransID.trans_id_state
(* the state of a mailbox. The queue of the NONEMPTY constructor should
* never be empty (use EMPTY instead).
*)
datatype 'a state =
EMPTY of (TransID.trans_id * 'a S.thread) Q.t
| NONEMPTY of (int * 'a Q.t)
datatype 'a mbox = MB of 'a state ref
fun resetMbox (MB state) = state := EMPTY (Q.new ())
fun mailbox () = MB (ref (EMPTY (Q.new ())))
fun sameMailbox (MB s1, MB s2) = (s1 = s2)
local
fun cleaner (TXID txst, _) =
case !txst of CANCEL => true | _ => false
in
fun cleanAndDeque q =
Q.cleanAndDeque (q, cleaner)
end
fun send (MB state, x) =
let
val () = Assert.assertNonAtomic' "Mailbox.send"
val () = debug' "send(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Mailbox.send(1)"
val () = S.atomicBegin ()
val () = debug' "send(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Mailbox.send(2)", SOME 1)
val () =
case !state of
EMPTY q => (case (cleanAndDeque q) of
(NONE, _) =>
(let val q = Q.new ()
in state := NONEMPTY (1, Q.enque (q, x))
end
; S.atomicEnd())
| (SOME (transId', t'), q') =>
S.atomicReadyAndSwitch
(fn () =>
(state := EMPTY q'
; TransID.force transId'
; (t', x))))
| NONEMPTY (p, q) =>
(* we force a context switch here to prevent
* a producer from outrunning a consumer.
*)
S.atomicReadyAndSwitchToNext
(fn () => state := NONEMPTY (p, Q.enque (q, x)))
val () = debug' "send(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.send(4)"
in
()
end
fun getMsg (state, q) =
let
val (msg, q') =
case Q.deque q of
SOME (msg, q') => (msg, q')
| NONE => raise Fail "Mailbox:getMsg"
val () = if Q.empty q'
then state := EMPTY (Q.new ())
else state := NONEMPTY (1, q')
val () = S.atomicEnd ()
in
msg
end
fun recv (MB state) =
let
val () = Assert.assertNonAtomic' "Mailbox.recv"
val () = debug' "recv(1)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Mailbox.recv(1)"
val () = S.atomicBegin ()
val () = debug' "recv(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Mailbox.recv(2)", SOME 1)
val msg =
case !state of
EMPTY q =>
let
val msg =
S.atomicSwitchToNext
(fn t => state := EMPTY (Q.enque (q, (TransID.mkTxId (), t))))
in
S.atomicEnd()
; msg
end
| NONEMPTY (_, q) => getMsg (state, q)
val () = debug' "recv(4)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Channel.recv(4)"
in
msg
end
fun recvEvt (MB state) =
let
fun blockFn {transId, cleanUp, next} =
let
val q =
case !state of
EMPTY q => q
| _ => raise Fail "Mailbox:recvEvt:blockFn"
val msg =
S.atomicSwitch
(fn t => (state := EMPTY (Q.enque (q, (transId, t)))
; (next (), ())))
in
cleanUp()
; S.atomicEnd()
; msg
end
fun pollFn () =
case !state of
EMPTY _ => E.blocked blockFn
| NONEMPTY (prio, q) =>
(state := NONEMPTY (prio + 1, q)
; E.enabled {prio = prio,
doitFn = fn () => getMsg (state, q)})
in
E.bevt pollFn
end
fun recvPoll (MB state) =
(S.atomicBegin()
; case !state of
EMPTY _ => (S.atomicEnd(); NONE)
| NONEMPTY (_, q) => SOME (getMsg (state, q)))
end
1.1 mlton/lib/cml/core-cml/rep-types.sml
Index: rep-types.sml
===================================================================
(* rep-types.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* rep-types.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* These are the concrete representations of the various CML types.
* These types are abstract (or not even visible) outside this library.
*)
structure RepTypes =
struct
(** transaction IDs -- see trans-id.sml *)
datatype trans_id = TXID of trans_id_state ref
and trans_id_state =
CANCEL
| TRANS
(** condition variables --- see cvar.sml and events.sml *)
datatype cvar = CVAR of cvar_state ref
and cvar_state =
CVAR_unset of {transId : trans_id,
cleanUp : unit -> unit,
thread : rdy_thread} list
| CVAR_set of int
(** thread IDs --- see thread-id.sml and threads.sml **)
and thread_id =
TID of {
(* an unique ID *)
id : int,
(* true, if there is a pending alert on this thread *)
alert : bool ref,
(* set this whenever this thread does some concurrency operation. *)
done_comm : bool ref,
(* root-level exception handler hook *)
exnHandler : (exn -> unit) ref,
(* holds thread-local properties *)
props : exn list ref,
(* the cvar that becomes set when the thread dies *)
dead : cvar
}
(** threads --- see scheduler.sml and threads.sml **)
and 'a thread = THRD of thread_id * 'a MLton.Thread.t
withtype rdy_thread = unit thread
(** events --- see events.sml **)
datatype 'a status =
ENABLED of {prio : int, doitFn : unit -> 'a}
| BLOCKED of {transId : trans_id,
cleanUp : unit -> unit,
next : unit -> rdy_thread} -> 'a
type 'a base = unit -> 'a status
datatype 'a event =
BEVT of 'a base list
| CHOOSE of 'a event list
| GUARD of unit -> 'a event
| WNACK of unit event -> 'a event
end
1.1 mlton/lib/cml/core-cml/run-cml.sig
Index: run-cml.sig
===================================================================
(* run-cml.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* ???
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature RUN_CML =
sig
val isRunning : unit -> bool
val doit : (unit -> unit) * Time.time option -> OS.Process.status
val shutdown : OS.Process.status -> 'a
end
1.1 mlton/lib/cml/core-cml/run-cml.sml
Index: run-cml.sml
===================================================================
(* run-cml.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* run-cml-fn.sml
*
* COPYRIGHT (c) 1996 AT&T Research.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure RunCML : RUN_CML =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure R = Running
structure S = Scheduler
structure SH = SchedulerHooks
structure TID = ThreadID
structure TO = TimeOut
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
fun debug' msg = debug (fn () => msg)
local
structure Signal = MLton.Signal
structure Itimer = MLton.Itimer
fun getAlrmHandler () =
Signal.getHandler Posix.Signal.alrm
fun setAlrmHandler h =
Signal.setHandler (Posix.Signal.alrm, h)
fun setItimer t =
Itimer.set (Itimer.Real, {value = t, interval = t})
in
fun prepareAlrmHandler tq =
let
val origAlrmHandler = getAlrmHandler ()
val tq =
case tq of
SOME tq => tq
| NONE => Time.fromMilliseconds 20
in
(fn alrmHandler =>
(setAlrmHandler (Signal.Handler.handler (S.unwrap alrmHandler))
; setItimer tq),
fn () =>
(setItimer Time.zeroTime
; setAlrmHandler origAlrmHandler))
end
end
fun isRunning () = !R.isRunning
fun reset running =
(S.reset running
; SH.reset ()
; TID.reset ()
; TO.reset ())
fun alrmHandler thrd =
let
val () = Assert.assertAtomic' ("RunCML.alrmHandler", NONE)
val () = debug' "alrmHandler" (* Atomic 1 *)
val () = Assert.assertAtomic' ("RunCML.alrmHandler", SOME 1)
val () = S.preempt thrd
val () = ignore (TO.preempt ())
in
S.next ()
end
(* Note that SH.pauseHook is only invoked by S.next
* when there are no threads on the ready queue;
* Furthermore, note that alrmHandler always
* enqueues the preepted thread (via S.preempt).
* Hence, the ready queue is never empty
* at the S.next in alrmHandler. Therefore,
* pauseHook is never run within alrmHandler.
*)
fun pauseHook () =
let
val () = Assert.assertAtomic' ("RunCML.pauseHook", NONE)
val () = debug' "pauseHook" (* Atomic 1 *)
val () = Assert.assertAtomic' ("RunCML.pauseHook", SOME 1)
val to = TO.preempt ()
in
case to of
NONE =>
(* no waiting threads *)
S.prepend(!SH.shutdownHook, fn () => (true, OS.Process.failure))
| SOME NONE =>
(* enqueued a waiting thread *)
S.next ()
| SOME (SOME t) =>
(* a waiting thread will be ready in t time *)
(if Time.toSeconds t <= 0
then ()
else S.doMasked (fn () => OS.Process.sleep t)
; pauseHook ())
end
fun doit (initialProc: unit -> unit,
tq: Time.time option) =
let
val () =
if isRunning ()
then raise Fail "CML is running"
else ()
val (installAlrmHandler, restoreAlrmHandler) = prepareAlrmHandler tq
val (cleanUp, status) =
S.switchToNext
(fn thrd =>
let
val () = R.isRunning := true
val () = reset true
val () = SH.shutdownHook := S.prepend (thrd, fn arg => (S.atomicBegin (); arg))
val () = SH.pauseHook := pauseHook
val () = installAlrmHandler alrmHandler
val () = ignore (Thread.spawn initialProc)
in
()
end)
val () = restoreAlrmHandler ()
val () = reset false
val () = R.isRunning := false
val () = S.atomicEnd ()
in
status
end
fun shutdown status =
if isRunning ()
then S.switch (fn _ => (!SH.shutdownHook, (true, status)))
else raise Fail "CML is not running"
end
1.1 mlton/lib/cml/core-cml/running.sml
Index: running.sml
===================================================================
(* running.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* running.sml
*
* COPYRIGHT (c) 1997 Bell Labs, Lucent Technologies.
*
* A flag to tell us if CML is running. This gets set and cleared in the
* RunCMLFn functor, but other modules need to test it.
*)
structure Running =
struct
val isRunning = ref false
end
1.1 mlton/lib/cml/core-cml/scheduler-hooks.sig
Index: scheduler-hooks.sig
===================================================================
(* scheduler-hooks.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* scheduler.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature SCHEDULER_HOOKS =
sig
type 'a thread = 'a RepTypes.thread
type rdy_thread = RepTypes.rdy_thread
(* this hook gets invoked when the scheduler has nothing else to do;
* it is invoked in an atomic region
*)
val pauseHook : (unit -> rdy_thread) ref
(* this hook points to a thread that gets invoked when
* the system is otherwise deadlocked. It takes two arguments:
* the first is a boolean flag that says weather to do clean-up,
* and the second is the exit status.
*)
val shutdownHook : (bool * OS.Process.status) thread ref
val reset : unit -> unit
end
1.1 mlton/lib/cml/core-cml/scheduler-hooks.sml
Index: scheduler-hooks.sml
===================================================================
(* scheduler-hooks.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* scheduler.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure SchedulerHooks: SCHEDULER_HOOKS =
struct
datatype thread = datatype RepTypes.thread
type rdy_thread = RepTypes.rdy_thread
val pauseHookDefault : unit -> rdy_thread =
fn _ => raise Fail "SchedulerHooks.pauseHook"
val pauseHook = ref pauseHookDefault
val shutdownHookDefault : (bool * OS.Process.status) thread =
THRD (ThreadID.bogus "shutdownHook", MLton.Thread.new (fn _ =>
raise Fail "SchedulerHooks.shutdownHook"))
val shutdownHook = ref shutdownHookDefault
fun reset () =
(pauseHook := pauseHookDefault
; shutdownHook := shutdownHookDefault)
end
1.1 mlton/lib/cml/core-cml/scheduler.sig
Index: scheduler.sig
===================================================================
(* scheduler.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* scheduler.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature SCHEDULER =
sig
include CRITICAL
type thread_id = ThreadID.thread_id
type 'a thread = 'a RepTypes.thread
type rdy_thread = RepTypes.rdy_thread
val getThreadId : 'a thread -> thread_id
val getCurThreadId : unit -> thread_id
val tidMsg : unit -> string
val ready : rdy_thread -> unit
val next : unit -> rdy_thread
val switch : ('a thread -> 'b thread * 'b) -> 'a
val atomicSwitch : ('a thread -> 'b thread * 'b) -> 'a
val switchToNext : ('a thread -> unit) -> 'a
val atomicSwitchToNext : ('a thread -> unit) -> 'a
val readyAndSwitch : (unit -> 'b thread * 'b) -> unit
val atomicReadyAndSwitch : (unit -> 'b thread * 'b) -> unit
val readyAndSwitchToNext : (unit -> unit) -> unit
val atomicReadyAndSwitchToNext : (unit -> unit) -> unit
val new : (thread_id -> 'a -> unit) -> 'a thread
val prepend :'a thread * ('b -> 'a) -> 'b thread
val unwrap : (rdy_thread -> rdy_thread) -> (unit MLton.Thread.t -> unit MLton.Thread.t)
val reset : bool -> unit
val preempt : rdy_thread -> unit
end
1.1 mlton/lib/cml/core-cml/scheduler.sml
Index: scheduler.sml
===================================================================
(* scheduler.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* scheduler.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* This module implements the scheduling queues and preemption
* mechanisms.
*)
structure Scheduler : SCHEDULER =
struct
structure Assert = LocalAssert(val assert = true)
structure GlobalDebug = Debug
structure Debug = LocalDebug(val debug = false)
open Critical
structure Q = ImpQueue
structure T = MLton.Thread
structure TID = ThreadID
structure SH = SchedulerHooks
type thread_id = ThreadID.thread_id
datatype thread = datatype RepTypes.thread
type rdy_thread = unit thread
(* the dummy thread Id; this is used when an ID is needed to get
* the types right
*)
val dummyTid = TID.bogus "dummy"
(* the error thread. This thread is used to trap attempts to run CML
* without proper initialization (i.e., via RunCML). This thread is
* enqueued by reset.
*)
val errorTid = TID.bogus "error"
val errorThrd : rdy_thread =
THRD (errorTid, T.new (fn () =>
(GlobalDebug.sayDebug
([fn () => "CML"], fn () => "**** Use RunCML.doit to run CML ****")
; raise Fail "CML not initialized")))
local
val curTid : thread_id ref = ref dummyTid
in
fun getThreadId (THRD (tid, _)) = tid
fun getCurThreadId () =
let
val tid = !curTid
in
tid
end
fun setCurThreadId tid =
let
val () = Assert.assertAtomic' ("Scheduler.setCurThreadId", NONE)
in
curTid := tid
end
end
fun tidMsg () = TID.tidToString (getCurThreadId ())
fun debug msg = Debug.sayDebug ([atomicMsg, tidMsg], msg)
fun debug' msg = debug (fn () => msg)
local
val time = ref Time.zeroTime
in
val atomicBegin = fn () =>
let
val () = atomicBegin ()
(*
val () =
case MLton.Thread.atomicState () of
MLton.Thread.AtomicState.Atomic 1 => time := Time.now ()
| _ => ()
*)
in
()
end
val atomicEnd = fn () =>
let
(*
val () =
case MLton.Thread.atomicState () of
MLton.Thread.AtomicState.Atomic 1 =>
let
val diff = Time.-(Time.now(), !time)
in
GlobalDebug.sayDebug
([], fn () =>
concat [LargeInt.toString (Time.toMilliseconds diff), "ms"])
end
| _ => ()
*)
val () = atomicEnd ()
in
()
end
end
(* The thread ready queues:
* rdyQ1 is the primary queue and rdyQ2 is the secondary queue.
*)
val rdyQ1 : rdy_thread Q.t = Q.new ()
and rdyQ2 : rdy_thread Q.t = Q.new ()
(* enqueue a thread in the primary queue *)
fun enque1 thrd =
(Assert.assertAtomic' ("Scheduler.enque1", NONE)
; Q.enque (rdyQ1, thrd))
(* enqueue a thread in the secondary queue *)
fun enque2 thrd =
(Assert.assertAtomic' ("Scheduler.enque2", NONE)
; Q.enque (rdyQ2, thrd))
(* dequeue a thread from the primary queue *)
fun deque1 () =
(Assert.assertAtomic' ("Scheduler.deque1", NONE)
; case Q.deque rdyQ1 of
NONE => deque2 ()
| SOME thrd => SOME thrd)
(* dequeue a thread from the secondary queue *)
and deque2 () =
(Assert.assertAtomic' ("Scheduler.deque2", NONE)
; case Q.deque rdyQ2 of
NONE => NONE
| SOME thrd => SOME thrd)
(* promote a thread from the secondary queue to the primary queue *)
fun promote () =
(Assert.assertAtomic' ("Scheduler.promote", NONE)
; case deque2 () of
NONE => ()
| SOME thrd => enque1 thrd)
fun next () =
let
val () = Assert.assertAtomic' ("Scheduler.next", NONE)
val thrd =
case deque1 () of
NONE => !SH.pauseHook ()
| SOME thrd => thrd
in
thrd
end
fun ready thrd =
let
val () = Assert.assertAtomic' ("Scheduler.ready", NONE)
val () = enque1 thrd
in
()
end
local
fun atomicSwitchAux msg f =
(Assert.assertAtomic (fn () => "Scheduler." ^ msg, NONE)
; T.atomicSwitch (fn t =>
let
val tid = getCurThreadId ()
val () = TID.mark tid
val (THRD (tid',t'), x') = f (THRD (tid, t))
val () = setCurThreadId tid'
in
(t', x')
end))
in
fun atomicSwitch f =
atomicSwitchAux "atomicSwitch" f
fun switch f =
(atomicBegin (); atomicSwitch f)
fun atomicSwitchToNext f =
atomicSwitchAux "atomicSwitchToNext" (fn thrd => (f thrd; (next (), ())))
fun switchToNext f =
(atomicBegin (); atomicSwitchToNext f)
fun atomicReadyAndSwitch f =
atomicSwitchAux "atomicReadyAndSwitch" (fn thrd => (ready thrd; f ()))
fun readyAndSwitch f =
(atomicBegin (); atomicReadyAndSwitch f)
fun atomicReadyAndSwitchToNext f =
atomicSwitchAux "atomicReadyAndSwitchToNext" (fn thrd => (ready thrd; f (); (next (), ())))
fun readyAndSwitchToNext f =
(atomicBegin (); atomicReadyAndSwitchToNext f)
end
fun new (f : thread_id -> 'a -> unit) : 'a thread =
let
val () = Assert.assertAtomic' ("Scheduler.new", NONE)
val tid = TID.new ()
val t = T.new (f tid)
in
THRD (tid, t)
end
fun prepend (thrd : 'a thread, f : 'b -> 'a) : 'b thread =
let
val () = Assert.assertAtomic' ("Scheduler.prepend", NONE)
val THRD (tid, t) = thrd
val t = T.prepend (t, f)
in
THRD (tid, t)
end
fun unwrap (f : rdy_thread -> rdy_thread) (t: unit T.t) : unit T.t =
let
val () = Assert.assertAtomic' ("Scheduler.unwrap", NONE)
val tid = getCurThreadId ()
val THRD (tid', t') = f (THRD (tid, t))
val () = setCurThreadId tid'
in
t'
end
(* reset various pieces of state *)
fun reset running =
(atomicBegin ()
; setCurThreadId dummyTid
; Q.reset rdyQ1; Q.reset rdyQ2
; if not running then ready errorThrd else ()
; atomicEnd ())
(* what to do at a preemption (with the current thread) *)
fun preempt (thrd as THRD (tid, _)) =
let
val () = Assert.assertAtomic' ("Scheduler.preempt", NONE)
val () = debug' "Scheduler.preempt" (* Atomic 1 *)
val () = Assert.assertAtomic' ("Scheduler.preempt", SOME 1)
val () =
if TID.isMarked tid
then (TID.unmark tid
; promote ()
; enque1 thrd)
else enque2 thrd
in
()
end
val _ = reset false
end
1.1 mlton/lib/cml/core-cml/sync-var.sig
Index: sync-var.sig
===================================================================
(* sync-var.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* sync-var-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The implementation of Id-style synchronizing memory cells (I-structures
* and M-structures).
*)
signature SYNC_VAR =
sig
type 'a ivar (* I-structure variable *)
type 'a mvar (* M-structure variable *)
exception Put (* raised on put operations to full cells *)
val iVar : unit -> 'a ivar
val iPut : ('a ivar * 'a) -> unit
val iGet : 'a ivar -> 'a
val iGetEvt : 'a ivar -> 'a CML.event
val iGetPoll : 'a ivar -> 'a option
val sameIVar : ('a ivar * 'a ivar) -> bool
val mVar : unit -> 'a mvar
val mVarInit : 'a -> 'a mvar
val mPut : ('a mvar * 'a) -> unit
val mTake : 'a mvar -> 'a
val mTakeEvt : 'a mvar -> 'a CML.event
val mTakePoll : 'a mvar -> 'a option
val mGet : 'a mvar -> 'a
val mGetEvt : 'a mvar -> 'a CML.event
val mGetPoll : 'a mvar -> 'a option
val mSwap : ('a mvar * 'a) -> 'a
val mSwapEvt : ('a mvar * 'a) -> 'a CML.event
val sameMVar : ('a mvar * 'a mvar) -> bool
end
1.1 mlton/lib/cml/core-cml/sync-var.sml
Index: sync-var.sml
===================================================================
(* sync-var.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* sync-var.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* The implementation of Id-style synchronizing memory cells.
*)
structure SyncVar : SYNC_VAR =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure Q = ImpQueue
structure S = Scheduler
structure E = Event
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
datatype trans_id = datatype TransID.trans_id
datatype trans_id_state = datatype TransID.trans_id_state
(* the underlying representation of both ivars and mvars is the same. *)
datatype 'a cell =
CELL of {prio : int ref,
readQ : (trans_id * 'a S.thread) Q.t,
value : 'a option ref}
type 'a ivar = 'a cell
type 'a mvar = 'a cell
exception Put
fun newCell () = CELL {prio = ref 0, readQ = Q.new(), value = ref NONE}
(* sameCell : ('a cell * 'a cell) -> bool *)
fun sameCell (CELL {prio = prio1, ...}, CELL {prio = prio2, ...}) =
prio1 = prio2
(* bump a priority value by one, returning the old value *)
fun bumpPriority (p as ref n) = (p := n+1; n)
(* functions to clean channel input and output queues *)
local
fun cleaner (TXID txst, _) =
case !txst of CANCEL => true | _ => false
in
fun cleanAndDeque q =
Q.cleanAndDeque (q, cleaner)
fun enqueAndClean (q, item) =
Q.enqueAndClean (q, item, cleaner)
end
(* When a thread is resumed after being blocked on an iGet or mGet operation,
* there may be other threads also blocked on the variable. This function
* is used to propagate the message to all of the threads that are blocked
* on the variable (or until one of them takes the value in the mvar case).
* It must be called from an atomic region; when the readQ is finally empty,
* we leave the atomic region. We must use "cleanAndDeque" to get items
* from the readQ in the unlikely event that a single thread executes a
* choice of multiple gets on the same variable.
*)
fun relayMsg (readQ, msg) =
case (cleanAndDeque readQ) of
NONE => S.atomicEnd()
| SOME (txid, t) =>
S.readyAndSwitch
(fn () =>
(TransID.force txid
; (t, msg)))
(** G-variables **)
(* Generalized synchronized variables,
* to factor out the common operations.
*)
fun gPut (name, CELL {prio, readQ, value}, x) =
let
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name])
val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
val () = S.atomicBegin()
val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
val () =
case !value of
NONE =>
let
val () = debug (fn () => concat [name, "(3.1.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.1.1)"], SOME 1)
val () = value := SOME x
val () =
case cleanAndDeque readQ of
NONE => S.atomicEnd ()
| SOME (rtxid, rt) =>
S.readyAndSwitch
(fn () =>
(prio := 1
; TransID.force rtxid
; (rt, x)))
val () = debug (fn () => concat [name, "(3.1.2)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.1.2)"])
in
()
end
| SOME _ =>
let
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val () = S.atomicEnd ()
val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
in
raise Put
end
val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
in
()
end
(* Swap the current contents of the cell with a new value;
* it is guaranteed to be atomic.
*)
fun gSwap (name, doSwap, CELL {prio, readQ, value}) =
let
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""])
val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
val () = S.atomicBegin()
val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
val msg =
case !value of
NONE =>
let
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val msg =
S.atomicSwitchToNext
(fn rt => enqueAndClean (readQ, (TransID.mkTxId (), rt)))
val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
val () = doSwap value
val () = relayMsg (readQ, msg)
val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
in
msg
end
| SOME x =>
let
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val () = prio := 1
val () = doSwap value
val () = S.atomicEnd ()
val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
in
x
end
val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
in
msg
end
fun gSwapEvt (name, doSwap, CELL{prio, readQ, value}) =
let
fun doitFn () =
let
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".doitFn"], NONE)
val x = valOf (!value)
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val () = prio := 1
val () = doSwap value
val () = S.atomicEnd ()
val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
in
x
end
fun blockFn {transId, cleanUp, next} =
let
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".blockFn"], NONE)
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val msg =
S.atomicSwitch
(fn rt =>
(enqueAndClean (readQ, (transId, rt))
; (next (), ())))
val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
val () = cleanUp ()
val () = doSwap value
val () = relayMsg (readQ, msg)
val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
in
msg
end
fun pollFn () =
let
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".pollFn"], NONE)
val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
in
case !value of
NONE => E.blocked blockFn
| SOME _ => E.enabled {prio = bumpPriority prio,
doitFn = doitFn}
end
in
E.bevt pollFn
end
fun gSwapPoll (name, doSwap, CELL{prio, value, ...}) =
let
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""])
val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
val () = S.atomicBegin()
val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
val msg =
case !value of
NONE =>
let
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val msg = NONE
val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
val () = S.atomicEnd ()
val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
in
msg
end
| SOME x =>
let
val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
val () = prio := 1
val () = doSwap value
val () = S.atomicEnd ()
val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
in
SOME x
end
val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
in
msg
end
(** I-variables **)
val iVar = newCell
val sameIVar = sameCell
fun iPut (cell, x) = gPut ("iPut", cell, x)
local fun doGetSwap _ = ()
in
fun iGet cell = gSwap ("iGet", doGetSwap, cell)
fun iGetEvt cell = gSwapEvt ("iGetEvt", doGetSwap, cell)
fun iGetPoll cell = gSwapPoll ("iGetPoll", doGetSwap, cell)
end
(** M-variables **)
val mVar = newCell
fun mVarInit x = CELL {prio = ref 0, readQ = Q.new(), value = ref (SOME x)}
val sameMVar = sameCell
fun mPut (cell, x) = gPut ("mPut", cell, x)
local fun doTakeSwap value = value := NONE
in
fun mTake cell = gSwap ("mTake", doTakeSwap, cell)
fun mTakeEvt cell = gSwapEvt ("mTakeEvt", doTakeSwap, cell)
fun mTakePoll cell = gSwapPoll ("mTakePoll", doTakeSwap, cell)
end
local fun doGetSwap _ = ()
in
fun mGet cell = gSwap ("mGet", doGetSwap, cell)
fun mGetEvt cell = gSwapEvt ("mGetEvt", doGetSwap, cell)
fun mGetPoll cell = gSwapPoll ("mGetPoll", doGetSwap, cell)
end
local fun doSwapSwap x value = value := SOME x
in
fun mSwap (cell, x) = gSwap ("mSwap", doSwapSwap x, cell)
fun mSwapEvt (cell, x) = gSwapEvt ("mSwap", doSwapSwap x, cell)
end
end
1.1 mlton/lib/cml/core-cml/thread-id.sig
Index: thread-id.sig
===================================================================
(* thread-id.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* threads-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
type word = Word.word
signature THREAD_ID =
sig
datatype thread_id = datatype RepTypes.thread_id
val sameTid : (thread_id * thread_id) -> bool
val compareTid : (thread_id * thread_id) -> order
val hashTid : thread_id -> word
val tidToString : thread_id -> string
end
signature THREAD_ID_EXTRA =
sig
include THREAD_ID
val new : unit -> thread_id
val bogus : string -> thread_id
val mark : thread_id -> unit
val unmark : thread_id -> unit
val isMarked : thread_id -> bool
val reset : unit -> unit
end
1.1 mlton/lib/cml/core-cml/thread-id.sml
Index: thread-id.sml
===================================================================
(* thread.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* thread.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure ThreadID : THREAD_ID_EXTRA =
struct
structure Assert = LocalAssert(val assert = true)
structure R = RepTypes
datatype thread_id = datatype R.thread_id
fun sameTid (TID{id=a, ...}, TID{id=b, ...}) = a = b
fun compareTid (TID{id=a, ...}, TID{id=b, ...}) = Int.compare (a, b)
fun hashTid (TID{id, ...}) = Word.fromInt id
fun tidToString (TID{id, ...}) =
concat["[", StringCvt.padLeft #"0" 6 (Int.toString id), "]"]
fun exnHandler (_ : exn) = ()
val defaultExnHandler = ref exnHandler
fun new' n =
TID {id = n,
alert = ref false,
done_comm = ref false,
exnHandler = ref (!defaultExnHandler),
props = ref [],
dead = CVar.new ()}
local
val tidCounter = ref 0
in
fun new () =
let
val _ = Assert.assertAtomic' ("ThreadID.newTid", NONE)
val n = !tidCounter
val _ = tidCounter := n + 1
in
new' n
end
fun reset () = tidCounter := 0
end
fun bogus s =
let val n = CharVector.foldr (fn (c, n) => 2 * n - Char.ord c) 0 s
in new' n
end
fun mark (TID{done_comm, ...}) =
(Assert.assertAtomic' ("ThreadID.mark", NONE)
; done_comm := true)
fun unmark (TID{done_comm, ...}) =
(Assert.assertAtomic' ("ThreadID.unmark", NONE)
; done_comm := false)
fun isMarked (TID{done_comm, ...}) = !done_comm
end
1.1 mlton/lib/cml/core-cml/thread.sig
Index: thread.sig
===================================================================
(* thread.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* threads-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature THREAD =
sig
include THREAD_ID
val getTid : unit -> thread_id
val spawnc : ('a -> unit) -> 'a -> thread_id
val spawn : (unit -> unit) -> thread_id
val exit : unit -> 'a
val yield : unit -> unit (* mostly for benchmarking *)
val joinEvt : thread_id -> unit Event.event
(* thread-local data *)
val newThreadProp : (unit -> 'a) ->
{
clrFn : unit -> unit, (* clear's current thread's property *)
getFn : unit -> 'a, (* get current thread's property; if *)
(* the property is not defined, then *)
(* it sets it using the initialization *)
(* function. *)
peekFn : unit -> 'a option, (* return the property's value, if any *)
setFn : 'a -> unit (* set the property's value for the *)
(* current thread. *)
}
val newThreadFlag : unit -> {getFn : unit -> bool, setFn : bool -> unit}
end
1.1 mlton/lib/cml/core-cml/thread.sml
Index: thread.sml
===================================================================
(* thread.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* thread.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure Thread : THREAD =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure S = Scheduler
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
fun debug' msg = debug (fn () => msg)
open ThreadID
fun generalExit (tid', clr') =
let
val () = Assert.assertNonAtomic' "Thread.generalExit"
val () = debug' "generalExit" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Thread.generalExit"
in
S.switchToNext
(fn t =>
let
val tid as TID {dead, props, ...} = S.getThreadId t
val () = Assert.assert ([], fn () =>
concat ["Thread.generalExit ",
Option.getOpt (Option.map tidToString tid', "NONE"),
" <> ",
tidToString tid], fn () =>
case tid' of NONE => true
| SOME tid' => sameTid (tid', tid))
val () = if clr' then props := [] else ()
val () = Event.atomicCVarSet dead
in
()
end)
end
fun doHandler (TID {exnHandler, ...}, exn) =
(debug (fn () => concat ["Exception: ", exnName exn, " : ", exnMessage exn])
; ((!exnHandler) exn) handle _ => ())
fun spawnc f x =
let
val () = S.atomicBegin ()
fun thread tid () =
((f x) handle ex => doHandler (tid, ex)
; generalExit (SOME tid, false))
val t = S.new thread
val tid = S.getThreadId t
val _ = S.ready t
val () = S.atomicEnd ()
val () = debug (fn () => concat ["spawnc ", tidToString tid]) (* NonAtomic *)
in
tid
end
fun spawn f = spawnc f ()
fun joinEvt (TID{dead, ...}) = Event.cvarGetEvt dead
val getTid = S.getCurThreadId
fun exit () =
let
val () = Assert.assertNonAtomic' "Thread.exit"
val () = debug' "exit" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Thread.exit"
in
generalExit (NONE, true)
end
fun yield () =
let
val () = Assert.assertNonAtomic' "Thread.exit"
val () = debug' "yield" (* NonAtomic *)
val () = Assert.assertNonAtomic' "Thread.yield"
in
S.readyAndSwitchToNext (fn () => ())
end
(* thread-local data *)
local
fun mkProp () =
let
exception E of 'a
fun cons (a, l) = E a :: l
fun peek [] = NONE
| peek (E a :: _) = SOME a
| peek (_ :: l) = peek l
fun delete [] = []
| delete (E _ :: r) = r
| delete (x :: r) = x :: delete r
in
{cons = cons,
peek = peek,
delete = delete}
end
fun mkFlag () =
let
exception E
fun peek [] = false
| peek (E :: _) = true
| peek (_ :: l) = peek l
fun set (l, flg) =
let
fun set ([], _) = if flg then E::l else l
| set (E::r, xs) = if flg then l else List.revAppend(xs, r)
| set (x::r, xs) = set (r, x::xs)
in
set (l, [])
end
in
{set = set,
peek = peek}
end
fun getProps () =
let val TID {props, ...} = getTid ()
in props
end
in
fun newThreadProp (init : unit -> 'b) =
let
val {peek, cons, delete} = mkProp()
fun peekFn () = peek(!(getProps()))
fun getF () =
let val h = getProps()
in
case peek(!h) of
NONE => let val b = init()
in h := cons(b, !h); b
end
| (SOME b) => b
end
fun clrF () =
let val h = getProps()
in h := delete(!h)
end
fun setFn x =
let val h = getProps()
in h := cons(x, delete(!h))
end
in
{peekFn = peekFn,
getFn = getF,
clrFn = clrF,
setFn = setFn}
end
fun newThreadFlag () =
let
val {peek, set} = mkFlag()
fun getF ()= peek(!(getProps()))
fun setF flg =
let val h = getProps()
in h := set(!h, flg)
end
in
{getFn = getF,
setFn = setF}
end
end
end
1.1 mlton/lib/cml/core-cml/timeout.sig
Index: timeout.sig
===================================================================
(* timeout.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* timeout-sig.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* Exported interface for timeout synchronization.
*)
signature TIME_OUT =
sig
val timeOutEvt : Time.time -> unit Event.event
val atTimeEvt : Time.time -> unit Event.event
end
signature TIME_OUT_EXTRA =
sig
include TIME_OUT
val reset : unit -> unit
(* preepmt () == NONE ==> no waiting threads
* preepmt () == SOME NONE ==> enqueued a waiting thread
* preepmt () == SOME (SOME t) ==> a waiting thread will be ready in t time
*)
val preempt : unit -> Time.time option option
end
1.1 mlton/lib/cml/core-cml/timeout.sml
Index: timeout.sml
===================================================================
(* timeout.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* timeout.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* Events for synchronizing on timeouts.
*)
structure TimeOut : TIME_OUT_EXTRA =
struct
structure Assert = LocalAssert(val assert = true)
structure Debug = LocalDebug(val debug = false)
structure S = Scheduler
structure E = Event
fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
fun debug' msg = debug (fn () => msg)
datatype trans_id = datatype TransID.trans_id
datatype trans_id_state = datatype TransID.trans_id_state
(* this holds an approximation of the current time of day. It is
* cleared at each pre-emption, and initialized on demand (by getTime).
*)
val clock = ref NONE
(* returns an approximation of the current time of day
* (this is at least as accurate as the time quantum).
*)
fun getTime () =
case !clock of
NONE => let val t = Time.now()
in clock := SOME t; t
end
| SOME t => t
fun preemptTime () = clock := NONE
(* The queue of threads waiting for timeouts.
* It is sorted in increasing order of time value.
*)
structure TQ = FunPriorityQueue(structure Key = struct open Time type t = time end)
type item = trans_id * (unit -> unit) * S.rdy_thread
val timeQ : item TQ.t ref = ref (TQ.new ())
fun cleaner readied elt =
let
val now = getTime ()
val (TXID txst, cleanUp, t) = TQ.Elt.value elt
in
case !txst of
CANCEL => true
| _ => if Time.<=(TQ.Elt.key elt, now)
then (readied ()
; S.ready t
; cleanUp ()
; true)
else false
end
fun timeWait (time, txid, cleanUp, t) =
(Assert.assertAtomic' ("TimeOut.timeWait", NONE)
; timeQ := TQ.enqueAndClean(!timeQ, time, (txid, cleanUp, t), cleaner (fn () => ())))
(** NOTE: unlike for most base events, the block functions of time-out
** events do not have to exit the atomic region or execute the clean-up
** operation. This is done when they are removed from the waiting queue.
**)
fun timeOutEvt time =
let
fun blockFn {transId, cleanUp, next} =
let
val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.blockFn", NONE)
val () = debug' "timeOutEvt(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(3.2.1)", SOME 1)
val () =
S.atomicSwitch
(fn t =>
(timeWait (Time.+(time, getTime ()), transId, cleanUp, t)
; (next (), ())))
val () = debug' "timeOutEvt(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "TimeOut.timeOutEvt(3.2.3)"
in
()
end
fun pollFn () =
let
val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.pollFn", NONE)
val () = debug' "timeOutEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(2)", SOME 1)
in
if Time.<=(time, Time.zeroTime)
then E.enabled {prio = ~1, doitFn = S.atomicEnd}
else E.blocked blockFn
end
in
E.bevt pollFn
end
fun atTimeEvt time =
let
fun blockFn {transId, cleanUp, next} =
let
val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.blockFn", NONE)
val () = debug' "atTimeEvt(3.2.1)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(3.2.1)", SOME 1)
val () =
S.atomicSwitch
(fn t =>
(timeWait (time, transId, cleanUp, t)
; (next (), ())))
val () = debug' "atTimeEvt(3.2.3)" (* NonAtomic *)
val () = Assert.assertNonAtomic' "TimeOut.atTimeEvt(3.2.3)"
in
()
end
fun pollFn () =
let
val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.pollFn", NONE)
val () = debug' "atTimeEvt(2)" (* Atomic 1 *)
val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(2)", SOME 1)
in
if Time.<=(time, getTime())
then E.enabled {prio = ~1, doitFn = S.atomicEnd}
else E.blocked blockFn
end
in
E.bevt pollFn
end
(* reset various pieces of state *)
fun reset () = timeQ := TQ.new ()
(* what to do at a preemption *)
fun preempt () : Time.time option option =
let
val () = Assert.assertAtomic' ("TimeOut.preempt", NONE)
val () = debug' "TimeOut.preempt" (* Atomic 1 *)
val () = Assert.assertAtomic' ("TimeOut.preempt", SOME 1)
val () = preemptTime ()
val timeQ' = !timeQ
in
if TQ.empty timeQ'
then NONE
else let
val readied = ref false
val timeQ' = TQ.clean (timeQ', cleaner (fn () => readied := true))
val () = timeQ := timeQ'
in
if !readied
then SOME NONE
else case TQ.peek timeQ' of
NONE => NONE
| SOME elt => SOME(SOME(Time.-(TQ.Elt.key elt, getTime ())))
end
end
end
1.1 mlton/lib/cml/core-cml/trans-id.sig
Index: trans-id.sig
===================================================================
(* trans-id.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* ???
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature TRANS_ID =
sig
datatype trans_id = datatype RepTypes.trans_id
datatype trans_id_state = datatype RepTypes.trans_id_state
(* create a new transaction ID. *)
val mkTxId : unit -> trans_id
(* create a transaction flag (ID and cleanUp). *)
val mkFlg : unit -> (trans_id * (unit -> unit))
(* given a transaction ID, mark it cancelled. *)
val force : trans_id -> unit
val toString : trans_id -> string
end
1.1 mlton/lib/cml/core-cml/trans-id.sml
Index: trans-id.sml
===================================================================
(* trans-id.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* ???
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
structure TransID : TRANS_ID =
struct
structure Assert = LocalAssert(val assert = true)
structure R = RepTypes
(* Transaction IDs are used to mark blocked threads in the various waiting
* queues. They are "cancelled" when some other event is selected.
*)
datatype trans_id = datatype R.trans_id
datatype trans_id_state = datatype R.trans_id_state
(* create a new transaction ID. *)
fun mkTxId () = TXID(ref TRANS)
(* create a transaction flag (ID and cleanUp). *)
fun mkFlg () =
let
val txid as TXID txst = mkTxId ()
val cleanUp = fn () =>
(Assert.assertAtomic' ("TransID.mkFlg.cleanUp", NONE)
; txst := CANCEL)
in
(txid, cleanUp)
end
(* given a transaction ID, mark it cancelled. *)
fun force (TXID txst) =
(Assert.assertAtomic' ("TransID.force", NONE)
; case !txst of
TRANS => txst := CANCEL
| CANCEL => raise Fail "TransID.force")
fun toString (TXID txst) =
case !txst of
TRANS => "TRANS"
| CANCEL => "CANCEL"
end
1.1 mlton/lib/cml/core-cml/version.sig
Index: version.sig
===================================================================
(* version.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* ???
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*)
signature VERSION =
sig
val version : {system : string, version_id : int list, date : string}
val banner : string
end
1.1 mlton/lib/cml/core-cml/version.sml
Index: version.sml
===================================================================
(* version.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* version.sml
*
* COPYRIGHT (c) 1996 AT&T Research.
*)
structure Version : VERSION =
struct
val version = {
system = "Concurrent ML (MLton)",
version_id = [1, 0, 10],
date = "March, 2004"
}
fun f ([], l) = l
| f ([x], l) = (Int.toString x)::l
| f (x::r, l) = (Int.toString x) :: "." :: f(r, l)
val banner =
concat (
#system version ::
", Version " ::
f (#version_id version, [", ", #date version])
)
end
1.1 mlton/lib/cml/tests/exit.cm
Index: exit.cm
===================================================================
Group is
../cml.cm
exit.sml
run-main.sml
1.1 mlton/lib/cml/tests/exit.sml
Index: exit.sml
===================================================================
structure Main =
struct
open CML
fun doit _ =
CML.exit ()
end
1.1 mlton/lib/cml/tests/primes-multicast.cm
Index: primes-multicast.cm
===================================================================
Group is
../cml.cm
primes-multicast.sml
run-main.sml
1.1 mlton/lib/cml/tests/primes-multicast.sml
Index: primes-multicast.sml
===================================================================
structure Main =
struct
open CML
structure MC = Multicast
fun makeNatStream c =
let
val mch = MC.mChannel ()
fun count i = (MC.multicast(mch, i)
; count(i+1))
val _ = spawn (fn () =>
(print (concat ["makeNatStream: ",
tidToString (getTid ()),
"\n"])
; count c))
in
mch
end
fun makeFilter (p, inMCh) =
let
val inP = MC.port inMCh
val outMCh = MC.mChannel ()
fun loop () =
let
val i = sync (MC.recvEvt inP)
in
if ((i mod p) <> 0)
then MC.multicast(outMCh, i)
else ()
; loop ()
end
val _ = spawn loop
in
outMCh
end
fun makePrimes () =
let
val primes = MC.mChannel ()
fun head mch =
let
val p = MC.recv (MC.port mch)
in
MC.multicast(primes, p)
; head (makeFilter (p, mch))
end
val _ = spawn (fn () =>
(print (concat ["makePrimes: ",
tidToString (getTid ()),
"\n"])
; head (makeNatStream 2)))
in
primes
end
fun makeNatPrinter mch n =
let
val p = MC.port mch
fun loop i =
if i > n then RunCML.shutdown OS.Process.success
else let
val m = MC.recv p
val m' = Int.toString m
fun loop' j =
if j > m then ()
else (print (m' ^ "\n")
; loop' (j + 1))
in
loop' m
; loop (i + 1)
end
val _ = spawn (fn () =>
(print (concat ["makeNatPrinter: ",
tidToString (getTid ()),
"\n"])
; loop 0))
in
()
end
fun doit' n =
RunCML.doit
(fn () =>
let
val mch = makePrimes ()
val _ = makeNatPrinter mch n
in
()
end,
SOME (Time.fromMilliseconds 10))
fun doit n =
let
val x = doit' n
in
x
end
end
1.1 mlton/lib/cml/tests/primes.cm
Index: primes.cm
===================================================================
Group is
../cml.cm
primes.sml
run-main.sml
1.1 mlton/lib/cml/tests/primes.sml
Index: primes.sml
===================================================================
structure Main =
struct
open CML
fun makeNatStream c =
let
val ch = channel ()
fun count i = (send(ch, i)
; count(i+1))
val _ = spawn (fn () =>
(print (concat ["makeNatStream: ",
tidToString (getTid ()),
"\n"])
; count c))
in
ch
end
fun makeFilter (p, inCh) =
let
val outCh = channel ()
fun loop () =
let
val i = sync (recvEvt inCh)
in
if ((i mod p) <> 0)
then sync (sendEvt (outCh, i))
else ()
; loop ()
end
val _ = spawn loop
in
outCh
end
fun makePrimes () =
let
val primes = channel ()
fun head ch =
let val p = recv ch
in
send(primes, p)
; head (makeFilter (p, ch))
end
val _ = spawn (fn () =>
(print (concat ["makePrimes: ",
tidToString (getTid ()),
"\n"])
; head (makeNatStream 2)))
in
primes
end
fun makeNatPrinter ch n =
let
fun loop i =
if i > n then RunCML.shutdown OS.Process.success
else let
val m = recv ch
val m' = Int.toString m
fun loop' j =
if j > m then ()
else (print (m' ^ "\n")
; loop' (j + 1))
in
loop' m
; loop (i + 1)
end
val _ = spawn (fn () =>
(print (concat ["makeNatPrinter: ",
tidToString (getTid ()),
"\n"])
; loop 0))
in
()
end
fun doit' n =
RunCML.doit
(fn () =>
let
val ch = makePrimes ()
val _ = makeNatPrinter ch n
in
()
end,
SOME (Time.fromMilliseconds 10))
fun doit n =
let
val x = doit' n
in
x
end
end
1.1 mlton/lib/cml/tests/run-main.sml
Index: run-main.sml
===================================================================
val n =
case CommandLine.arguments () of
[] => 100
| s::_ => (case Int.fromString s of
NONE => 100
| SOME n => n)
val ts = Time.now ()
val _ = Main.doit n
val te = Time.now ()
val d = Time.-(te, ts)
val _ = TextIO.print (concat ["Time start: ", Time.toString ts, "\n"])
val _ = TextIO.print (concat ["Time end: ", Time.toString te, "\n"])
val _ = TextIO.print (concat ["Time diff: ", LargeInt.toString (Time.toMilliseconds d), "ms\n"])
1.1 mlton/lib/cml/tests/timeout.cm
Index: timeout.cm
===================================================================
Group is
../cml.cm
timeout.sml
run-main.sml
1.1 mlton/lib/cml/tests/timeout.sml
Index: timeout.sml
===================================================================
structure Main =
struct
open CML
fun doit' n =
RunCML.doit
(fn () =>
let
fun make m () =
(print (concat ["make: ", Int.toString m, " ",
tidToString (getTid ()), "\n"])
; sync (timeOutEvt (Time.fromSeconds (Int.toLarge m)))
; print (concat ["finish: ", Int.toString m, " ",
tidToString (getTid ()), "\n"]))
fun loop m =
if m <= 0
then ()
else let
val _ = spawn (make m)
in
loop (m - 10)
end
in
loop n
end,
SOME (Time.fromMilliseconds 10))
fun doit n =
let
val x = doit' n
in
x
end
end
1.1 mlton/lib/cml/util/assert.sig
Index: assert.sig
===================================================================
(* assert.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
signature ASSERT =
sig
val assert: (unit -> string) list * (unit -> string) * (unit -> bool) -> unit
val assert': string * (unit -> bool) -> unit
val assertAtomic: (unit -> string) * int option -> unit
val assertNonAtomic: (unit -> string) -> unit
val assertAtomic': string * int option -> unit
val assertNonAtomic': string -> unit
end
1.1 mlton/lib/cml/util/assert.sml
Index: assert.sml
===================================================================
(* assert.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
structure Assert: ASSERT =
struct
structure C = Critical
val assertFlg = true
fun fail msg =
(C.atomicBegin ();
TextIO.print (concat ["ASSERT: ", msg, "\n"]);
OS.Process.exit OS.Process.failure)
fun assert (msgs: (unit -> string) list,
msg: unit -> string,
f: unit -> bool): unit =
if assertFlg andalso not (f () handle _ => false)
then let
val msgs = List.map (fn f => f ()) msgs
val msg = concat [String.concatWith " " msgs, " :: ", msg ()]
in
fail msg
end
else ()
fun assert' (msg: string, f: unit -> bool): unit =
assert ([], fn () => msg, f)
datatype z = datatype MLton.Thread.AtomicState.t
fun assertAtomic (msg: unit -> string, n: int option): unit =
assert ([C.atomicMsg], msg, fn () =>
case MLton.Thread.atomicState () of
Atomic m => (case n of NONE => true | SOME n => n = m)
| NonAtomic => false)
fun assertNonAtomic (msg: unit -> string): unit =
assert ([C.atomicMsg], msg, fn () =>
case MLton.Thread.atomicState () of
Atomic _ => false
| NonAtomic => true)
fun assertAtomic' (msg, n) = assertAtomic (fn () => msg, n)
fun assertNonAtomic' msg = assertNonAtomic (fn () => msg)
end
1.1 mlton/lib/cml/util/critical.sig
Index: critical.sig
===================================================================
(* critical.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
signature CRITICAL =
sig
val atomicBegin : unit -> unit
val atomicEnd : unit -> unit
val atomicMsg : unit -> string
val doAtomic : (unit -> unit) -> unit
val maskBegin : unit -> unit
val maskEnd : unit -> unit
val doMasked : (unit -> unit) -> unit
end
1.1 mlton/lib/cml/util/critical.sml
Index: critical.sml
===================================================================
(* critical.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
structure Critical : CRITICAL =
struct
structure Thread = MLton.Thread
structure AtomicState = MLton.Thread.AtomicState
structure Signal = MLton.Signal
structure Itimer = MLton.Itimer
val atomicBegin = Thread.atomicBegin
val atomicEnd = Thread.atomicEnd
local datatype z = datatype Thread.AtomicState.t
in
fun atomicMsg () =
case Thread.atomicState () of
AtomicState.NonAtomic => "[NonAtomic]"
| AtomicState.Atomic n => concat ["[ Atomic ", Int.toString n, "]"]
end
fun doAtomic f = (atomicBegin (); f (); atomicEnd ())
val mask = Signal.Mask.some [Itimer.signal Itimer.Real]
fun maskBegin () = Signal.Mask.block mask
fun maskEnd () = Signal.Mask.unblock mask
fun doMasked f = (maskBegin (); f (); maskEnd ())
end
1.1 mlton/lib/cml/util/debug.sig
Index: debug.sig
===================================================================
(* debug.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* debug.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* Debugging support for the CML core.
*)
signature DEBUG =
sig
val sayDebug : (unit -> string) list * (unit -> string) -> unit
val sayDebug' : string -> unit
end
1.1 mlton/lib/cml/util/debug.sml
Index: debug.sml
===================================================================
(* debug.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
(* debug.sml
*
* COPYRIGHT (c) 1995 AT&T Bell Laboratories.
* COPYRIGHT (c) 1989-1991 John H. Reppy
*
* Debugging support for the CML core.
*)
structure Debug : DEBUG =
struct
structure C = Critical
val debugFlg = true
fun sayDebug (msgs: (unit -> string) list,
msg: unit -> string) =
if debugFlg
then let
val msgs = List.map (fn f => f ()) msgs
val msg = concat [String.concatWith " " msgs, " :: ", msg ()]
in
C.atomicBegin ();
TextIO.print (concat [msg, "\n"]);
C.atomicEnd ()
end
else ()
fun sayDebug' (msg: string) = sayDebug ([], fn () => msg)
end
1.1 mlton/lib/cml/util/fun-priority-queue.fun
Index: fun-priority-queue.fun
===================================================================
(* fun-queue.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
functor FunPriorityQueue(S: FUN_PRIORITY_QUEUE_ARG) :
FUN_PRIORITY_QUEUE where type Key.t = S.Key.t =
struct
open S
structure Elt =
struct
datatype 'a t = T of Key.t * 'a
fun key (T (k, _)) = k
fun value (T (_, v)) = v
end
datatype 'a t = T of 'a Elt.t list
local
fun filterPrefix (xs, p) =
case xs of
[] => []
| y::ys => if p y
then filterPrefix (ys, p)
else xs
fun filter (xs, p) = List.filter (not o p) xs
in
fun cleanPrefix (T xs, p) = T (filterPrefix (xs, p))
fun clean (T xs, p) = T (filter (xs, p))
end
fun deque (T xs) =
(case xs of
[] => NONE
| x::xs => SOME (x, T xs))
fun cleanAndDeque (q, p) =
let
val q = clean (q, p)
in
case deque q of
NONE => (NONE, q)
| SOME (x, q) => (SOME x, q)
end
fun empty (T xs) =
(case xs of
[] => true
| _ => false)
fun enque (T xs, k', v') =
let
val x' = Elt.T (k', v')
fun loop (xs, ys) =
case xs of
[] => List.revAppend(ys, [x'])
| (z as Elt.T (k, _))::zs =>
(case Key.compare (k, k') of
GREATER => List.revAppend(ys, x'::xs)
| _ => loop(zs, z::ys))
in
T (loop (xs, []))
end
fun enqueAndClean (q, k, v, p) =
clean (enque (q, k, v), p)
fun new () = T []
fun peek (T xs) =
(case xs of
[] => NONE
| elt::_ => SOME elt)
end
1.1 mlton/lib/cml/util/fun-priority-queue.sig
Index: fun-priority-queue.sig
===================================================================
(* fun-prio-queue.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
signature FUN_PRIORITY_QUEUE_ARG =
sig
structure Key :
sig
type t
val compare : t * t -> order
end
end
signature FUN_PRIORITY_QUEUE =
sig
include FUN_PRIORITY_QUEUE_ARG
structure Elt:
sig
type 'a t
val key: 'a t -> Key.t
val value: 'a t -> 'a
end
type 'a t
val clean: 'a t * ('a Elt.t -> bool) -> 'a t
val cleanAndDeque: 'a t * ('a Elt.t -> bool) -> 'a Elt.t option * 'a t
val cleanPrefix: 'a t * ('a Elt.t -> bool) -> 'a t
val deque: 'a t -> ('a Elt.t * 'a t) option
val empty: 'a t -> bool
val enque: 'a t * Key.t * 'a -> 'a t
val enqueAndClean: 'a t * Key.t * 'a * ('a Elt.t -> bool) -> 'a t
val new: unit -> 'a t
val peek: 'a t -> 'a Elt.t option
end
1.1 mlton/lib/cml/util/fun-queue.sig
Index: fun-queue.sig
===================================================================
(* fun-queue.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
signature FUN_QUEUE =
sig
type 'a t
val clean: 'a t * ('a -> bool) -> 'a t
val cleanAndDeque: 'a t * ('a -> bool) -> 'a option * 'a t
val cleanPrefix: 'a t * ('a -> bool) -> 'a t
val deque: 'a t -> ('a * 'a t) option
val empty: 'a t -> bool
val enque: 'a t * 'a -> 'a t
val enqueAndClean: 'a t * 'a * ('a -> bool) -> 'a t
val new: unit -> 'a t
val peek: 'a t -> 'a option
end
1.1 mlton/lib/cml/util/fun-queue.sml
Index: fun-queue.sml
===================================================================
(* fun-queue.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
structure FunQueue : FUN_QUEUE =
struct
datatype 'a t = T of {front: 'a list, back: 'a list}
local
fun filterPrefix (xs, p) =
case xs of
[] => []
| y::ys => if p y
then filterPrefix (ys, p)
else xs
fun filter (xs, p) = List.filter (not o p) xs
fun filterRevAcc ((xs, zs), p) =
case xs of
[] => zs
| y::ys => if p y
then filterRevAcc ((ys, zs), p)
else filterRevAcc ((ys, y::zs), p)
fun filterRev (xs, p) = filterRevAcc ((xs, []), p)
in
fun cleanPrefix (T {front, back}, p) =
(case filterPrefix (front, p) of
[] => T {front = filterPrefix (List.rev(back), p),
back = []}
| front' => T {front = front',
back = back})
fun clean (T {front, back}, p) =
(case filter (front, p) of
[] => T {front = filterRev (back, p),
back = []}
| front' => T {front = front',
back = filter (back, p)})
fun cleanAndDeque (T {front, back}, p) =
(case filter (front, p) of
[] => (case filterRev(back, p) of
[] => (NONE,
T {front = [],
back = []})
| x::front' => (SOME x,
T {front = front',
back = []}))
| [x] => (SOME x,
T {front = filterRev (back, p),
back = []})
| x::front' => (SOME x,
T {front = front',
back = filter (back, p)}))
end
fun deque (T {front, back}) =
(case front of
[] => (case back of
[] => NONE
| l => let val l = List.rev l
in
case l of
[] => raise Fail "FunQueue.deque:impossible"
| x::front' =>
SOME (x,
T {front = front',
back = []})
end)
| x::front' => SOME (x, T {front = front', back = back}))
fun empty (T {front, back}) =
(case front of
[] => (case back of
[] => true
| _ => false)
| _ => false)
fun enque (T {front, back, ...}, x) =
T {front = front, back = x::back}
fun enqueAndClean (q, y, p) =
clean (enque (q, y), p)
fun new () = T {front = [], back = []}
fun peek (T {front, back}) =
(case front of
[] => (case back of
[] => NONE
| l => let val l = List.rev l
in
case l of
[] => raise Fail "FunQueue.peek:impossible"
| x::_ => SOME x
end)
| x::_ => SOME x)
end
1.1 mlton/lib/cml/util/imp-queue.sig
Index: imp-queue.sig
===================================================================
(* imp-queue.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
signature IMP_QUEUE =
sig
type 'a t
val clean: 'a t * ('a -> bool) -> unit
val cleanAndDeque: 'a t * ('a -> bool) -> 'a option
val cleanPrefix: 'a t * ('a -> bool) -> unit
val deque: 'a t -> 'a option
val empty: 'a t -> bool
val enque: 'a t * 'a -> unit
val enqueAndClean: 'a t * 'a * ('a -> bool) -> unit
val new: unit -> 'a t
val peek: 'a t -> 'a option
val reset: 'a t -> unit
end
1.1 mlton/lib/cml/util/imp-queue.sml
Index: imp-queue.sml
===================================================================
(* imp-queue.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
structure ImpQueue : IMP_QUEUE =
struct
datatype 'a t = T of {front: 'a list ref, back: 'a list ref}
local
fun filterPrefix (xs, p) =
case xs of
[] => []
| y::ys => if p y
then filterPrefix (ys, p)
else xs
fun filter (xs, p) = List.filter (not o p) xs
fun filterRevAcc ((xs, zs), p) =
case xs of
[] => zs
| y::ys => if p y
then filterRevAcc ((ys, zs), p)
else filterRevAcc ((ys, y::zs), p)
fun filterRev (xs, p) = filterRevAcc ((xs, []), p)
in
fun cleanPrefix (T {front, back}, p) =
(Assert.assertAtomic' ("ImpQueue.cleanPrefix", NONE)
; case filterPrefix (!front, p) of
[] => (front := filterPrefix (List.rev(!back), p)
; back := [])
| front' => front := front')
fun clean (T {front, back}, p) =
(Assert.assertAtomic' ("ImpQueue.clean", NONE)
; case filter (!front, p) of
[] => (front := filterRev (!back, p)
; back := [])
| front' => (front := front'
; back := filter (!back, p)))
fun cleanAndDeque (T {front, back}, p) =
(Assert.assertAtomic' ("ImpQueue.cleanAndDeque", NONE)
; case filter (!front, p) of
[] => (case filterRev(!back, p) of
[] => (front := []
; back := []
; NONE)
| x::front' => (front := front'
; back := []
; SOME x))
| [x] => (front := filterRev (!back, p)
; back := []
; SOME x)
| x::front' => (front := front'
; back := filter (!back, p)
; SOME x))
end
fun deque (T {front, back}) =
(Assert.assertAtomic' ("ImpQueue.deque", NONE)
; case !front of
[] => (case !back of
[] => NONE
| l => let val l = List.rev l
in case l of
[] => raise Fail "ImpQueue.deque:impossible"
| x :: front' =>
(front := front'
; back := []
; SOME x)
end)
| x::front' => (front := front'; SOME x))
fun empty (T {front, back}) =
(Assert.assertAtomic' ("ImpQueue.empty", NONE)
; case !front of
[] => (case !back of
[] => true
| _ => false)
| _ => false)
fun enque (T {back, ...}, x) =
(Assert.assertAtomic' ("ImpQueue.enque", NONE)
; back := x::(!back))
fun enqueAndClean (q, y, p) =
(enque (q, y); clean (q, p))
fun new () = T {front = ref [], back = ref []}
fun peek (T {front, back}) =
(Assert.assertAtomic' ("ImpQueue.peek", NONE)
; case !front of
[] => (case !back of
[] => NONE
| l => let val l = List.rev l
in case l of
[] => raise Fail "ImpQueue.peek:impossible"
| x::front' =>
(front := x::front'
; back := []
; SOME x)
end)
| x::_ => SOME x)
fun reset (T {front, back}) =
(Assert.assertAtomic' ("ImpQueue.reset", NONE)
; front := []
; back := [])
(*
val clean = fn arg => TimeIt.timeit "ImpQueue.clean" clean arg
val cleanAndDeque = fn arg => TimeIt.timeit "ImpQueue.cleanAndDeque" cleanAndDeque arg
val cleanPrefix = fn arg => TimeIt.timeit "ImpQueue.cleanPrefix" cleanPrefix arg
val deque = fn arg => TimeIt.timeit "ImpQueue.deque" deque arg
val empty = fn arg => TimeIt.timeit "ImpQueue.empty" empty arg
val enque = fn arg => TimeIt.timeit "ImpQueue.enque" enque arg
val enqueAndClean = fn arg => TimeIt.timeit "ImpQueue.enqueAndClean" enqueAndClean arg
val new = fn arg => TimeIt.timeit "ImpQueue.new" new arg
val peek = fn arg => TimeIt.timeit "ImpQueue.peek" peek arg
val reset = fn arg => TimeIt.timeit "ImpQueue.reset" reset arg
*)
end
1.1 mlton/lib/cml/util/local-assert.fun
Index: local-assert.fun
===================================================================
(* local-assert.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
functor LocalAssert(val assert: bool): ASSERT =
struct
fun make f =
if assert then f else fn _ => ()
val assert = make Assert.assert
val assert' = make Assert.assert'
val assertAtomic = make Assert.assertAtomic
val assertNonAtomic = make Assert.assertNonAtomic
val assertAtomic' = make Assert.assertAtomic'
val assertNonAtomic' = make Assert.assertNonAtomic'
end
1.1 mlton/lib/cml/util/local-debug.fun
Index: local-debug.fun
===================================================================
(* local-debug.fun
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
functor LocalDebug(val debug: bool): DEBUG =
struct
fun make f =
if debug then f else fn _ => ()
val sayDebug' = make Debug.sayDebug'
val sayDebug = make Debug.sayDebug
end
1.1 mlton/lib/cml/util/timeit.sig
Index: timeit.sig
===================================================================
(* timeit.sig
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
signature TIMEIT =
sig
val timeit : string -> ('a -> 'b) -> 'a -> 'b
end
1.1 mlton/lib/cml/util/timeit.sml
Index: timeit.sml
===================================================================
(* timeit.sml
* 2004 Matthew Fluet (mfluet@acm.org)
* Ported to MLton threads.
*)
structure TimeIt : TIMEIT =
struct
val timeitFlg = true
fun timeit (name: string) (f: 'a -> 'b) (a: 'a) : 'b =
if timeitFlg
then let
val start = Time.now ()
fun done () =
let
val finish = Time.now ()
val diff = Time.-(finish, start)
in
Debug.sayDebug
([], fn () =>
concat [name, ": ",
LargeInt.toString (Time.toMilliseconds diff),
" ms"])
end
in
(f a before done ())
handle e => (done (); raise e)
end
else f a
end
1.1 mlton/lib/cml/util/util.cm
Index: util.cm
===================================================================
Group is
critical.sig
critical.sml
assert.sig
assert.sml
local-assert.fun
debug.sig
debug.sml
local-debug.fun
timeit.sig
timeit.sml
fun-queue.sig
fun-queue.sml
imp-queue.sig
imp-queue.sml
fun-priority-queue.sig
fun-priority-queue.fun