import datetime import gzip import inspect import io import json import os import time import uuid import zipfile from decimal import Decimal from pathlib import Path from typing import Any, Dict, List from warnings import warn import boto3 # type: ignore[import] import requests import rockset # type: ignore[import] PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch" S3_RESOURCE = boto3.resource("s3") # NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun # 2 more times MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3 # NB: Rockset has an upper limit of 5000 documents in one request BATCH_SIZE = 5000 def _get_request_headers() -> Dict[str, str]: return { "Accept": "application/vnd.github.v3+json", "Authorization": "token " + os.environ["GITHUB_TOKEN"], } def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]: """Get all workflow artifacts with 'test-report' in the name.""" response = requests.get( f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100", ) artifacts = response.json()["artifacts"] while "next" in response.links.keys(): response = requests.get( response.links["next"]["url"], headers=_get_request_headers() ) artifacts.extend(response.json()["artifacts"]) artifact_urls = {} for artifact in artifacts: if artifact["name"].startswith(prefix): artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"] return artifact_urls def _download_artifact( artifact_name: Path, artifact_url: str, workflow_run_attempt: int ) -> Path: # [Artifact run attempt] # All artifacts on a workflow share a single namespace. However, we can # re-run a workflow and produce a new set of artifacts. To avoid name # collisions, we add `-runattempt1-` somewhere in the artifact name. # # This code parses out the run attempt number from the artifact name. If it # doesn't match the one specified on the command line, skip it. atoms = str(artifact_name).split("-") for atom in atoms: if atom.startswith("runattempt"): found_run_attempt = int(atom[len("runattempt") :]) if workflow_run_attempt != found_run_attempt: print( f"Skipping {artifact_name} as it is an invalid run attempt. " f"Expected {workflow_run_attempt}, found {found_run_attempt}." ) print(f"Downloading {artifact_name}") response = requests.get(artifact_url, headers=_get_request_headers()) with open(artifact_name, "wb") as f: f.write(response.content) return artifact_name def download_s3_artifacts( prefix: str, workflow_run_id: int, workflow_run_attempt: int ) -> List[Path]: bucket = S3_RESOURCE.Bucket("gha-artifacts") objs = bucket.objects.filter( Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}" ) found_one = False paths = [] for obj in objs: found_one = True p = Path(Path(obj.key).name) print(f"Downloading {p}") with open(p, "wb") as f: f.write(obj.get()["Body"].read()) paths.append(p) if not found_one: print( "::warning title=s3 artifacts not found::" "Didn't find any test reports in s3, there might be a bug!" ) return paths def download_gha_artifacts( prefix: str, workflow_run_id: int, workflow_run_attempt: int ) -> List[Path]: artifact_urls = _get_artifact_urls(prefix, workflow_run_id) paths = [] for name, url in artifact_urls.items(): paths.append(_download_artifact(Path(name), url, workflow_run_attempt)) return paths def upload_to_rockset( collection: str, docs: List[Any], workspace: str = "commons", client: Any = None, ) -> None: if not client: client = rockset.RocksetClient( host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"] ) index = 0 while index < len(docs): from_index = index to_index = min(from_index + BATCH_SIZE, len(docs)) print(f"Writing {to_index - from_index} documents to Rockset") client.Documents.add_documents( collection=collection, data=docs[from_index:to_index], workspace=workspace, ) index += BATCH_SIZE print("Done!") def upload_to_s3( bucket_name: str, key: str, docs: List[Dict[str, Any]], ) -> None: print(f"Writing {len(docs)} documents to S3") body = io.StringIO() for doc in docs: json.dump(doc, body) body.write("\n") S3_RESOURCE.Object( f"{bucket_name}", f"{key}", ).put( Body=gzip.compress(body.getvalue().encode()), ContentEncoding="gzip", ContentType="application/json", ) print("Done!") def read_from_s3( bucket_name: str, key: str, ) -> List[Dict[str, Any]]: print(f"Reading from s3://{bucket_name}/{key}") body = ( S3_RESOURCE.Object( f"{bucket_name}", f"{key}", ) .get()["Body"] .read() ) results = gzip.decompress(body).decode().split("\n") return [json.loads(result) for result in results if result] def upload_workflow_stats_to_s3( workflow_run_id: int, workflow_run_attempt: int, collection: str, docs: List[Dict[str, Any]], ) -> None: bucket_name = "ossci-raw-job-status" key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}" upload_to_s3(bucket_name, key, docs) def upload_file_to_s3( file_name: str, bucket: str, key: str, ) -> None: """ Upload a local file to S3 """ print(f"Upload {file_name} to s3://{bucket}/{key}") boto3.client("s3").upload_file( file_name, bucket, key, ) def unzip(p: Path) -> None: """Unzip the provided zipfile to a similarly-named directory. Returns None if `p` is not a zipfile. Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/ """ assert p.is_file() unzipped_dir = p.with_name("unzipped-" + p.stem) print(f"Extracting {p} to {unzipped_dir}") with zipfile.ZipFile(p, "r") as zip: zip.extractall(unzipped_dir) def is_rerun_disabled_tests(tests: Dict[str, Dict[str, int]]) -> bool: """ Check if the test report is coming from rerun_disabled_tests workflow where each test is run multiple times """ return all( t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE for t in tests.values() ) def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]: return {k: Decimal(str(v)) if isinstance(v, float) else v for k, v in data.items()} class EnvVarMetric: name: str env_var: str required: bool = True # Used to cast the value of the env_var to the correct type (defaults to str) type_conversion_fn: Any = None def __init__( self, name: str, env_var: str, required: bool = True, type_conversion_fn: Any = None, ) -> None: self.name = name self.env_var = env_var self.required = required self.type_conversion_fn = type_conversion_fn def value(self) -> Any: value = os.environ.get(self.env_var) if value is None and self.required: raise ValueError( f"Missing {self.name}. Please set the {self.env_var} " "environment variable to pass in this value." ) if self.type_conversion_fn: return self.type_conversion_fn(value) return value def emit_metric( metric_name: str, metrics: Dict[str, Any], ) -> None: """ Upload a metric to DynamoDB (and from there, Rockset). Parameters: metric_name: Name of the metric. Every unique metric should have a different name and be emitted just once per run attempt. Metrics are namespaced by their module and the function that emitted them. metrics: The actual data to record. Some default values are populated from environment variables, which must be set for metrics to be emitted. (If they're not set, this function becomes a noop): """ if metrics is None: raise ValueError("You didn't ask to upload any metrics!") # We use these env vars that to determine basic info about the workflow run. # By using env vars, we don't have to pass this info around to every function. # It also helps ensure that we only emit metrics during CI env_var_metrics = [ EnvVarMetric("repo", "GITHUB_REPOSITORY"), EnvVarMetric("workflow", "GITHUB_WORKFLOW"), EnvVarMetric("build_environment", "BUILD_ENVIRONMENT"), EnvVarMetric("job", "GITHUB_JOB"), EnvVarMetric("test_config", "TEST_CONFIG", required=False), EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int), EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int), EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int), ] # Use info about the function that invoked this one as a namespace and a way to filter metrics. calling_frame = inspect.currentframe().f_back # type: ignore[union-attr] calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type] calling_file = os.path.basename(calling_frame_info.filename) calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr] calling_function = calling_frame_info.function try: reserved_metrics = { "metric_name": metric_name, "calling_file": calling_file, "calling_module": calling_module, "calling_function": calling_function, "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), **{m.name: m.value() for m in env_var_metrics}, } except ValueError as e: warn(f"Not emitting metrics. {e}") return # Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision reserved_metrics[ "dynamo_key" ] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}" # Ensure the metrics dict doesn't contain any reserved keys for key in reserved_metrics.keys(): used_reserved_keys = [k for k in metrics.keys() if k == key] if used_reserved_keys: raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]") # boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals. metrics = _convert_float_values_to_decimals(metrics) try: session = boto3.Session(region_name="us-east-1") session.resource("dynamodb").Table("torchci-metrics").put_item( Item={ **reserved_metrics, **metrics, } ) except Exception as e: # We don't want to fail the job if we can't upload the metric. # We still raise the ValueErrors outside this try block since those indicate improperly configured metrics warn(f"Error uploading metric to DynamoDB: {e}") return