Flight recoder data as JSON (#129505)

Summary:
Provide a new API to retrieve flight recorder data as JSON.
The one minor difference between flight recorder as Pickle v/s JSON is
that the JSON API does not retrieve stack traces at the moment.
This ends up being far too much data.

Test Plan:
unit test

Differential Revision: [D59536460](https://our.internmc.facebook.com/intern/diff/D59536460)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/129505
Approved by: https://github.com/wconstab, https://github.com/d4l3k
This commit is contained in:
Chirag Pandya 2024-07-09 11:45:06 -07:00 committed by PyTorch MergeBot
parent 86bca69c5f
commit 83c95c48f7
5 changed files with 253 additions and 74 deletions

View File

@ -3520,33 +3520,7 @@ class NCCLTraceTestBase(MultiProcessTestCase):
class NCCLTraceTest(NCCLTraceTestBase):
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@parametrize("timing_enabled", [True, False])
@parametrize("include_collectives", [True, False])
def test_short(self, timing_enabled, include_collectives):
if self.rank == self.MAIN_PROCESS_RANK:
return
pg = self._create_process_group_nccl()
if timing_enabled:
pg._enable_collectives_timing()
device = self.local_device
a = torch.full((3, 4), float(self.rank), device=device)
for i in range(2):
f = pg.allreduce(a)
f.wait()
torch.cuda.synchronize(device=device)
# gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api
time.sleep(1)
if include_collectives:
t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())
else:
t = pickle.loads(
torch._C._distributed_c10d._dump_nccl_trace(
includeCollectives=False, includeStackTraces=None, onlyActive=None
)
)
def _verify_trace(self, t, include_collectives, timing_enabled, is_json):
ver = t["version"]
self.assertEqual(ver, "2.2")
pg_config = t["pg_config"]
@ -3560,7 +3534,6 @@ class NCCLTraceTest(NCCLTraceTestBase):
if include_collectives:
self.assertEqual(len(t["entries"]), 2)
t = t["entries"]
self.assertEqual(len(t), 2)
last = t[-1]
self.assertEqual(last["process_group"], ("0", "default_pg"))
self.assertEqual(last["state"], "completed")
@ -3571,7 +3544,9 @@ class NCCLTraceTest(NCCLTraceTestBase):
if timing_enabled:
self.assertIsNotNone(s)
self.assertTrue(s <= f)
self.assertIn("test_c10d_nccl.py", str(last["frames"]))
# we don't collect stack traces in JSON at the moment
if not is_json:
self.assertIn("test_c10d_nccl.py", str(last["frames"]))
self.assertEqual(last["input_sizes"], ((3, 4),))
self.assertEqual(last["input_dtypes"], ["Float"])
self.assertEqual(last["output_sizes"], ((3, 4),))
@ -3592,6 +3567,63 @@ class NCCLTraceTest(NCCLTraceTestBase):
else:
self.assertTrue("entries" not in t)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@parametrize("timing_enabled", [True, False])
@parametrize("include_collectives", [True, False])
def test_short_json(self, timing_enabled, include_collectives):
if self.rank == self.MAIN_PROCESS_RANK:
return
pg = self._create_process_group_nccl()
if timing_enabled:
pg._enable_collectives_timing()
device = self.local_device
a = torch.full((3, 4), float(self.rank), device=device)
for i in range(2):
f = pg.allreduce(a)
f.wait()
torch.cuda.synchronize(device=device)
# gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api
time.sleep(1)
t = json.loads(
torch._C._distributed_c10d._dump_nccl_trace_json(
includeCollectives=include_collectives
)
)
self._verify_trace(t, include_collectives, timing_enabled, True)
dist.destroy_process_group()
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@parametrize("timing_enabled", [True, False])
@parametrize("include_collectives", [True, False])
def test_short_pickle(self, timing_enabled, include_collectives):
if self.rank == self.MAIN_PROCESS_RANK:
return
pg = self._create_process_group_nccl()
if timing_enabled:
pg._enable_collectives_timing()
device = self.local_device
a = torch.full((3, 4), float(self.rank), device=device)
for i in range(2):
f = pg.allreduce(a)
f.wait()
torch.cuda.synchronize(device=device)
# gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api
time.sleep(1)
t = pickle.loads(
torch._C._distributed_c10d._dump_nccl_trace(
includeCollectives=include_collectives
)
)
self._verify_trace(
t,
include_collectives=include_collectives,
timing_enabled=timing_enabled,
is_json=True,
)
dist.destroy_process_group()
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
def test_dump_pipe(self):
@ -3664,6 +3696,7 @@ class NCCLTraceTest(NCCLTraceTestBase):
self.assertEqual(last["output_dtypes"], ["Float"])
self.assertEqual(last["timeout_ms"], 600000)
self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9)
dist.destroy_process_group()
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")

View File

@ -5,6 +5,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <nlohmann/json.hpp>
#include <memory>
#include <mutex>
#include <thread>
@ -173,39 +174,46 @@
} while (0)
namespace c10d {
static c10::IValue entries_key = "entries";
static c10::IValue nccl_comm_key = "nccl_comm_state";
static c10::IValue version_key = "version";
using json = nlohmann::json;
#define DEFINE_CONSTANT(name, value) \
static c10::IValue name = value; \
static std::string name##_str = value;
DEFINE_CONSTANT(entries_key, "entries");
DEFINE_CONSTANT(nccl_comm_key, "nccl_comm_state");
DEFINE_CONSTANT(version_key, "version");
// Update whenever changing contents or formatting of the dump
// (minor when adding fields, major when changing existing fields)
static c10::IValue version_val = "2.2";
static c10::IValue pg_config_key = "pg_config";
static c10::IValue record_id_key = "record_id";
static c10::IValue pg_id_key = "pg_id";
static c10::IValue pg_name_key = "process_group";
static c10::IValue collective_seq_id_key = "collective_seq_id";
static c10::IValue p2p_seq_id_key = "p2p_seq_id";
static c10::IValue is_p2p_key = "is_p2p";
static c10::IValue op_id_key = "op_id";
static c10::IValue profiling_name_key = "profiling_name";
static c10::IValue input_sizes_key = "input_sizes";
static c10::IValue input_dtypes_key = "input_dtypes";
static c10::IValue output_sizes_key = "output_sizes";
static c10::IValue output_dtypes_key = "output_dtypes";
static c10::IValue time_created_key = "time_created_ns";
static c10::IValue duration_key = "duration_ms";
static c10::IValue timeout_key = "timeout_ms";
static c10::IValue frames_key = "frames";
static c10::IValue state_key = "state";
static c10::IValue line_key = "line";
static c10::IValue name_key = "name";
static c10::IValue filename_key = "filename";
static c10::IValue retired_key = "retired";
static c10::IValue time_discovered_started_key = "time_discovered_started_ns";
static c10::IValue time_discovered_completed_key =
"time_discovered_completed_ns";
// Also update both JSON and Pickle dumps to make use of the newly defined
// field(s).
DEFINE_CONSTANT(version_val, "2.2");
DEFINE_CONSTANT(pg_config_key, "pg_config");
DEFINE_CONSTANT(record_id_key, "record_id");
DEFINE_CONSTANT(pg_id_key, "pg_id");
DEFINE_CONSTANT(pg_name_key, "process_group");
DEFINE_CONSTANT(collective_seq_id_key, "collective_seq_id");
DEFINE_CONSTANT(p2p_seq_id_key, "p2p_seq_id");
DEFINE_CONSTANT(is_p2p_key, "is_p2p");
DEFINE_CONSTANT(op_id_key, "op_id");
DEFINE_CONSTANT(profiling_name_key, "profiling_name");
DEFINE_CONSTANT(input_sizes_key, "input_sizes");
DEFINE_CONSTANT(input_dtypes_key, "input_dtypes");
DEFINE_CONSTANT(output_sizes_key, "output_sizes");
DEFINE_CONSTANT(output_dtypes_key, "output_dtypes");
DEFINE_CONSTANT(time_created_key, "time_created_ns");
DEFINE_CONSTANT(duration_key, "duration_ms");
DEFINE_CONSTANT(timeout_key, "timeout_ms");
DEFINE_CONSTANT(frames_key, "frames");
DEFINE_CONSTANT(state_key, "state");
DEFINE_CONSTANT(line_key, "line");
DEFINE_CONSTANT(name_key, "name");
DEFINE_CONSTANT(filename_key, "filename");
DEFINE_CONSTANT(retired_key, "retired");
DEFINE_CONSTANT(time_discovered_started_key, "time_discovered_started_ns");
DEFINE_CONSTANT(time_discovered_completed_key, "time_discovered_completed_ns");
DEFINE_CONSTANT(completed_state, "completed");
DEFINE_CONSTANT(scheduled_state, "scheduled");
DEFINE_CONSTANT(started_state, "started");
#undef DEFINE_CONSTANT
TORCH_API size_t hashTensors(const std::vector<at::Tensor>& tensors);
TORCH_API std::string getNcclVersion();
@ -805,6 +813,77 @@ struct NCCLTraceBuffer {
}
}
std::list<json> getCollectiveTraceJson(bool onlyActive) {
auto result = dump_entries();
std::list<json> entries;
for (auto i : c10::irange(result.size())) {
json j;
auto& e = result.at(i);
if (onlyActive && e.time_discovered_completed_.has_value()) {
continue;
}
j[record_id_key_str] = int64_t(e.id_);
j[pg_id_key_str] = int64_t(e.pg_id_);
j[pg_name_key_str] = e.pg_name_;
j[collective_seq_id_key_str] = int64_t(e.collective_seq_id_);
j[p2p_seq_id_key_str] = int64_t(e.p2p_seq_id_);
j[op_id_key_str] = int64_t(e.op_id_);
j[profiling_name_key_str] = e.profiling_name_;
j[time_created_key_str] = int64_t(e.time_created_);
if (e.duration_) {
j[duration_key_str] = *e.duration_;
}
auto it = e.sizes_.begin();
auto read_sizes = [&](const c10::SmallVector<int, 4>& dims) {
auto sizes = std::list<std::list<int>>();
for (auto dim : dims) {
auto arg_sizes = std::list<int>();
for (auto i : c10::irange(dim)) {
(void)i;
arg_sizes.push_back(*it++);
}
sizes.push_back(arg_sizes);
}
return sizes;
};
j[input_sizes_key_str] = read_sizes(e.input_dims_);
std::vector<std::string> input_dtypes_strs;
input_dtypes_strs.reserve(e.input_dtypes_.size());
for (const auto& input_dtype : e.input_dtypes_) {
input_dtypes_strs.push_back(c10::toString(input_dtype));
}
j[input_dtypes_key_str] = input_dtypes_strs;
j[output_sizes_key_str] = read_sizes(e.output_dims_);
std::vector<std::string> output_dtypes_strs;
output_dtypes_strs.reserve(e.output_dtypes_.size());
for (const auto& output_dtype : e.output_dtypes_) {
output_dtypes_strs.push_back(c10::toString(output_dtype));
}
j[output_dtypes_key_str] = output_dtypes_strs;
if (e.time_discovered_completed_.has_value()) {
j[state_key_str] = completed_state_str;
} else if (e.time_discovered_started_.has_value()) {
j[state_key_str] = started_state_str;
} else {
j[state_key_str] = scheduled_state_str;
}
j[time_discovered_started_key_str] =
e.time_discovered_started_.has_value()
? int64_t(*e.time_discovered_started_)
: 0;
j[time_discovered_completed_key_str] =
e.time_discovered_completed_.has_value()
? int64_t(*e.time_discovered_completed_)
: 0;
j[retired_key_str] = e.retired_;
j[timeout_key_str] = e.timeout_ms_;
j[is_p2p_key_str] = e.isP2P_;
entries.emplace_back(j);
}
return entries;
}
const c10::List<c10::IValue> getCollectiveTrace(
bool includeStacktraces,
bool onlyActive) {
@ -833,7 +912,6 @@ struct NCCLTraceBuffer {
if (onlyActive && e.time_discovered_completed_.has_value()) {
continue;
}
if (includeStacktraces) {
auto& tb = stracebacks.tracebacks.at(i);
auto frames = new_list();
@ -884,11 +962,11 @@ struct NCCLTraceBuffer {
}
dict.insert(output_dtypes_key, output_dtypes_strs);
if (e.time_discovered_completed_.has_value()) {
dict.insert(state_key, "completed");
dict.insert(state_key, completed_state);
} else if (e.time_discovered_started_.has_value()) {
dict.insert(state_key, "started");
dict.insert(state_key, started_state);
} else {
dict.insert(state_key, "scheduled");
dict.insert(state_key, scheduled_state);
}
dict.insert(
@ -923,6 +1001,44 @@ struct NCCLTraceBuffer {
return pg_config;
}
const std::map<std::string, std::map<std::string, std::string>>
getPgConfigJson() {
std::map<std::string, std::map<std::string, std::string>> result;
for (const auto& [pg_name, ranks] : pg_name_to_ranks_) {
auto pg_info = std::map<std::string, std::string>();
pg_info["name"] = std::get<0>(pg_name);
pg_info["desc"] = std::get<1>(pg_name);
pg_info["ranks"] = ranks_str(ranks);
result.emplace(std::get<0>(pg_name), pg_info);
}
return result;
}
std::string dump_json(
const std::optional<std::unordered_map<
std::string,
std::unordered_map<std::string, std::string>>>& ncclDumpMap,
bool includeCollectives,
bool onlyActive) {
json result;
result[version_key_str] = version_val_str;
result[pg_config_key_str] = getPgConfigJson();
// collective trace
if (includeCollectives) {
auto entries = getCollectiveTraceJson(onlyActive);
if (entries.size() > 0) {
result[entries_key_str] = entries;
}
}
if (ncclDumpMap.has_value()) {
result[nccl_comm_key_str] = ncclDumpMap.value();
}
return result.dump();
}
// dump all collectives + ncclDumpMap
std::string dump(
const std::optional<std::unordered_map<
@ -959,7 +1075,6 @@ struct NCCLTraceBuffer {
return pickle_str(result);
}
};
} // namespace c10d
#endif // USE_C10D_NCCL

View File

@ -342,11 +342,9 @@ void cacheAllocatorDeregisterHook(
}
}
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
getNCCLCommDumpMap() {
#if defined(IS_NCCLX) && defined(NCCL_COMM_DUMP)
std::string dump_nccl_trace(
bool includeCollectives,
bool includeStackTraces,
bool onlyActive) {
std::unordered_map<
std::string /* ncclUniqueID */,
std::unordered_map<std::string, std::string> /* dump from this comm */>
@ -366,19 +364,28 @@ std::string dump_nccl_trace(
std::string ncclUniqueIDStr = buildNcclUniqueIdStr(ncclComm->getNcclId());
ncclDumpMap[ncclUniqueIDStr] = ncclComm->ncclCommDump();
}
return NCCLTraceBuffer::get()->dump(
ncclDumpMap, includeCollectives, includeStackTraces, onlyActive);
return ncclDumpMap;
#else
return std::unordered_map<
std::string,
std::unordered_map<std::string, std::string>>();
#endif
}
#else
std::string dump_nccl_trace(
bool includeCollectives,
bool includeStackTraces,
bool onlyActive) {
auto ncclDumpMap = getNCCLCommDumpMap();
return NCCLTraceBuffer::get()->dump(
std::nullopt, includeCollectives, includeStackTraces, onlyActive);
ncclDumpMap, includeCollectives, includeStackTraces, onlyActive);
}
std::string dump_nccl_trace_json(bool includeCollectives, bool onlyActive) {
auto ncclDumpMap = getNCCLCommDumpMap();
return NCCLTraceBuffer::get()->dump_json(
ncclDumpMap, includeCollectives, onlyActive);
}
#endif
std::optional<std::function<void(std::function<void(const std::string&)>)>>&
get_cpp_trace_dumper() {

View File

@ -1121,6 +1121,13 @@ TORCH_API std::string dump_nccl_trace(
bool includeStackTraces,
bool onlyActive);
// Dumps the NCCL comm traces and additional information about the Process
// Group in JSON formatted string.
// We don't include stack traces in JSON format as it is far too much data.
TORCH_API std::string dump_nccl_trace_json(
bool includeCollectives,
bool onlyActive);
// Gets a mutable reference to a global optional function.Heartbeat Monitor
// will use this function to dump traces, if available. Inside fbcode, we
// store a function here that uses an internal tool for process tracing

View File

@ -3194,6 +3194,23 @@ such as `dist.all_reduce(tensor, async_op=True)`.
Arguments:
tensors(List[torch.Tensor]): List of tensors we want to hash.
)");
module.def(
"_dump_nccl_trace_json",
[](std::optional<bool> includeCollectives,
std::optional<bool> onlyActive) {
return py::bytes(::c10d::dump_nccl_trace_json(
includeCollectives.value_or(true), onlyActive.value_or(false)));
},
py::arg("includeCollectives") = std::optional<bool>(),
py::arg("onlyActive") = std::optional<bool>(),
R"(
Arguments:
includeCollectives(bool, optional): Whether to include collective work traces. Default is True.
onlyActive (bool, optional): Whether to only include active collective work traces. Default is False.
Returns:
Stringified json work traces.
Default settings return everything - i.e. contains NCCL comm dumps and collective traces.
)");
module.def(
"_dump_nccl_trace",
[](std::optional<bool> includeCollectives,