[MLton-commit] r4982
Wesley Terpstra
wesley at mlton.org
Mon Dec 18 18:52:57 PST 2006
my collection of SML libs, only half finished mostly
----------------------------------------------------------------------
A mltonlib/trunk/ca/terpstra/st/
A mltonlib/trunk/ca/terpstra/st/Makefile
A mltonlib/trunk/ca/terpstra/st/README
A mltonlib/trunk/ca/terpstra/st/data.sig
A mltonlib/trunk/ca/terpstra/st/data.sml
A mltonlib/trunk/ca/terpstra/st/edge.fun
A mltonlib/trunk/ca/terpstra/st/epoll.h
A mltonlib/trunk/ca/terpstra/st/epoll.sig
A mltonlib/trunk/ca/terpstra/st/epoll.sml
A mltonlib/trunk/ca/terpstra/st/ioevent.sig
A mltonlib/trunk/ca/terpstra/st/ioevent.sml
A mltonlib/trunk/ca/terpstra/st/kevent.h
A mltonlib/trunk/ca/terpstra/st/kqueue.sml
A mltonlib/trunk/ca/terpstra/st/level.fun
A mltonlib/trunk/ca/terpstra/st/lpoll.sig
A mltonlib/trunk/ca/terpstra/st/open.sml
A mltonlib/trunk/ca/terpstra/st/scheduler.sig
A mltonlib/trunk/ca/terpstra/st/socket.sml
A mltonlib/trunk/ca/terpstra/st/st.mlb
A mltonlib/trunk/ca/terpstra/st/state.sig
A mltonlib/trunk/ca/terpstra/st/state.sml
A mltonlib/trunk/ca/terpstra/st/test.mlb
A mltonlib/trunk/ca/terpstra/st/test.sml
A mltonlib/trunk/ca/terpstra/st/thread.sig
A mltonlib/trunk/ca/terpstra/st/thread.sml
A mltonlib/trunk/ca/terpstra/st/timeout.sig
A mltonlib/trunk/ca/terpstra/st/timeout.sml
----------------------------------------------------------------------
Added: mltonlib/trunk/ca/terpstra/st/Makefile
===================================================================
--- mltonlib/trunk/ca/terpstra/st/Makefile 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/Makefile 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,9 @@
+all: st
+
+epoll/epoll.mlb: epoll.h /usr/include/x86_64-linux/i386-linux/sys/epoll.h
+kevent/kevent.mlb: kevent.h /usr/include/sys/event.h
+
+%.mlb:
+ mlnlffigen -allSU true -linkage static -dir $(@D) -mlbfile $(@F) $^
+
+-include $(patsubst %.mlb,%.dep,$(wildcard *.mlb))
Added: mltonlib/trunk/ca/terpstra/st/README
===================================================================
--- mltonlib/trunk/ca/terpstra/st/README 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/README 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,16 @@
+This is a simple work-alike of state-threads.sf.net for Standard ML.
+It helps in building event driven state machines with non-concurrent threads.
+
+For an example, see test.sml
+
+To use on osx:
+ make kevent/kevent.mlb
+ mlton test.mlb
+
+To use on linux:
+ edit st.mlb to use epoll.mlb instead of kevent.mlb
+ make epoll/epoll.mlb
+ mlton test.mlb
+
+The test program downloads two webpages from google concurrently, while
+answering TCP connections on port 12467 and printing a heart beat.
Added: mltonlib/trunk/ca/terpstra/st/data.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/data.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/data.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,46 @@
+signature SPARSE_ARRAY =
+ sig
+ type 'a sparse_array
+
+ val new: unit -> 'a sparse_array
+
+ val sub: 'a sparse_array * int -> 'a option
+ val update: 'a sparse_array * int * 'a -> unit
+ val erase: 'a sparse_array * int -> unit
+ end
+
+signature DYNAMIC_ARRAY =
+ sig
+ type 'a dynamic_array
+
+ val new: unit -> 'a dynamic_array
+ val size: 'a dynamic_array -> int
+
+ val sub: 'a dynamic_array * int -> 'a
+ val update: 'a dynamic_array * int * 'a -> unit
+ val swap: 'a dynamic_array * int * int -> unit
+
+ val push: 'a dynamic_array * 'a -> unit
+ val pop: 'a dynamic_array -> unit
+ end
+
+signature HEAP =
+ sig
+ type 'a heap
+ val new: ('a * 'a -> bool) -> 'a heap
+ val push: 'a heap * 'a -> unit
+ val pop: 'a heap -> unit
+ val peek: 'a heap -> 'a option
+ end
+
+signature QUEUE =
+ sig
+ type 'a queue
+ val new: unit -> 'a queue
+
+ val empty: 'a queue -> bool
+ val enque: 'a queue * 'a -> unit
+ val deque: 'a queue -> 'a option
+
+(* val enqueList: 'a queue * 'a list -> unit *)
+ end
Added: mltonlib/trunk/ca/terpstra/st/data.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/data.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/data.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,136 @@
+structure SparseArray :> SPARSE_ARRAY =
+ struct
+ type 'a sparse_array = 'a option array ref
+
+ fun new () = ref (Array.array (8, NONE))
+
+ fun sub (ref array, i) =
+ if i >= (Array.length array) then NONE else
+ Array.sub (array, i)
+
+ fun update (array, i, x) = (
+ if i < Array.length (!array) then () else
+ let val a = Array.array (i*2 + 1, NONE)
+ in
+ Array.copy { src = !array, dst = a, di = 0 };
+ array := a
+ end;
+ Array.update (!array, i, SOME x))
+
+ fun erase (ref array, i) =
+ if i >= (Array.length array) then () else
+ Array.update (array, i, NONE)
+ end
+
+structure DynamicArray :> DYNAMIC_ARRAY =
+ struct
+ type 'a dynamic_array = 'a option array ref * int ref
+
+ fun new () = (ref (Array.array (8, NONE)), ref 0)
+ fun size (_, ref length) = length
+
+ fun sub ((ref array, _), i) = valOf (Array.sub (array, i))
+ fun update ((ref array, _), i, x) = Array.update (array, i, SOME x)
+
+ fun swap ((ref array, _), i, j) =
+ let
+ val iv = Array.sub (array, i)
+ val jv = Array.sub (array, j)
+ in
+ Array.update (array, i, jv);
+ Array.update (array, j, iv)
+ end
+
+ fun push ((array, length), x) = (
+ if Array.length (!array) > !length then () else
+ let val a = Array.array (!length * 2, NONE)
+ in
+ Array.copy { src = !array, dst = a, di = 0 };
+ array := a
+ end;
+ update ((array, length), !length, x);
+ length := !length + 1)
+
+ fun pop (ref array, length) = (
+ length := !length - 1;
+ Array.update (array, !length, NONE))
+ end
+
+structure Heap :> HEAP =
+ struct
+ open DynamicArray
+ type 'a heap = 'a dynamic_array * ('a * 'a -> bool)
+
+ fun left i = 2*i + 1
+ fun right i = 2*i + 2
+ fun parent i = (i - 1) div 2
+
+ fun new cmp = (DynamicArray.new (), cmp)
+
+ fun push ((a, cmp), x) =
+ let
+ fun fixtail 0 = () | fixtail i =
+ let
+ val parent = parent i
+ in
+ if cmp (sub (a, parent), sub (a, i)) then () else
+ (swap (a, parent, i); fixtail parent)
+ end
+ in
+ DynamicArray.push (a, x);
+ fixtail (size a - 1)
+ end
+
+ fun pop (a, cmp) =
+ let
+ val newsize = size a - 1
+
+ fun fixhead i =
+ let
+ val left = left i
+ val right = right i
+ in
+ if left >= newsize then () else
+ if right >= newsize then
+ if cmp (sub (a, i), sub (a, left)) then () else
+ swap (a, i, left)
+ else
+ if cmp (sub (a, left), sub (a, right)) then
+ if cmp (sub (a, i), sub (a, left)) then () else
+ (swap (a, i, left); fixhead left)
+ else
+ if cmp (sub (a, i), sub (a, right)) then () else
+ (swap (a, i, right); fixhead right)
+ end
+ in
+ update (a, 0, sub (a, newsize));
+ DynamicArray.pop a;
+ fixhead 0
+ end
+
+ fun peek (a, cmp) =
+ if size a = 0 then NONE else SOME (sub (a, 0))
+ end
+
+structure Queue :> QUEUE =
+ struct
+ datatype 'a queue = T of {front: 'a list ref, back: 'a list ref}
+
+ fun new() = T{front = ref [], back = ref []}
+
+ fun empty (T {front=ref [], back=ref []}) = true
+ | empty _ = false
+
+ fun enque(T{back, ...}, x) = back := x :: !back
+
+ fun deque(T{front, back}) =
+ case !front of
+ [] => (case !back of
+ [] => NONE
+ | l => let val l = rev l
+ in case l of
+ [] => raise Fail "deque"
+ | x :: l => (back := []; front := l; SOME x)
+ end)
+ | x :: l => (front := l; SOME x)
+ end
Added: mltonlib/trunk/ca/terpstra/st/edge.fun
===================================================================
--- mltonlib/trunk/ca/terpstra/st/edge.fun 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/edge.fun 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,58 @@
+functor Edge(Poll : EPOLL) :> SCHEDULER_EXTRA =
+ struct
+ open State
+ open Thread_Extra
+ open Timeout_Extra
+ open Poll
+
+ val poll = create 1000 (* ready for 1000 file descriptors *)
+
+ structure IoEvent : IOEVENT =
+ struct
+ open IoEvent
+ fun monitor fd status = (
+ add (poll, fd);
+ IoEvent.monitor fd status)
+ fun unmonitor fd = (
+ remove (poll, fd);
+ IoEvent.unmonitor fd)
+ end
+ open IoEvent
+
+ fun sigPulse thread = thread before stop ()
+
+ fun loop block =
+ let
+ fun relativeTime time =
+ let
+ val delta = Time.- (time, Time.now ())
+ in
+ if Time.< (delta, Time.zeroTime)
+ then Time.zeroTime
+ else delta
+ end
+
+ val delay =
+ case block of
+ PENDING => SOME Time.zeroTime
+ | COMPLETE => Option.map relativeTime (getNext ())
+ in
+ wait (poll, delay);
+ trigger (Time.now ());
+ loop (run ())
+ end
+
+ fun main () =
+ let
+ open MLton
+ open Signal
+ val real = Itimer.signal Itimer.Real
+ val freq = Time.fromMilliseconds 50
+ in
+ (* prevent high throughput connections from causing starvation *)
+ Mask.unblock (Mask.some [real]);
+ setHandler (real, Handler.handler sigPulse);
+ (* Itimer.set (Itimer.Real, { interval = freq, value = freq }); *)
+ loop (run ())
+ end
+ end
Added: mltonlib/trunk/ca/terpstra/st/epoll.h
===================================================================
--- mltonlib/trunk/ca/terpstra/st/epoll.h 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/epoll.h 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,9 @@
+#include <sys/epoll.h>
+
+enum EPOLL_CTL {
+ CTL_ADD = EPOLL_CTL_ADD,
+ CTL_DEL = EPOLL_CTL_DEL,
+ CTL_MOD = EPOLL_CTL_MOD
+};
+
+int close(int);
Added: mltonlib/trunk/ca/terpstra/st/epoll.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/epoll.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/epoll.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,15 @@
+signature EPOLL =
+ sig
+ type poll
+ type ioh = IoEvent.ioh
+
+ val create: int -> poll
+ val close: poll -> unit
+
+ (* Track changes to state of the io handle *)
+ val add: poll * ioh -> unit
+ val remove: poll * ioh -> unit
+
+ (* will automatically change IoEvent's status *)
+ val wait: poll * Time.time option -> unit
+ end
Added: mltonlib/trunk/ca/terpstra/st/epoll.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/epoll.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/epoll.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,71 @@
+(* Edge-triggered *)
+structure EPoll :> EPOLL =
+ struct
+ type poll = MLRep.Int.Signed.int
+ type ioh = IoEvent.ioh
+
+ fun create events = F_epoll_create.f (MLRep.Int.Signed.fromInt events)
+ fun close epoll = ignore (F_close.f epoll)
+
+ fun ctl cmd (epoll, fd) =
+ let
+ open E_EPOLL_EVENTS
+ val makeUnsigned = MLRep.Int.Unsigned.fromInt o MLRep.Int.Signed.toInt
+ val flags = makeUnsigned (e_EPOLLIN + e_EPOLLOUT + e_EPOLLERR +
+ e_EPOLLHUP + e_EPOLLET)
+ val epoll_event = C.new S_epoll_event.typ
+ in
+ C.Set.uint (S_epoll_event.f_events epoll_event, flags);
+ C.Set.sint (U_epoll_data.f_fd (S_epoll_event.f_data epoll_event),
+ MLRep.Int.Signed.fromInt fd);
+ F_epoll_ctl.f (epoll, cmd, MLRep.Int.Signed.fromInt fd,
+ C.Ptr.|&| epoll_event);
+ C.discard epoll_event
+ end
+
+ val add = ctl E_EPOLL_CTL.e_CTL_ADD
+ val remove = ctl E_EPOLL_CTL.e_CTL_DEL
+
+ val nevents = 500
+ val events = C.alloc S_epoll_event.typ (Word.fromInt nevents)
+
+ fun wait (epoll, time) =
+ let
+ val roundup = Time.fromMicroseconds 999
+ val delay = case time of
+ NONE => ~1
+ | SOME x => LargeInt.toInt (Time.toMilliseconds (Time.+ (x, roundup)))
+
+ val nevents = F_epoll_wait.f (epoll, events, nevents, delay)
+
+ fun event ees =
+ let
+ open E_EPOLL_EVENTS
+ val makeUnsigned = MLRep.Int.Unsigned.fromInt o MLRep.Int.Signed.toInt
+ val EPOLLIN = makeUnsigned e_EPOLLIN
+ val EPOLLOUT = makeUnsigned e_EPOLLOUT
+ val EPOLLERR = makeUnsigned e_EPOLLERR
+ val EPOLLHUP = makeUnsigned e_EPOLLHUP
+
+ val fdf = U_epoll_data.f_fd (S_epoll_event.f_data ees)
+ val fd = MLRep.Int.Signed.toInt (C.Get.sint fdf)
+ val flags = C.Get.uint (S_epoll_event.f_events ees)
+
+ fun value bit = MLRep.Int.Unsigned.andb (flags, bit) = bit
+ val broken = value EPOLLERR orelse value EPOLLHUP
+ in
+ IoEvent.notifyHASINPUT fd (value EPOLLIN orelse broken);
+ IoEvent.notifyCANOUTPUT fd (value EPOLLOUT orelse broken)
+ end
+
+ fun process i =
+ if i = nevents then () else
+ (event (C.Ptr.sub (events, i)); process (i + 1))
+ in
+ process 0
+ end
+ end
+
+structure Scheduler = Edge(EPoll)
+structure IoEvent = Scheduler.IoEvent
+structure Scheduler :> SCHEDULER = Scheduler
Added: mltonlib/trunk/ca/terpstra/st/ioevent.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/ioevent.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/ioevent.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,20 @@
+signature IOEVENT =
+ sig
+ exception Unmonitored
+
+ type status = { hasinput: bool, canoutput: bool}
+ type ioh
+
+ val socket: ('af, 'sock_type) Socket.sock -> (ioh -> 'a) -> 'a
+ val sockdes: Socket.sock_desc -> (ioh -> 'a) -> 'a
+ val file: Posix.IO.file_desc -> (ioh -> 'a) -> 'a
+
+ val HASINPUT: ioh -> (bool, bool) State.state
+ val CANOUTPUT: ioh -> (bool, bool) State.state
+
+ val notifyHASINPUT: ioh -> bool State.signal
+ val notifyCANOUTPUT: ioh -> bool State.signal
+
+ val monitor: ioh -> status -> unit
+ val unmonitor: ioh -> unit
+ end
Added: mltonlib/trunk/ca/terpstra/st/ioevent.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/ioevent.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/ioevent.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,53 @@
+structure IoEvent : IOEVENT =
+ struct
+ open State
+ open SparseArray
+
+ type ioh = int
+ exception Unmonitored
+
+ type status = {
+ hasinput: bool,
+ canoutput: bool }
+ type filedes = {
+ fhasinput: (bool, bool) state * bool signal,
+ fcanoutput: (bool, bool) state * bool signal }
+ val filedes : filedes sparse_array = new ()
+
+ type 'a t = (unit -> 'a) * ('a -> unit)
+ val (geti, _) = _symbol "side_channel_hack" alloc: int t;
+ val (_, sets) = _symbol "side_channel_hack": ('a, 'b) Socket.sock t;
+ val (_, setd) = _symbol "side_channel_hack": Socket.sock_desc t;
+ val (_, setf) = _symbol "side_channel_hack": Posix.IO.file_desc t;
+
+ fun socket sock f = f (sets sock; geti ())
+ fun sockdes des f = f (setd des; geti ())
+ fun file file f = f (setf file; geti ())
+
+ fun test select fd = case sub (filedes, fd) of
+ NONE => raise Unmonitored
+ | SOME x => case select x of (state, _) => state
+
+ val HASINPUT = test #fhasinput
+ val CANOUTPUT = test #fcanoutput
+
+ fun notify select fd = case sub (filedes, fd) of
+ NONE => raise Unmonitored
+ | SOME x => case select x of (_, signal) => signal
+
+ val notifyHASINPUT = notify #fhasinput
+ val notifyCANOUTPUT = notify #fcanoutput
+
+ fun monitor fd (status:status) =
+ let
+ val entry = {
+ fhasinput = state (#hasinput status),
+ fcanoutput = state (#canoutput status) }
+ in
+ update (filedes, fd, entry)
+ end
+
+ fun unmonitor fd = case sub (filedes, fd) of
+ NONE => raise Unmonitored
+ | SOME _ => erase (filedes, fd)
+ end
Added: mltonlib/trunk/ca/terpstra/st/kevent.h
===================================================================
--- mltonlib/trunk/ca/terpstra/st/kevent.h 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/kevent.h 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,57 @@
+#include <sys/event.h>
+#include <sys/time.h>
+
+enum filter {
+ read = EVFILT_READ,
+ write = EVFILT_WRITE,
+ aio = EVFILT_AIO,
+ vnode = EVFILT_VNODE,
+ proc = EVFILT_PROC,
+ signal = EVFILT_SIGNAL,
+ timer = EVFILT_TIMER,
+ machport = EVFILT_MACHPORT,
+ fs = EVFILT_FS
+};
+
+enum action {
+ add = EV_ADD,
+ delete = EV_DELETE,
+ enable = EV_ENABLE,
+ disable = EV_DISABLE,
+ oneshot = EV_ONESHOT,
+ clear = EV_CLEAR,
+ sysflags = EV_SYSFLAGS,
+ flag0 = EV_FLAG0,
+ flag1 = EV_FLAG1,
+ eof = EV_EOF,
+ error = EV_ERROR,
+ poll = EV_POLL,
+ ooband = EV_OOBAND
+};
+
+/*
+enum note {
+ lowat = NOTE_LOWAT,
+ delete = NOTE_DELETE,
+ write = NOTE_WRITE,
+ extend = NOTE_EXTEND,
+ attrib = NOTE_ATTRIB,
+ link = NOTE_LINK,
+ rename = NOTE_RENAME,
+ revoke = NOTE_REVOKE,
+ exit = NOTE_EXIT,
+ fork = NOTE_FORK,
+ exec = NOTE_EXEC,
+ pctrlmask = NOTE_PCTRLMASK,
+ pdatamask = NOTE_PDATAMASK,
+ seconds = NOTE_SECONDS,
+ useconds = NOTE_USECONDS,
+ nseconds = NOTE_NSECONDS,
+ absolute = NOTE_ABSOLUTE,
+ track = NOTE_TRACK,
+ trackerr = NOTE_TRACKERR,
+ child = NOTE_CHILD
+};
+*/
+
+int close(int fd);
Added: mltonlib/trunk/ca/terpstra/st/kqueue.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/kqueue.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/kqueue.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,102 @@
+structure KQueue :> EPOLL =
+ struct
+ type poll = MLRep.Int.Signed.int
+ type ioh = IoEvent.ioh
+
+ fun create _ = F_kqueue.f ()
+ fun close epoll = ignore (F_close.f epoll)
+
+(*
+ val () = print ("change: " ^ Int.toString fd ^ ": ")
+ val () = print (Int.toString filter ^ " ")
+ val () = print (Int.toString flags)
+ val () = print "\n"
+*)
+ fun kevent (ke, fd, filter, flags) =
+ (C.Set.ulong (S_kevent.f_ident ke,
+ MLRep.Long.Unsigned.fromInt fd);
+ C.Set.sshort (S_kevent.f_filter ke,
+ MLRep.Short.Signed.fromInt
+ (MLRep.Int.Signed.toInt filter));
+ C.Set.ushort (S_kevent.f_flags ke,
+ MLRep.Short.Unsigned.fromInt
+ (MLRep.Int.Signed.toInt flags)))
+
+ fun control flags (epoll, fd) =
+ let
+ val changes = C.alloc S_kevent.typ (Word.fromInt 2)
+ val zero = C.new S_timespec.typ
+ in
+ kevent (C.Ptr.sub (changes, 0), fd, E_filter.e_read, flags);
+ kevent (C.Ptr.sub (changes, 1), fd, E_filter.e_write, flags);
+ C.Set.slong (S_timespec.f_tv_sec zero, 0);
+ C.Set.slong (S_timespec.f_tv_nsec zero, 0);
+ F_kevent.f (epoll,
+ C.Ptr.ro changes, 2,
+ C.Ptr.null (C.T.pointer S_kevent.typ), 0,
+ C.Ptr.ro (C.Ptr.|&| zero));
+ C.discard zero;
+ C.free changes
+ end
+
+ val add = control (E_action.e_add + E_action.e_clear)
+ val remove = control E_action.e_delete
+
+ val nevents = 500
+ val events = C.alloc S_kevent.typ (Word.fromInt nevents)
+
+ fun event ke =
+ let
+ val fd = C.Get.ulong (S_kevent.f_ident ke)
+ val io = C.Get.sshort (S_kevent.f_filter ke)
+
+ val fd = MLRep.Long.Unsigned.toInt fd
+
+ val cvt = MLRep.Short.Signed.fromInt o MLRep.Int.Signed.toInt
+ val read = cvt E_filter.e_read
+ val write = cvt E_filter.e_write
+(*
+ val () = print ("event: " ^ Int.toString fd ^ ":")
+ val () = if io = read then print " read" else ()
+ val () = if io = write then print " write" else ()
+ val () = print "\n"
+*)
+ in
+ if io = read then IoEvent.notifyHASINPUT fd true else ();
+ if io = write then IoEvent.notifyCANOUTPUT fd true else ()
+ end
+
+ fun wait (epoll, time) =
+ let
+ fun timespec NONE = C.Ptr.null (C.T.pointer S_timespec.typ)
+ | timespec (SOME t) =
+ let
+ val ts = C.alloc S_timespec.typ (Word.fromInt 1)
+ val (seconds, nano) =
+ IntInf.quotRem (Time.toNanoseconds t, 1000000000)
+ in
+ C.Set.slong (S_timespec.f_tv_sec (C.Ptr.|*| ts),
+ MLRep.Long.Signed.fromLarge seconds);
+ C.Set.slong (S_timespec.f_tv_nsec (C.Ptr.|*| ts),
+ MLRep.Long.Signed.fromLarge nano);
+ ts
+ end
+ val ts = timespec time
+
+ val changes = C.Ptr.ro (C.Ptr.null (C.T.pointer S_kevent.typ))
+ val nevents = F_kevent.f (MLRep.Int.Signed.fromInt epoll,
+ changes, 0,
+ events, nevents,
+ C.Ptr.ro ts)
+ fun process i =
+ if i = nevents then () else
+ (event (C.Ptr.sub (events, i)); process (i + 1))
+ in
+ process 0;
+ if C.Ptr.isNull ts then () else C.free ts
+ end
+ end
+
+structure Scheduler = Edge(KQueue)
+structure IoEvent :> IOEVENT = Scheduler.IoEvent
+structure Scheduler :> SCHEDULER = Scheduler
Added: mltonlib/trunk/ca/terpstra/st/level.fun
===================================================================
--- mltonlib/trunk/ca/terpstra/st/level.fun 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/level.fun 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,73 @@
+functor Level(Poll : LPOLL) :> SCHEDULER_EXTRA =
+ struct
+ open State
+ open Thread_Extra
+ open Timeout_Extra
+ open Poll
+
+ val poll = create 1000 (* ready for 1000 file descriptors *)
+
+ structure IoEvent : IOEVENT =
+ struct
+ open IoEvent
+
+ fun monitor fd { hasinput, canoutput } = (
+ if hasinput then () else watch (poll, fd, Poll.HASINPUT);
+ if canoutput then () else watch (poll, fd, Poll.CANOUTPUT);
+ IoEvent.monitor fd {hasinput = hasinput, canoutput = canoutput})
+
+ fun unmonitor fd = (
+ unwatchall (poll, fd);
+ IoEvent.unmonitor fd)
+
+ fun notifyHASINPUT fd true = (
+ IoEvent.notifyHASINPUT fd true)
+ | notifyHASINPUT fd false = (
+ Poll.watch (poll, fd, Poll.HASINPUT);
+ IoEvent.notifyHASINPUT fd false)
+
+ fun notifyCANOUTPUT fd true = (
+ IoEvent.notifyCANOUTPUT fd true)
+ | notifyCANOUTPUT fd false = (
+ Poll.watch (poll, fd, Poll.CANOUTPUT);
+ IoEvent.notifyCANOUTPUT fd false)
+ end
+ open IoEvent
+
+ fun sigPulse thread = thread before stop ()
+
+ fun loop block =
+ let
+ fun relativeTime time =
+ let
+ val delta = Time.- (time, Time.now ())
+ in
+ if Time.< (delta, Time.zeroTime)
+ then Time.zeroTime
+ else delta
+ end
+
+ val delay =
+ case block of
+ PENDING => SOME Time.zeroTime
+ | COMPLETE => Option.map relativeTime (getNext ())
+ in
+ wait (poll, delay);
+ trigger (Time.now ());
+ loop (run ())
+ end
+
+ fun main () =
+ let
+ open MLton
+ open Signal
+ val real = Itimer.signal Itimer.Real
+ val freq = Time.fromMilliseconds 50
+ in
+ (* prevent high throughput connections from causing starvation *)
+ Mask.unblock (Mask.some [real]);
+ setHandler (real, Handler.handler sigPulse);
+ (* Itimer.set (Itimer.Real, { interval = freq, value = freq }); *)
+ loop (run ())
+ end
+ end
Added: mltonlib/trunk/ca/terpstra/st/lpoll.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/lpoll.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/lpoll.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,21 @@
+(* Signature for level-triggered poll *)
+signature LPOLL =
+ sig
+ type poll
+ type ioh = IoEvent.ioh
+ datatype level = HASINPUT | CANOUTPUT
+
+ val create: int -> poll
+ val close: poll -> unit
+
+ (* add a watch to the list *)
+ val watch: poll * ioh * level -> unit
+
+ (* called prior to closing the io handle *)
+ val unwatchall: poll * ioh -> unit
+
+ (* automatically change IoEvent's status
+ * triggered watches are automatically removed from the poll (ie: oneshot)
+ *)
+ val wait: poll * Time.time option -> unit
+ end
Added: mltonlib/trunk/ca/terpstra/st/open.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/open.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/open.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,5 @@
+open State
+open Thread
+open Timeout
+open IoEvent
+open Scheduler
Added: mltonlib/trunk/ca/terpstra/st/scheduler.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/scheduler.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/scheduler.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,10 @@
+signature SCHEDULER =
+ sig
+ val main: unit -> unit
+ end
+
+signature SCHEDULER_EXTRA =
+ sig
+ include SCHEDULER
+ structure IoEvent: IOEVENT
+ end
Added: mltonlib/trunk/ca/terpstra/st/socket.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/socket.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/socket.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,202 @@
+structure Socket : SOCKET =
+ struct
+ open Socket
+ open State
+ open IoEvent
+ open Timeout
+ open Thread
+
+ fun wrapInNB f s x =
+ case f x of
+ NONE => NONE before socket (s x) notifyHASINPUT false
+ | SOME v => SOME v
+
+ val recvVecNB = fn x => wrapInNB recvVecNB #1 x
+ val recvVecNB' = fn x => wrapInNB recvVecNB' #1 x
+ val recvArrNB = fn x => wrapInNB recvArrNB #1 x
+ val recvArrNB' = fn x => wrapInNB recvArrNB' #1 x
+
+ val recvVecFromNB = fn x => wrapInNB recvVecFromNB #1 x
+ val recvVecFromNB' = fn x => wrapInNB recvVecFromNB' #1 x
+ val recvArrFromNB = fn x => wrapInNB recvArrFromNB #1 x
+ val recvArrFromNB' = fn x => wrapInNB recvArrFromNB' #1 x
+
+ fun wrapIn f s x = (
+ stopTill (socket (s x) HASINPUT);
+ case f x of
+ NONE => wrapIn f s x
+ | SOME x => x)
+
+ fun recvVec x = wrapIn recvVecNB #1 x
+ fun recvVec' x = wrapIn recvVecNB' #1 x
+ fun recvArr x = wrapIn recvArrNB #1 x
+ fun recvArr' x = wrapIn recvArrNB' #1 x
+
+ fun recvVecFrom x = wrapIn recvVecFromNB #1 x
+ fun recvVecFrom' x = wrapIn recvVecFromNB' #1 x
+ fun recvArrFrom x = wrapIn recvArrFromNB #1 x
+ fun recvArrFrom' x = wrapIn recvArrFromNB' #1 x
+
+ fun wrapOutNB f s x =
+ case f x of
+ NONE => NONE before socket (s x) notifyCANOUTPUT false
+ | SOME v => SOME v
+
+ val sendVecNB = fn x => wrapOutNB sendVecNB #1 x
+ val sendVecNB' = fn x => wrapOutNB sendVecNB' #1 x
+ val sendArrNB = fn x => wrapOutNB sendArrNB #1 x
+ val sendArrNB' = fn x => wrapOutNB sendArrNB' #1 x
+
+ fun wrapOutNBbool f s x =
+ case f x of
+ false => false before socket (s x) notifyCANOUTPUT false
+ | true => true
+
+ val sendVecToNB = fn x => wrapOutNBbool sendVecToNB #1 x
+ val sendVecToNB' = fn x => wrapOutNBbool sendVecToNB' #1 x
+ val sendArrToNB = fn x => wrapOutNBbool sendArrToNB #1 x
+ val sendArrToNB' = fn x => wrapOutNBbool sendArrToNB' #1 x
+
+ fun wrapOut f s x = (
+ stopTill (socket (s x) CANOUTPUT);
+ case f x of
+ NONE => wrapOut f s x
+ | SOME x => x)
+
+ fun sendVec x = wrapOut sendVecNB #1 x
+ fun sendVec' x = wrapOut sendVecNB' #1 x
+ fun sendArr x = wrapOut sendArrNB #1 x
+ fun sendArr' x = wrapOut sendArrNB' #1 x
+
+ fun wrapOutbool f s x = (
+ stopTill (socket (s x) CANOUTPUT);
+ case f x of
+ false => wrapOutbool f s x
+ | true => ())
+
+ fun sendVecTo x = wrapOutbool sendVecToNB #1 x
+ fun sendVecTo' x = wrapOutbool sendVecToNB' #1 x
+ fun sendArrTo x = wrapOutbool sendArrToNB #1 x
+ fun sendArrTo' x = wrapOutbool sendArrToNB' #1 x
+
+ val acceptNB = fn s =>
+ case acceptNB s of
+ NONE => NONE before socket s notifyHASINPUT false
+ | SOME (s, a) =>
+ (* It is safe to say no input, b/c edge triggered APIs always
+ * give at least one initial status report. It is also safe
+ * for level triggered, since this gets it added to the poll.
+ * Thus, no really fast sends are lost.
+ *
+ * This is the smart thing to do, because SYN+ACK takes a while
+ * to reach the client. So, there's no point wasting a recv()
+ * when it's almost surely not going to have data yet anyways.
+ *)
+ SOME (s, a) before socket s monitor { hasinput = false,
+ canoutput = true }
+ fun accept x = wrapIn acceptNB (fn s => s) x
+
+ val close = fn s => (socket s unmonitor; close s)
+
+ val listen = fn (s, i) =>
+ (* due to a bug in BSD's kqueue API, we must re-monitor *)
+ (socket s unmonitor;
+ listen (s, i);
+ socket s monitor { hasinput = false, canoutput = true })
+
+ val connect = fn (s, a) =>
+ case connectNB (s, a) of
+ true => ()
+ | false => (
+ stopTill (socket s CANOUTPUT);
+ (* Get the error status, if getERROR doesn't raise, we raise
+ * something generic since we only know that it failed.
+ *)
+ if Socket.Ctl.getERROR s
+ then raise OS.SysErr ("Connection failed", NONE)
+ else ())
+
+ fun select {rds, wrs, exs, timeout} =
+ let
+ datatype which =
+ RDS of sock_desc | WRS of sock_desc | TIMER
+
+ val rds = List.map (fn rd => (sockdes rd HASINPUT, RDS rd)) rds
+ val wrs = List.map (fn wr => (sockdes wr CANOUTPUT, WRS wr)) wrs
+ val tmr = case timeout of SOME x => [(TIMEOUT x, TIMER)] | NONE => []
+ val events = List.concat [rds, wrs, tmr]
+
+ val ords = ref []
+ val owrs = ref []
+
+ fun split (RDS rd) = ords := rd :: !ords
+ | split (WRS wr) = owrs := wr :: !owrs
+ | split TIME = ()
+ in
+ List.app split (Thread.select events);
+ {rds = !ords, wrs = !owrs, exs = []}
+ end
+ end
+
+structure Wrap =
+ struct
+ local
+ open IoEvent
+ in
+ val monitor = fn s =>
+ s before socket s monitor { hasinput = false, canoutput = false }
+
+ fun monitorPair (s, t) = (monitor s, monitor t)
+ end
+ end
+
+structure GenericSock : GENERIC_SOCK =
+ struct
+ open GenericSock
+ open Wrap
+
+ val socket = fn x => monitor (socket x)
+ val socket' = fn x => monitor (socket' x)
+ val socketPair = fn x => monitorPair (socketPair x)
+ val socketPair' = fn x => monitorPair (socketPair' x)
+ end
+
+structure INetSock : INET_SOCK =
+ struct
+ open INetSock
+ open Wrap
+
+ structure UDP =
+ struct
+ open UDP
+ val socket = fn x => monitor (socket x)
+ val socket' = fn x => monitor (socket' x)
+ end
+
+ structure TCP =
+ struct
+ open TCP
+ val socket = fn x => monitor (socket x)
+ val socket' = fn x => monitor (socket' x)
+ end
+ end
+
+structure UnixSock : UNIX_SOCK =
+ struct
+ open UnixSock
+ open Wrap
+
+ structure Strm =
+ struct
+ open Strm
+ val socket = fn x => monitor (socket x)
+ val socketPair = fn x => monitorPair (socketPair x)
+ end
+
+ structure DGrm =
+ struct
+ open DGrm
+ val socket = fn x => monitor (socket x)
+ val socketPair = fn x => monitorPair (socketPair x)
+ end
+ end
Added: mltonlib/trunk/ca/terpstra/st/st.mlb
===================================================================
--- mltonlib/trunk/ca/terpstra/st/st.mlb 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/st.mlb 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,56 @@
+local
+ $(SML_LIB)/basis/basis.mlb
+ $(SML_LIB)/basis/mlton.mlb
+ $(SML_LIB)/mlnlffi-lib/mlnlffi-lib.mlb
+
+ ann
+ "allowFFI true"
+ in
+ data.sig
+ data.sml
+
+ state.sig
+ state.sml
+ thread.sig
+ thread.sml
+
+ timeout.sig
+ timeout.sml
+ ioevent.sig
+ ioevent.sml
+
+ scheduler.sig
+ epoll.sig
+ edge.fun
+ lpoll.sig
+ level.fun
+
+ kevent/kevent.mlb
+ kqueue.sml
+
+(* epoll/epoll.mlb
+ epoll.sml
+*)
+ socket.sml
+ end
+in
+ signature STATE
+ signature THREAD
+ signature TIMEOUT
+ signature IOEVENT
+ signature SCHEDULER
+
+ structure State
+ structure Thread
+ structure Timeout
+ structure IoEvent
+ structure Scheduler
+
+ (* override basis definitions with ours -- we have hooks *)
+ structure Socket
+ structure GenericSock
+ structure INetSock
+ structure UnixSock
+
+ open.sml
+end
Added: mltonlib/trunk/ca/terpstra/st/state.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/state.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/state.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,47 @@
+(* Attempts to classify states as 'level-triggered' or 'edge-triggered'
+ * will fail, as these terms make sense only at the intersection of states
+ * and blocking primitives. Both styles (and others) can be realized using
+ * the watch and value methods.
+ *
+ * A given state may only be watched once (1 time in 1 thread).
+ * If a second watch is attempted, the RaceCondition exception is raised.
+ *)
+signature STATE =
+ sig
+ type ('val, 'diff) state
+ type 'diff signal = 'diff -> unit
+
+ (* create a new state *)
+ val state: ''val -> (''val, ''val) state * ''val signal
+ val delta: ('val * 'diff -> 'val option) -> 'val ->
+ ('val, 'diff) state * 'diff signal
+
+ (* get the current value of a state *)
+ val value: ('val, 'diff) state -> 'val
+
+ (* hook a callback invoked when the state changes *)
+ exception RaceCondition
+ exception UnWatched
+ val dwatch: ('val * 'diff -> unit) -> ('val, 'diff) state -> unit
+ val swatch: ('val -> unit) -> ('val, 'diff) state -> unit
+ val release: ('val, 'diff) state -> unit
+
+ (* map this state into a new derived state *)
+ val smap: ('val1 -> 'val2) ->
+ ('val1, 'val1) state -> ('val2, 'val2) state
+ val dmap: ('val1 -> 'val2) *
+ ('val1 * 'diff1 * 'val2 -> ('val2 * 'diff2) option) ->
+ ('val1, 'diff1) state -> ('val2, 'diff2) state
+
+ (* compose two states into their product *)
+ datatype ('diff1, 'diff2) alt = DIFF1 of 'diff1 | DIFF2 of 'diff2
+ val scompose: ('val1, 'val1) state * ('val2, 'val2) state ->
+ ('val1 * 'val2, 'val1 * 'val2) state
+ val dcompose: ('val1, 'diff1) state * ('val2, 'diff2) state ->
+ ('val1 * 'val2, ('diff1, 'diff2) alt) state
+
+ (* If you want multiple watchers on the same state *)
+ type ('val, 'diff) broadcast
+ val broadcast: ('val, 'diff) state -> ('val, 'diff) broadcast
+ val clone: ('val, 'diff) broadcast -> ('val, 'diff) state
+ end
Added: mltonlib/trunk/ca/terpstra/st/state.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/state.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/state.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,133 @@
+structure State :> STATE =
+ struct
+ type ('val, 'diff) state = {
+ value: unit -> 'val,
+ release: unit -> unit,
+ watch: ('val * 'diff -> unit) -> unit
+ }
+ type 'diff signal = 'diff -> unit
+ exception RaceCondition
+ exception UnWatched
+
+ fun delta update init =
+ let
+ val state = ref init
+ val block = ref NONE
+
+ fun value () = !state
+ fun release () =
+ case !block of
+ NONE => raise UnWatched
+ | SOME _ => block := NONE
+ fun watch f =
+ case !block of
+ NONE => block := SOME f
+ | SOME _ => raise RaceCondition
+ fun signal diff =
+ case update (!state, diff) of
+ NONE => ()
+ | SOME newval =>
+ case !block of
+ NONE => state := newval
+ | SOME f => (state := newval; f (newval, diff))
+ in
+ ({ value = value, release = release, watch = watch }, signal)
+ end
+ fun state init = delta (fn (s, d) => if s = d then NONE else SOME d) init
+
+ fun value { value, release=_, watch=_ } = value ()
+ fun release { value=_, release, watch=_ } = release ()
+ fun dwatch f { value=_, release=_, watch } = watch f
+ fun swatch f = dwatch (fn (x, _) => f x)
+
+ fun dmap (valmap, diffmap) state =
+ let
+ val valproxy = ref NONE
+ val block = ref NONE
+
+ fun proxy (val1, diff1) =
+ case diffmap (val1, diff1, valOf (!valproxy)) of
+ NONE => ()
+ | SOME (newval2, diff2) =>
+ case !block of
+ NONE => valproxy := SOME newval2
+ | SOME f => (valproxy := SOME newval2; f (newval2, diff2))
+
+ val watch = fn f =>
+ case !block of
+ NONE => (dwatch proxy state; (* first b/c it might raise *)
+ block := SOME f;
+ valproxy := SOME (valmap (value state)))
+ | SOME _ => raise RaceCondition
+ val value = fn () =>
+ case !valproxy of
+ NONE => valmap (value state)
+ | SOME x => x
+ val release = fn () =>
+ case !block of
+ NONE => raise UnWatched
+ | SOME _ => (release state; block := NONE; valproxy := NONE)
+ in
+ { value = value, release = release, watch = watch }
+ end
+ fun smap valmap =
+ dmap (valmap, fn (v, _, _) => let val v2 = valmap v in SOME (v2, v2) end)
+
+ datatype ('diff1, 'diff2) alt = DIFF1 of 'diff1 | DIFF2 of 'diff2
+ fun dcompose (state1, state2) =
+ let
+ val block = ref NONE
+ fun proxy1 (val1, diff1) =
+ (valOf (!block)) ((val1, value state2), DIFF1 diff1)
+ fun proxy2 (val2, diff2) =
+ (valOf (!block)) ((value state1, val2), DIFF2 diff2)
+
+ val watch = fn f =>
+ case !block of
+ NONE => (
+ dwatch proxy1 state1;
+ (dwatch proxy2 state2 handle ex => (release state1; raise ex));
+ block := SOME f)
+ | SOME _ => raise RaceCondition
+ val value = fn () =>
+ (value state1, value state2)
+ val release = fn () =>
+ case !block of
+ NONE => raise UnWatched
+ | SOME _ => (release state1; release state2; block := NONE)
+ in
+ { value = value, release = release, watch = watch }
+ end
+
+ fun scompose (state1, state2) =
+ let
+ val block = ref NONE
+ fun proxy1 (val1, diff1) =
+ let val val2 = value state2 in
+ (valOf (!block)) ((val1, val2), (val1, val2)) end
+ fun proxy2 (val2, diff2) =
+ let val val1 = value state1 in
+ (valOf (!block)) ((val1, val2), (val1, val2)) end
+
+ val watch = fn f =>
+ case !block of
+ NONE => (
+ dwatch proxy1 state1;
+ (dwatch proxy2 state2 handle ex => (release state1; raise ex));
+ block := SOME f)
+ | SOME _ => raise RaceCondition
+ val value = fn () =>
+ (value state1, value state2)
+ val release = fn () =>
+ case !block of
+ NONE => raise UnWatched
+ | SOME _ => (release state1; release state2; block := NONE)
+ in
+ { value = value, release = release, watch = watch }
+ end
+
+ (* !!! fixme *)
+ type ('val, 'diff) broadcast = ('val, 'diff) state
+ fun broadcast state = state
+ fun clone broadcaster = broadcaster
+ end
Added: mltonlib/trunk/ca/terpstra/st/test.mlb
===================================================================
--- mltonlib/trunk/ca/terpstra/st/test.mlb 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/test.mlb 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,6 @@
+local
+ $(SML_LIB)/basis/basis.mlb
+ st.mlb
+in
+ test.sml
+end
Added: mltonlib/trunk/ca/terpstra/st/test.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/test.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/test.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,59 @@
+type port = (INetSock.inet, Socket.passive Socket.stream) Socket.sock
+
+(* There must be a better (faster!) way to convert a string to unsigned char *)
+fun msg s = Word8VectorSlice.full (Word8Vector.tabulate
+ (String.size s, Word8.fromInt o Char.ord o (fn i => String.sub (s, i))))
+fun str v = CharVector.tabulate (Word8Vector.length v,
+ Char.chr o Word8.toInt o (fn i => Word8Vector.sub (v, i)))
+
+val delay = Time.fromSeconds 5
+val port : port = INetSock.TCP.socket ()
+val () = Socket.Ctl.setREUSEADDR (port, true)
+val () = Socket.bind (port, INetSock.any 12467)
+val () = Socket.listen (port, 100)
+
+val google = valOf (NetHostDB.getByName "www.google.de")
+val ghttp = INetSock.toAddr (NetHostDB.addr google, 80)
+
+fun http () =
+ let
+ val s = INetSock.TCP.socket ()
+ val () = print "connecting...\n"
+ val () = Socket.connect (s, ghttp)
+ val () = print "sending...\n"
+ val _ = Socket.sendVec (s, msg "GET / HTTP/1.1\nHost: www.google.de\n\n")
+ val () = print "reading...\n"
+ val got = Socket.recvVec (s, 4096)
+ val () = print "done!\n"
+ in
+ print ("response: " ^ str got ^ "\n")
+ end
+
+fun worker s () =
+ let
+ val _ = Socket.sendVec (s, msg "hello and welcome!\n");
+ val got = Word8VectorSlice.full (Socket.recvVec (s, 400))
+ in
+ if Word8VectorSlice.length got = 0 then Socket.close s else
+ (Socket.sendVec (s, got); worker s ())
+ end
+
+fun welcome () =
+ let
+ val (s, _) = Socket.accept port
+ in
+ spawn (worker s);
+ welcome ()
+ end
+
+fun beat () = (
+ stopTill (TIMEOUT delay);
+ print "hello world\n";
+ beat ())
+
+val () = spawn welcome
+val () = spawn beat
+val () = spawn http
+val () = spawn http
+
+val () = main ()
Added: mltonlib/trunk/ca/terpstra/st/thread.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/thread.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/thread.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,18 @@
+signature THREAD =
+ sig
+ (* start a new thread, which will be run later *)
+ val spawn: (unit -> unit) -> unit
+ val yield: 'a -> 'a (* release control for a tick *)
+
+ val stopTill: (bool, 'a) State.state -> unit
+ val select: ((bool, 'b) State.state * 'a) list -> 'a list
+ end
+
+signature THREAD_EXTRA =
+ sig
+ include THREAD
+
+ datatype loop = COMPLETE | PENDING
+ val run: unit -> loop (* process queue till completed or stopped *)
+ val stop: unit -> unit (* stop processing queue and return soon *)
+ end
Added: mltonlib/trunk/ca/terpstra/st/thread.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/thread.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/thread.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,65 @@
+structure Thread_Extra :> THREAD_EXTRA =
+ struct
+ open MLton.Thread
+ open State
+ type thread = Runnable.t
+
+ val ready : thread Queue.queue = Queue.new ()
+ val loop : thread option ref = ref NONE
+ val quit : bool ref = ref false
+
+ fun next () =
+ if Queue.empty ready orelse !quit then valOf (!loop) else
+ valOf (Queue.deque ready)
+
+ fun spawn main =
+ Queue.enque (ready, prepare (new
+ (fn () => (main (); switch (fn _ => next ()))), ()))
+
+ fun yield result = switch (fn thread => (
+ Queue.enque (ready, prepare (thread, result));
+ next ()))
+
+ datatype loop = COMPLETE | PENDING
+ fun run () = (
+ quit := false;
+ switch (fn thread => (loop := SOME (prepare (thread, ())); next ()));
+ case Queue.empty ready of
+ true => COMPLETE | false => PENDING)
+
+ fun stop () = quit := true
+
+ (* the while loop deals with the case that a state may have only
+ * temporarily become true (before switch), but is not true any longer.
+ *)
+ fun stopTill state =
+ while not (value state) do switch (fn thread =>
+ let
+ fun resume _ = (
+ release state;
+ Queue.enque (ready, prepare (thread, ())))
+ in
+ swatch resume state;
+ next ()
+ end)
+
+ fun select events =
+ let
+ fun map (state, res) = if value state then SOME res else NONE
+ fun block thread =
+ let
+ fun resume _ = (
+ List.app (fn (state, _) => release state) events;
+ Queue.enque (ready, prepare (thread, ())))
+ in
+ List.app (fn (state, _) => swatch resume state) events;
+ next ()
+ end
+ in
+ case List.mapPartial map events of
+ x :: r => x :: r
+ | [] => (switch block; select events)
+ end
+ end
+
+structure Thread :> THREAD = Thread_Extra
Added: mltonlib/trunk/ca/terpstra/st/timeout.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/timeout.sig 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/timeout.sig 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,21 @@
+signature TIMEOUT =
+ sig
+ (* TIMEOUT is measured since the last IO poll, not the instant called *)
+ val TIMEOUT: Time.time -> (bool, bool) State.state
+
+ (* LATERTHAN is an absolute time value *)
+ val LATERTHAN: Time.time -> (bool, bool) State.state
+
+ (* What is the cached time as of last tick (fast) *)
+ val lastTick: unit -> Time.time
+ end
+
+signature TIMEOUT_EXTRA =
+ sig
+ include TIMEOUT
+
+ (* The earliest pending timer (if any) *)
+ val getNext: unit -> Time.time option
+ (* Toggle all states to true prior to the given *)
+ val trigger: Time.time -> unit
+ end
Added: mltonlib/trunk/ca/terpstra/st/timeout.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/timeout.sml 2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/timeout.sml 2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,46 @@
+(* !!! fixme: timers persist in the heap even if unreferenced.
+ * once MLton bug is fixed, use MLton.Weak and MLton.Finalizable
+ *)
+structure Timeout_Extra :> TIMEOUT_EXTRA =
+ struct
+ open State
+ open Time
+ open Heap
+
+ type sleeper = time * bool signal
+ fun nextSleeper ((t1, _), (t2, _)) = t1 < t2
+ val sleeper = new nextSleeper
+ val rLastTick = ref (Time.now ())
+
+ fun lastTick () = !rLastTick
+
+ fun LATERTHAN time =
+ let
+ val (state, signal) = state false
+ in
+ push (sleeper, (time, signal));
+ state
+ end
+
+ fun TIMEOUT time = LATERTHAN (time + lastTick ())
+
+ fun getNext () =
+ case peek sleeper of
+ NONE => NONE
+ | SOME (t, _) => SOME t
+
+ fun trigger time =
+ let
+ fun loop () =
+ case peek sleeper of
+ NONE => ()
+ | SOME (t, s) =>
+ if time < t then () else
+ (pop sleeper; s true; loop ())
+ in
+ rLastTick := time;
+ loop ()
+ end
+ end
+
+structure Timeout :> TIMEOUT = Timeout_Extra
More information about the MLton-commit
mailing list