pytorch/benchmarks/distributed/rpc/rl/coordinator.py
Sam Estep 8c798e0622 Forbid trailing whitespace (#53406)
Summary:
Context: https://github.com/pytorch/pytorch/pull/53299#discussion_r587882857

These are the only hand-written parts of this diff:
- the addition to `.github/workflows/lint.yml`
- the file endings changed in these four files (to appease FB-internal land-blocking lints):
  - `GLOSSARY.md`
  - `aten/src/ATen/core/op_registration/README.md`
  - `scripts/README.md`
  - `torch/csrc/jit/codegen/fuser/README.md`

The rest was generated by running this command (on macOS):
```
git grep -I -l ' $' -- . ':(exclude)**/contrib/**' ':(exclude)third_party' | xargs gsed -i 's/ *$//'
```

I looked over the auto-generated changes and didn't see anything that looked problematic.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/53406

Test Plan:
This run (after adding the lint but before removing existing trailing spaces) failed:
- https://github.com/pytorch/pytorch/runs/2043032377

This run (on the tip of this PR) succeeded:
- https://github.com/pytorch/pytorch/runs/2043296348

Reviewed By: walterddr, seemethere

Differential Revision: D26856620

Pulled By: samestep

fbshipit-source-id: 3f0de7f7c2e4b0f1c089eac9b5085a58dd7e0d97
2021-03-05 17:22:55 -08:00

140 lines
5.3 KiB
Python

import numpy as np
import time
import torch
import torch.distributed.rpc as rpc
from agent import AgentBase
from observer import ObserverBase
COORDINATOR_NAME = "coordinator"
AGENT_NAME = "agent"
OBSERVER_NAME = "observer{}"
EPISODE_STEPS = 100
class CoordinatorBase:
def __init__(self, batch_size, batch, state_size, nlayers, out_features):
r"""
Coordinator object to run on worker. Only one coordinator exists. Responsible
for facilitating communication between agent and observers and recording benchmark
throughput and latency data.
Args:
batch_size (int): Number of observer requests to process in a batch
batch (bool): Whether to process and respond to observer requests as a batch or 1 at a time
state_size (list): List of ints dictating the dimensions of the state
nlayers (int): Number of layers in the model
out_features (int): Number of out features in the model
"""
self.batch_size = batch_size
self.batch = batch
self.agent_rref = None # Agent RRef
self.ob_rrefs = [] # Observer RRef
agent_info = rpc.get_worker_info(AGENT_NAME)
self.agent_rref = rpc.remote(agent_info, AgentBase)
for rank in range(batch_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(rank + 2))
ob_ref = rpc.remote(ob_info, ObserverBase)
self.ob_rrefs.append(ob_ref)
ob_ref.rpc_sync().set_state(state_size, batch)
self.agent_rref.rpc_sync().set_world(
batch_size, state_size, nlayers, out_features, self.batch)
def run_coordinator(self, episodes, episode_steps, queue):
r"""
Runs n benchmark episodes. Each episode is started by coordinator telling each
observer to contact the agent. Each episode is concluded by coordinator telling agent
to finish the episode, and then the coordinator records benchmark data
Args:
episodes (int): Number of episodes to run
episode_steps (int): Number steps to be run in each episdoe by each observer
queue (SimpleQueue): SimpleQueue from torch.multiprocessing.get_context() for
saving benchmark run results to
"""
agent_latency_final = []
agent_throughput_final = []
observer_latency_final = []
observer_throughput_final = []
for ep in range(episodes):
ep_start_time = time.time()
print(f"Episode {ep} - ", end='')
n_steps = episode_steps
agent_start_time = time.time()
futs = []
for ob_rref in self.ob_rrefs:
futs.append(ob_rref.rpc_async().run_ob_episode(
self.agent_rref, n_steps))
rets = torch.futures.wait_all(futs)
agent_latency, agent_throughput = self.agent_rref.rpc_sync().finish_episode(rets)
self.agent_rref.rpc_sync().reset_metrics()
agent_latency_final += agent_latency
agent_throughput_final += agent_throughput
observer_latency_final += [ret[2] for ret in rets]
observer_throughput_final += [ret[3] for ret in rets]
ep_end_time = time.time()
episode_time = ep_end_time - ep_start_time
print(round(episode_time, 3))
observer_latency_final = [t for s in observer_latency_final for t in s]
observer_throughput_final = [
t for s in observer_throughput_final for t in s]
benchmark_metrics = {'agent latency (seconds)': {},
'agent throughput': {},
'observer latency (seconds)': {},
'observer throughput': {}}
print("For batch size {0}".format(self.batch_size))
print("\nAgent Latency - ", len(agent_latency_final))
agent_latency_final = sorted(agent_latency_final)
for p in [50, 75, 90, 95]:
v = np.percentile(agent_latency_final, p)
print("p" + str(p) + ":", round(v, 3))
p = f'p{p}'
benchmark_metrics['agent latency (seconds)'][p] = round(v, 3)
print("\nAgent Throughput - ", len(agent_throughput_final))
agent_throughput_final = sorted(agent_throughput_final)
for p in [50, 75, 90, 95]:
v = np.percentile(agent_throughput_final, p)
print("p" + str(p) + ":", int(v))
p = f'p{p}'
benchmark_metrics['agent throughput'][p] = int(v)
print("\nObserver Latency - ", len(observer_latency_final))
observer_latency_final = sorted(observer_latency_final)
for p in [50, 75, 90, 95]:
v = np.percentile(observer_latency_final, p)
print("p" + str(p) + ":", round(v, 3))
p = f'p{p}'
benchmark_metrics['observer latency (seconds)'][p] = round(v, 3)
print("\nObserver Throughput - ", len(observer_throughput_final))
observer_throughput_final = sorted(observer_throughput_final)
for p in [50, 75, 90, 95]:
v = np.percentile(observer_throughput_final, p)
print("p" + str(p) + ":", int(v))
p = f'p{p}'
benchmark_metrics['observer throughput'][p] = int(v)
if queue:
queue.put(benchmark_metrics)