[Utilization] Convert timestamp to str for datetime64 (#145985)

Convert all timestamp(float) to int  timestamp during data pipeline for db type datetime64.
float does not work when try to insert into clickhouse using jsonExtract.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/145985
Approved by: https://github.com/huydhn
This commit is contained in:
Yang Wang 2025-02-03 21:05:15 +00:00 committed by PyTorch MergeBot
parent 1d4adf4e1f
commit fd73ae2068
4 changed files with 46 additions and 36 deletions

View File

@ -28,7 +28,6 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
import argparse
import copy
import dataclasses
import datetime
import os
import signal
import threading
@ -40,6 +39,7 @@ import psutil # type: ignore[import]
from tools.stats.utilization_stats_lib import (
getDataModelVersion,
getTsNow,
GpuUsage,
RecordData,
UtilizationMetadata,
@ -190,7 +190,7 @@ class UsageLogger:
job_name=_job_name,
workflow_id=_workflow_run_id,
workflow_name=_workflow_name,
start_at=datetime.datetime.now().timestamp(),
start_at=getTsNow(),
)
self._data_collect_interval = data_collect_interval
self._has_pynvml = pynvml_enabled
@ -253,14 +253,14 @@ class UsageLogger:
"""
output the data.
"""
self._metadata.start_at = datetime.datetime.now().timestamp()
self._metadata.start_at = getTsNow()
self.log_json(self._metadata.to_json())
while not self.exit_event.is_set():
collecting_start_time = time.time()
stats = UtilizationRecord(
level="record",
timestamp=datetime.datetime.now().timestamp(),
timestamp=getTsNow(),
)
try:
@ -306,7 +306,7 @@ class UsageLogger:
except Exception as e:
stats = UtilizationRecord(
level="record",
timestamp=datetime.datetime.now().timestamp(),
timestamp=getTsNow(),
error=str(e),
)
finally:

View File

@ -22,12 +22,13 @@ TEST_DT_PLUS_30S = TEST_DT_BASE + timedelta(seconds=30)
TEST_DT_PLUS_40S = TEST_DT_BASE + timedelta(seconds=40)
# timestamps from January 1, 2022 12:00:00
TEST_TS_BASE = TEST_DT_BASE.timestamp()
TEST_TS_PLUS_5S = TEST_DT_PLUS_5S.timestamp()
TEST_TS_PLUS_10S = TEST_DT_PLUS_10S.timestamp()
TEST_TS_PLUS_15S = TEST_DT_PLUS_15S.timestamp()
TEST_TS_PLUS_30S = TEST_DT_PLUS_30S.timestamp()
TEST_TS_PLUS_40S = TEST_DT_PLUS_40S.timestamp()
TEST_TS_BASE = int(TEST_DT_BASE.timestamp())
TEST_TS_PLUS_5S = int(TEST_DT_PLUS_5S.timestamp())
TEST_TS_PLUS_10S = int(TEST_DT_PLUS_10S.timestamp())
TEST_TS_PLUS_15S = int(TEST_DT_PLUS_15S.timestamp())
TEST_TS_PLUS_30S = int(TEST_DT_PLUS_30S.timestamp())
TEST_TS_PLUS_40S = int(TEST_DT_PLUS_40S.timestamp())
# test cmd names
PYTEST1_NAME = "python test1.py"
@ -95,12 +96,12 @@ class TestSegmentGenerator(unittest.TestCase):
test_gap_dt1 = TEST_DT_PLUS_30S + timedelta(seconds=80)
test_gap_dt2 = TEST_DT_PLUS_30S + timedelta(seconds=85)
record_gap_1 = UtilizationRecord(
timestamp=test_gap_dt1.timestamp(),
timestamp=int(test_gap_dt1.timestamp()),
cmd_names=[PYTEST1_NAME],
level="PYTHON_CMD",
)
record_gap_2 = UtilizationRecord(
timestamp=test_gap_dt2.timestamp(),
timestamp=int(test_gap_dt2.timestamp()),
cmd_names=[PYTEST1_NAME],
level="PYTHON_CMD",
)
@ -193,3 +194,12 @@ def get_base_test_records() -> list[UtilizationRecord]:
if __name__ == "__main__":
unittest.main()
def getTimestampStr(timestamp: float) -> str:
return f"{timestamp:.0f}"
def getCurrentTimestampStr() -> str:
timestamp_now = datetime.now().timestamp()
return getTimestampStr(timestamp_now)

View File

@ -13,13 +13,13 @@ import argparse
import json
import zipfile
from dataclasses import asdict
from datetime import datetime, timezone
from typing import Any, Optional
import pandas as pd # type: ignore[import]
from tools.stats.upload_stats_lib import download_s3_artifacts, upload_to_s3
from tools.stats.utilization_stats_lib import (
getDataModelVersion,
getTsNow,
OssCiSegmentV1,
OssCiUtilizationMetadataV1,
OssCiUtilizationTimeSeriesV1,
@ -58,7 +58,7 @@ class SegmentGenerator:
for process in (record.cmd_names or [])
]
)
df[time_col_name] = pd.to_datetime(df[time_col_name], unit="s")
df[time_col_name] = pd.to_datetime(df[time_col_name], unit="s", utc=True)
# get unique cmd names
unique_cmds_df = pd.DataFrame(df[cmd_col_name].unique(), columns=[cmd_col_name])
@ -80,8 +80,8 @@ class SegmentGenerator:
segment = OssCiSegmentV1(
level=CMD_PYTHON_LEVEL,
name=value,
start_at=row["start_time"].timestamp(),
end_at=row["end_time"].timestamp(),
start_at=int(row["start_time"].timestamp()),
end_at=int(row["end_time"].timestamp()),
extra_info={},
)
segments.append(segment)
@ -124,7 +124,7 @@ class UtilizationDbConverter:
self.metadata = metadata
self.records = records
self.segments = segments
self.created_at = datetime.now().timestamp()
self.created_at = getTsNow()
self.info = info
end_time_stamp = max([record.timestamp for record in records])
self.end_at = end_time_stamp
@ -176,7 +176,7 @@ class UtilizationDbConverter:
job_id=self.info.job_id,
workflow_name=self.info.workflow_name,
job_name=self.info.job_name,
json_data=str(asdict(record.data) if record.data else {}),
json_data=str(record.data.to_json() if record.data else {}),
)
@ -367,12 +367,6 @@ def unzip_file(path: Path, file_name: str) -> str:
return ""
def get_datetime_string(timestamp: float) -> str:
dt = datetime.fromtimestamp(timestamp, timezone.utc)
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S.%f")
return dt_str
def parse_args() -> argparse.Namespace:
"""
Parse command line arguments.

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from dataclasses_json import DataClassJsonMixin
@ -23,7 +24,7 @@ class UtilizationMetadata(DataClassJsonMixin):
job_name: str
usage_collect_interval: float
data_model_version: float
start_at: float
start_at: int
gpu_count: Optional[int] = None
cpu_count: Optional[int] = None
gpu_type: Optional[str] = None
@ -47,27 +48,27 @@ class RecordData(DataClassJsonMixin):
@dataclass
class UtilizationRecord(DataClassJsonMixin):
level: str
timestamp: float
timestamp: int
data: Optional[RecordData] = None
cmd_names: Optional[list[str]] = None
error: Optional[str] = None
log_duration: Optional[str] = None
# the db schema related to this is:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql
@dataclass
class OssCiSegmentV1(DataClassJsonMixin):
level: str
name: str
start_at: float
end_at: float
start_at: int
end_at: int
extra_info: dict[str, str]
# the db schema related to this is:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql
@dataclass
class OssCiUtilizationMetadataV1:
created_at: float
created_at: int
repo: str
workflow_id: int
run_attempt: int
@ -79,8 +80,8 @@ class OssCiUtilizationMetadataV1:
gpu_count: int
cpu_count: int
gpu_type: str
start_at: float
end_at: float
start_at: int
end_at: int
segments: list[OssCiSegmentV1]
tags: list[str] = field(default_factory=list)
@ -89,10 +90,10 @@ class OssCiUtilizationMetadataV1:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_time_series_schema.sql
@dataclass
class OssCiUtilizationTimeSeriesV1:
created_at: float
created_at: int
type: str
tags: list[str]
time_stamp: float
time_stamp: int
repo: str
workflow_id: int
run_attempt: int
@ -106,6 +107,11 @@ def getDataModelVersion() -> float:
return _DATA_MODEL_VERSION
def getTsNow() -> int:
ts = datetime.now().timestamp()
return int(ts)
@dataclass
class WorkflowInfo:
workflow_run_id: int