pytorch/torch/csrc/distributed/c10d/FileStore.hpp
Tristan Rice 8b5e717601 c10d/Store: add clone feature (#150966) (#150966) (#151045)
Summary:
This adds a new `clone()` method to Store which will return a new Store instance that can be used from a different thread.

This is intended to better support multiple threads with stores such as when ProcessGroupNCCL needs a store to do error propagation.

Related issue: https://github.com/pytorch/pytorch/issues/150943

Approved by: https://github.com/fduwjj

Test Plan:
contbuild & OSS CI, see 205881ea4a

Test plan from GitHub:
```
pytest test/distributed/test_store.py -k PythonStore
pytest test/distributed/test_store.py -k clone
```

Differential Revision: D72789690

Pull Request resolved: https://github.com/pytorch/pytorch/pull/151045
Approved by: https://github.com/XilunWu, https://github.com/fduwjj
2025-04-11 04:00:23 +00:00

66 lines
1.5 KiB
C++

#pragma once
#include <sys/types.h>
#include <mutex>
#include <unordered_map>
#include <torch/csrc/distributed/c10d/Store.hpp>
namespace c10d {
class TORCH_API FileStore : public Store {
public:
explicit FileStore(std::string path, int numWorkers);
c10::intrusive_ptr<Store> clone() override;
~FileStore() override;
void set(const std::string& key, const std::vector<uint8_t>& value) override;
std::vector<uint8_t> compareSet(
const std::string& key,
const std::vector<uint8_t>& expectedValue,
const std::vector<uint8_t>& desiredValue) override;
std::vector<uint8_t> get(const std::string& key) override;
int64_t add(const std::string& key, int64_t value) override;
int64_t getNumKeys() override;
bool deleteKey(const std::string& key) override;
bool check(const std::vector<std::string>& keys) override;
void wait(const std::vector<std::string>& keys) override;
void wait(
const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout) override;
// Returns the path used by the FileStore.
const std::string& getPath() const noexcept {
return path_;
}
protected:
int64_t addHelper(const std::string& key, int64_t i);
std::string path_;
off_t pos_{0};
int numWorkers_;
const std::string cleanupKey_;
const std::string refCountKey_;
const std::string regularPrefix_;
const std::string deletePrefix_;
std::unordered_map<std::string, std::vector<uint8_t>> cache_;
std::mutex activeFileOpLock_;
};
} // namespace c10d