Make emit_metrics importable without having boto3 installed (#107070)

Make it so that scripts can import and run the `emit_metrics` function even if they don't have boto3 installed, in which case it will still validate the inputs but skip the actual metric emission part.

It's purely a refactor without any real logic changes

Motivation: So that run_test.py and the target determination code can use this library easily without worrying about if it was imported or if it's dependencies are installed.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/107070
Approved by: https://github.com/huydhn
This commit is contained in:
Zain Rizvi 2023-08-21 12:49:22 -05:00 committed by PyTorch MergeBot
parent 3920ce2f6e
commit 5ddb8ef827
6 changed files with 151 additions and 129 deletions

View File

@ -41,7 +41,7 @@ try:
# using tools/ to optimize test run.
sys.path.insert(0, str(REPO_ROOT))
from tools.stats.export_test_times import TEST_TIMES_FILE
from tools.stats.upload_stats_lib import emit_metric
from tools.stats.upload_metrics import emit_metric
from tools.testing.test_selections import (
calculate_shards,
get_reordered_tests,

View File

@ -0,0 +1,144 @@
import datetime
import inspect
import os
import time
import uuid
from decimal import Decimal
from typing import Any, Dict
from warnings import warn
# boto3 is an optional dependency. If it's not installed,
# we'll just not emit the metrics.
# Keeping this logic here so that callers don't have to
# worry about it.
EMIT_METRICS = False
try:
import boto3 # type: ignore[import]
EMIT_METRICS = True
except ImportError as e:
print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}")
class EnvVarMetric:
name: str
env_var: str
required: bool = True
# Used to cast the value of the env_var to the correct type (defaults to str)
type_conversion_fn: Any = None
def __init__(
self,
name: str,
env_var: str,
required: bool = True,
type_conversion_fn: Any = None,
) -> None:
self.name = name
self.env_var = env_var
self.required = required
self.type_conversion_fn = type_conversion_fn
def value(self) -> Any:
value = os.environ.get(self.env_var)
if value is None and self.required:
raise ValueError(
f"Missing {self.name}. Please set the {self.env_var} "
"environment variable to pass in this value."
)
if self.type_conversion_fn:
return self.type_conversion_fn(value)
return value
def emit_metric(
metric_name: str,
metrics: Dict[str, Any],
) -> None:
"""
Upload a metric to DynamoDB (and from there, Rockset).
Even if EMIT_METRICS is set to False, this function will still run the code to
validate and shape the metrics, skipping just the upload.
Parameters:
metric_name:
Name of the metric. Every unique metric should have a different name
and be emitted just once per run attempt.
Metrics are namespaced by their module and the function that emitted them.
metrics: The actual data to record.
Some default values are populated from environment variables, which must be set
for metrics to be emitted. (If they're not set, this function becomes a noop):
"""
if metrics is None:
raise ValueError("You didn't ask to upload any metrics!")
# We use these env vars that to determine basic info about the workflow run.
# By using env vars, we don't have to pass this info around to every function.
# It also helps ensure that we only emit metrics during CI
env_var_metrics = [
EnvVarMetric("repo", "GITHUB_REPOSITORY"),
EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
EnvVarMetric("build_environment", "BUILD_ENVIRONMENT"),
EnvVarMetric("job", "GITHUB_JOB"),
EnvVarMetric("test_config", "TEST_CONFIG", required=False),
EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
]
# Use info about the function that invoked this one as a namespace and a way to filter metrics.
calling_frame = inspect.currentframe().f_back # type: ignore[union-attr]
calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type]
calling_file = os.path.basename(calling_frame_info.filename)
calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr]
calling_function = calling_frame_info.function
try:
reserved_metrics = {
"metric_name": metric_name,
"calling_file": calling_file,
"calling_module": calling_module,
"calling_function": calling_function,
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
**{m.name: m.value() for m in env_var_metrics},
}
except ValueError as e:
warn(f"Not emitting metrics. {e}")
return
# Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
reserved_metrics[
"dynamo_key"
] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
# Ensure the metrics dict doesn't contain any reserved keys
for key in reserved_metrics.keys():
used_reserved_keys = [k for k in metrics.keys() if k == key]
if used_reserved_keys:
raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
# boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
metrics = _convert_float_values_to_decimals(metrics)
if EMIT_METRICS:
try:
session = boto3.Session(region_name="us-east-1")
session.resource("dynamodb").Table("torchci-metrics").put_item(
Item={
**reserved_metrics,
**metrics,
}
)
except Exception as e:
# We don't want to fail the job if we can't upload the metric.
# We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
warn(f"Error uploading metric to DynamoDB: {e}")
return
def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]:
return {k: Decimal(str(v)) if isinstance(v, float) else v for k, v in data.items()}

View File

@ -1,17 +1,11 @@
import datetime
import gzip
import inspect
import io
import json
import os
import time
import uuid
import zipfile
from decimal import Decimal
from pathlib import Path
from typing import Any, Dict, List
from warnings import warn
import boto3 # type: ignore[import]
import requests
@ -234,122 +228,3 @@ def is_rerun_disabled_tests(tests: Dict[str, Dict[str, int]]) -> bool:
t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE
for t in tests.values()
)
def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]:
return {k: Decimal(str(v)) if isinstance(v, float) else v for k, v in data.items()}
class EnvVarMetric:
name: str
env_var: str
required: bool = True
# Used to cast the value of the env_var to the correct type (defaults to str)
type_conversion_fn: Any = None
def __init__(
self,
name: str,
env_var: str,
required: bool = True,
type_conversion_fn: Any = None,
) -> None:
self.name = name
self.env_var = env_var
self.required = required
self.type_conversion_fn = type_conversion_fn
def value(self) -> Any:
value = os.environ.get(self.env_var)
if value is None and self.required:
raise ValueError(
f"Missing {self.name}. Please set the {self.env_var} "
"environment variable to pass in this value."
)
if self.type_conversion_fn:
return self.type_conversion_fn(value)
return value
def emit_metric(
metric_name: str,
metrics: Dict[str, Any],
) -> None:
"""
Upload a metric to DynamoDB (and from there, Rockset).
Parameters:
metric_name:
Name of the metric. Every unique metric should have a different name
and be emitted just once per run attempt.
Metrics are namespaced by their module and the function that emitted them.
metrics: The actual data to record.
Some default values are populated from environment variables, which must be set
for metrics to be emitted. (If they're not set, this function becomes a noop):
"""
if metrics is None:
raise ValueError("You didn't ask to upload any metrics!")
# We use these env vars that to determine basic info about the workflow run.
# By using env vars, we don't have to pass this info around to every function.
# It also helps ensure that we only emit metrics during CI
env_var_metrics = [
EnvVarMetric("repo", "GITHUB_REPOSITORY"),
EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
EnvVarMetric("build_environment", "BUILD_ENVIRONMENT"),
EnvVarMetric("job", "GITHUB_JOB"),
EnvVarMetric("test_config", "TEST_CONFIG", required=False),
EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
]
# Use info about the function that invoked this one as a namespace and a way to filter metrics.
calling_frame = inspect.currentframe().f_back # type: ignore[union-attr]
calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type]
calling_file = os.path.basename(calling_frame_info.filename)
calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr]
calling_function = calling_frame_info.function
try:
reserved_metrics = {
"metric_name": metric_name,
"calling_file": calling_file,
"calling_module": calling_module,
"calling_function": calling_function,
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
**{m.name: m.value() for m in env_var_metrics},
}
except ValueError as e:
warn(f"Not emitting metrics. {e}")
return
# Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
reserved_metrics[
"dynamo_key"
] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
# Ensure the metrics dict doesn't contain any reserved keys
for key in reserved_metrics.keys():
used_reserved_keys = [k for k in metrics.keys() if k == key]
if used_reserved_keys:
raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
# boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
metrics = _convert_float_values_to_decimals(metrics)
try:
session = boto3.Session(region_name="us-east-1")
session.resource("dynamodb").Table("torchci-metrics").put_item(
Item={
**reserved_metrics,
**metrics,
}
)
except Exception as e:
# We don't want to fail the job if we can't upload the metric.
# We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
warn(f"Error uploading metric to DynamoDB: {e}")
return

View File

@ -4,7 +4,10 @@ import unittest
from typing import Any, Dict
from unittest import mock
from tools.stats.upload_stats_lib import BATCH_SIZE, emit_metric, upload_to_rockset
from tools.stats.upload_metrics import emit_metric
from tools.stats.upload_stats_lib import BATCH_SIZE, upload_to_rockset
# default values
REPO = "some/repo"

View File

@ -1,6 +1,6 @@
from typing import List
from tools.stats.upload_stats_lib import emit_metric
from tools.stats.upload_metrics import emit_metric
from tools.testing.target_determination.heuristics import (
HEURISTICS,

View File

@ -13,7 +13,7 @@ from tools.shared.logging_utils import duration_to_str, pluralize
from tools.stats.export_test_times import TEST_FILE_RATINGS_FILE
from tools.stats.import_test_stats import get_disabled_tests, get_slow_tests
from tools.stats.upload_stats_lib import emit_metric
from tools.stats.upload_metrics import emit_metric
REPO_ROOT = Path(__file__).resolve().parent.parent.parent