From 6d4f5f76883edb12d8e6ae6bab5564ec222367b3 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 23 Jan 2025 19:04:41 +0000 Subject: [PATCH] [Utilization][Usage Log] Add data model for record (#145114) Add data model for consistency and data model change in the future. The data model will be used during the post-test-process pipeline Pull Request resolved: https://github.com/pytorch/pytorch/pull/145114 Approved by: https://github.com/huydhn --- .ci/docker/requirements-ci.txt | 5 + .github/workflows/_linux-test.yml | 21 ++-- .lintrunner.toml | 1 + tools/stats/monitor.py | 138 ++++++++++++++------------- tools/stats/utilization_stats_lib.py | 60 ++++++++++++ 5 files changed, 150 insertions(+), 75 deletions(-) create mode 100644 tools/stats/utilization_stats_lib.py diff --git a/.ci/docker/requirements-ci.txt b/.ci/docker/requirements-ci.txt index ecce41f8ea7..87f62e65a10 100644 --- a/.ci/docker/requirements-ci.txt +++ b/.ci/docker/requirements-ci.txt @@ -371,3 +371,8 @@ pulp==2.9.0 ; python_version >= "3.8" #Description: required for testing ilp formulaiton under torch/distributed/_tools #Pinned versions: 2.9.0 #test that import: test_sac_ilp.py + +dataclasses_json==0.6.7 +#Description: required for data pipeline and scripts under tools/stats +#Pinned versions: 0.6.7 +#test that import: diff --git a/.github/workflows/_linux-test.yml b/.github/workflows/_linux-test.yml index e33545369be..a843525a870 100644 --- a/.github/workflows/_linux-test.yml +++ b/.github/workflows/_linux-test.yml @@ -155,13 +155,25 @@ jobs: nvidia-smi if: ${{ contains(matrix.runner, 'a100') && steps.check_container_runner.outputs.IN_CONTAINER_RUNNER == 'false' }} + - name: Get workflow job id + id: get-job-id + uses: ./.github/actions/get-workflow-job-id + if: always() + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + - name: Start monitoring script id: monitor-script if: ${{ !inputs.disable-monitor }} shell: bash continue-on-error: true + env: + JOB_ID: ${{ steps.get-job-id.outputs.job-id }} + JOB_NAME: ${{ steps.get-job-id.outputs.job-name }} + WORKFLOW_NAME: ${{ github.workflow }} + WORKFLOW_RUN_ID: ${{github.run_id}} run: | - python3 -m pip install psutil==5.9.1 nvidia-ml-py==11.525.84 + python3 -m pip install psutil==5.9.1 nvidia-ml-py==11.525.84 dataclasses_json==0.6.7 python3 -m tools.stats.monitor > usage_log.txt 2>&1 & echo "monitor-script-pid=${!}" >> "${GITHUB_OUTPUT}" @@ -180,13 +192,6 @@ jobs: id: parse-ref run: .github/scripts/parse_ref.py - - name: Get workflow job id - id: get-job-id - uses: ./.github/actions/get-workflow-job-id - if: always() - with: - github-token: ${{ secrets.GITHUB_TOKEN }} - - name: Check for keep-going label and re-enabled test issues # This uses the filter-test-configs action because it conviniently # checks for labels and re-enabled test issues. It does not actually do diff --git a/.lintrunner.toml b/.lintrunner.toml index a58ccde1452..148363a5e64 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -161,6 +161,7 @@ init_command = [ 'rich==10.9.0', 'pyyaml==6.0.1', 'optree==0.13.0', + 'dataclasses_json==0.6.7' ] [[linter]] diff --git a/tools/stats/monitor.py b/tools/stats/monitor.py index 34ab626a456..22f32f7f75d 100644 --- a/tools/stats/monitor.py +++ b/tools/stats/monitor.py @@ -19,11 +19,17 @@ Usage: from __future__ import annotations +import os +import sys + + +# adding sys.path makes the monitor script able to import path tools.stats.utilization_stats_lib +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) import argparse import copy import dataclasses import datetime -import json +import os import signal import threading import time @@ -32,11 +38,23 @@ from typing import Any import psutil # type: ignore[import] +from tools.stats.utilization_stats_lib import ( + getDataModelVersion, + GpuUsage, + RecordData, + UtilizationMetadata, + UtilizationRecord, + UtilizationStats, +) + _HAS_PYNVML = False _HAS_AMDSMI = False -_DATA_MODEL_VERSION = 1.0 +_job_name = os.environ.get("JOB_NAME", "") +_job_id = os.environ.get("JOB_ID", "") +_workflow_run_id = os.environ.get("WORKFLOW_RUN_ID", "") +_workflow_name = os.environ.get("WORKFLOW_NAME", "") @dataclasses.dataclass @@ -164,16 +182,20 @@ class UsageLogger: in a pretty format with more information. """ self._log_interval = log_interval - self._summary_info = { - "level": "metadata", - "interval": self._log_interval, - "data_model_version": _DATA_MODEL_VERSION, - } + self._metadata = UtilizationMetadata( + level="metadata", + usage_collect_interval=self._log_interval, + data_model_version=getDataModelVersion(), + job_id=_job_id, + job_name=_job_name, + workflow_id=_workflow_run_id, + workflow_name=_workflow_name, + ) self._data_collect_interval = data_collect_interval self._has_pynvml = pynvml_enabled self._has_amdsmi = amdsmi_enabled self._gpu_handles: list[Any] = [] - self._gpu_libs_detected: list[str] = [] + self._gpu_lib_detected: str = "" self._num_of_cpus = 0 self._debug_mode = is_debug_mode self._initial_gpu_handler() @@ -210,31 +232,32 @@ class UsageLogger: finally: time.sleep(self._data_collect_interval) - def _generate_stats(self, data_list: list[float]) -> dict[str, Any]: + def _generate_stats(self, data_list: list[float]) -> UtilizationStats: """ Generate stats from the data list. """ if len(data_list) == 0: - return {} + return UtilizationStats() total = sum(data_list) avg = total / len(data_list) maxi = max(data_list) - return { - "avg": round(avg, 2), - "max": round(maxi, 2), - } + + return UtilizationStats( + avg=round(avg, 2), + max=round(maxi, 2), + ) def _output_data(self) -> None: """ output the data. """ - self._summary_info["start_time"] = datetime.datetime.now().timestamp() - self.log_json(self._summary_info) + self._metadata.start_at = datetime.datetime.now().timestamp() + self.log_json(self._metadata.to_json()) while not self.exit_event.is_set(): collecting_start_time = time.time() - stats = {} + stats = UtilizationRecord() try: data_list, error_list = self.shared_resource.get_and_reset() if self._debug_mode: @@ -252,13 +275,8 @@ class UsageLogger: if not data_list: # pass since no data is collected continue - - stats.update( - { - "level": "record", - "time": datetime.datetime.now().timestamp(), - } - ) + stats.level = "record" + stats.timestamp = datetime.datetime.now().timestamp() cpu_stats = self._generate_stats( [data.cpu_percent for data in data_list] @@ -271,43 +289,35 @@ class UsageLogger: cmds = { process["cmd"] for data in data_list for process in data.processes } - stats.update( - { - "cpu": cpu_stats, - "memory": memory_stats, - "cmds": list(cmds), - "count": len(data_list), - } - ) + + stats.cmd_names = list(cmds) + record = RecordData() + record.cpu = cpu_stats + record.memory = memory_stats # collect gpu metrics if self._has_pynvml or self._has_amdsmi: gpu_list = self._calculate_gpu_utilization(data_list) - stats.update( - { - "gpu_list": gpu_list, - } - ) + record.gpu_usage = gpu_list + stats.data = record except Exception as e: - stats = { - "level": "record", - "time": datetime.datetime.now().timestamp(), - "error": str(e), - } + stats = UtilizationRecord( + level="record", + timestamp=datetime.datetime.now().timestamp(), + error=str(e), + ) finally: collecting_end_time = time.time() time_diff = collecting_end_time - collecting_start_time # verify there is data - if stats: - stats["log_duration"] = f"{time_diff * 1000:.2f} ms" - self.log_json(stats) + if stats.level: + stats.log_duration = f"{time_diff * 1000:.2f} ms" + self.log_json(stats.to_json()) time.sleep(self._log_interval) # shut down gpu connections when exiting self._shutdown_gpu_connections() - def _calculate_gpu_utilization( - self, data_list: list[UsageData] - ) -> list[dict[str, Any]]: + def _calculate_gpu_utilization(self, data_list: list[UsageData]) -> list[GpuUsage]: """ Calculates the GPU utilization. """ @@ -324,11 +334,11 @@ class UsageLogger: gpu_util_stats = self._generate_stats(gpu_utilization[gpu_uuid]) gpu_mem_util_stats = self._generate_stats(gpu_mem_utilization[gpu_uuid]) calculate_gpu.append( - { - "uuid": gpu_uuid, - "util_percent": gpu_util_stats, - "mem_util_percent": gpu_mem_util_stats, - } + GpuUsage( + uuid=gpu_uuid, + util_percent=gpu_util_stats, + mem_util_percent=gpu_mem_util_stats, + ) ) return calculate_gpu @@ -348,10 +358,7 @@ class UsageLogger: """ Logs the stats in json format to stdout. """ - if self._debug_mode: - print(json.dumps(stats, indent=4)) - return - print(json.dumps(stats)) + print(stats) def _collect_gpu_data(self) -> list[GpuData]: gpu_data_list = [] @@ -391,7 +398,7 @@ class UsageLogger: """ try: if self._has_pynvml: - self._gpu_libs_detected.append("pynvml") + self._gpu_lib_detected = "pynvml" # Todo: investigate if we can use device uuid instead of index. # there is chance that the gpu index can change when the gpu is rebooted. self._gpu_handles = [ @@ -399,20 +406,17 @@ class UsageLogger: for i in range(pynvml.nvmlDeviceGetCount()) ] if self._has_amdsmi: - self._gpu_libs_detected.append("amdsmi") + self._gpu_lib_detected = "amdsmi" self._gpu_handles = amdsmi.amdsmi_get_processor_handles() self._num_of_cpus = psutil.cpu_count(logical=False) # update summary info - self._summary_info.update( - { - "gpu_libs_detected": self._gpu_libs_detected, - "num_of_gpus": len(self._gpu_handles), - "num_of_cpus": self._num_of_cpus, - } - ) + self._metadata.gpu_type = self._gpu_lib_detected + self._metadata.gpu_count = len(self._gpu_handles) + self._metadata.cpu_count = self._num_of_cpus + except Exception as e: - self._summary_info["error"] = str(e) + self._metadata.error = str(e) def _shutdown_gpu_connections(self) -> None: if self._has_amdsmi: diff --git a/tools/stats/utilization_stats_lib.py b/tools/stats/utilization_stats_lib.py new file mode 100644 index 00000000000..c0d6506c8cf --- /dev/null +++ b/tools/stats/utilization_stats_lib.py @@ -0,0 +1,60 @@ +from dataclasses import dataclass +from typing import Optional + +from dataclasses_json import DataClassJsonMixin + + +_DATA_MODEL_VERSION = 1.0 + + +# the db schema related to this is: +# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql +# data model for test log usage +@dataclass +class UtilizationStats: + avg: Optional[float] = None + max: Optional[float] = None + + +@dataclass +class UtilizationMetadata(DataClassJsonMixin): + level: Optional[str] = None + workflow_id: Optional[str] = None + job_id: Optional[str] = None + workflow_name: Optional[str] = None + job_name: Optional[str] = None + usage_collect_interval: Optional[float] = None + data_model_version: Optional[float] = None + gpu_count: Optional[int] = None + cpu_count: Optional[int] = None + gpu_type: Optional[str] = None + start_at: Optional[float] = None + error: Optional[str] = None + + +@dataclass +class GpuUsage(DataClassJsonMixin): + uuid: Optional[str] = None + util_percent: Optional[UtilizationStats] = None + mem_util_percent: Optional[UtilizationStats] = None + + +@dataclass +class RecordData(DataClassJsonMixin): + cpu: Optional[UtilizationStats] = None + memory: Optional[UtilizationStats] = None + gpu_usage: Optional[list[GpuUsage]] = None + + +@dataclass +class UtilizationRecord(DataClassJsonMixin): + level: Optional[str] = None + timestamp: Optional[float] = None + data: Optional[RecordData] = None + cmd_names: Optional[list[str]] = None + error: Optional[str] = None + log_duration: Optional[str] = None + + +def getDataModelVersion() -> float: + return _DATA_MODEL_VERSION