diff --git a/Services/RequestServer/Cache/CacheEntry.h b/Services/RequestServer/Cache/CacheEntry.h index f74ce75984..f463241f59 100644 --- a/Services/RequestServer/Cache/CacheEntry.h +++ b/Services/RequestServer/Cache/CacheEntry.h @@ -55,6 +55,8 @@ class CacheEntry { public: virtual ~CacheEntry() = default; + u64 cache_key() const { return m_cache_key; } + void remove(); void mark_for_deletion(Badge) { m_marked_for_deletion = true; } diff --git a/Services/RequestServer/Cache/DiskCache.cpp b/Services/RequestServer/Cache/DiskCache.cpp index 54ffc5fc26..b2c4983b79 100644 --- a/Services/RequestServer/Cache/DiskCache.cpp +++ b/Services/RequestServer/Cache/DiskCache.cpp @@ -33,47 +33,54 @@ DiskCache::DiskCache(NonnullRefPtr database, LexicalPath cac { } -Optional DiskCache::create_entry(Request& request) +Variant, DiskCache::CacheHasOpenEntry> DiskCache::create_entry(Request& request) { if (!is_cacheable(request.method())) - return {}; + return Optional {}; auto serialized_url = serialize_url_for_cache_storage(request.url()); auto cache_key = create_cache_key(serialized_url, request.method()); + if (check_if_cache_has_open_entry(request, cache_key, CheckReaderEntries::Yes)) + return CacheHasOpenEntry {}; + auto cache_entry = CacheEntryWriter::create(*this, m_index, cache_key, move(serialized_url), request.request_start_time()); if (cache_entry.is_error()) { dbgln("\033[31;1mUnable to create cache entry for\033[0m {}: {}", request.url(), cache_entry.error()); - return {}; + return Optional {}; } dbgln("\033[32;1mCreated disk cache entry for\033[0m {}", request.url()); - auto address = reinterpret_cast(cache_entry.value().ptr()); - m_open_cache_entries.set(address, cache_entry.release_value()); + auto* cache_entry_pointer = cache_entry.value().ptr(); + m_open_cache_entries.ensure(cache_key).append(cache_entry.release_value()); - return static_cast(**m_open_cache_entries.get(address)); + return Optional { *cache_entry_pointer }; } -Optional DiskCache::open_entry(Request& request) +Variant, DiskCache::CacheHasOpenEntry> DiskCache::open_entry(Request& request) { if (!is_cacheable(request.method())) - return {}; + return Optional {}; auto serialized_url = serialize_url_for_cache_storage(request.url()); auto cache_key = create_cache_key(serialized_url, request.method()); + if (check_if_cache_has_open_entry(request, cache_key, CheckReaderEntries::No)) + return CacheHasOpenEntry {}; + auto index_entry = m_index.find_entry(cache_key); if (!index_entry.has_value()) { dbgln("\033[35;1mNo disk cache entry for\033[0m {}", request.url()); - return {}; + return Optional {}; } auto cache_entry = CacheEntryReader::create(*this, m_index, cache_key, index_entry->data_size); if (cache_entry.is_error()) { dbgln("\033[31;1mUnable to open cache entry for\033[0m {}: {}", request.url(), cache_entry.error()); m_index.remove_entry(cache_key); - return {}; + + return Optional {}; } auto freshness_lifetime = calculate_freshness_lifetime(cache_entry.value()->headers()); @@ -82,21 +89,46 @@ Optional DiskCache::open_entry(Request& request) if (!is_response_fresh(freshness_lifetime, current_age)) { dbgln("\033[33;1mCache entry expired for\033[0m {} (lifetime={}s age={}s)", request.url(), freshness_lifetime.to_seconds(), current_age.to_seconds()); cache_entry.value()->remove(); - return {}; + + return Optional {}; } dbgln("\033[32;1mOpened disk cache entry for\033[0m {} (lifetime={}s age={}s) ({} bytes)", request.url(), freshness_lifetime.to_seconds(), current_age.to_seconds(), index_entry->data_size); - auto address = reinterpret_cast(cache_entry.value().ptr()); - m_open_cache_entries.set(address, cache_entry.release_value()); + auto* cache_entry_pointer = cache_entry.value().ptr(); + m_open_cache_entries.ensure(cache_key).append(cache_entry.release_value()); - return static_cast(**m_open_cache_entries.get(address)); + return Optional { *cache_entry_pointer }; +} + +bool DiskCache::check_if_cache_has_open_entry(Request& request, u64 cache_key, CheckReaderEntries check_reader_entries) +{ + auto open_entries = m_open_cache_entries.get(cache_key); + if (!open_entries.has_value()) + return false; + + for (auto const& open_entry : *open_entries) { + if (is(*open_entry)) { + dbgln("\033[36;1mDeferring disk cache entry for\033[0m {} (waiting for existing writer)", request.url()); + m_requests_waiting_completion.ensure(cache_key).append(request); + return true; + } + } + + if (check_reader_entries == CheckReaderEntries::No) + return false; + + dbgln("\033[36;1mDeferring disk cache entry for\033[0m {} (waiting for existing reader)", request.url()); + m_requests_waiting_completion.ensure(cache_key).append(request); + return true; } void DiskCache::clear_cache() { - for (auto& [_, cache_entry] : m_open_cache_entries) - cache_entry->mark_for_deletion({}); + for (auto const& [_, open_entries] : m_open_cache_entries) { + for (auto const& open_entry : open_entries) + open_entry->mark_for_deletion({}); + } m_index.remove_all_entries(); @@ -117,8 +149,32 @@ void DiskCache::clear_cache() void DiskCache::cache_entry_closed(Badge, CacheEntry const& cache_entry) { - auto address = reinterpret_cast(&cache_entry); - m_open_cache_entries.remove(address); + auto cache_key = cache_entry.cache_key(); + + auto open_entries = m_open_cache_entries.get(cache_key); + if (!open_entries.has_value()) + return; + + open_entries->remove_first_matching([&](auto const& open_entry) { return open_entry.ptr() == &cache_entry; }); + if (open_entries->size() > 0) + return; + + m_open_cache_entries.remove(cache_key); + + // FIXME: This creates a bit of a first-past-the-post situation if a resumed request causes other pending requests + // to become delayed again. We may want to come up with some method to control the order of resumed requests. + if (auto pending_requests = m_requests_waiting_completion.take(cache_key); pending_requests.has_value()) { + // We defer resuming requests to ensure we are outside of any internal curl callbacks. For example, when curl + // invokes the CURLOPT_WRITEFUNCTION callback, we will flush pending HTTP headers to the disk cache. If that + // does not succeed, we delete the cache entry, and end up here. We must queue the new request outside of that + // callback, otherwise curl will return CURLM_RECURSIVE_API_CALL error codes. + Core::deferred_invoke([pending_requests = pending_requests.release_value()]() { + for (auto const& request : pending_requests) { + if (request) + request->notify_request_unblocked({}); + } + }); + } } } diff --git a/Services/RequestServer/Cache/DiskCache.h b/Services/RequestServer/Cache/DiskCache.h index 089b413a89..0c361e9316 100644 --- a/Services/RequestServer/Cache/DiskCache.h +++ b/Services/RequestServer/Cache/DiskCache.h @@ -23,8 +23,10 @@ class DiskCache { public: static ErrorOr create(); - Optional create_entry(Request&); - Optional open_entry(Request&); + struct CacheHasOpenEntry { }; + Variant, CacheHasOpenEntry> create_entry(Request&); + Variant, CacheHasOpenEntry> open_entry(Request&); + void clear_cache(); LexicalPath const& cache_directory() { return m_cache_directory; } @@ -34,9 +36,16 @@ public: private: DiskCache(NonnullRefPtr, LexicalPath cache_directory, CacheIndex); + enum class CheckReaderEntries { + No, + Yes, + }; + bool check_if_cache_has_open_entry(Request&, u64 cache_key, CheckReaderEntries); + NonnullRefPtr m_database; - HashMap> m_open_cache_entries; + HashMap, 1>> m_open_cache_entries; + HashMap, 1>> m_requests_waiting_completion; LexicalPath m_cache_directory; CacheIndex m_index; diff --git a/Services/RequestServer/Request.cpp b/Services/RequestServer/Request.cpp index b1545187b0..ac353253c8 100644 --- a/Services/RequestServer/Request.cpp +++ b/Services/RequestServer/Request.cpp @@ -123,6 +123,13 @@ Request::~Request() (void)m_cache_entry_writer->flush(); } +void Request::notify_request_unblocked(Badge) +{ + // FIXME: We may want a timer to limit how long we are waiting for a request before proceeding with a network + // request that skips the disk cache. + transition_to_state(State::Init); +} + void Request::notify_fetch_complete(Badge, int result_code) { m_curl_result_code = result_code; @@ -146,6 +153,9 @@ void Request::process() case State::ReadCache: handle_read_cache_state(); break; + case State::WaitForCache: + // Do nothing; we are waiting for the disk cache to notify us to proceed. + break; case State::DNSLookup: handle_dns_lookup_state(); break; @@ -167,13 +177,33 @@ void Request::process() void Request::handle_initial_state() { if (m_disk_cache.has_value()) { - m_cache_entry_reader = m_disk_cache->open_entry(*this); - if (m_cache_entry_reader.has_value()) { - transition_to_state(State::ReadCache); - return; - } + m_disk_cache->open_entry(*this).visit( + [&](Optional cache_entry_reader) { + m_cache_entry_reader = cache_entry_reader; + if (m_cache_entry_reader.has_value()) + transition_to_state(State::ReadCache); + }, + [&](DiskCache::CacheHasOpenEntry) { + // If an existing entry is open for writing, we must wait for it to complete. + transition_to_state(State::WaitForCache); + }); - m_cache_entry_writer = m_disk_cache->create_entry(*this); + if (m_state != State::Init) + return; + + m_disk_cache->create_entry(*this).visit( + [&](Optional cache_entry_writer) { + m_cache_entry_writer = cache_entry_writer; + }, + [&](DiskCache::CacheHasOpenEntry) { + // If an existing entry is open for reading or writing, we must wait for it to complete. An entry being + // open for reading is a rare case, but may occur if a cached response expired between the existing + // entry's cache validation and the attempted reader validation when this request was created. + transition_to_state(State::WaitForCache); + }); + + if (m_state != State::Init) + return; } transition_to_state(State::DNSLookup); diff --git a/Services/RequestServer/Request.h b/Services/RequestServer/Request.h index c362bfd063..404ff829fd 100644 --- a/Services/RequestServer/Request.h +++ b/Services/RequestServer/Request.h @@ -54,6 +54,7 @@ public: ByteString const& method() const { return m_method; } UnixDateTime request_start_time() const { return m_request_start_time; } + void notify_request_unblocked(Badge); void notify_fetch_complete(Badge, int result_code); private: @@ -63,13 +64,14 @@ private: }; enum class State : u8 { - Init, // Decide whether to service this request from cache or the network. - ReadCache, // Read the cached response from disk. - DNSLookup, // Resolve the URL's host. - Connect, // Issue a network request to connect to the URL. - Fetch, // Issue a network request to fetch the URL. - Complete, // Finalize the request with the client. - Error, // Any error occured during the request's lifetime. + Init, // Decide whether to service this request from cache or the network. + ReadCache, // Read the cached response from disk. + WaitForCache, // Wait for an existing cache entry to complete before proceeding. + DNSLookup, // Resolve the URL's host. + Connect, // Issue a network request to connect to the URL. + Fetch, // Issue a network request to fetch the URL. + Complete, // Finalize the request with the client. + Error, // Any error occured during the request's lifetime. }; Request(