[MLton-commit] r6454
spoons at mlton.org
spoons at mlton.org
Mon Mar 3 07:41:23 PST 2008
SML portion of the parallel library.
Provide support for richer parallel primitives, as well as multiple
scheduling policies.
----------------------------------------------------------------------
U mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb
U mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml
U mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig
U mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml
U mlton/branches/shared-heap-multicore/basis-library/mlton.mlb
A mlton/branches/shared-heap-multicore/basis-library/parallel/
A mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig
A mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig
A mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig
A mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig
A mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig
A mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map
A mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map
A mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map
A mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml
A mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig
A mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map
A mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml
----------------------------------------------------------------------
Modified: mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb 2008-03-03 15:41:17 UTC (rev 6454)
@@ -281,6 +281,10 @@
../mlton/exit.sml
../mlton/exn.sig
../mlton/exn.sml
+
+ ann "allowFFI true" in
+ ../parallel/internal.sml
+ end
../mlton/thread.sig
../mlton/thread.sml
../mlton/signal.sig
@@ -363,6 +367,22 @@
../mlton/world.sml
../mlton/mono-array.sig
../mlton/mono-vector.sig
+
+ ../parallel/workqueue.sig
+ ../parallel/basic.sig
+ ../parallel/future.sig
+ ../parallel/forkjoin.sig
+ ../parallel/array.sig
+ ../parallel/parallel.sig
+ ann "allowFFI true" in
+ ../parallel/$(WORK_QUEUE).sml
+ ../parallel/basic.sml
+ ../parallel/future.sml
+ ../parallel/forkjoin.sml
+ end
+ ../parallel/array.sml
+ ../parallel/parallel.sml
+
../mlton/mlton.sig
../mlton/mlton.sml
Modified: mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -94,6 +94,7 @@
signature MLTON_ITIMER = MLTON_ITIMER
signature MLTON_MONO_ARRAY = MLTON_MONO_ARRAY
signature MLTON_MONO_VECTOR = MLTON_MONO_VECTOR
+signature MLTON_PARALLEL = MLTON_PARALLEL
signature MLTON_PLATFORM = MLTON_PLATFORM
signature MLTON_POINTER = MLTON_POINTER
signature MLTON_PROC_ENV = MLTON_PROC_ENV
Modified: mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -34,6 +34,7 @@
structure Itimer: MLTON_ITIMER
structure LargeReal: MLTON_REAL
structure LargeWord: MLTON_WORD
+ structure Parallel: MLTON_PARALLEL
structure Platform: MLTON_PLATFORM
structure Pointer: MLTON_POINTER
structure ProcEnv: MLTON_PROC_ENV
Modified: mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -63,6 +63,7 @@
open LargeWord
type t = word
end
+structure Parallel = MLtonParallel
structure Platform = MLtonPlatform
structure Pointer = MLtonPointer
structure ProcEnv = MLtonProcEnv
Modified: mlton/branches/shared-heap-multicore/basis-library/mlton.mlb
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/mlton.mlb 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/mlton.mlb 2008-03-03 15:41:17 UTC (rev 6454)
@@ -26,6 +26,7 @@
signature MLTON_ITIMER
signature MLTON_MONO_ARRAY
signature MLTON_MONO_VECTOR
+ signature MLTON_PARALLEL
signature MLTON_PLATFORM
signature MLTON_POINTER
signature MLTON_PROC_ENV
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,14 @@
+signature MLTON_PARALLEL_ARRAY =
+sig
+ (* parallel versions of ordinary array operations. maxSeq indicates the
+ largest number of elements that will be computed sequentially on a single
+ processor. *)
+ (* maxSeq f n *)
+ val tabulate : int -> (int -> 'a) -> int -> 'a Array.array
+ (* maxSeq f a *)
+ val modify : int -> (int * 'a -> 'a) -> 'a Array.array -> unit
+
+ (* see the comment for MLTON_PARALLEL_FORKJOIN.reduce *)
+ (* maxSeq "*" "inj" "unit" a *)
+ val reduce : int -> ('b * 'b -> 'b) -> ('a -> 'b) -> 'b -> 'a Array.array -> 'b
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,43 @@
+structure MLtonParallelArray =
+struct
+
+ structure B = MLtonParallelBasic
+ structure F = MLtonParallelForkJoin
+
+ fun tabulate maxSeq f size =
+ let
+ (* XXX check that maxSeq is large enough to ensure atomic writes *)
+ val () = if maxSeq < 1 then raise B.Parallel "maxSeq must be at least 1" else ()
+ val a = Array.arrayUninit size
+ val () = F.reduce maxSeq
+ (fn ((), ()) => ())
+ (fn i => Array.update (a, i, f i))
+ ()
+ size
+ in
+ a
+ end
+
+ fun modify maxSeq f a =
+ let
+ (* XXX check that maxSeq is large enough to ensure atomic writes *)
+ val () = if maxSeq < 1 then raise B.Parallel "maxSeq must be at least 1" else ()
+ val () = F.reduce maxSeq
+ (fn ((), ()) => ())
+ (fn i => Array.update (a, i,
+ f (i, Array.sub (a, i))))
+ ()
+ (Array.length a)
+ in
+ ()
+ end
+
+ (* XXX check that maxSeq is large enough to ensure atomic writes *)
+ fun arrayReduce maxSeq f g u a = F.reduce maxSeq
+ f
+ (fn i => g (Array.sub (a, i)))
+ u
+ (Array.length a)
+ val reduce = arrayReduce
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,45 @@
+signature MLTON_PARALLEL_BASIC =
+sig
+
+ (* the empty type *)
+ type void
+ (* the type of a parallel task -- work never returns normally *)
+ type work = unit -> void
+ (* a suspended computation waiting for a value of type 'a *)
+ type 'a t
+
+ (* reify the current point of execution and then add the given work to the
+ queue; the list is assumed to be given in order of decreasing priority. NB
+ no processor should resume this suspension until *AFTER* it has returned a
+ new set of parallel tasks. *)
+ val suspend : ('a t -> work list) -> 'a
+
+ (* end the current task and add the given computation to the queue *)
+ val resume : 'a t * 'a -> void
+
+ (* end the current task and return control to the scheduler *)
+ val return : unit -> void
+
+ (* add the given work to the queue; the list is assumed to be given in order
+ of decreasing priority. may suspend under some scheduling policies -- the
+ implementation performs no worse than
+ addWork ws = suspend (fn k => (fn () => resume (k, ()))::ws)
+ *)
+ val addWork : work list -> unit
+
+ (* temporarily yield, but continue as scheduling policy permits.
+ the implementation performs no worse than
+ continue f = suspend (fn k => [fn () => resume (k, f ())])
+ *)
+ val continue : (unit -> 'a) -> 'a
+
+ (* general errors related to parallelism *)
+ exception Parallel of string
+
+ (* informational *)
+ val policyName : string
+ val numberOfProcessors : int
+ val maxBytesLive : unit -> Word64.word
+ val resetBytesLive : unit -> unit
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,107 @@
+structure MLtonParallelBasic :> MLTON_PARALLEL_BASIC =
+struct
+
+ type void = unit
+ type work = unit -> void
+
+ val numberOfProcessors = MLtonParallelInternal.numberOfProcessors
+
+ structure Q = WorkQueue (struct
+ type work = work
+ val numberOfProcessors = fn () => numberOfProcessors
+ end)
+ :> PARALLEL_WORKQUEUE where type work = work
+
+ val processorNumber = MLtonParallelInternal.processorNumber
+ val profileDisable = _import "GC_profileDisable": unit -> unit;
+ val profileEnable = _import "GC_profileEnable": unit -> unit;
+
+ exception Parallel of string
+
+ structure T = MLtonThread
+ type 'a t = 'a T.t
+
+ val enabled = ref true
+
+ fun schedule ws () =
+ let
+ fun loop p =
+ let in
+ case Q.getWork p
+ of NONE =>
+ let in
+ (* if !enabled then (enabled := false; profileDisable ()) else (); *)
+ ()
+ end
+ | SOME w =>
+ let in
+ (* if not (!enabled) then (enabled := true; profileEnable ()) else (); *)
+ w ()
+ handle e => TextIO.output (TextIO.stdErr,
+ ("WARNING: Caught exception \""
+ ^ (Primitive.Exn.name e)
+ ^ "\" in parallel scheduler!\n"))
+ end;
+ (* NB we call processorNumber again here in case that this job has
+ been split across two processors *)
+ loop (processorNumber ())
+ end
+
+ val p = processorNumber ()
+ in
+ Q.addWork p ws;
+ Q.finishWork p;
+ loop p
+ end
+
+ fun suspend f =
+ T.switch
+ (fn k =>
+ let
+ val ws = f k
+ in
+ (* Note that we cannot call addWork on this thread! One of
+ the ws may contain a reference to the current thread, and if
+ that work is scheduled it would be running on the same stack
+ as us! *)
+ (* Also, we can't just call schedule here because we need to
+ preserve the current thread/stack. Instead we create a new
+ thread that will continue by calling schedule. *)
+ T.prepare (T.new (schedule ws), ())
+ end)
+
+ fun resume (k, v) =
+ if Q.shouldYield (processorNumber ()) then
+ let in
+ schedule [fn () => T.switch (fn _ => T.prepare (k, v))] ()
+ end
+ else
+ T.switch (fn _ => T.prepare (k, v))
+
+ val return = schedule []
+
+ fun addWork ws =
+ let
+ val p = processorNumber ()
+ in
+ if Q.shouldYield p then
+ suspend (fn k => (fn () => resume (k, ()))::ws)
+ else
+ Q.addWork p ws
+ end
+
+ fun continue f =
+ if Q.shouldYield (processorNumber ()) then
+ suspend (fn k => [fn () => resume (k, f ())])
+ else
+ f ()
+
+ val () = (_export "Parallel_run": (unit -> void) -> unit;) return
+ (* init MUST come after schedulerLoop has been exported *)
+ val () = (_import "Parallel_init": unit -> unit;) ()
+
+ val policyName = Q.policyName
+ val maxBytesLive = _import "Parallel_maxBytesLive": unit -> Word64.word;
+ val resetBytesLive = _import "Parallel_resetBytesLive": unit -> unit;
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,25 @@
+signature MLTON_PARALLEL_FORKJOIN =
+sig
+
+ (* run two functions, possibly in parallel, and return their results as the
+ components of a pair *)
+ val fork : (unit -> 'a) * (unit -> 'b) -> 'a * 'b
+
+ (* aggregate some intermediate results. you can think of the arguments like
+ the multiplication and unit operations of a group, along with an injection
+ function from the integers. reduce tabulates the integers from zero
+ (inclusive) to "length" (exclusion), injects them into the group and then
+ multiplies them up.
+
+ "*" must be associative and the unit must really be the identity element of
+ the group. assuming these are true, reduce behaves according to the
+ following equivalence:
+ reduce _ m j u l = List.fold m u (List.tabulate (l, fn i => j i))
+
+ maxSeq is the largest number of injections/multiplications that are run
+ sequentially on a single processor.
+*)
+ (* maxSeq "*" "inj" unit length *)
+ val reduce : int -> ('b * 'b -> 'b) -> (int -> 'b) -> 'b -> int -> 'b
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,63 @@
+structure MLtonParallelForkJoin :> MLTON_PARALLEL_FORKJOIN =
+struct
+
+ structure B = MLtonParallelBasic
+
+ val fetchAndAdd = _import "Parallel_fetchAndAdd": Int32.int ref * Int32.int -> Int32.int;
+
+ datatype 'a result =
+ NotYet
+ | Finished of 'a
+ | Raised of exn
+
+ fun fork (f, g) =
+ let
+ val c = ref 0
+ val l = ref NotYet
+ val r = ref NotYet
+
+ fun wrap k h res () =
+ let
+ val v = Finished (h ())
+ handle e => Raised e
+ val () = res := v
+ val t = fetchAndAdd (c, 1)
+ in
+ if t = 1 then
+ B.resume (k, (!l, !r))
+ else
+ B.return ()
+ end
+ in
+ case B.suspend (fn k => [wrap k f l, wrap k g r])
+ of (Finished a, Finished b) => (a, b)
+ | (Raised e, _) => raise e
+ | (_, Raised e) => raise e
+ | _ => raise B.Parallel "impossible"
+ end
+
+ fun reduce maxSeq f g u n =
+ let
+ val () = if maxSeq < 1 then raise B.Parallel "maxSeq must be at least 1" else ()
+
+ fun wrap i l () =
+ if l <= maxSeq then
+ let
+ val stop = i + l
+ fun loop j v = if j = stop then v
+ else loop (j + 1) (f (v, g j))
+ in
+ loop i u
+ end
+ else
+ let
+ val l' = l div 2
+ in
+ f (fork (wrap i l',
+ wrap (i + l') (l - l')))
+ end
+ in
+ wrap 0 n ()
+ end
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,15 @@
+signature MLTON_PARALLEL_FUTURE =
+sig
+
+ (* a future yielding a value of type 'a *)
+ type 'a t
+
+ (* depending on the scheduling policy BOTH future and force may suspend the
+ current task. *)
+ (* create a new parallel future. futures may be executed speculatively. *)
+ val future : (unit -> 'a) -> 'a t
+
+ (* force the execution of a future if that has not yet occured. *)
+ val force : 'a t -> 'a
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,69 @@
+structure MLtonParallelFuture :> MLTON_PARALLEL_FUTURE =
+struct
+
+ structure B = MLtonParallelBasic
+
+ val fetchAndAdd = _import "Parallel_fetchAndAdd": Int32.int ref * Int32.int -> Int32.int;
+
+ datatype 'a result =
+ NotYet
+ | Finished of 'a
+ | Raised of exn
+
+ type 'a t = 'a result ref * 'a result B.t option ref * int ref
+
+ fun future f =
+ let
+ val r = ref NotYet
+ val n = ref NONE
+ val c = ref 0
+ val () = B.addWork
+ [fn () =>
+ let
+ val res = Finished (f ())
+ handle e => Raised e
+ val () = r := res
+ val t = fetchAndAdd (c, 1)
+ in
+ if t = 0 then B.return ()
+ else B.resume (valOf (!n), res)
+ end]
+ in
+ (r, n, c)
+ end
+
+ fun force (r, n, c) =
+ let
+ val res = if !c = 1 then
+ B.continue (fn () => !r)
+ else
+(*
+ B.suspend (fn k =>
+ let
+ val () = n := SOME k
+ val t = fetchAndAdd (c, 1)
+ in
+ if t = 0 then []
+ else [fn () => B.resume (k, !r)]
+ end)
+*)
+ B.suspend (fn k =>
+ let
+ val () = n := SOME k
+ in
+ [fn () =>
+ let
+ val t = fetchAndAdd (c, 1)
+ in
+ if t = 0 then B.return () else B.resume (k, !r)
+ end]
+ end)
+
+ in
+ case res of
+ Finished v => v
+ | Raised e => raise e
+ | NotYet => raise B.Parallel "got NotYet in force!"
+ end
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,8 @@
+structure MLtonParallelInternal =
+struct
+
+ val numberOfProcessors = Int32.toInt ((_import "Parallel_numberOfProcessors": unit -> Int32.int;) ())
+
+ val processorNumber = _import "Parallel_processorNumber": unit -> Int32.int;
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,8 @@
+signature MLTON_PARALLEL =
+sig
+
+ structure Basic : MLTON_PARALLEL_BASIC
+ structure ForkJoin : MLTON_PARALLEL_FORKJOIN
+ structure Array : MLTON_PARALLEL_ARRAY
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,8 @@
+structure MLtonParallel :> MLTON_PARALLEL =
+struct
+
+ structure Basic : MLTON_PARALLEL_BASIC = MLtonParallelBasic
+ structure ForkJoin : MLTON_PARALLEL_FORKJOIN = MLtonParallelForkJoin
+ structure Array : MLTON_PARALLEL_ARRAY = MLtonParallelArray
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE pbfworkqueue
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,65 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+ type proc = int
+ type work = W.work
+
+ local
+ val lock_ = _import "Parallel_lock": int -> unit;
+ val unlock_ = _import "Parallel_unlock": int -> unit;
+ in
+ fun lock () = lock_ 0
+ fun unlock () = unlock_ 0
+ end
+
+ datatype 'a mlist =
+ Cons of 'a * 'a mlist ref
+ | Nil
+
+ (* initialize state *)
+ val (head, tail) = (ref Nil, ref Nil)
+
+ fun addWork _ ws =
+ let
+ fun add w =
+ tail := (case !tail of
+ Cons (_, r) =>
+ let
+ val n = Cons (w, ref (!r))
+ val () = r := n
+ in
+ n
+ end
+ | Nil =>
+ let
+ val n = Cons (w, ref Nil)
+ val () = head := n
+ in
+ n
+ end)
+ in
+ lock ();
+ app add ws;
+ unlock ()
+ end
+
+ fun getWork _ =
+ let in
+ lock ();
+ case !head
+ of Nil => (unlock ();
+ NONE)
+ | Cons (w, t) => (head := !t;
+ case !t of Nil => tail := !t
+ | _ => ();
+ unlock ();
+ SOME w)
+ end
+
+ fun finishWork _ = ()
+
+ fun shouldYield _ = true
+
+ val policyName = "pbf"
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE pdfworkqueue
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,165 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+ type proc = int
+ type work = W.work
+
+ local
+ val lock_ = _import "Parallel_lock": int -> unit;
+ val unlock_ = _import "Parallel_unlock": int -> unit;
+ in
+ fun lock () = lock_ 0
+ fun unlock () = unlock_ 0
+ end
+
+ local
+ exception Impossible
+ open TextIO
+ in
+ fun die n = (output (stdErr,
+ "PDFWorkQueue: die at " ^ (Int.toString n) ^ "\n");
+ flushOut stdErr;
+ unlock ();
+ raise Impossible)
+ end
+
+ datatype 'a mdlist =
+ Cons of 'a mdlist ref * int ref * 'a option ref * 'a mdlist ref
+ | Nil
+
+ (* initialize global state *)
+ val head = Cons (ref Nil, ref 0, ref NONE, ref Nil)
+
+ (* private state *)
+ structure A = Array
+ val state = A.tabulate (W.numberOfProcessors (), fn _ => head)
+
+(*
+ fun ntos MNil = "_"
+ | ntos (MCons (l, s, c, wr, r)) =
+ let
+ val star = case !wr of SOME _ => "*" | _ => ""
+ val ls = case !l of MNil => "_"
+ | MCons (_, ls, _, _, _) => ls
+ val rs = case !r of MNil => "_"
+ | MCons (_, rs, _, _, _) => rs
+ in
+ (s ^ star ^ "[" ^ Int.toString (!c) ^ "](" ^ ls ^ "," ^ rs ^ ")")
+ end
+
+ fun pr nr =
+ let
+ fun loop MNil = ()
+ | loop (n as MCons (_, _, _, _, r)) =
+ let
+ in
+ print (ntos n ^ ", ");
+ loop (!r)
+ end
+
+ val s = case !nr of (MCons (_, s, _, _, _)) => s
+ | MNil => "nil"
+ in
+ print "hd: ";
+ loop head;
+ print ("\ncr: " ^ s ^ "\n")
+ end
+
+ local
+ val count = ref 0
+ in
+ fun next () = (Int.toString (!count))
+ before count := !count + 1
+ end
+*)
+
+ fun addWork p ws =
+ let
+ fun add w =
+ case A.unsafeSub (state, p) of
+ (l as Cons (_, c, wr, rl)) =>
+ let in
+ (* Can't read wr unless we have the lock! *)
+ case !wr of
+ (* Easy if we can replace the current node *)
+ NONE => wr := SOME w
+ (* Need to insert to the right of the current node *)
+ | SOME _ =>
+ let
+ val r = !rl
+ val n = Cons (ref l, ref 1, ref (SOME w), ref r)
+ val () = c := !c - 1
+ val () = A.unsafeUpdate (state, p, n)
+ val () = rl := n
+ in
+ case r of
+ Cons (lr, _, _, _) => lr := n
+ | Nil => ()
+ end
+ end
+ | Nil => die 1
+ in
+ lock ();
+ app add ws;
+ unlock ()
+ end
+
+ fun getWork p =
+ let
+ (* get assumes that its argument doesn't need its ref count
+ decremented *)
+ fun get Nil = NONE
+ | get (n as Cons (l, c, wr, r)) =
+ let in
+ case !wr of
+ SOME w => (wr := NONE;
+ A.unsafeUpdate (state, p, n);
+ c := !c + 1;
+ SOME w)
+ (* Otherwise, continue down the list *)
+ | _ => get (!r)
+ end
+ in
+ lock ();
+ (* leftmost => always start from the head *)
+ get head
+ before
+ unlock ()
+ end
+
+ fun finishWork p =
+ let in
+ case A.unsafeSub (state, p)
+ before A.unsafeUpdate (state, p, head) of
+ (Cons (l, c, wr, r)) =>
+ let
+ val () = lock ()
+ (* whatever happens, our state will no longer point at this node *)
+ val c' = !c - 1
+ in
+ (* decrement the ref count *)
+ c := c';
+ case (!wr, !l, !r, c') of
+ (SOME _, _, _, _) => () (* still work to do! *)
+ | (NONE, Nil, _, _) => () (* do nothing if first node in list *)
+ | (NONE, l as Cons (_, _, _, rl), r, 0) =>
+ (* if not first and new ref count is 0 then remove *)
+ let in
+ rl := r;
+ case r of Cons (lr, _, _, _) =>
+ lr := l
+ | Nil => ()
+ end
+ | (NONE, _, _, _) => (); (* do nothing if ref count > 0 *)
+ unlock ()
+ end
+ | Nil => die 2
+ end
+
+ (* PERF could check to see if there are any waiting jobs earlier in the
+ queue *)
+ fun shouldYield _ = true
+
+ val policyName = "pdf"
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE simpleworkqueue
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,42 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+ type proc = int
+ type work = W.work
+
+ local
+ val lock_ = _import "Parallel_lock": int -> unit;
+ val unlock_ = _import "Parallel_unlock": int -> unit;
+ in
+ fun lock () = lock_ 0
+ fun unlock () = unlock_ 0
+ end
+
+ (* initialize state *)
+ val queue = ref nil : work list ref
+
+ fun addWork _ ws =
+ let in
+ lock ();
+ queue := ws @ (!queue);
+ unlock ()
+ end
+
+ fun getWork _ =
+ let in
+ lock ();
+ case !queue
+ of nil => (unlock ();
+ NONE)
+ | w::ws => (queue := ws;
+ unlock ();
+ SOME w)
+ end
+
+ fun finishWork _ = ()
+
+ fun shouldYield _ = true
+
+ val policyName = "sim"
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,23 @@
+signature PARALLEL_WORKQUEUE =
+sig
+
+ (* processor identifier *)
+ type proc = int
+ (* abstract type of work *)
+ type work
+
+ (* these take the identifier of the current processor as their first
+ argument *)
+ (* add new work to the queue *)
+ val addWork : proc -> work list -> unit
+ (* remove the next, highest priority work *)
+ val getWork : proc -> work option
+ (* mark the most recent unit of work as done *)
+ val finishWork : proc -> unit
+ (* is there higher priority work for the given processor? *)
+ val shouldYield : proc -> bool
+
+ (* name of the current policy *)
+ val policyName : string
+
+end
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE wsworkqueue
Added: mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml 2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml 2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,83 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+ type proc = int
+ type work = W.work
+
+ val lock = _import "Parallel_lock": int -> unit;
+ val unlock = _import "Parallel_unlock": int -> unit;
+
+ local
+ exception Impossible
+ open TextIO
+ in
+ fun die n = (output (stdErr,
+ "WSWorkQueue: die at " ^ (Int.toString n) ^ "\n");
+ flushOut stdErr;
+ (* XX unlock (); *)
+ raise Impossible)
+ end
+
+ datatype 'a mdlist =
+ Cons of 'a mdlist ref * int ref * 'a option ref * 'a mdlist ref
+ | Nil
+
+ structure A = Array
+ val numberOfProcessors = W.numberOfProcessors ()
+
+ (* private state *)
+ val state = A.tabulate (numberOfProcessors, fn _ => nil)
+
+ fun addWork p ws =
+ let
+ val () = lock p
+ val lq = A.unsafeSub (state, p)
+ in
+ A.unsafeUpdate (state, p, ws @ lq);
+ unlock p
+ end
+
+ local
+ fun victim () = Word.toIntX (MLtonRandom.rand ()) mod numberOfProcessors
+ in
+ fun getWork p =
+ let
+ fun steal () =
+ let
+ val p' = victim ()
+ val () = lock p'
+ val lq' = A.unsafeSub (state, p')
+ val l = length lq'
+ in
+ (if l > 0 then
+ let
+ val (ws, ws') = (List.take (lq', l - 1), List.drop (lq', l - 1))
+ val w = case ws' of w::nil => w | _ => die 1
+ in
+ A.unsafeUpdate (state, p', ws);
+ SOME w
+ end
+ else NONE)
+ before unlock p'
+ end
+
+ val () = lock p
+ val lq = A.unsafeSub (state, p)
+ val res = case lq of
+ w::ws => (A.unsafeUpdate (state, p, ws);
+ SOME w
+ before unlock p)
+ | nil => (unlock p;
+ steal ())
+ in
+ res
+ end
+ end
+
+ fun finishWork p = ()
+
+ fun shouldYield _ = false
+
+ val policyName = "ws"
+
+end
More information about the MLton-commit
mailing list