[Utilization][Usage Log] Add data model for record (#145114)

Add data model for consistency and data model change in the future.

The data model will be used during the post-test-process pipeline
Pull Request resolved: https://github.com/pytorch/pytorch/pull/145114
Approved by: https://github.com/huydhn
This commit is contained in:
Yang Wang 2025-01-23 19:04:41 +00:00 committed by PyTorch MergeBot
parent 2f317bbdbc
commit 6d4f5f7688
5 changed files with 150 additions and 75 deletions

View File

@ -371,3 +371,8 @@ pulp==2.9.0 ; python_version >= "3.8"
#Description: required for testing ilp formulaiton under torch/distributed/_tools #Description: required for testing ilp formulaiton under torch/distributed/_tools
#Pinned versions: 2.9.0 #Pinned versions: 2.9.0
#test that import: test_sac_ilp.py #test that import: test_sac_ilp.py
dataclasses_json==0.6.7
#Description: required for data pipeline and scripts under tools/stats
#Pinned versions: 0.6.7
#test that import:

View File

@ -155,13 +155,25 @@ jobs:
nvidia-smi nvidia-smi
if: ${{ contains(matrix.runner, 'a100') && steps.check_container_runner.outputs.IN_CONTAINER_RUNNER == 'false' }} if: ${{ contains(matrix.runner, 'a100') && steps.check_container_runner.outputs.IN_CONTAINER_RUNNER == 'false' }}
- name: Get workflow job id
id: get-job-id
uses: ./.github/actions/get-workflow-job-id
if: always()
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Start monitoring script - name: Start monitoring script
id: monitor-script id: monitor-script
if: ${{ !inputs.disable-monitor }} if: ${{ !inputs.disable-monitor }}
shell: bash shell: bash
continue-on-error: true continue-on-error: true
env:
JOB_ID: ${{ steps.get-job-id.outputs.job-id }}
JOB_NAME: ${{ steps.get-job-id.outputs.job-name }}
WORKFLOW_NAME: ${{ github.workflow }}
WORKFLOW_RUN_ID: ${{github.run_id}}
run: | run: |
python3 -m pip install psutil==5.9.1 nvidia-ml-py==11.525.84 python3 -m pip install psutil==5.9.1 nvidia-ml-py==11.525.84 dataclasses_json==0.6.7
python3 -m tools.stats.monitor > usage_log.txt 2>&1 & python3 -m tools.stats.monitor > usage_log.txt 2>&1 &
echo "monitor-script-pid=${!}" >> "${GITHUB_OUTPUT}" echo "monitor-script-pid=${!}" >> "${GITHUB_OUTPUT}"
@ -180,13 +192,6 @@ jobs:
id: parse-ref id: parse-ref
run: .github/scripts/parse_ref.py run: .github/scripts/parse_ref.py
- name: Get workflow job id
id: get-job-id
uses: ./.github/actions/get-workflow-job-id
if: always()
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Check for keep-going label and re-enabled test issues - name: Check for keep-going label and re-enabled test issues
# This uses the filter-test-configs action because it conviniently # This uses the filter-test-configs action because it conviniently
# checks for labels and re-enabled test issues. It does not actually do # checks for labels and re-enabled test issues. It does not actually do

View File

@ -161,6 +161,7 @@ init_command = [
'rich==10.9.0', 'rich==10.9.0',
'pyyaml==6.0.1', 'pyyaml==6.0.1',
'optree==0.13.0', 'optree==0.13.0',
'dataclasses_json==0.6.7'
] ]
[[linter]] [[linter]]

View File

@ -19,11 +19,17 @@ Usage:
from __future__ import annotations from __future__ import annotations
import os
import sys
# adding sys.path makes the monitor script able to import path tools.stats.utilization_stats_lib
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
import argparse import argparse
import copy import copy
import dataclasses import dataclasses
import datetime import datetime
import json import os
import signal import signal
import threading import threading
import time import time
@ -32,11 +38,23 @@ from typing import Any
import psutil # type: ignore[import] import psutil # type: ignore[import]
from tools.stats.utilization_stats_lib import (
getDataModelVersion,
GpuUsage,
RecordData,
UtilizationMetadata,
UtilizationRecord,
UtilizationStats,
)
_HAS_PYNVML = False _HAS_PYNVML = False
_HAS_AMDSMI = False _HAS_AMDSMI = False
_DATA_MODEL_VERSION = 1.0 _job_name = os.environ.get("JOB_NAME", "")
_job_id = os.environ.get("JOB_ID", "")
_workflow_run_id = os.environ.get("WORKFLOW_RUN_ID", "")
_workflow_name = os.environ.get("WORKFLOW_NAME", "")
@dataclasses.dataclass @dataclasses.dataclass
@ -164,16 +182,20 @@ class UsageLogger:
in a pretty format with more information. in a pretty format with more information.
""" """
self._log_interval = log_interval self._log_interval = log_interval
self._summary_info = { self._metadata = UtilizationMetadata(
"level": "metadata", level="metadata",
"interval": self._log_interval, usage_collect_interval=self._log_interval,
"data_model_version": _DATA_MODEL_VERSION, data_model_version=getDataModelVersion(),
} job_id=_job_id,
job_name=_job_name,
workflow_id=_workflow_run_id,
workflow_name=_workflow_name,
)
self._data_collect_interval = data_collect_interval self._data_collect_interval = data_collect_interval
self._has_pynvml = pynvml_enabled self._has_pynvml = pynvml_enabled
self._has_amdsmi = amdsmi_enabled self._has_amdsmi = amdsmi_enabled
self._gpu_handles: list[Any] = [] self._gpu_handles: list[Any] = []
self._gpu_libs_detected: list[str] = [] self._gpu_lib_detected: str = ""
self._num_of_cpus = 0 self._num_of_cpus = 0
self._debug_mode = is_debug_mode self._debug_mode = is_debug_mode
self._initial_gpu_handler() self._initial_gpu_handler()
@ -210,31 +232,32 @@ class UsageLogger:
finally: finally:
time.sleep(self._data_collect_interval) time.sleep(self._data_collect_interval)
def _generate_stats(self, data_list: list[float]) -> dict[str, Any]: def _generate_stats(self, data_list: list[float]) -> UtilizationStats:
""" """
Generate stats from the data list. Generate stats from the data list.
""" """
if len(data_list) == 0: if len(data_list) == 0:
return {} return UtilizationStats()
total = sum(data_list) total = sum(data_list)
avg = total / len(data_list) avg = total / len(data_list)
maxi = max(data_list) maxi = max(data_list)
return {
"avg": round(avg, 2), return UtilizationStats(
"max": round(maxi, 2), avg=round(avg, 2),
} max=round(maxi, 2),
)
def _output_data(self) -> None: def _output_data(self) -> None:
""" """
output the data. output the data.
""" """
self._summary_info["start_time"] = datetime.datetime.now().timestamp() self._metadata.start_at = datetime.datetime.now().timestamp()
self.log_json(self._summary_info) self.log_json(self._metadata.to_json())
while not self.exit_event.is_set(): while not self.exit_event.is_set():
collecting_start_time = time.time() collecting_start_time = time.time()
stats = {} stats = UtilizationRecord()
try: try:
data_list, error_list = self.shared_resource.get_and_reset() data_list, error_list = self.shared_resource.get_and_reset()
if self._debug_mode: if self._debug_mode:
@ -252,13 +275,8 @@ class UsageLogger:
if not data_list: if not data_list:
# pass since no data is collected # pass since no data is collected
continue continue
stats.level = "record"
stats.update( stats.timestamp = datetime.datetime.now().timestamp()
{
"level": "record",
"time": datetime.datetime.now().timestamp(),
}
)
cpu_stats = self._generate_stats( cpu_stats = self._generate_stats(
[data.cpu_percent for data in data_list] [data.cpu_percent for data in data_list]
@ -271,43 +289,35 @@ class UsageLogger:
cmds = { cmds = {
process["cmd"] for data in data_list for process in data.processes process["cmd"] for data in data_list for process in data.processes
} }
stats.update(
{ stats.cmd_names = list(cmds)
"cpu": cpu_stats, record = RecordData()
"memory": memory_stats, record.cpu = cpu_stats
"cmds": list(cmds), record.memory = memory_stats
"count": len(data_list),
}
)
# collect gpu metrics # collect gpu metrics
if self._has_pynvml or self._has_amdsmi: if self._has_pynvml or self._has_amdsmi:
gpu_list = self._calculate_gpu_utilization(data_list) gpu_list = self._calculate_gpu_utilization(data_list)
stats.update( record.gpu_usage = gpu_list
{ stats.data = record
"gpu_list": gpu_list,
}
)
except Exception as e: except Exception as e:
stats = { stats = UtilizationRecord(
"level": "record", level="record",
"time": datetime.datetime.now().timestamp(), timestamp=datetime.datetime.now().timestamp(),
"error": str(e), error=str(e),
} )
finally: finally:
collecting_end_time = time.time() collecting_end_time = time.time()
time_diff = collecting_end_time - collecting_start_time time_diff = collecting_end_time - collecting_start_time
# verify there is data # verify there is data
if stats: if stats.level:
stats["log_duration"] = f"{time_diff * 1000:.2f} ms" stats.log_duration = f"{time_diff * 1000:.2f} ms"
self.log_json(stats) self.log_json(stats.to_json())
time.sleep(self._log_interval) time.sleep(self._log_interval)
# shut down gpu connections when exiting # shut down gpu connections when exiting
self._shutdown_gpu_connections() self._shutdown_gpu_connections()
def _calculate_gpu_utilization( def _calculate_gpu_utilization(self, data_list: list[UsageData]) -> list[GpuUsage]:
self, data_list: list[UsageData]
) -> list[dict[str, Any]]:
""" """
Calculates the GPU utilization. Calculates the GPU utilization.
""" """
@ -324,11 +334,11 @@ class UsageLogger:
gpu_util_stats = self._generate_stats(gpu_utilization[gpu_uuid]) gpu_util_stats = self._generate_stats(gpu_utilization[gpu_uuid])
gpu_mem_util_stats = self._generate_stats(gpu_mem_utilization[gpu_uuid]) gpu_mem_util_stats = self._generate_stats(gpu_mem_utilization[gpu_uuid])
calculate_gpu.append( calculate_gpu.append(
{ GpuUsage(
"uuid": gpu_uuid, uuid=gpu_uuid,
"util_percent": gpu_util_stats, util_percent=gpu_util_stats,
"mem_util_percent": gpu_mem_util_stats, mem_util_percent=gpu_mem_util_stats,
} )
) )
return calculate_gpu return calculate_gpu
@ -348,10 +358,7 @@ class UsageLogger:
""" """
Logs the stats in json format to stdout. Logs the stats in json format to stdout.
""" """
if self._debug_mode: print(stats)
print(json.dumps(stats, indent=4))
return
print(json.dumps(stats))
def _collect_gpu_data(self) -> list[GpuData]: def _collect_gpu_data(self) -> list[GpuData]:
gpu_data_list = [] gpu_data_list = []
@ -391,7 +398,7 @@ class UsageLogger:
""" """
try: try:
if self._has_pynvml: if self._has_pynvml:
self._gpu_libs_detected.append("pynvml") self._gpu_lib_detected = "pynvml"
# Todo: investigate if we can use device uuid instead of index. # Todo: investigate if we can use device uuid instead of index.
# there is chance that the gpu index can change when the gpu is rebooted. # there is chance that the gpu index can change when the gpu is rebooted.
self._gpu_handles = [ self._gpu_handles = [
@ -399,20 +406,17 @@ class UsageLogger:
for i in range(pynvml.nvmlDeviceGetCount()) for i in range(pynvml.nvmlDeviceGetCount())
] ]
if self._has_amdsmi: if self._has_amdsmi:
self._gpu_libs_detected.append("amdsmi") self._gpu_lib_detected = "amdsmi"
self._gpu_handles = amdsmi.amdsmi_get_processor_handles() self._gpu_handles = amdsmi.amdsmi_get_processor_handles()
self._num_of_cpus = psutil.cpu_count(logical=False) self._num_of_cpus = psutil.cpu_count(logical=False)
# update summary info # update summary info
self._summary_info.update( self._metadata.gpu_type = self._gpu_lib_detected
{ self._metadata.gpu_count = len(self._gpu_handles)
"gpu_libs_detected": self._gpu_libs_detected, self._metadata.cpu_count = self._num_of_cpus
"num_of_gpus": len(self._gpu_handles),
"num_of_cpus": self._num_of_cpus,
}
)
except Exception as e: except Exception as e:
self._summary_info["error"] = str(e) self._metadata.error = str(e)
def _shutdown_gpu_connections(self) -> None: def _shutdown_gpu_connections(self) -> None:
if self._has_amdsmi: if self._has_amdsmi:

View File

@ -0,0 +1,60 @@
from dataclasses import dataclass
from typing import Optional
from dataclasses_json import DataClassJsonMixin
_DATA_MODEL_VERSION = 1.0
# the db schema related to this is:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql
# data model for test log usage
@dataclass
class UtilizationStats:
avg: Optional[float] = None
max: Optional[float] = None
@dataclass
class UtilizationMetadata(DataClassJsonMixin):
level: Optional[str] = None
workflow_id: Optional[str] = None
job_id: Optional[str] = None
workflow_name: Optional[str] = None
job_name: Optional[str] = None
usage_collect_interval: Optional[float] = None
data_model_version: Optional[float] = None
gpu_count: Optional[int] = None
cpu_count: Optional[int] = None
gpu_type: Optional[str] = None
start_at: Optional[float] = None
error: Optional[str] = None
@dataclass
class GpuUsage(DataClassJsonMixin):
uuid: Optional[str] = None
util_percent: Optional[UtilizationStats] = None
mem_util_percent: Optional[UtilizationStats] = None
@dataclass
class RecordData(DataClassJsonMixin):
cpu: Optional[UtilizationStats] = None
memory: Optional[UtilizationStats] = None
gpu_usage: Optional[list[GpuUsage]] = None
@dataclass
class UtilizationRecord(DataClassJsonMixin):
level: Optional[str] = None
timestamp: Optional[float] = None
data: Optional[RecordData] = None
cmd_names: Optional[list[str]] = None
error: Optional[str] = None
log_duration: Optional[str] = None
def getDataModelVersion() -> float:
return _DATA_MODEL_VERSION