diff --git a/tools/stats/monitor.py b/tools/stats/monitor.py index 2ffc3ba709f..46fdcf20cd3 100644 --- a/tools/stats/monitor.py +++ b/tools/stats/monitor.py @@ -28,7 +28,6 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) import argparse import copy import dataclasses -import datetime import os import signal import threading @@ -40,6 +39,7 @@ import psutil # type: ignore[import] from tools.stats.utilization_stats_lib import ( getDataModelVersion, + getTsNow, GpuUsage, RecordData, UtilizationMetadata, @@ -190,7 +190,7 @@ class UsageLogger: job_name=_job_name, workflow_id=_workflow_run_id, workflow_name=_workflow_name, - start_at=datetime.datetime.now().timestamp(), + start_at=getTsNow(), ) self._data_collect_interval = data_collect_interval self._has_pynvml = pynvml_enabled @@ -253,14 +253,14 @@ class UsageLogger: """ output the data. """ - self._metadata.start_at = datetime.datetime.now().timestamp() + self._metadata.start_at = getTsNow() self.log_json(self._metadata.to_json()) while not self.exit_event.is_set(): collecting_start_time = time.time() stats = UtilizationRecord( level="record", - timestamp=datetime.datetime.now().timestamp(), + timestamp=getTsNow(), ) try: @@ -306,7 +306,7 @@ class UsageLogger: except Exception as e: stats = UtilizationRecord( level="record", - timestamp=datetime.datetime.now().timestamp(), + timestamp=getTsNow(), error=str(e), ) finally: diff --git a/tools/stats/upload_utilization_stats/test_upload_utilization_stats.py b/tools/stats/upload_utilization_stats/test_upload_utilization_stats.py index 94c2bfc69e7..a719ff39b4e 100644 --- a/tools/stats/upload_utilization_stats/test_upload_utilization_stats.py +++ b/tools/stats/upload_utilization_stats/test_upload_utilization_stats.py @@ -22,12 +22,13 @@ TEST_DT_PLUS_30S = TEST_DT_BASE + timedelta(seconds=30) TEST_DT_PLUS_40S = TEST_DT_BASE + timedelta(seconds=40) # timestamps from January 1, 2022 12:00:00 -TEST_TS_BASE = TEST_DT_BASE.timestamp() -TEST_TS_PLUS_5S = TEST_DT_PLUS_5S.timestamp() -TEST_TS_PLUS_10S = TEST_DT_PLUS_10S.timestamp() -TEST_TS_PLUS_15S = TEST_DT_PLUS_15S.timestamp() -TEST_TS_PLUS_30S = TEST_DT_PLUS_30S.timestamp() -TEST_TS_PLUS_40S = TEST_DT_PLUS_40S.timestamp() +TEST_TS_BASE = int(TEST_DT_BASE.timestamp()) +TEST_TS_PLUS_5S = int(TEST_DT_PLUS_5S.timestamp()) +TEST_TS_PLUS_10S = int(TEST_DT_PLUS_10S.timestamp()) +TEST_TS_PLUS_15S = int(TEST_DT_PLUS_15S.timestamp()) +TEST_TS_PLUS_30S = int(TEST_DT_PLUS_30S.timestamp()) +TEST_TS_PLUS_40S = int(TEST_DT_PLUS_40S.timestamp()) + # test cmd names PYTEST1_NAME = "python test1.py" @@ -95,12 +96,12 @@ class TestSegmentGenerator(unittest.TestCase): test_gap_dt1 = TEST_DT_PLUS_30S + timedelta(seconds=80) test_gap_dt2 = TEST_DT_PLUS_30S + timedelta(seconds=85) record_gap_1 = UtilizationRecord( - timestamp=test_gap_dt1.timestamp(), + timestamp=int(test_gap_dt1.timestamp()), cmd_names=[PYTEST1_NAME], level="PYTHON_CMD", ) record_gap_2 = UtilizationRecord( - timestamp=test_gap_dt2.timestamp(), + timestamp=int(test_gap_dt2.timestamp()), cmd_names=[PYTEST1_NAME], level="PYTHON_CMD", ) @@ -193,3 +194,12 @@ def get_base_test_records() -> list[UtilizationRecord]: if __name__ == "__main__": unittest.main() + + +def getTimestampStr(timestamp: float) -> str: + return f"{timestamp:.0f}" + + +def getCurrentTimestampStr() -> str: + timestamp_now = datetime.now().timestamp() + return getTimestampStr(timestamp_now) diff --git a/tools/stats/upload_utilization_stats/upload_utilization_stats.py b/tools/stats/upload_utilization_stats/upload_utilization_stats.py index 49cd6081f6a..666cf7eb9b7 100644 --- a/tools/stats/upload_utilization_stats/upload_utilization_stats.py +++ b/tools/stats/upload_utilization_stats/upload_utilization_stats.py @@ -13,13 +13,13 @@ import argparse import json import zipfile from dataclasses import asdict -from datetime import datetime, timezone from typing import Any, Optional import pandas as pd # type: ignore[import] from tools.stats.upload_stats_lib import download_s3_artifacts, upload_to_s3 from tools.stats.utilization_stats_lib import ( getDataModelVersion, + getTsNow, OssCiSegmentV1, OssCiUtilizationMetadataV1, OssCiUtilizationTimeSeriesV1, @@ -58,7 +58,7 @@ class SegmentGenerator: for process in (record.cmd_names or []) ] ) - df[time_col_name] = pd.to_datetime(df[time_col_name], unit="s") + df[time_col_name] = pd.to_datetime(df[time_col_name], unit="s", utc=True) # get unique cmd names unique_cmds_df = pd.DataFrame(df[cmd_col_name].unique(), columns=[cmd_col_name]) @@ -80,8 +80,8 @@ class SegmentGenerator: segment = OssCiSegmentV1( level=CMD_PYTHON_LEVEL, name=value, - start_at=row["start_time"].timestamp(), - end_at=row["end_time"].timestamp(), + start_at=int(row["start_time"].timestamp()), + end_at=int(row["end_time"].timestamp()), extra_info={}, ) segments.append(segment) @@ -124,7 +124,7 @@ class UtilizationDbConverter: self.metadata = metadata self.records = records self.segments = segments - self.created_at = datetime.now().timestamp() + self.created_at = getTsNow() self.info = info end_time_stamp = max([record.timestamp for record in records]) self.end_at = end_time_stamp @@ -176,7 +176,7 @@ class UtilizationDbConverter: job_id=self.info.job_id, workflow_name=self.info.workflow_name, job_name=self.info.job_name, - json_data=str(asdict(record.data) if record.data else {}), + json_data=str(record.data.to_json() if record.data else {}), ) @@ -367,12 +367,6 @@ def unzip_file(path: Path, file_name: str) -> str: return "" -def get_datetime_string(timestamp: float) -> str: - dt = datetime.fromtimestamp(timestamp, timezone.utc) - dt_str = dt.strftime("%Y-%m-%d %H:%M:%S.%f") - return dt_str - - def parse_args() -> argparse.Namespace: """ Parse command line arguments. diff --git a/tools/stats/utilization_stats_lib.py b/tools/stats/utilization_stats_lib.py index deaa32a5caf..50bb9312c05 100644 --- a/tools/stats/utilization_stats_lib.py +++ b/tools/stats/utilization_stats_lib.py @@ -1,4 +1,5 @@ from dataclasses import dataclass, field +from datetime import datetime from typing import Optional from dataclasses_json import DataClassJsonMixin @@ -23,7 +24,7 @@ class UtilizationMetadata(DataClassJsonMixin): job_name: str usage_collect_interval: float data_model_version: float - start_at: float + start_at: int gpu_count: Optional[int] = None cpu_count: Optional[int] = None gpu_type: Optional[str] = None @@ -47,27 +48,27 @@ class RecordData(DataClassJsonMixin): @dataclass class UtilizationRecord(DataClassJsonMixin): level: str - timestamp: float + timestamp: int data: Optional[RecordData] = None cmd_names: Optional[list[str]] = None error: Optional[str] = None log_duration: Optional[str] = None +# the db schema related to this is: +# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql @dataclass class OssCiSegmentV1(DataClassJsonMixin): level: str name: str - start_at: float - end_at: float + start_at: int + end_at: int extra_info: dict[str, str] -# the db schema related to this is: -# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql @dataclass class OssCiUtilizationMetadataV1: - created_at: float + created_at: int repo: str workflow_id: int run_attempt: int @@ -79,8 +80,8 @@ class OssCiUtilizationMetadataV1: gpu_count: int cpu_count: int gpu_type: str - start_at: float - end_at: float + start_at: int + end_at: int segments: list[OssCiSegmentV1] tags: list[str] = field(default_factory=list) @@ -89,10 +90,10 @@ class OssCiUtilizationMetadataV1: # https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_time_series_schema.sql @dataclass class OssCiUtilizationTimeSeriesV1: - created_at: float + created_at: int type: str tags: list[str] - time_stamp: float + time_stamp: int repo: str workflow_id: int run_attempt: int @@ -106,6 +107,11 @@ def getDataModelVersion() -> float: return _DATA_MODEL_VERSION +def getTsNow() -> int: + ts = datetime.now().timestamp() + return int(ts) + + @dataclass class WorkflowInfo: workflow_run_id: int