[MLton-commit] r5444
Stephen Weeks
sweeks at mlton.org
Sun Mar 18 06:55:33 PST 2007
The beginnings of a new approach to Async based on "deferred" values
and streams. A deferred value is a value that will be determined at
some point (i.e. asynchronously). It is different from a CML event in
two ways:
1. It is not a potential communication that must be committed to in
order to be enabled. Rather, it represents a computation that
has already been committed to (i.e. begun), and may yield a value
at some point.
2. Once it becomes determined with a value, it always has that
value.
The deferred type more accurately reflects the result of Ivar.read,
since an ivar never changes once it is written. It will make code
clearer to use this more precise type to capture write-once events.
A "stream" is the natural asynchronous analogue of a list -- the only
difference is that when getting the first element, one cannot be sure
whether it has been computed or not, so one receives a deferred value
instead of a normal one. Streams naturally represent the result of
reading from a multicast -- each reader gets his own stream (i.e.
a read-only copy of the rest of the values). They also make it
trivial to implement multicast -- all readers share some tail of the
same stream.
The idea going forward is that events, i.e. potential communications
that vary in time, represent potential deferred values, and that one
"commits" to an event using
val commit: 'a Event.t -> 'a Deferred.t
The notion of commiting was implicit in CML and with the old Async,
but it was tied with actually getting the value. I think it will be
clearer to make the notion explicit.
----------------------------------------------------------------------
A mltonlib/trunk/com/sweeks/async/
A mltonlib/trunk/com/sweeks/async/unstable/
A mltonlib/trunk/com/sweeks/async/unstable/async.sig
A mltonlib/trunk/com/sweeks/async/unstable/async.sml
A mltonlib/trunk/com/sweeks/async/unstable/lib.mlb
----------------------------------------------------------------------
Added: mltonlib/trunk/com/sweeks/async/unstable/async.sig
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sig 2007-03-18 14:40:52 UTC (rev 5443)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sig 2007-03-18 14:55:33 UTC (rev 5444)
@@ -0,0 +1,34 @@
+signature ASYNC = sig
+ val runHandlers: Unit.t -> Unit.t
+ structure Deferred: sig
+ type 'a t
+
+ val upon: 'a t * ('a -> Unit.t) -> Unit.t
+ end
+ structure Ivar: sig
+ type 'a t
+
+ val fill: 'a t * 'a -> Unit.t
+ val new: Unit.t -> 'a t
+ val read: 'a t -> 'a Deferred.t
+ end
+ structure Stream: sig
+ type 'a t
+
+ val fold: 'a t * 'b * ('a * 'b -> 'b) -> 'b Deferred.t
+ val for: 'a t * ('a -> Unit.t) -> Unit.t Deferred.t
+ val map: 'a t * ('a -> 'b) -> 'b t
+ val read: 'a t -> ('a * 'a t) Option.t Deferred.t
+ val recur:
+ 'a t * 'b * ('b -> Unit.t) * ('a * 'b * ('b -> Unit.t) -> Unit.t)
+ -> Unit.t
+ end
+ structure Multicast: sig
+ type 'a t
+
+ val close: 'a t -> Unit.t
+ val new: Unit.t -> 'a t
+ val reader: 'a t -> 'a Stream.t
+ val send: 'a t * 'a -> Unit.t
+ end
+end
Added: mltonlib/trunk/com/sweeks/async/unstable/async.sml
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sml 2007-03-18 14:40:52 UTC (rev 5443)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sml 2007-03-18 14:55:33 UTC (rev 5444)
@@ -0,0 +1,98 @@
+structure Async: ASYNC = struct
+ exception Full
+
+ val todo = ref []
+
+ fun schedule (f, v) = todo := (fn () => f v) :: !todo
+
+ fun runHandlers () =
+ case !todo of
+ [] => ()
+ | t :: ts => (todo := ts; t (); runHandlers ())
+
+ structure Deferred = struct
+ datatype 'a v = Filled of 'a | Unfilled of ('a -> Unit.t) List.t
+ datatype 'a t = T of 'a v Ref.t
+
+ fun upon (T r, f) =
+ case !r of
+ Filled v => schedule (f, v)
+ | Unfilled fs => r := Unfilled (f :: fs)
+ end
+
+ val upon = Deferred.upon
+
+ structure Ivar = struct
+ open Deferred
+
+ fun new () = T (ref (Unfilled []))
+
+ fun fill (T r, v) =
+ case !r of
+ Filled _ => raise Full
+ | Unfilled fs => (r := Filled v; List.for (fs, pass v))
+
+ val read = id
+ end
+
+ structure Stream = struct
+ datatype 'a t = T of ('a * 'a t) Option.t Ivar.t
+
+ fun new () = T (Ivar.new ())
+
+ fun read (T d) = Ivar.read d
+
+ val recur = fn (t, b, done, step) =>
+ recur ((t, b), fn ((t, b), loop) =>
+ upon (read t,
+ fn None => done b
+ | Some (a, t) => step (a, b, fn b => loop (t, b))))
+
+ fun fold (t, b, f) = let
+ val i = Ivar.new ()
+ val () = recur (t, b, fn b => Ivar.fill (i, b),
+ fn (a, b, k) => k (f (a, b)))
+ in
+ Ivar.read i
+ end
+
+ fun for (t, f) = fold (t, (), f o #1)
+
+ fun fill (T i, v) = Ivar.fill (i, v)
+
+ fun close t = fill (t, None)
+
+ fun extend (t, v) = let
+ val t' = new ()
+ val () = fill (t, Some (v, t'))
+ in
+ t'
+ end
+
+ fun map (t, f) = let
+ val t' = new ()
+ val () = recur (t, t', fn t' => fill (t', None),
+ fn (a, t', k) => k (extend (t', f a)))
+ in
+ t'
+ end
+ end
+
+ structure Tail = struct
+ datatype 'a t = T of 'a Stream.t Ref.t
+
+ fun new () = T (ref (Stream.new ()))
+
+ fun extend (t as T r, v) = r := Stream.extend (!r, v)
+
+ fun close (T r) = Stream.close (!r)
+ end
+
+ structure Multicast = struct
+ open Tail
+
+ fun reader (T r) = !r
+
+ val send = extend
+ end
+end
Added: mltonlib/trunk/com/sweeks/async/unstable/lib.mlb
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/lib.mlb 2007-03-18 14:40:52 UTC (rev 5443)
+++ mltonlib/trunk/com/sweeks/async/unstable/lib.mlb 2007-03-18 14:55:33 UTC (rev 5444)
@@ -0,0 +1,3 @@
+../../basic/unstable/lib.mlb
+async.sig
+async.sml
More information about the MLton-commit
mailing list