[MLton-commit] r5445
Stephen Weeks
sweeks at mlton.org
Sun Mar 18 15:25:24 PST 2007
Added Event, Channel, and Mailbox. The implementation of channels is
new, using a pair of streams and a dedicated helper that looks for
enabled handlers to pair up. It should easily generalize to n-way
synchronization. I'm not sure I like it better than the old
implementation, though.
----------------------------------------------------------------------
U mltonlib/trunk/com/sweeks/async/unstable/async.sig
U mltonlib/trunk/com/sweeks/async/unstable/async.sml
----------------------------------------------------------------------
Modified: mltonlib/trunk/com/sweeks/async/unstable/async.sig
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sig 2007-03-18 14:55:33 UTC (rev 5444)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sig 2007-03-18 23:25:23 UTC (rev 5445)
@@ -1,17 +1,42 @@
signature ASYNC = sig
+ exception Closed
+ exception Full
val runHandlers: Unit.t -> Unit.t
structure Deferred: sig
type 'a t
val upon: 'a t * ('a -> Unit.t) -> Unit.t
end
+ structure Event: sig
+ type 'a t
+
+ val always: 'a -> 'a t
+ val any: 'a t List.t -> 'a t
+ val commit: 'a t -> 'a Deferred.t
+ val never: Unit.t -> 'a t
+ end
+ structure Channel: sig
+ type 'a t
+
+ val give: 'a t * 'a -> Unit.t Event.t
+ val new: Unit.t -> 'a t
+ val take: 'a t -> 'a Event.t
+ end
structure Ivar: sig
type 'a t
val fill: 'a t * 'a -> Unit.t
+ (* may raise Full *)
val new: Unit.t -> 'a t
val read: 'a t -> 'a Deferred.t
end
+ structure Mailbox: sig
+ type 'a t
+
+ val new: Unit.t -> 'a t
+ val send: 'a t * 'a -> Unit.t
+ val take: 'a t -> 'a Event.t
+ end
structure Stream: sig
type 'a t
@@ -27,8 +52,10 @@
type 'a t
val close: 'a t -> Unit.t
+ (* may raise Closed *)
val new: Unit.t -> 'a t
val reader: 'a t -> 'a Stream.t
val send: 'a t * 'a -> Unit.t
+ (* may raise Closed *)
end
end
Modified: mltonlib/trunk/com/sweeks/async/unstable/async.sml
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sml 2007-03-18 14:55:33 UTC (rev 5444)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sml 2007-03-18 23:25:23 UTC (rev 5445)
@@ -1,14 +1,55 @@
structure Async: ASYNC = struct
+ exception Closed
exception Full
- val todo = ref []
+ structure Queue:> sig
+ type 'a t
- fun schedule (f, v) = todo := (fn () => f v) :: !todo
+ val deque: 'a t -> 'a Option.t
+ val enque: 'a t -> 'a -> Unit.t
+ val new: Unit.t -> 'a t
+ end = struct
+ structure Node = struct
+ datatype 'a t = T of ('a * 'a t) Option.t Ref.t
+
+ fun new () = T (ref None)
+ end
+
+ datatype 'a t = T of {back: 'a Node.t Ref.t,
+ front: 'a Node.t Ref.t}
+ fun new () =
+ let
+ val n = Node.new ()
+ in
+ T {back = ref n, front = ref n}
+ end
+
+ fun enque (T {back, ...}) = fn a =>
+ let
+ val Node.T r = !back
+ val n = Node.new ()
+ in
+ r := Some (a, n);
+ back := n
+ end
+
+ fun deque (T {front, ...}) =
+ let
+ val Node.T r = !front
+ in
+ Option.map (!r, fn (a, n) => (front := n; a))
+ end
+ end
+
+ val todo = Queue.new ()
+
+ fun schedule (f, v) = Queue.enque todo (fn () => f v)
+
fun runHandlers () =
- case !todo of
- [] => ()
- | t :: ts => (todo := ts; t (); runHandlers ())
+ case Queue.deque todo of
+ None => ()
+ | Some t => (t (); runHandlers ())
structure Deferred = struct
datatype 'a v = Filled of 'a | Unfilled of ('a -> Unit.t) List.t
@@ -40,7 +81,7 @@
fun new () = T (Ivar.new ())
- fun read (T d) = Ivar.read d
+ fun read (T i) = Ivar.read i
val recur = fn (t, b, done, step) =>
recur ((t, b), fn ((t, b), loop) =>
@@ -60,7 +101,7 @@
fun fill (T i, v) = Ivar.fill (i, v)
- fun close t = fill (t, None)
+ fun close t = fill (t, None) handle Full => raise Closed
fun extend (t, v) = let
val t' = new ()
@@ -81,6 +122,8 @@
structure Tail = struct
datatype 'a t = T of 'a Stream.t Ref.t
+ fun toStream (T r) = !r
+
fun new () = T (ref (Stream.new ()))
fun extend (t as T r, v) = r := Stream.extend (!r, v)
@@ -95,4 +138,113 @@
val send = extend
end
+
+ structure Handler: sig
+ type 'a t
+
+ val ignore: Unit.t -> 'a t
+ val isScheduled: 'a t -> Bool.t
+ val new: ('a -> Unit.t) -> 'a t
+ val maybeSchedule: 'a t * 'a -> Unit.t
+ val precompose: 'a t * ('b -> 'a) -> 'b t
+ (* It is an error to call Handler.schedule h if Handler.isScheduled h.
+ *)
+ val schedule: 'a t * 'a -> Unit.t
+ end = struct
+ datatype 'a t = T of {handler: 'a -> Unit.t,
+ isScheduled: Bool.t Ref.t}
+
+ fun new f =
+ T {handler = f,
+ isScheduled = ref false}
+
+ val ignore = fn () => new ignore
+
+ fun isScheduled (T {isScheduled = h, ...}) = !h
+
+ val schedule = fn (T {handler, isScheduled}: 'a t, a: 'a) =>
+ if !isScheduled then
+ die "Handler.schedule of handler that isScheduled"
+ else
+ (isScheduled := true; schedule (handler, a))
+
+ fun maybeSchedule (h, a) = if isScheduled h then () else schedule (h, a)
+
+ fun precompose (T {handler, isScheduled}, f) =
+ T {handler = handler o f,
+ isScheduled = isScheduled}
+ end
+
+ structure Event = struct
+ datatype 'a t = T of 'a Handler.t -> Unit.t
+ (* Invariant: we never pass a Handler that isScheduled *)
+
+ fun send (T f, h) = f h
+
+ fun commit t = let
+ val i = Ivar.new ()
+ val () = send (t, Handler.new (fn v => Ivar.fill (i, v)))
+ in
+ Ivar.read i
+ end
+
+ fun any ts =
+ T (fn h =>
+ List.recur (ts, (), ignore, fn (t, (), k) =>
+ if Handler.isScheduled h then
+ ()
+ else
+ (send (t, h); k ())))
+
+ fun always a = T (fn h => Handler.maybeSchedule (h, a))
+
+ fun never () = T ignore
+
+ fun map (t, f) = T (fn h => send (t, Handler.precompose (h, f)))
+ end
+
+ structure Channel = struct
+ datatype 'a t = T of {givers: ('a * Unit.t Handler.t) Tail.t,
+ takers: 'a Handler.t Tail.t}
+
+ fun 'a new () = let
+ val givers: ('a * Unit.t Handler.t) Tail.t = Tail.new ()
+ val takers = Tail.new ()
+ fun loop (gs, ts) =
+ upon (Stream.read gs, fn opt =>
+ Option.for (opt, fn (g, gs) => loopG (g, gs, ts)))
+ and loopG (g, gs, ts) =
+ upon (Stream.read ts, fn opt =>
+ Option.for (opt, fn (t, ts) => loopGT (g, gs, t, ts)))
+ and loopT (gs, t, ts) =
+ upon (Stream.read gs, fn opt =>
+ Option.for (opt, fn (g, gs) => loopGT (g, gs, t, ts)))
+ and loopGT (g as (a, gh), gs, t, ts) =
+ case (Handler.isScheduled gh, Handler.isScheduled t) of
+ (false, false) =>
+ (Handler.schedule (gh, ());
+ Handler.schedule (t, a);
+ loop (gs, ts))
+ | (false, true) => loopG (g, gs, ts)
+ | (true, false) => loopT (gs, t, ts)
+ | (true, true) => loop (gs, ts)
+ val () = loop (Tail.toStream givers, Tail.toStream takers)
+ in
+ T {givers = givers,
+ takers = takers}
+ end
+
+ fun give (T {givers, ...}, a) =
+ Event.T (fn h => Tail.extend (givers, (a, h)))
+
+ fun take (T {takers, ...}) =
+ Event.T (fn h => Tail.extend (takers, h))
+ end
+
+ structure Mailbox = struct
+ open Channel
+
+ fun send (T {givers, ...}, a) =
+ Tail.extend (givers, (a, Handler.ignore ()))
+ end
end
More information about the MLton-commit
mailing list