RequestServer: Create RequestPipe abstraction for request data transfer

The Win32 API equivalent to pipe2() is CreatePipe(), which creates read
and write anonymous pipe handles that we can set to non-blocking via
SetNamedPipeHandleState(); however, this initial approach caused issues
as our Windows infrastructure assumes socket-based handles/fds and that
we don't use Windows pipes at all, see Core::System::is_socket() in
SystemWindows.cpp. So we use socketpair() to keep our current
assumptions true.

Given that Windows uses socketpair() and Unix uses pipe2(), this
RequestPipe abstraction avoids ifdef soup by hiding the details about
how the read/write fds pair is created and how response data is written
to the client.
This commit is contained in:
ayeteadoe 2025-08-24 15:01:37 -07:00 committed by Tim Flynn
parent 9dae1acc31
commit 11ec7c9cea
6 changed files with 126 additions and 22 deletions

View File

@ -10,6 +10,7 @@ set(SOURCES
ConnectionFromClient.cpp
CURL.cpp
Request.cpp
RequestPipe.cpp
Resolver.cpp
WebSocketImplCurl.cpp
)

View File

@ -15,6 +15,7 @@ class CacheIndex;
class ConnectionFromClient;
class DiskCache;
class Request;
class RequestPipe;
struct DNSInfo;
struct Resolver;

View File

@ -106,9 +106,6 @@ Request::~Request()
if (!m_response_buffer.is_eof())
dbgln("Warning: Request destroyed with buffered data (it's likely that the client disappeared or the request was cancelled)");
if (m_client_writer_fd != -1)
MUST(Core::System::close(m_client_writer_fd));
if (m_curl_easy_handle) {
auto result = curl_multi_remove_handle(m_curl_multi_handle, m_curl_easy_handle);
VERIFY(result == CURLM_OK);
@ -219,21 +216,23 @@ void Request::handle_read_cache_state()
m_reason_phrase = m_cache_entry_reader->reason_phrase();
m_response_headers = m_cache_entry_reader->headers();
auto fds = Core::System::pipe2(O_NONBLOCK);
if (fds.is_error()) {
dbgln("Request::handle_read_from_cache_state: Failed to create pipe: {}", fds.error());
auto pipe_or_error = RequestPipe::create();
if (pipe_or_error.is_error()) {
dbgln("Request::handle_read_from_cache_state: Failed to create pipe: {}", pipe_or_error.error());
transition_to_state(State::Error);
return;
}
m_client.async_request_started(m_request_id, IPC::File::adopt_fd(fds.value().at(0)));
m_client_writer_fd = fds.value().at(1);
auto pipe = pipe_or_error.release_value();
m_client.async_request_started(m_request_id, IPC::File::adopt_fd(pipe.reader_fd()));
m_client_request_pipe = move(pipe);
m_client.async_headers_became_available(m_request_id, m_response_headers, m_status_code, m_reason_phrase);
m_sent_response_headers_to_client = true;
m_cache_entry_reader->pipe_to(
m_client_writer_fd,
m_client_request_pipe.value().writer_fd(),
[this](auto bytes_sent) {
m_bytes_transferred_to_client = bytes_sent;
m_curl_result_code = CURLE_OK;
@ -300,10 +299,6 @@ void Request::handle_connect_state()
void Request::handle_fetch_state()
{
#if defined(AK_OS_WINDOWS)
dbgln("FIXME: Request::handle_fetch_state: Not implemented on Windows");
transition_to_state(State::Error);
#else
dbgln_if(REQUESTSERVER_DEBUG, "RequestServer: DNS lookup successful");
m_curl_easy_handle = curl_easy_init();
@ -314,18 +309,20 @@ void Request::handle_fetch_state()
}
if (!m_start_offset_of_response_resumed_from_cache.has_value()) {
auto fds = Core::System::pipe2(O_NONBLOCK);
if (fds.is_error()) {
dbgln("Request::handle_start_fetch_state: Failed to create pipe: {}", fds.error());
auto pipe_or_error = RequestPipe::create();
if (pipe_or_error.is_error()) {
dbgln("Request::handle_start_fetch_state: Failed to create pipe: {}", pipe_or_error.error());
transition_to_state(State::Error);
return;
}
m_client.async_request_started(m_request_id, IPC::File::adopt_fd(fds.value().at(0)));
m_client_writer_fd = fds.value().at(1);
auto pipe = pipe_or_error.release_value();
m_client.async_request_started(m_request_id, IPC::File::adopt_fd(pipe.reader_fd()));
m_client_request_pipe = move(pipe);
}
m_client_writer_notifier = Core::Notifier::construct(m_client_writer_fd, Core::NotificationType::Write);
m_client_writer_notifier = Core::Notifier::construct(m_client_request_pipe.value().writer_fd(), Core::NotificationType::Write);
m_client_writer_notifier->set_enabled(false);
m_client_writer_notifier->on_activation = [this] {
@ -412,7 +409,6 @@ void Request::handle_fetch_state()
auto result = curl_multi_add_handle(m_curl_multi_handle, m_curl_easy_handle);
VERIFY(result == CURLM_OK);
#endif
}
void Request::handle_complete_state()
@ -562,7 +558,7 @@ ErrorOr<void> Request::write_queued_bytes_without_blocking()
bytes_to_send.resize(available_bytes);
m_response_buffer.peek_some(bytes_to_send);
auto result = Core::System::write(m_client_writer_fd, bytes_to_send);
auto result = m_client_request_pipe.value().write(bytes_to_send);
if (result.is_error()) {
if (result.error().code() != EAGAIN)
return result.release_error();

View File

@ -20,6 +20,7 @@
#include <LibURL/URL.h>
#include <RequestServer/CacheLevel.h>
#include <RequestServer/Forward.h>
#include <RequestServer/RequestPipe.h>
struct curl_slist;
@ -149,7 +150,7 @@ private:
AllocatingMemoryStream m_response_buffer;
RefPtr<Core::Notifier> m_client_writer_notifier;
int m_client_writer_fd { -1 };
Optional<RequestPipe> m_client_request_pipe;
Optional<size_t> m_start_offset_of_response_resumed_from_cache;
size_t m_bytes_transferred_to_client { 0 };

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) 2025, ayeteadoe <ayeteadoe@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include <LibCore/System.h>
#include <RequestServer/RequestPipe.h>
#if defined(AK_OS_WINDOWS)
# include <AK/Windows.h>
#endif
namespace RequestServer {
RequestPipe::RequestPipe(int const reader_fd, int const writer_fd)
: m_reader_fd(reader_fd)
, m_writer_fd(writer_fd)
{
VERIFY(m_reader_fd >= 0);
VERIFY(m_writer_fd >= 0);
}
RequestPipe::RequestPipe(RequestPipe&& other)
: m_reader_fd(exchange(other.m_reader_fd, -1))
, m_writer_fd(exchange(other.m_writer_fd, -1))
{
}
RequestPipe& RequestPipe::operator=(RequestPipe&& other)
{
m_reader_fd = exchange(other.m_reader_fd, -1);
m_writer_fd = exchange(other.m_writer_fd, -1);
return *this;
}
RequestPipe::~RequestPipe()
{
if (m_writer_fd != -1)
MUST(Core::System::close(m_writer_fd));
}
ErrorOr<RequestPipe> RequestPipe::create()
{
#if defined(AK_OS_WINDOWS)
int socket_fds[2] {};
TRY(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds));
int option = 1;
TRY(Core::System::ioctl(socket_fds[0], FIONBIO, option));
TRY(Core::System::ioctl(socket_fds[1], FIONBIO, option));
return RequestPipe(socket_fds[0], socket_fds[1]);
#else
auto fds = TRY(Core::System::pipe2(O_NONBLOCK));
return RequestPipe(fds[0], fds[1]);
#endif
}
ErrorOr<ssize_t> RequestPipe::write(ReadonlyBytes bytes)
{
#if defined(AK_OS_WINDOWS)
auto sent = ::send(m_writer_fd, reinterpret_cast<char const*>(bytes.data()), bytes.size(), 0);
if (sent == SOCKET_ERROR)
return Error::from_windows_error();
return sent;
#else
return Core::System::write(m_writer_fd, bytes);
#endif
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2025, ayeteadoe <ayeteadoe@gmail.com>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Span.h>
namespace RequestServer {
class RequestPipe {
AK_MAKE_NONCOPYABLE(RequestPipe);
public:
RequestPipe(RequestPipe&& other);
RequestPipe& operator=(RequestPipe&& other);
~RequestPipe();
static ErrorOr<RequestPipe> create();
int reader_fd() const { return m_reader_fd; }
int writer_fd() const { return m_writer_fd; }
ErrorOr<ssize_t> write(ReadonlyBytes bytes);
private:
RequestPipe(int reader_fd, int writer_fd);
int m_reader_fd { -1 };
int m_writer_fd { -1 };
};
}