[utilization] pipeline to create clean db records (#145327)

upload_utilization_script to generate db-ready-insert records to s3
- generate two files: metadata and timeseries in ossci-utilization buckets
- convert log record to db format ones
- add unit test job for tools/stats/

Related Prs:
setup composite action for data pipeline: https://github.com/pytorch/pytorch/pull/145310
add permission for composite action to access S3 bucket: https://github.com/pytorch-labs/pytorch-gha-infra/pull/595
add insert logic in s3 replicator: https://github.com/pytorch/test-infra/pull/6217
Pull Request resolved: https://github.com/pytorch/pytorch/pull/145327
Approved by: https://github.com/huydhn

Co-authored-by: Huy Do <huydhn@gmail.com>
This commit is contained in:
Yang Wang 2025-01-29 23:48:47 +00:00 committed by PyTorch MergeBot
parent 18a7a04c4a
commit a9ed7bd78e
7 changed files with 738 additions and 20 deletions

View File

@ -1,4 +1,5 @@
name: Lint name: Lint
# Workflow that runs lint checks and also unittests for tools, and scripts.
on: on:
pull_request: pull_request:
@ -207,6 +208,7 @@ jobs:
conda activate "${CONDA_ENV}" conda activate "${CONDA_ENV}"
# Test tools # Test tools
PYTHONPATH=$(pwd) pytest tools/stats
PYTHONPATH=$(pwd) pytest tools/test -o "python_files=test*.py" PYTHONPATH=$(pwd) pytest tools/test -o "python_files=test*.py"
PYTHONPATH=$(pwd) pytest .github/scripts -o "python_files=test*.py" PYTHONPATH=$(pwd) pytest .github/scripts -o "python_files=test*.py"

View File

@ -161,7 +161,8 @@ init_command = [
'rich==10.9.0', 'rich==10.9.0',
'pyyaml==6.0.1', 'pyyaml==6.0.1',
'optree==0.13.0', 'optree==0.13.0',
'dataclasses_json==0.6.7' 'dataclasses_json==0.6.7',
'pandas==2.2.3',
] ]
[[linter]] [[linter]]

View File

@ -190,6 +190,7 @@ class UsageLogger:
job_name=_job_name, job_name=_job_name,
workflow_id=_workflow_run_id, workflow_id=_workflow_run_id,
workflow_name=_workflow_name, workflow_name=_workflow_name,
start_at=datetime.datetime.now().timestamp(),
) )
self._data_collect_interval = data_collect_interval self._data_collect_interval = data_collect_interval
self._has_pynvml = pynvml_enabled self._has_pynvml = pynvml_enabled
@ -257,7 +258,11 @@ class UsageLogger:
while not self.exit_event.is_set(): while not self.exit_event.is_set():
collecting_start_time = time.time() collecting_start_time = time.time()
stats = UtilizationRecord() stats = UtilizationRecord(
level="record",
timestamp=datetime.datetime.now().timestamp(),
)
try: try:
data_list, error_list = self.shared_resource.get_and_reset() data_list, error_list = self.shared_resource.get_and_reset()
if self._debug_mode: if self._debug_mode:
@ -275,8 +280,6 @@ class UsageLogger:
if not data_list: if not data_list:
# pass since no data is collected # pass since no data is collected
continue continue
stats.level = "record"
stats.timestamp = datetime.datetime.now().timestamp()
cpu_stats = self._generate_stats( cpu_stats = self._generate_stats(
[data.cpu_percent for data in data_list] [data.cpu_percent for data in data_list]

View File

@ -23,6 +23,9 @@ def get_s3_resource() -> Any:
return boto3.resource("s3") return boto3.resource("s3")
GHA_ARTIFACTS_BUCKET = "gha-artifacts"
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun # NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
# 2 more times # 2 more times
MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3 MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3
@ -84,16 +87,22 @@ def _download_artifact(
def download_s3_artifacts( def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int prefix: str,
workflow_run_id: int,
workflow_run_attempt: int,
job_id: Optional[int] = None,
) -> list[Path]: ) -> list[Path]:
bucket = get_s3_resource().Bucket("gha-artifacts") bucket = get_s3_resource().Bucket(GHA_ARTIFACTS_BUCKET)
objs = bucket.objects.filter( objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}" Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
) )
found_one = False found_one = False
paths = [] paths = []
for obj in objs: for obj in objs:
object_name = Path(obj.key).name
# target an artifact for a specific job_id if provided, otherwise skip the download.
if job_id is not None and str(job_id) not in object_name:
continue
found_one = True found_one = True
p = Path(Path(obj.key).name) p = Path(Path(obj.key).name)
print(f"Downloading {p}") print(f"Downloading {p}")

View File

@ -0,0 +1,195 @@
import os
import sys
import unittest
from collections import Counter
from datetime import datetime, timedelta
# adding sys.path makes the monitor script able to import path tools.stats.utilization_stats_lib
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from tools.stats.upload_utilization_stats.upload_utilization_stats import (
SegmentGenerator,
)
from tools.stats.utilization_stats_lib import OssCiSegmentV1, UtilizationRecord
# datetimes from January 1, 2022 12:00:00
TEST_DT_BASE = datetime(2022, 1, 1, 12, 0, 0)
TEST_DT_PLUS_5S = TEST_DT_BASE + timedelta(seconds=5)
TEST_DT_PLUS_10S = TEST_DT_BASE + timedelta(seconds=10)
TEST_DT_PLUS_15S = TEST_DT_BASE + timedelta(seconds=15)
TEST_DT_PLUS_30S = TEST_DT_BASE + timedelta(seconds=30)
TEST_DT_PLUS_40S = TEST_DT_BASE + timedelta(seconds=40)
# timestamps from January 1, 2022 12:00:00
TEST_TS_BASE = TEST_DT_BASE.timestamp()
TEST_TS_PLUS_5S = TEST_DT_PLUS_5S.timestamp()
TEST_TS_PLUS_10S = TEST_DT_PLUS_10S.timestamp()
TEST_TS_PLUS_15S = TEST_DT_PLUS_15S.timestamp()
TEST_TS_PLUS_30S = TEST_DT_PLUS_30S.timestamp()
TEST_TS_PLUS_40S = TEST_DT_PLUS_40S.timestamp()
# test cmd names
PYTEST1_NAME = "python test1.py"
PYTEST2_NAME = "python test2.py"
PYPIP_INSTALL_NAME = "python pip install install1"
class TestSegmentGenerator(unittest.TestCase):
def test_generate_empty_records(self) -> None:
records: list[UtilizationRecord] = []
# execute
generator = SegmentGenerator()
segments = generator.generate(records)
# assert
self.assertEqual(segments, [])
def test_generate_single_record(self) -> None:
record = UtilizationRecord(
timestamp=TEST_TS_BASE, cmd_names=[PYTEST1_NAME], level="PYTHON_CMD"
)
records = [record]
# execute
generator = SegmentGenerator()
segments = generator.generate(records)
# assert
self.assertEqual(len(segments), 1)
def test_generate_single_record_with_multiple_cmds(self) -> None:
record = UtilizationRecord(
timestamp=TEST_TS_BASE,
cmd_names=[PYTEST1_NAME, PYPIP_INSTALL_NAME],
level="PYTHON_CMD",
)
records = [record]
# execute
generator = SegmentGenerator()
segments = generator.generate(records)
# assert
self.assertEqual(len(segments), 2)
def test_generate_multiple_records(self) -> None:
records = get_base_test_records()
# execute
generator = SegmentGenerator()
segments = generator.generate(records)
# assert
self.assertEqual(len(segments), 2)
self.validate_segment(segments[0], PYTEST1_NAME, TEST_TS_BASE, TEST_TS_PLUS_30S)
self.validate_segment(
segments[1], PYPIP_INSTALL_NAME, TEST_TS_PLUS_10S, TEST_TS_PLUS_15S
)
def test_generate_cmd_interval_larger_than_default_threshold_setting(self) -> None:
records = get_base_test_records()
# record has more than 1 minute gap than last default record
test_gap_dt1 = TEST_DT_PLUS_30S + timedelta(seconds=80)
test_gap_dt2 = TEST_DT_PLUS_30S + timedelta(seconds=85)
record_gap_1 = UtilizationRecord(
timestamp=test_gap_dt1.timestamp(),
cmd_names=[PYTEST1_NAME],
level="PYTHON_CMD",
)
record_gap_2 = UtilizationRecord(
timestamp=test_gap_dt2.timestamp(),
cmd_names=[PYTEST1_NAME],
level="PYTHON_CMD",
)
records += [record_gap_1, record_gap_2]
# execute
generator = SegmentGenerator()
segments = generator.generate(records)
# assert
counter = Counter(seg.name for seg in segments)
self.assertEqual(counter[PYTEST1_NAME], 2)
self.assertEqual(counter[PYPIP_INSTALL_NAME], 1)
self.assertEqual(len(segments), 3)
self.validate_segment(segments[0], PYTEST1_NAME, TEST_TS_BASE, TEST_TS_PLUS_30S)
self.validate_segment(
segments[1],
PYTEST1_NAME,
test_gap_dt1.timestamp(),
test_gap_dt2.timestamp(),
)
self.validate_segment(
segments[2], PYPIP_INSTALL_NAME, TEST_TS_PLUS_10S, TEST_TS_PLUS_15S
)
def test_generate_multiple_segments_with_customized_threshold(self) -> None:
# set threshold to consider as continuous segment to 10 seconds
test_threshold = 10
records = get_base_test_records()
# execute
generator = SegmentGenerator()
segments = generator.generate(records, test_threshold)
# assert
counter = Counter(seg.name for seg in segments)
self.assertEqual(counter[PYTEST1_NAME], 2)
self.assertEqual(counter[PYPIP_INSTALL_NAME], 1)
self.assertEqual(len(segments), 3)
self.validate_segment(segments[0], PYTEST1_NAME, TEST_TS_BASE, TEST_TS_PLUS_15S)
self.validate_segment(
segments[1], PYTEST1_NAME, TEST_TS_PLUS_30S, TEST_TS_PLUS_30S
)
self.validate_segment(
segments[2], PYPIP_INSTALL_NAME, TEST_TS_PLUS_10S, TEST_TS_PLUS_15S
)
def validate_segment(
self, segment: OssCiSegmentV1, name: str, start_at: float, end_at: float
) -> None:
self.assertEqual(segment.name, name)
self.assertEqual(segment.start_at, start_at)
self.assertEqual(segment.end_at, end_at)
def get_base_test_records() -> list[UtilizationRecord]:
record1 = UtilizationRecord(
timestamp=TEST_TS_BASE, cmd_names=[PYTEST1_NAME], level="PYTHON_CMD"
)
record2 = UtilizationRecord(
timestamp=TEST_TS_PLUS_5S,
cmd_names=[PYTEST1_NAME],
level="PYTHON_CMD",
)
record3 = UtilizationRecord(
timestamp=TEST_TS_PLUS_10S,
cmd_names=[PYTEST1_NAME, PYPIP_INSTALL_NAME],
level="PYTHON_CMD",
)
record4 = UtilizationRecord(
timestamp=TEST_TS_PLUS_15S,
cmd_names=[PYTEST1_NAME, PYPIP_INSTALL_NAME],
level="PYTHON_CMD",
)
record5 = UtilizationRecord(
timestamp=TEST_TS_PLUS_30S,
cmd_names=[PYTEST1_NAME],
level="PYTHON_CMD",
)
record6 = UtilizationRecord(
timestamp=TEST_TS_PLUS_40S,
cmd_names=[],
level="PYTHON_CMD",
)
return [record1, record2, record3, record4, record5, record6]
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,452 @@
from __future__ import annotations
import os
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
import argparse
import json
import zipfile
from dataclasses import asdict
from datetime import datetime, timezone
from typing import Any, Optional
import pandas as pd # type: ignore[import]
from tools.stats.upload_stats_lib import download_s3_artifacts, upload_to_s3
from tools.stats.utilization_stats_lib import (
getDataModelVersion,
OssCiSegmentV1,
OssCiUtilizationMetadataV1,
OssCiUtilizationTimeSeriesV1,
UtilizationMetadata,
UtilizationRecord,
WorkflowInfo,
)
USAGE_LOG_FILENAME = "usage_log.txt"
CMD_PYTHON_LEVEL = "CMD_PYTHON"
UTILIZATION_BUCKET = "ossci-utilization"
PYTORCH_REPO = "pytorch/pytorch"
class SegmentGenerator:
"""
generates test segment from utilization records, currently it only generate segments on python commands level
segment_delta_threshold is the threshold to determine if a segment is continuous or not, default is 60 seconds.
"""
def generate(
self, records: list[UtilizationRecord], segment_delta_threshold: int = 60
) -> list[OssCiSegmentV1]:
if len(records) == 0:
return []
cmd_col_name = "cmd"
time_col_name = "time"
# flatten time series with detected cmds
df = pd.DataFrame(
[
{time_col_name: record.timestamp, cmd_col_name: process}
for record in records
for process in (record.cmd_names or [])
]
)
df[time_col_name] = pd.to_datetime(df[time_col_name], unit="s")
# get unique cmd names
unique_cmds_df = pd.DataFrame(df[cmd_col_name].unique(), columns=[cmd_col_name])
# get all detected python cmds
cmd_list = unique_cmds_df[
unique_cmds_df[cmd_col_name].str.startswith("python")
][cmd_col_name].tolist()
# find segments by screening continuoues time series data
segments: list[OssCiSegmentV1] = []
for value in cmd_list:
subset = df[df[cmd_col_name] == value].copy()
continuous_segments = self._find_continuous_windows(
segment_delta_threshold, time_col_name, subset
)
for row in continuous_segments:
segment = OssCiSegmentV1(
level=CMD_PYTHON_LEVEL,
name=value,
start_at=row["start_time"].timestamp(),
end_at=row["end_time"].timestamp(),
extra_info={},
)
segments.append(segment)
print(
f"[Db Segments] detected pytest cmd: {len(cmd_list)}, generated segments: {len(segments)}"
)
return segments
def _find_continuous_windows(
self,
threshold: int,
time_column_name: str,
df: Any, # the lintrunner keep complaining about the type of df, but it's not a problem
) -> list[dict[str, Any]]:
time_threshold = pd.Timedelta(seconds=threshold)
df = df.sort_values(by=time_column_name).reset_index(drop=True)
df["time_diff"] = df[time_column_name].diff()
df["segment"] = (df["time_diff"] > time_threshold).cumsum()
segments = (
df.groupby("segment")
.agg(
start_time=(time_column_name, "first"),
end_time=(time_column_name, "last"),
)
.reset_index(drop=True)
)
return segments[["start_time", "end_time"]].to_dict(orient="records") # type: ignore[no-any-return]
class UtilizationDbConverter:
"""convert utilization log model to db model"""
def __init__(
self,
info: WorkflowInfo,
metadata: UtilizationMetadata,
records: list[UtilizationRecord],
segments: list[OssCiSegmentV1],
):
self.metadata = metadata
self.records = records
self.segments = segments
self.created_at = datetime.now().timestamp()
self.info = info
end_time_stamp = max([record.timestamp for record in records])
self.end_at = end_time_stamp
def convert(
self,
) -> tuple[OssCiUtilizationMetadataV1, list[OssCiUtilizationTimeSeriesV1]]:
db_metadata = self._to_oss_ci_metadata()
timeseries = self._to_oss_ci_timeseries_list()
return db_metadata, timeseries
def _to_oss_ci_metadata(self) -> OssCiUtilizationMetadataV1:
return OssCiUtilizationMetadataV1(
repo=self.info.repo,
workflow_id=self.info.workflow_run_id,
run_attempt=self.info.run_attempt,
job_id=self.info.job_id,
workflow_name=self.info.workflow_name,
job_name=self.info.job_name,
usage_collect_interval=self.metadata.usage_collect_interval,
data_model_version=str(self.metadata.data_model_version),
created_at=self.created_at,
gpu_count=self.metadata.gpu_count if self.metadata.gpu_count else 0,
cpu_count=self.metadata.cpu_count if self.metadata.cpu_count else 0,
gpu_type=self.metadata.gpu_type if self.metadata.gpu_type else "",
start_at=self.metadata.start_at,
end_at=self.end_at,
segments=self.segments,
tags=[],
)
def _to_oss_ci_timeseries_list(self) -> list[OssCiUtilizationTimeSeriesV1]:
return [
self._to_oss_ci_time_series(record, type="utilization", tags=["record"])
for record in self.records
]
def _to_oss_ci_time_series(
self, record: UtilizationRecord, type: str, tags: list[str]
) -> OssCiUtilizationTimeSeriesV1:
return OssCiUtilizationTimeSeriesV1(
created_at=self.created_at,
type=type,
tags=tags,
time_stamp=record.timestamp,
repo=self.info.repo,
workflow_id=self.info.workflow_run_id,
run_attempt=self.info.run_attempt,
job_id=self.info.job_id,
workflow_name=self.info.workflow_name,
job_name=self.info.job_name,
json_data=str(asdict(record.data) if record.data else {}),
)
class UploadUtilizationData:
"""
main class to handle utilization data conversion and s3 upload
fetches raw log data from s3, convert to log model, then convert to db model, and upload to s3
"""
def __init__(
self,
info: WorkflowInfo,
dry_run: bool = False,
debug: bool = False,
):
self.info = info
self.segment_generator = SegmentGenerator()
self.debug_mode = debug
self.dry_run = dry_run
def start(self) -> None:
metadata, valid_records, _ = self.get_log_data(
self.info.workflow_run_id, self.info.job_id, self.info.run_attempt
)
if not metadata:
print("[Log Model] Failed to process test log, metadata is None")
return None
if len(valid_records) == 0:
print("[Log Model] Failed to process test log, no valid records")
return None
segments = self.segment_generator.generate(valid_records)
db_metadata, db_records = UtilizationDbConverter(
self.info, metadata, valid_records, segments
).convert()
print(
f"[db model] Peek db metadatga \n: {json.dumps(asdict(db_metadata), indent=4)}"
)
if len(db_records) > 0:
print(
f"[db model] Peek db timeseries \n:{json.dumps(asdict(db_records[0]), indent=4)}"
)
if self.dry_run:
print("[dry-run-mode]: no upload in dry run mode")
return
version = f"v_{db_metadata.data_model_version}"
metadata_collection = "util_metadata"
ts_collection = "util_timeseries"
if self.debug_mode:
metadata_collection = f"debug_{metadata_collection}"
ts_collection = f"debug_{ts_collection}"
self._upload_utilization_data_to_s3(
collection=metadata_collection,
version=version,
repo=self.info.repo,
workflow_run_id=self.info.workflow_run_id,
workflow_run_attempt=self.info.run_attempt,
job_id=self.info.job_id,
file_name="metadata",
docs=[asdict(db_metadata)],
)
self._upload_utilization_data_to_s3(
collection=ts_collection,
version=version,
repo=self.info.repo,
workflow_run_id=self.info.workflow_run_id,
workflow_run_attempt=self.info.run_attempt,
job_id=self.info.job_id,
file_name="time_series",
docs=[asdict(record) for record in db_records],
)
def _upload_utilization_data_to_s3(
self,
collection: str,
version: str,
repo: str,
workflow_run_id: int,
workflow_run_attempt: int,
job_id: int,
file_name: str,
docs: list[dict[str, Any]],
) -> None:
bucket_name = UTILIZATION_BUCKET
key = f"{collection}/{version}/{repo}/{workflow_run_id}/{workflow_run_attempt}/{job_id}/{file_name}"
upload_to_s3(bucket_name, key, docs)
def get_log_data(
self, workflow_run_id: int, job_id: int, workflow_run_attempt: int
) -> tuple[
Optional[UtilizationMetadata], list[UtilizationRecord], list[UtilizationRecord]
]:
artifact_paths = download_s3_artifacts(
"logs-test", workflow_run_id, workflow_run_attempt, job_id
)
if len(artifact_paths) == 0:
print(
f"Failed to download artifacts for workflow {workflow_run_id} and job {job_id}"
)
return None, [], []
elif len(artifact_paths) > 1:
print(
f"Found more than one artifact for workflow {workflow_run_id} and job {job_id}, {artifact_paths}"
)
return None, [], []
p = artifact_paths[0]
test_log_content = unzip_file(p, USAGE_LOG_FILENAME)
metadata, records, error_records = self.convert_to_log_models(test_log_content)
if metadata is None:
return None, [], []
print(f"Converted Log Model: UtilizationMetadata:\n {metadata}")
return metadata, records, error_records
def _process_raw_record(
self, line: str
) -> tuple[Optional[UtilizationRecord], bool]:
try:
record = UtilizationRecord.from_json(line)
if record.error:
return record, False
return record, True
except Exception as e:
print(f"Failed to parse JSON line: {e}")
return None, False
def _process_utilization_records(
self,
lines: list[str],
) -> tuple[list[UtilizationRecord], list[UtilizationRecord]]:
results = [self._process_raw_record(line) for line in lines[1:]]
valid_records = [
record for record, valid in results if valid and record is not None
]
invalid_records = [
record for record, valid in results if not valid and record is not None
]
return valid_records, invalid_records
def convert_to_log_models(
self,
content: str,
) -> tuple[
Optional[UtilizationMetadata], list[UtilizationRecord], list[UtilizationRecord]
]:
if not content:
return None, [], []
lines = content.splitlines()
metadata = None
if len(lines) < 2:
print("Expected at least two records from log file")
return None, [], []
print(f"[Raw Log] Peek raw metadata json: {lines[0]} \n")
print(f"[Raw Log] Peek raw record json: {lines[1]} \n")
try:
metadata = UtilizationMetadata.from_json(lines[0])
except Exception as e:
print(f":: warning Failed to parse metadata: {e} for data: {lines[0]}")
return None, [], []
if metadata.data_model_version != getDataModelVersion():
print(
f":: warning Data model version mismatch: {metadata.data_model_version} != {getDataModelVersion()}"
)
return None, [], []
result_logs, error_logs = self._process_utilization_records(lines)
return metadata, result_logs, error_logs
def unzip_file(path: Path, file_name: str) -> str:
try:
with zipfile.ZipFile(path) as zip_file:
# Read the desired file from the zip archive
return zip_file.read(name=file_name).decode()
except Exception as e:
print(f"::warning trying to download test log {object} failed by: {e}")
return ""
def get_datetime_string(timestamp: float) -> str:
dt = datetime.fromtimestamp(timestamp, timezone.utc)
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S.%f")
return dt_str
def parse_args() -> argparse.Namespace:
"""
Parse command line arguments.
Returns:
argparse.Namespace: Parsed arguments.
"""
parser = argparse.ArgumentParser(description="Upload test stats to s3")
parser.add_argument(
"--workflow-run-id",
type=int,
required=True,
help="id of the workflow to get artifacts from",
)
parser.add_argument(
"--workflow-run-attempt",
type=int,
required=True,
help="which retry of the workflow this is",
)
parser.add_argument(
"--workflow-name",
type=str,
required=True,
help="id of the workflow to get artifacts from",
)
parser.add_argument(
"--job-id",
type=int,
required=True,
help="id of the workflow to get artifacts from",
)
parser.add_argument(
"--job-name",
type=str,
required=True,
help="id of the workflow to get artifacts from",
)
parser.add_argument(
"--repo",
type=str,
required=False,
help="which GitHub repo this workflow run belongs to",
)
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--dry-run", action="store_true", help="Enable dry-run mode")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# Flush stdout so that any errors in the upload show up last in the logs.
sys.stdout.flush()
repo = PYTORCH_REPO
if args.repo:
repo = args.repo
print(f"repo: {repo}")
workflow_info = WorkflowInfo(
workflow_run_id=args.workflow_run_id,
run_attempt=args.workflow_run_attempt,
job_id=args.job_id,
workflow_name=args.workflow_name,
job_name=args.job_name,
repo=repo,
)
ud = UploadUtilizationData(
info=workflow_info,
dry_run=args.dry_run,
debug=args.debug,
)
ud.start()

View File

@ -1,4 +1,4 @@
from dataclasses import dataclass from dataclasses import dataclass, field
from typing import Optional from typing import Optional
from dataclasses_json import DataClassJsonMixin from dataclasses_json import DataClassJsonMixin
@ -7,8 +7,6 @@ from dataclasses_json import DataClassJsonMixin
_DATA_MODEL_VERSION = 1.0 _DATA_MODEL_VERSION = 1.0
# the db schema related to this is:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql
# data model for test log usage # data model for test log usage
@dataclass @dataclass
class UtilizationStats: class UtilizationStats:
@ -18,17 +16,17 @@ class UtilizationStats:
@dataclass @dataclass
class UtilizationMetadata(DataClassJsonMixin): class UtilizationMetadata(DataClassJsonMixin):
level: Optional[str] = None level: str
workflow_id: Optional[str] = None workflow_id: str
job_id: Optional[str] = None job_id: str
workflow_name: Optional[str] = None workflow_name: str
job_name: Optional[str] = None job_name: str
usage_collect_interval: Optional[float] = None usage_collect_interval: float
data_model_version: Optional[float] = None data_model_version: float
start_at: float
gpu_count: Optional[int] = None gpu_count: Optional[int] = None
cpu_count: Optional[int] = None cpu_count: Optional[int] = None
gpu_type: Optional[str] = None gpu_type: Optional[str] = None
start_at: Optional[float] = None
error: Optional[str] = None error: Optional[str] = None
@ -48,13 +46,71 @@ class RecordData(DataClassJsonMixin):
@dataclass @dataclass
class UtilizationRecord(DataClassJsonMixin): class UtilizationRecord(DataClassJsonMixin):
level: Optional[str] = None level: str
timestamp: Optional[float] = None timestamp: float
data: Optional[RecordData] = None data: Optional[RecordData] = None
cmd_names: Optional[list[str]] = None cmd_names: Optional[list[str]] = None
error: Optional[str] = None error: Optional[str] = None
log_duration: Optional[str] = None log_duration: Optional[str] = None
@dataclass
class OssCiSegmentV1(DataClassJsonMixin):
level: str
name: str
start_at: float
end_at: float
extra_info: dict[str, str]
# the db schema related to this is:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql
@dataclass
class OssCiUtilizationMetadataV1:
created_at: float
repo: str
workflow_id: int
run_attempt: int
job_id: int
workflow_name: str
job_name: str
usage_collect_interval: float
data_model_version: str
gpu_count: int
cpu_count: int
gpu_type: str
start_at: float
end_at: float
segments: list[OssCiSegmentV1]
tags: list[str] = field(default_factory=list)
# this data model is for the time series data:
# https://github.com/pytorch/test-infra/blob/main/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_time_series_schema.sql
@dataclass
class OssCiUtilizationTimeSeriesV1:
created_at: float
type: str
tags: list[str]
time_stamp: float
repo: str
workflow_id: int
run_attempt: int
job_id: int
workflow_name: str
job_name: str
json_data: str
def getDataModelVersion() -> float: def getDataModelVersion() -> float:
return _DATA_MODEL_VERSION return _DATA_MODEL_VERSION
@dataclass
class WorkflowInfo:
workflow_run_id: int
workflow_name: str
job_id: int
run_attempt: int
job_name: str
repo: str = "pytorch/pytorch"