pytorch/torch/csrc/distributed/c10d/logger.cpp
Rohan Varma 4feef6c970 Log static graph in constructor if it is set (#72456)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/72456

It is easier to log if static graph is set at construction time now that it is natively supported in DDP constructor, as opposed to waiting for the first iteration to finish. In some failure cases we're seeing the first iteration does not finish and thus we don't have this data which is vaulable to debug.
ghstack-source-id: 148840679

Test Plan: CI

Reviewed By: zhaojuanmao

Differential Revision: D34045204

fbshipit-source-id: 72a187c1ce031db217de4b3ad20a64f2a74995bc
(cherry picked from commit 1d622c88f3)
2022-02-11 15:55:09 +00:00

407 lines
16 KiB
C++

#include <c10/util/StringUtil.h>
#include <c10d/Utils.hpp>
#include <c10d/logger.hpp>
#include <fmt/format.h>
#include <string>
#ifdef USE_C10D_GLOO
#include <c10d/ProcessGroupGloo.hpp>
#endif
namespace c10d {
// Logs runtime stats to configured destination. Note that since data collection
// only runs every ddp_runtime_logging_sample_rate iterations, the actual
// training iterations recorded will be like 10,
// (20-10) * ddp_runtime_logging_sample_rate,
// (50-10) * ddp_runtime_logging_sample_rate and so on.
const int LoggingIterations[] = {10, 20, 50, 100, 500, 800, 1000}; // NOLINT
std::ostream& operator<<(std::ostream& output, const Logger& logger) {
auto& ddp_logging_data = (*logger.ddp_logging_data_);
std::string loggerInfo = fmt::format(
"[Rank {} / {}] [before iteration {}] Training {} unused_parameter_size={} \n "
"Avg forward compute time: {} \n Avg backward compute time: {} \n"
"Avg backward comm. time: {} \n Avg backward comm/comp overlap time: {}",
ddp_logging_data.ints_map["rank"],
ddp_logging_data.ints_map["world_size"],
ddp_logging_data.ints_map["iteration"],
ddp_logging_data.strs_map["module_name"],
ddp_logging_data.ints_map["unused_parameter_size"],
ddp_logging_data.ints_map["avg_forward_compute_time"],
ddp_logging_data.ints_map["avg_backward_compute_time"],
ddp_logging_data.ints_map["avg_backward_comm_time"],
ddp_logging_data.ints_map["avg_backward_compute_comm_overlap_time"]);
if (ddp_logging_data.strs_map["comm_hook"] != "") {
loggerInfo += fmt::format(
"\n Gradient comm. hook: {}", ddp_logging_data.strs_map["comm_hook"]);
}
if (ddp_logging_data.ints_map["join_uneven_inputs"]) {
loggerInfo += "\n Uneven input detection with join() enabled.";
}
return output << loggerInfo;
}
Logger::Logger(std::shared_ptr<c10d::Reducer> reducer) {
reducer_ = reducer;
ddp_logging_data_ = std::make_unique<at::DDPLoggingData>();
}
std::once_flag log_graph_static_flag;
void Logger::log_if_graph_static(bool is_static) {
std::call_once(log_graph_static_flag, [this, is_static]() {
ddp_logging_data_->ints_map["can_set_static_graph"] = is_static;
// It is useful to report the iteration that training finished at.
ddp_logging_data_->ints_map["iteration"] = reducer_->num_iterations_;
at::LogPyTorchDDPUsage(*ddp_logging_data_);
});
}
// Environment variables
void Logger::set_env_variables() {
ddp_logging_data_->strs_map["master_port"] = parse_env("MASTER_PORT");
ddp_logging_data_->strs_map["master_addr"] = parse_env("MASTER_ADDR");
ddp_logging_data_->strs_map["torch_distributed_debug"] =
parse_env("TORCH_DISTRIBUTED_DEBUG");
ddp_logging_data_->strs_map["cuda_visible_devices"] =
parse_env("CUDA_VISIBLE_DEVICES");
if (reducer_->process_group_->getBackendName() == "nccl") {
ddp_logging_data_->strs_map["nccl_socket_ifname"] =
parse_env("NCCL_SOCKET_IFNAME");
ddp_logging_data_->strs_map["nccl_blocking_wait"] =
parse_env("NCCL_BLOCKING_WAIT");
ddp_logging_data_->strs_map["nccl_async_error_handling"] =
parse_env("NCCL_ASYNC_ERROR_HANDLING");
ddp_logging_data_->strs_map["nccl_debug"] = parse_env("NCCL_DEBUG");
ddp_logging_data_->strs_map["nccl_nthreads"] = parse_env("NCCL_NTHREADS");
ddp_logging_data_->strs_map["nccl_ib_timeout"] =
parse_env("NCCL_IB_TIMEOUT");
}
if (reducer_->process_group_->getBackendName() == "gloo") {
ddp_logging_data_->strs_map["gloo_socket_ifname"] =
parse_env("GLOO_SOCKET_IFNAME");
ddp_logging_data_->strs_map["gloo_device_transport"] =
parse_env("GLOO_DEVICE_TRANSPORT");
#ifdef USE_C10D_GLOO
auto gloo_pg =
static_cast<c10d::ProcessGroupGloo*>(reducer_->process_group_.get());
auto n_threads = gloo_pg->getNumThreads();
ddp_logging_data_->ints_map["gloo_num_threads"] = n_threads;
#endif
}
}
void Logger::set_parameter_stats() {
// The number of parameter tensors
ddp_logging_data_->ints_map["num_parameter_tensors"] =
reducer_->params_.size();
// Total parameters size (Bytes)
ddp_logging_data_->ints_map["total_parameter_size_bytes"] = 0;
// Parameters' data types, there may be multiple data
// types for mixed precision training.
std::set<std::string> unique_dtypes;
for (const auto& t : reducer_->params_) {
ddp_logging_data_->ints_map["total_parameter_size_bytes"] +=
t.numel() * t.element_size();
unique_dtypes.insert(std::string(t.dtype().name()));
}
ddp_logging_data_->strs_map["dtypes"] = c10::Join(", ", unique_dtypes);
}
std::vector<std::vector<size_t>> Logger::get_per_bucket_variable_indices() {
std::vector<std::vector<size_t>> per_bucket_variable_indices;
per_bucket_variable_indices.reserve(reducer_->buckets_.size());
for (const auto& bucket : reducer_->buckets_) {
const auto& indices = bucket.variable_indices;
per_bucket_variable_indices.push_back(indices);
}
return per_bucket_variable_indices;
}
std::vector<int> Logger::get_bucket_sizes() {
std::vector<int> bucket_sizes;
for (const auto& bucket : reducer_->buckets_) {
const auto& variables = bucket.replicas[0].variables;
int bucket_size = 0;
for (const auto& v : variables) {
bucket_size += v.numel() * v.element_size();
}
bucket_sizes.push_back(bucket_size);
}
return bucket_sizes;
}
std::vector<int> Logger::get_bucket_size_limits() {
std::vector<int> bucket_size_limits;
for (const auto& bucket : reducer_->buckets_) {
bucket_size_limits.push_back(bucket.bucket_size_limit);
}
return bucket_size_limits;
}
// Communication hook. Empty string if not set, in which case it will not be
// logged.
void Logger::set_comm_hook(const std::string& hook) {
ddp_logging_data_->strs_map["comm_hook"] = hook;
}
// Whether we are running under model.join() context manager for DDP uneven
// inputs.
void Logger::set_uneven_input_join() {
ddp_logging_data_->ints_map["join_uneven_inputs"] = true;
}
void Logger::set_static_graph() {
ddp_logging_data_->ints_map["static_graph"] = reducer_->static_graph_;
}
// Data that can be got during DistributedDataParallel construction time
void Logger::set_construction_data_and_log(
const std::string& module_name,
const std::vector<int>& device_ids,
int output_device,
bool broadcast_buffers,
bool has_sync_bn,
bool static_graph) {
// No lock is needed, as it will be called in DistributedDataParallel
// constructor.
if (static_graph) {
set_static_graph();
}
ddp_logging_data_->strs_map["module_name"] = module_name;
ddp_logging_data_->ints_map["world_size"] =
reducer_->process_group_->getSize();
ddp_logging_data_->ints_map["rank"] = reducer_->process_group_->getRank();
// In which iteration of the training loop the get_ddp_logging_data()
// is called to fetch the DDPLoggingData, 0 if the data is fetched
// before training loop.
ddp_logging_data_->ints_map["iteration"] = 0;
ddp_logging_data_->ints_map["is_multi_device_module"] =
reducer_->is_multi_device_module_;
set_parameter_stats();
// A list of bucket sizes (Bytes) calculated during construction time
ddp_logging_data_->strs_map["bucket_sizes"] =
c10::Join(", ", get_bucket_sizes());
// A list of bucket size limits (bytes) specified during construction time
ddp_logging_data_->strs_map["initial_bucket_size_limits"] =
c10::Join(", ", get_bucket_size_limits());
set_env_variables();
// DistributedDataParallel constructor input parameters
ddp_logging_data_->strs_map["device_ids"] = c10::Join(", ", device_ids);
ddp_logging_data_->ints_map["output_device"] = output_device;
ddp_logging_data_->ints_map["broadcast_buffers"] = broadcast_buffers;
ddp_logging_data_->ints_map["has_sync_bn"] = has_sync_bn;
ddp_logging_data_->ints_map["bucket_cap_bytes"] = reducer_->bucket_bytes_cap_;
ddp_logging_data_->ints_map["find_unused_parameters"] =
reducer_->find_unused_parameters_;
ddp_logging_data_->ints_map["gradient_as_bucket_view"] =
reducer_->gradient_as_bucket_view_;
ddp_logging_data_->strs_map["backend_name"] =
reducer_->process_group_->getBackendName();
if (parseDistDebugLevel() != DistributedDebugLevel::OFF) {
std::string initInfo = fmt::format(
"[Rank {}]: DDP Initialized with: \n",
ddp_logging_data_->ints_map["rank"]);
std::stringstream ddpLoggingDataInfo;
for (const auto& intItem : ddp_logging_data_->ints_map) {
ddpLoggingDataInfo << intItem.first << ": " << intItem.second << "\n";
}
for (const auto& strItem : ddp_logging_data_->strs_map) {
ddpLoggingDataInfo << strItem.first << ": " << strItem.second << "\n";
}
LOG(INFO) << initInfo << ddpLoggingDataInfo.str();
}
at::LogPyTorchDDPUsage(*ddp_logging_data_);
}
void Logger::set_event_time(
int64_t& event_time,
Timer& timer,
Timer::Event event) {
auto timestamp = timer.getTimestamp(event);
if (timestamp != c10::nullopt) {
// TODO: should we set this as human-readable time instead of unixtime?
event_time = *timestamp;
}
}
void Logger::calculate_avg_time(
int64_t& avg_time,
int64_t& time_duration,
Timer& timer,
Timer::Event start_event,
Timer::Event end_event) {
TORCH_CHECK(num_iterations_stats_recorded_ > 0);
c10::optional<int64_t> maybe_time_duration = timer.measureDifference(start_event, end_event);
if (!maybe_time_duration.has_value()) {
return;
}
time_duration = maybe_time_duration.value();
avg_time = (time_duration + avg_time * (num_iterations_stats_recorded_ - 1)) /
num_iterations_stats_recorded_;
}
void Logger::reset_performance_stats() {
ddp_logging_data_->ints_map["forward_compute_time"] = 0;
ddp_logging_data_->ints_map["backward_comm_time"] = 0;
ddp_logging_data_->ints_map["backward_compute_time"] = 0;
ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"] = 0;
ddp_logging_data_->ints_map["forward_compute_time_start"] = 0;
ddp_logging_data_->ints_map["backward_compute_time_start"] = 0;
ddp_logging_data_->ints_map["backward_comm_time_start"] = 0;
ddp_logging_data_->ints_map["backward_compute_time_end"] = 0;
ddp_logging_data_->ints_map["backward_comm_time_end"] = 0;
}
void Logger::set_runtime_stats_and_log() {
// Sync with reducer's data
std::lock_guard<std::mutex> lock(reducer_->mutex_);
// Set runtime stats at the sampling iterations.
if (!reducer_->should_collect_runtime_stats()) {
return;
}
num_iterations_stats_recorded_++;
// Set ith iteration when the runtime stats are set.
ddp_logging_data_->ints_map["iteration"] = reducer_->num_iterations_;
// When get_ddp_logging_data() is called, "unused_parameter_size",
// "has_rebuilt_buckets" and "rebuilt_bucket_sizes" are updated in the latest
// sampling iteration.
// If unused_parameters_ is not empty, calculate its sizes.
// unused_parameters_ is calculated in forward call of
// each iteration.
if (reducer_->unused_parameters_.size() == 0 &&
reducer_->find_unused_parameters_) {
// No unused params in this iteration
ddp_logging_data_->ints_map["unused_parameter_size"] = 0;
}
for (const auto& unused_index : reducer_->unused_parameters_) {
const auto& v = reducer_->params_[unused_index];
ddp_logging_data_->ints_map["unused_parameter_size"] +=
v.numel() * v.element_size();
}
// rebuilt_bucket_sizes will not change once buckets are rebuilt,
// so it only needs to set once during whole training loop.
// Rebuild buckets stats after 1st iteration
if (ddp_logging_data_->ints_map["has_rebuilt_buckets"] !=
reducer_->has_rebuilt_bucket_) {
ddp_logging_data_->ints_map["has_rebuilt_buckets"] =
reducer_->has_rebuilt_bucket_;
ddp_logging_data_->strs_map["rebuilt_bucket_sizes"] =
c10::Join(", ", get_bucket_sizes());
ddp_logging_data_->strs_map["rebuilt_bucket_size_limits"] =
c10::Join(", ", get_bucket_size_limits());
// Log per-bucket variable indices
std::vector<std::string> per_bucket_variable_indices;
auto indices = get_per_bucket_variable_indices();
per_bucket_variable_indices.reserve(indices.size());
for (const auto& bucket_indices : indices) {
per_bucket_variable_indices.push_back(c10::Join(" ", bucket_indices));
}
ddp_logging_data_->strs_map["rebuilt_per_bucket_param_indices"] =
c10::Join(", ", per_bucket_variable_indices);
}
// Log gradient ready order
if (!reducer_->grad_ready_order_indices_.empty()) {
// Note that the indices are for the previous iteration as
// this function is called in forward pass, and we last computed gradient
// ready order in the last backward pass.
ddp_logging_data_->strs_map["prev_iteration_grad_ready_order_indices"] =
c10::Join(", ", reducer_->grad_ready_order_indices_);
}
reset_performance_stats();
// Cuda time stats are only collected for single device modules.
if (reducer_->params_[0].is_cuda() && reducer_->is_multi_device_module_) {
TORCH_WARN_ONCE(
"Cuda time stats are not collected for multi-device modules."
);
return;
}
TORCH_INTERNAL_ASSERT(reducer_->timer_);
calculate_avg_time(
ddp_logging_data_->ints_map["avg_forward_compute_time"],
ddp_logging_data_->ints_map["forward_compute_time"],
*reducer_->timer_,
Timer::Event::kForwardStart,
Timer::Event::kBackwardComputeStart);
calculate_avg_time(
ddp_logging_data_->ints_map["avg_backward_compute_time"],
ddp_logging_data_->ints_map["backward_compute_time"],
*reducer_->timer_,
Timer::Event::kBackwardComputeStart,
Timer::Event::kBackwardComputeEnd);
calculate_avg_time(
ddp_logging_data_->ints_map["avg_backward_comm_time"],
ddp_logging_data_->ints_map["backward_comm_time"],
*reducer_->timer_,
Timer::Event::kBackwardCommStart,
Timer::Event::kBackwardCommEnd);
calculate_avg_time(
ddp_logging_data_->ints_map["avg_backward_compute_comm_overlap_time"],
ddp_logging_data_->ints_map["backward_compute_comm_overlap_time"],
*reducer_->timer_,
Timer::Event::kBackwardCommStart,
Timer::Event::kBackwardComputeEnd);
set_event_time(
ddp_logging_data_->ints_map["forward_compute_time_start"],
*reducer_->timer_,
Timer::Event::kForwardStart
);
set_event_time(
ddp_logging_data_->ints_map["backward_compute_time_start"],
*reducer_->timer_,
Timer::Event::kBackwardComputeStart
);
set_event_time(
ddp_logging_data_->ints_map["backward_comm_time_start"],
*reducer_->timer_,
Timer::Event::kBackwardCommStart
);
set_event_time(
ddp_logging_data_->ints_map["backward_compute_time_end"],
*reducer_->timer_,
Timer::Event::kBackwardComputeEnd
);
set_event_time(
ddp_logging_data_->ints_map["backward_comm_time_end"],
*reducer_->timer_,
Timer::Event::kBackwardCommEnd
);
// Log runtime stats to stderr if TORCH_DISTRIBUTED_DEBUG=DETAIL is enabled.
if (parseDistDebugLevel() == DistributedDebugLevel::DETAIL) {
LOG(INFO) << *this;
}
// Log runtime (e.g. avg performance) stats at the beginning and also
// after a larger number of iterations. Choosing 10/1000/10000 is
// not scientific here, it assumes most of applications will run
// at least 10 iterations. stats could have smaller variance if
// selected num_iterations_ is larger.
if (std::find(
std::begin(LoggingIterations),
std::end(LoggingIterations),
num_iterations_stats_recorded_) != std::end(LoggingIterations)) {
at::LogPyTorchDDPUsage(*ddp_logging_data_);
}
}
at::DDPLoggingData Logger::get_ddp_logging_data() {
std::lock_guard<std::mutex> lock(reducer_->mutex_);
return *ddp_logging_data_;
}
} // namespace c10d