RequestServer: De-duplicate some disk cache requests

We previously had no protection against the same URL being requested
multiple times at the same time. For example, if a URL did not have any
cache entry and became requested twice, we would open two cache writers
concurrently. This would result in both writers piping the response to
disk, and we'd have a corrupt cache file.

We now hold back requests under certain scenarios until existing cache
entries have completed:

* If we are opening a cache entry for reading:
  - If there is an existing reader entry, carry on as normal. We can
    have multiple readers.
  - If there is an existing writer entry, defer the request until it is
    complete.

* If we are opening a cache entry for writing:
  - If there is an existing reader or writer entry, defer the request
    until it is complete.
This commit is contained in:
Timothy Flynn 2025-10-24 12:54:32 -04:00 committed by Andreas Kling
parent 95d23d02f1
commit 7f37889ff1
5 changed files with 133 additions and 34 deletions

View File

@ -55,6 +55,8 @@ class CacheEntry {
public:
virtual ~CacheEntry() = default;
u64 cache_key() const { return m_cache_key; }
void remove();
void mark_for_deletion(Badge<DiskCache>) { m_marked_for_deletion = true; }

View File

@ -33,47 +33,54 @@ DiskCache::DiskCache(NonnullRefPtr<Database::Database> database, LexicalPath cac
{
}
Optional<CacheEntryWriter&> DiskCache::create_entry(Request& request)
Variant<Optional<CacheEntryWriter&>, DiskCache::CacheHasOpenEntry> DiskCache::create_entry(Request& request)
{
if (!is_cacheable(request.method()))
return {};
return Optional<CacheEntryWriter&> {};
auto serialized_url = serialize_url_for_cache_storage(request.url());
auto cache_key = create_cache_key(serialized_url, request.method());
if (check_if_cache_has_open_entry(request, cache_key, CheckReaderEntries::Yes))
return CacheHasOpenEntry {};
auto cache_entry = CacheEntryWriter::create(*this, m_index, cache_key, move(serialized_url), request.request_start_time());
if (cache_entry.is_error()) {
dbgln("\033[31;1mUnable to create cache entry for\033[0m {}: {}", request.url(), cache_entry.error());
return {};
return Optional<CacheEntryWriter&> {};
}
dbgln("\033[32;1mCreated disk cache entry for\033[0m {}", request.url());
auto address = reinterpret_cast<FlatPtr>(cache_entry.value().ptr());
m_open_cache_entries.set(address, cache_entry.release_value());
auto* cache_entry_pointer = cache_entry.value().ptr();
m_open_cache_entries.ensure(cache_key).append(cache_entry.release_value());
return static_cast<CacheEntryWriter&>(**m_open_cache_entries.get(address));
return Optional<CacheEntryWriter&> { *cache_entry_pointer };
}
Optional<CacheEntryReader&> DiskCache::open_entry(Request& request)
Variant<Optional<CacheEntryReader&>, DiskCache::CacheHasOpenEntry> DiskCache::open_entry(Request& request)
{
if (!is_cacheable(request.method()))
return {};
return Optional<CacheEntryReader&> {};
auto serialized_url = serialize_url_for_cache_storage(request.url());
auto cache_key = create_cache_key(serialized_url, request.method());
if (check_if_cache_has_open_entry(request, cache_key, CheckReaderEntries::No))
return CacheHasOpenEntry {};
auto index_entry = m_index.find_entry(cache_key);
if (!index_entry.has_value()) {
dbgln("\033[35;1mNo disk cache entry for\033[0m {}", request.url());
return {};
return Optional<CacheEntryReader&> {};
}
auto cache_entry = CacheEntryReader::create(*this, m_index, cache_key, index_entry->data_size);
if (cache_entry.is_error()) {
dbgln("\033[31;1mUnable to open cache entry for\033[0m {}: {}", request.url(), cache_entry.error());
m_index.remove_entry(cache_key);
return {};
return Optional<CacheEntryReader&> {};
}
auto freshness_lifetime = calculate_freshness_lifetime(cache_entry.value()->headers());
@ -82,21 +89,46 @@ Optional<CacheEntryReader&> DiskCache::open_entry(Request& request)
if (!is_response_fresh(freshness_lifetime, current_age)) {
dbgln("\033[33;1mCache entry expired for\033[0m {} (lifetime={}s age={}s)", request.url(), freshness_lifetime.to_seconds(), current_age.to_seconds());
cache_entry.value()->remove();
return {};
return Optional<CacheEntryReader&> {};
}
dbgln("\033[32;1mOpened disk cache entry for\033[0m {} (lifetime={}s age={}s) ({} bytes)", request.url(), freshness_lifetime.to_seconds(), current_age.to_seconds(), index_entry->data_size);
auto address = reinterpret_cast<FlatPtr>(cache_entry.value().ptr());
m_open_cache_entries.set(address, cache_entry.release_value());
auto* cache_entry_pointer = cache_entry.value().ptr();
m_open_cache_entries.ensure(cache_key).append(cache_entry.release_value());
return static_cast<CacheEntryReader&>(**m_open_cache_entries.get(address));
return Optional<CacheEntryReader&> { *cache_entry_pointer };
}
bool DiskCache::check_if_cache_has_open_entry(Request& request, u64 cache_key, CheckReaderEntries check_reader_entries)
{
auto open_entries = m_open_cache_entries.get(cache_key);
if (!open_entries.has_value())
return false;
for (auto const& open_entry : *open_entries) {
if (is<CacheEntryWriter>(*open_entry)) {
dbgln("\033[36;1mDeferring disk cache entry for\033[0m {} (waiting for existing writer)", request.url());
m_requests_waiting_completion.ensure(cache_key).append(request);
return true;
}
}
if (check_reader_entries == CheckReaderEntries::No)
return false;
dbgln("\033[36;1mDeferring disk cache entry for\033[0m {} (waiting for existing reader)", request.url());
m_requests_waiting_completion.ensure(cache_key).append(request);
return true;
}
void DiskCache::clear_cache()
{
for (auto& [_, cache_entry] : m_open_cache_entries)
cache_entry->mark_for_deletion({});
for (auto const& [_, open_entries] : m_open_cache_entries) {
for (auto const& open_entry : open_entries)
open_entry->mark_for_deletion({});
}
m_index.remove_all_entries();
@ -117,8 +149,32 @@ void DiskCache::clear_cache()
void DiskCache::cache_entry_closed(Badge<CacheEntry>, CacheEntry const& cache_entry)
{
auto address = reinterpret_cast<FlatPtr>(&cache_entry);
m_open_cache_entries.remove(address);
auto cache_key = cache_entry.cache_key();
auto open_entries = m_open_cache_entries.get(cache_key);
if (!open_entries.has_value())
return;
open_entries->remove_first_matching([&](auto const& open_entry) { return open_entry.ptr() == &cache_entry; });
if (open_entries->size() > 0)
return;
m_open_cache_entries.remove(cache_key);
// FIXME: This creates a bit of a first-past-the-post situation if a resumed request causes other pending requests
// to become delayed again. We may want to come up with some method to control the order of resumed requests.
if (auto pending_requests = m_requests_waiting_completion.take(cache_key); pending_requests.has_value()) {
// We defer resuming requests to ensure we are outside of any internal curl callbacks. For example, when curl
// invokes the CURLOPT_WRITEFUNCTION callback, we will flush pending HTTP headers to the disk cache. If that
// does not succeed, we delete the cache entry, and end up here. We must queue the new request outside of that
// callback, otherwise curl will return CURLM_RECURSIVE_API_CALL error codes.
Core::deferred_invoke([pending_requests = pending_requests.release_value()]() {
for (auto const& request : pending_requests) {
if (request)
request->notify_request_unblocked({});
}
});
}
}
}

View File

@ -23,8 +23,10 @@ class DiskCache {
public:
static ErrorOr<DiskCache> create();
Optional<CacheEntryWriter&> create_entry(Request&);
Optional<CacheEntryReader&> open_entry(Request&);
struct CacheHasOpenEntry { };
Variant<Optional<CacheEntryWriter&>, CacheHasOpenEntry> create_entry(Request&);
Variant<Optional<CacheEntryReader&>, CacheHasOpenEntry> open_entry(Request&);
void clear_cache();
LexicalPath const& cache_directory() { return m_cache_directory; }
@ -34,9 +36,16 @@ public:
private:
DiskCache(NonnullRefPtr<Database::Database>, LexicalPath cache_directory, CacheIndex);
enum class CheckReaderEntries {
No,
Yes,
};
bool check_if_cache_has_open_entry(Request&, u64 cache_key, CheckReaderEntries);
NonnullRefPtr<Database::Database> m_database;
HashMap<FlatPtr, NonnullOwnPtr<CacheEntry>> m_open_cache_entries;
HashMap<u64, Vector<NonnullOwnPtr<CacheEntry>, 1>> m_open_cache_entries;
HashMap<u64, Vector<WeakPtr<Request>, 1>> m_requests_waiting_completion;
LexicalPath m_cache_directory;
CacheIndex m_index;

View File

@ -123,6 +123,13 @@ Request::~Request()
(void)m_cache_entry_writer->flush();
}
void Request::notify_request_unblocked(Badge<DiskCache>)
{
// FIXME: We may want a timer to limit how long we are waiting for a request before proceeding with a network
// request that skips the disk cache.
transition_to_state(State::Init);
}
void Request::notify_fetch_complete(Badge<ConnectionFromClient>, int result_code)
{
m_curl_result_code = result_code;
@ -146,6 +153,9 @@ void Request::process()
case State::ReadCache:
handle_read_cache_state();
break;
case State::WaitForCache:
// Do nothing; we are waiting for the disk cache to notify us to proceed.
break;
case State::DNSLookup:
handle_dns_lookup_state();
break;
@ -167,13 +177,33 @@ void Request::process()
void Request::handle_initial_state()
{
if (m_disk_cache.has_value()) {
m_cache_entry_reader = m_disk_cache->open_entry(*this);
if (m_cache_entry_reader.has_value()) {
transition_to_state(State::ReadCache);
return;
}
m_disk_cache->open_entry(*this).visit(
[&](Optional<CacheEntryReader&> cache_entry_reader) {
m_cache_entry_reader = cache_entry_reader;
if (m_cache_entry_reader.has_value())
transition_to_state(State::ReadCache);
},
[&](DiskCache::CacheHasOpenEntry) {
// If an existing entry is open for writing, we must wait for it to complete.
transition_to_state(State::WaitForCache);
});
m_cache_entry_writer = m_disk_cache->create_entry(*this);
if (m_state != State::Init)
return;
m_disk_cache->create_entry(*this).visit(
[&](Optional<CacheEntryWriter&> cache_entry_writer) {
m_cache_entry_writer = cache_entry_writer;
},
[&](DiskCache::CacheHasOpenEntry) {
// If an existing entry is open for reading or writing, we must wait for it to complete. An entry being
// open for reading is a rare case, but may occur if a cached response expired between the existing
// entry's cache validation and the attempted reader validation when this request was created.
transition_to_state(State::WaitForCache);
});
if (m_state != State::Init)
return;
}
transition_to_state(State::DNSLookup);

View File

@ -54,6 +54,7 @@ public:
ByteString const& method() const { return m_method; }
UnixDateTime request_start_time() const { return m_request_start_time; }
void notify_request_unblocked(Badge<DiskCache>);
void notify_fetch_complete(Badge<ConnectionFromClient>, int result_code);
private:
@ -63,13 +64,14 @@ private:
};
enum class State : u8 {
Init, // Decide whether to service this request from cache or the network.
ReadCache, // Read the cached response from disk.
DNSLookup, // Resolve the URL's host.
Connect, // Issue a network request to connect to the URL.
Fetch, // Issue a network request to fetch the URL.
Complete, // Finalize the request with the client.
Error, // Any error occured during the request's lifetime.
Init, // Decide whether to service this request from cache or the network.
ReadCache, // Read the cached response from disk.
WaitForCache, // Wait for an existing cache entry to complete before proceeding.
DNSLookup, // Resolve the URL's host.
Connect, // Issue a network request to connect to the URL.
Fetch, // Issue a network request to fetch the URL.
Complete, // Finalize the request with the client.
Error, // Any error occured during the request's lifetime.
};
Request(