[MLton-commit] r5528

Stephen Weeks sweeks at mlton.org
Sun Apr 15 20:26:33 PDT 2007


Made Deferred and Event into monads.

----------------------------------------------------------------------

U   mltonlib/trunk/com/sweeks/async/unstable/async.sig
U   mltonlib/trunk/com/sweeks/async/unstable/async.sml

----------------------------------------------------------------------

Modified: mltonlib/trunk/com/sweeks/async/unstable/async.sig
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sig	2007-04-16 03:25:52 UTC (rev 5527)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sig	2007-04-16 03:26:32 UTC (rev 5528)
@@ -5,38 +5,17 @@
    structure Deferred: sig
       type 'a t
 
+      val >>= : 'a t * ('a -> 'b t) -> 'b t
+      val return: 'a -> 'a t
       val upon: 'a t * ('a -> Unit.t) -> Unit.t
    end
-   structure Event: sig
-      type 'a t
-
-      val always: 'a -> 'a t
-      val any: 'a t List.t -> 'a t
-      val commit: 'a t -> 'a Deferred.t
-      val never: Unit.t -> 'a t
-   end
-   structure Channel: sig
-      type 'a t
-
-      val give: 'a t * 'a -> Unit.t Event.t
-      val new: Unit.t -> 'a t
-      val take: 'a t -> 'a Event.t
-   end
    structure Ivar: sig
       type 'a t
 
-      val fill: 'a t * 'a -> Unit.t
-      (* may raise Full *)
+      val fill: 'a t * 'a -> Unit.t (* may raise Full *)
       val new: Unit.t -> 'a t
       val read: 'a t -> 'a Deferred.t
    end
-   structure Mailbox: sig
-      type 'a t
-
-      val new: Unit.t -> 'a t
-      val send: 'a t * 'a -> Unit.t
-      val take: 'a t -> 'a Event.t
-   end
    structure Stream: sig
       type 'a t
 
@@ -51,11 +30,33 @@
    structure Multicast: sig
       type 'a t
 
-      val close: 'a t -> Unit.t
-      (* may raise Closed *)
+      val close: 'a t -> Unit.t (* may raise Closed *)
+      val listen: 'a t -> 'a Stream.t
       val new: Unit.t -> 'a t
-      val reader: 'a t -> 'a Stream.t
+      val send: 'a t * 'a -> Unit.t (* may raise Closed *)
+   end
+   structure Event: sig
+      type 'a t
+
+      val >>= : 'a t * ('a -> 'b t) -> 'b t
+      val always: 'a -> 'a t
+      val any: 'a t List.t -> 'a t
+      val commit: 'a t -> 'a Deferred.t
+      val never: Unit.t -> 'a t
+      val return: 'a -> 'a t
+   end
+   structure Channel: sig
+      type 'a t
+
+      val give: 'a t * 'a -> Unit.t Event.t
+      val new: Unit.t -> 'a t
+      val take: 'a t -> 'a Event.t
+   end
+   structure Mailbox: sig
+      type 'a t
+
+      val new: Unit.t -> 'a t
       val send: 'a t * 'a -> Unit.t
-      (* may raise Closed *)
+      val take: 'a t -> 'a Event.t
    end
 end

Modified: mltonlib/trunk/com/sweeks/async/unstable/async.sml
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sml	2007-04-16 03:25:52 UTC (rev 5527)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sml	2007-04-16 03:26:32 UTC (rev 5528)
@@ -55,25 +55,40 @@
       datatype 'a v = Filled of 'a | Unfilled of ('a -> Unit.t) List.t
       datatype 'a t = T of 'a v Ref.t
 
+      fun empty () = T (ref (Unfilled []))
+         
+      fun full x = T (ref (Filled x))
+
+      val return = full
+
+      fun fill (T r, v) =
+         case !r of
+            Filled _ => raise Full
+          | Unfilled fs => (r := Filled v; List.for (fs, pass v))
+
       fun upon (T r, f) =
          case !r of
             Filled v => schedule (f, v)
           | Unfilled fs => r := Unfilled (f :: fs)
+
+      fun >>= (d, f) = let
+         val d' = empty ()
+         val () = upon (d, fn a => upon (f a, fn b => fill (d', b)))
+      in
+         d'
+      end
    end
 
    val upon = Deferred.upon
 
    structure Ivar = struct
-      open Deferred
+      datatype 'a t = T of 'a Deferred.t
 
-      fun new () = T (ref (Unfilled []))
+      fun new () = T (Deferred.empty ())
                
-      fun fill (T r, v) =
-         case !r of
-            Filled _ => raise Full
-          | Unfilled fs => (r := Filled v; List.for (fs, pass v))
+      fun fill (T d, v) = Deferred.fill (d, v)
 
-      val read = id
+      fun read (T d) = d
    end
       
    structure Stream = struct
@@ -119,38 +134,19 @@
       end
    end
 
-   structure Tail = struct
+   structure Multicast = struct
       datatype 'a t = T of 'a Stream.t Ref.t
 
-      fun toStream (T r) = !r
-
       fun new () = T (ref (Stream.new ()))
 
-      fun extend (t as T r, v) = r := Stream.extend (!r, v)
+      fun listen (T r) = !r
 
+      fun send (t as T r, v) = r := Stream.extend (!r, v)
+
       fun close (T r) = Stream.close (!r)
    end
 
-   structure Multicast = struct
-      open Tail
-
-      fun reader (T r) = !r
-
-      val send = extend
-   end
-
-   structure Handler: sig
-      type 'a t
-
-      val ignore: Unit.t -> 'a t
-      val isScheduled: 'a t -> Bool.t
-      val new: ('a -> Unit.t) -> 'a t
-      val maybeSchedule: 'a t * 'a -> Unit.t
-      val precompose: 'a t * ('b -> 'a) -> 'b t
-      (* It is an error to call Handler.schedule h if Handler.isScheduled h.
-       *)
-      val schedule: 'a t * 'a -> Unit.t
-   end = struct
+   structure Handler = struct
       datatype 'a t = T of {handler: 'a -> Unit.t,
                             isScheduled: Bool.t Ref.t}
 
@@ -188,6 +184,13 @@
          Ivar.read i
       end
 
+      fun >>= (t, f) =
+         T (fn Handler.T {handler, isScheduled} =>
+            send (t,
+                  Handler.T
+                  {handler = fn a => upon (commit (f a), handler),
+                   isScheduled = isScheduled}))
+   
       fun any ts =
          T (fn h =>
             List.recur (ts, (), ignore, fn (t, (), k) =>
@@ -197,19 +200,21 @@
                            (send (t, h); k ())))
 
       fun always a = T (fn h => Handler.maybeSchedule (h, a))
+
+      val return = always
          
-      fun never () = T ignore
+      fun never () = T ignore        
 
       fun map (t, f) = T (fn h => send (t, Handler.precompose (h, f)))
    end
 
    structure Channel = struct
-      datatype 'a t = T of {givers: ('a * Unit.t Handler.t) Tail.t,
-                            takers: 'a Handler.t Tail.t}
+      datatype 'a t = T of {givers: ('a * Unit.t Handler.t) Multicast.t,
+                            takers: 'a Handler.t Multicast.t}
 
       fun 'a new () = let
-         val givers: ('a * Unit.t Handler.t) Tail.t = Tail.new ()
-         val takers = Tail.new ()
+         val givers: ('a * Unit.t Handler.t) Multicast.t = Multicast.new ()
+         val takers = Multicast.new ()
          fun loop (gs, ts) =
             upon (Stream.read gs, fn opt =>
                   Option.for (opt, fn (g, gs) => loopG (g, gs, ts)))
@@ -228,23 +233,23 @@
              | (false, true) => loopG (g, gs, ts)
              | (true, false) => loopT (gs, t, ts)
              | (true, true) => loop (gs, ts)
-         val () = loop (Tail.toStream givers, Tail.toStream takers)
+         val () = loop (Multicast.listen givers, Multicast.listen takers)
       in
          T {givers = givers,
             takers = takers}
       end
 
       fun give (T {givers, ...}, a) =
-         Event.T (fn h => Tail.extend (givers, (a, h)))
+         Event.T (fn h => Multicast.send (givers, (a, h)))
 
       fun take (T {takers, ...}) =
-         Event.T (fn h => Tail.extend (takers, h))
+         Event.T (fn h => Multicast.send (takers, h))
    end
 
    structure Mailbox = struct
       open Channel
 
       fun send (T {givers, ...}, a) =
-         Tail.extend (givers, (a, Handler.ignore ()))
+         Multicast.send (givers, (a, Handler.ignore ()))
    end
 end




More information about the MLton-commit mailing list