aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libc/src/__support/RPC/rpc.h8
-rw-r--r--libc/utils/gpu/server/rpc_server.cpp18
2 files changed, 17 insertions, 9 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 189da86..88c62dc 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -318,6 +318,8 @@ public:
return process.packet[index].header.opcode;
}
+ LIBC_INLINE uint16_t get_index() const { return index; }
+
LIBC_INLINE void close() {
// The server is passive, if it own the buffer when it closes we need to
// give ownership back to the client.
@@ -367,7 +369,7 @@ template <uint32_t lane_size> struct Server {
: process(port_count, buffer) {}
using Port = rpc::Port<true, Packet<lane_size>>;
- LIBC_INLINE cpp::optional<Port> try_open();
+ LIBC_INLINE cpp::optional<Port> try_open(uint32_t start = 0);
LIBC_INLINE Port open();
LIBC_INLINE static uint64_t allocation_size(uint32_t port_count) {
@@ -547,9 +549,9 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
template <uint32_t lane_size>
[[clang::convergent]] LIBC_INLINE
cpp::optional<typename Server<lane_size>::Port>
- Server<lane_size>::try_open() {
+ Server<lane_size>::try_open(uint32_t start) {
// Perform a naive linear scan for a port that has a pending request.
- for (uint32_t index = 0; index < process.port_count; ++index) {
+ for (uint32_t index = start; index < process.port_count; ++index) {
uint64_t lane_mask = gpu::get_lane_mask();
uint32_t in = process.load_inbox(lane_mask, index);
uint32_t out = process.load_outbox(lane_mask, index);
diff --git a/libc/utils/gpu/server/rpc_server.cpp b/libc/utils/gpu/server/rpc_server.cpp
index 6395a80..1c1c9f1 100644
--- a/libc/utils/gpu/server/rpc_server.cpp
+++ b/libc/utils/gpu/server/rpc_server.cpp
@@ -36,11 +36,12 @@ struct Server {
rpc_status_t handle_server(
const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks,
- const std::unordered_map<rpc_opcode_t, void *> &callback_data) {
+ const std::unordered_map<rpc_opcode_t, void *> &callback_data,
+ uint32_t &index) {
rpc_status_t ret = RPC_STATUS_SUCCESS;
std::visit(
[&](auto &server) {
- ret = handle_server(*server, callbacks, callback_data);
+ ret = handle_server(*server, callbacks, callback_data, index);
},
server);
return ret;
@@ -51,8 +52,9 @@ private:
rpc_status_t handle_server(
rpc::Server<lane_size> &server,
const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks,
- const std::unordered_map<rpc_opcode_t, void *> &callback_data) {
- auto port = server.try_open();
+ const std::unordered_map<rpc_opcode_t, void *> &callback_data,
+ uint32_t &index) {
+ auto port = server.try_open(index);
if (!port)
return RPC_STATUS_SUCCESS;
@@ -203,6 +205,9 @@ private:
(handler->second)(port_ref, data);
}
}
+
+ // Increment the index so we start the scan after this port.
+ index = port->get_index() + 1;
port->close();
return RPC_STATUS_CONTINUE;
}
@@ -333,10 +338,11 @@ rpc_status_t rpc_handle_server(uint32_t device_id) {
if (!state->devices[device_id])
return RPC_STATUS_ERROR;
+ uint32_t index = 0;
for (;;) {
auto &device = *state->devices[device_id];
- rpc_status_t status =
- device.server.handle_server(device.callbacks, device.callback_data);
+ rpc_status_t status = device.server.handle_server(
+ device.callbacks, device.callback_data, index);
if (status != RPC_STATUS_CONTINUE)
return status;
}