diff options
-rw-r--r-- | libc/src/__support/RPC/rpc.h | 8 | ||||
-rw-r--r-- | libc/utils/gpu/server/rpc_server.cpp | 18 |
2 files changed, 17 insertions, 9 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h index 189da86..88c62dc 100644 --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -318,6 +318,8 @@ public: return process.packet[index].header.opcode; } + LIBC_INLINE uint16_t get_index() const { return index; } + LIBC_INLINE void close() { // The server is passive, if it own the buffer when it closes we need to // give ownership back to the client. @@ -367,7 +369,7 @@ template <uint32_t lane_size> struct Server { : process(port_count, buffer) {} using Port = rpc::Port<true, Packet<lane_size>>; - LIBC_INLINE cpp::optional<Port> try_open(); + LIBC_INLINE cpp::optional<Port> try_open(uint32_t start = 0); LIBC_INLINE Port open(); LIBC_INLINE static uint64_t allocation_size(uint32_t port_count) { @@ -547,9 +549,9 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() { template <uint32_t lane_size> [[clang::convergent]] LIBC_INLINE cpp::optional<typename Server<lane_size>::Port> - Server<lane_size>::try_open() { + Server<lane_size>::try_open(uint32_t start) { // Perform a naive linear scan for a port that has a pending request. - for (uint32_t index = 0; index < process.port_count; ++index) { + for (uint32_t index = start; index < process.port_count; ++index) { uint64_t lane_mask = gpu::get_lane_mask(); uint32_t in = process.load_inbox(lane_mask, index); uint32_t out = process.load_outbox(lane_mask, index); diff --git a/libc/utils/gpu/server/rpc_server.cpp b/libc/utils/gpu/server/rpc_server.cpp index 6395a80..1c1c9f1 100644 --- a/libc/utils/gpu/server/rpc_server.cpp +++ b/libc/utils/gpu/server/rpc_server.cpp @@ -36,11 +36,12 @@ struct Server { rpc_status_t handle_server( const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks, - const std::unordered_map<rpc_opcode_t, void *> &callback_data) { + const std::unordered_map<rpc_opcode_t, void *> &callback_data, + uint32_t &index) { rpc_status_t ret = RPC_STATUS_SUCCESS; std::visit( [&](auto &server) { - ret = handle_server(*server, callbacks, callback_data); + ret = handle_server(*server, callbacks, callback_data, index); }, server); return ret; @@ -51,8 +52,9 @@ private: rpc_status_t handle_server( rpc::Server<lane_size> &server, const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks, - const std::unordered_map<rpc_opcode_t, void *> &callback_data) { - auto port = server.try_open(); + const std::unordered_map<rpc_opcode_t, void *> &callback_data, + uint32_t &index) { + auto port = server.try_open(index); if (!port) return RPC_STATUS_SUCCESS; @@ -203,6 +205,9 @@ private: (handler->second)(port_ref, data); } } + + // Increment the index so we start the scan after this port. + index = port->get_index() + 1; port->close(); return RPC_STATUS_CONTINUE; } @@ -333,10 +338,11 @@ rpc_status_t rpc_handle_server(uint32_t device_id) { if (!state->devices[device_id]) return RPC_STATUS_ERROR; + uint32_t index = 0; for (;;) { auto &device = *state->devices[device_id]; - rpc_status_t status = - device.server.handle_server(device.callbacks, device.callback_data); + rpc_status_t status = device.server.handle_server( + device.callbacks, device.callback_data, index); if (status != RPC_STATUS_CONTINUE) return status; } |