aboutsummaryrefslogtreecommitdiff
path: root/libphobos/src/std/parallelism.d
diff options
context:
space:
mode:
Diffstat (limited to 'libphobos/src/std/parallelism.d')
-rw-r--r--libphobos/src/std/parallelism.d71
1 files changed, 60 insertions, 11 deletions
diff --git a/libphobos/src/std/parallelism.d b/libphobos/src/std/parallelism.d
index 664330a..bd467d2 100644
--- a/libphobos/src/std/parallelism.d
+++ b/libphobos/src/std/parallelism.d
@@ -955,16 +955,46 @@ uint totalCPUsImpl() @nogc nothrow @trusted
}
else version (linux)
{
- import core.sys.linux.sched : CPU_COUNT, cpu_set_t, sched_getaffinity;
+ import core.stdc.stdlib : calloc;
+ import core.stdc.string : memset;
+ import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
- cpu_set_t set = void;
- if (sched_getaffinity(0, cpu_set_t.sizeof, &set) == 0)
+ int count = 0;
+
+ /**
+ * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
+ * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
+ *
+ * The hardcode number also comes from ruby's source code.
+ * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
+ */
+ for (int n = 64; n <= 16384; n *= 2)
{
- int count = CPU_COUNT(&set);
+ size_t size = CPU_ALLOC_SIZE(count);
+ if (size >= 0x400)
+ {
+ auto cpuset = cast(cpu_set_t*) calloc(1, size);
+ if (cpuset is null) break;
+ if (sched_getaffinity(0, size, cpuset) == 0)
+ {
+ count = CPU_COUNT_S(size, cpuset);
+ }
+ CPU_FREE(cpuset);
+ }
+ else
+ {
+ cpu_set_t cpuset;
+ if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
+ {
+ count = CPU_COUNT(&cpuset);
+ }
+ }
+
if (count > 0)
return cast(uint) count;
}
+
return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
}
else version (Darwin)
@@ -2733,9 +2763,6 @@ public:
}
}
- foreach (ref t; tasks[])
- emplaceRef(t, RTask());
-
// Hack to take the address of a nested function w/o
// making a closure.
static auto scopedAddress(D)(scope D del) @system
@@ -2748,12 +2775,19 @@ public:
void useTask(ref RTask task)
{
import std.algorithm.comparison : min;
+ import core.lifetime : emplace;
+
+ // Private constructor, so can't feed it's arguments directly
+ // to emplace
+ emplace(&task, RTask
+ (
+ scopedAddress(&reduceOnRange),
+ range,
+ curPos, // lower bound.
+ cast() min(len, curPos + workUnitSize) // upper bound.
+ ));
task.pool = this;
- task._args[0] = scopedAddress(&reduceOnRange);
- task._args[3] = min(len, curPos + workUnitSize); // upper bound.
- task._args[1] = range; // range
- task._args[2] = curPos; // lower bound.
curPos += workUnitSize;
}
@@ -3505,6 +3539,21 @@ public:
assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
}
+// Issue 16705
+@system unittest
+{
+ struct MyIota
+ {
+ size_t front;
+ void popFront()(){front++;}
+ auto empty(){return front >= 25;}
+ auto opIndex(size_t i){return front+i;}
+ auto length(){return 25-front;}
+ }
+
+ auto mySum = taskPool.reduce!"a + b"(MyIota());
+}
+
/**
Returns a lazily initialized global instantiation of `TaskPool`.
This function can safely be called concurrently from multiple non-worker