pytorch/.github/scripts/file_io_utils.py
Zain Rizvi 96f46316c9 Preserve PyTest Cache across job runs (#100522)
Preserves the PyTest cache from one job run to the next.  In a later PR, this will be used to change the order in which we actually run those tests

The process is:
1. Before running tests, check S3 to see if there is an uploaded cache from any shard of the current job
2. If there are, download them all and merge their contents. Put the merged cache in the default .pytest_cache folder
3. After running the tests, merge the now-current .pytest_cache folder with the cache previously downloaded for the current shard. This will make the merged cache contain all tests that have ever failed for the given PR in the current shard
4. Upload the resulting cache file back to S3

The S3 folder has a retention policy of 30 days, after which the uploaded cache files will get auto-deleted.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/100522
Approved by: https://github.com/huydhn
2023-05-10 18:37:28 +00:00

102 lines
2.9 KiB
Python

import json
import re
import shutil
from pathlib import Path
from typing import Any, List
import boto3 # type: ignore[import]
def zip_folder(folder_to_zip: Path, dest_file_base_name: Path) -> Path:
"""
Returns the path to the resulting zip file, with the appropriate extension added if needed
"""
# shutil.make_archive will append .zip to the dest_file_name, so we need to remove it if it's already there
if dest_file_base_name.suffix == ".zip":
dest_file_base_name = dest_file_base_name.with_suffix("")
ensure_dir_exists(dest_file_base_name.parent)
print(f"Zipping {folder_to_zip}\n to {dest_file_base_name}")
# Convert to string because shutil.make_archive doesn't like Path objects
return Path(shutil.make_archive(str(dest_file_base_name), "zip", folder_to_zip))
def unzip_folder(zip_file_path: Path, unzip_to_folder: Path) -> None:
"""
Returns the path to the unzipped folder
"""
print(f"Unzipping {zip_file_path}")
print(f" to {unzip_to_folder}")
shutil.unpack_archive(zip_file_path, unzip_to_folder, "zip")
def ensure_dir_exists(dir: Path) -> None:
dir.mkdir(parents=True, exist_ok=True)
def copy_file(source_file: Path, dest_file: Path) -> None:
ensure_dir_exists(dest_file.parent)
shutil.copyfile(source_file, dest_file)
def load_json_file(file_path: Path) -> Any:
"""
Returns the deserialized json object
"""
with open(file_path, "r") as f:
return json.load(f)
def write_json_file(file_path: Path, content: Any) -> None:
dir = file_path.parent
ensure_dir_exists(dir)
with open(file_path, "w") as f:
json.dump(content, f, indent=2)
def sanitize_for_s3(text: str) -> str:
"""
S3 keys can only contain alphanumeric characters, underscores, and dashes.
This function replaces all other characters with underscores.
"""
return re.sub(r"[^a-zA-Z0-9_-]", "_", text)
def upload_file_to_s3(file_name: Path, bucket: str, key: str) -> None:
print(f"Uploading {file_name}")
print(f" to s3://{bucket}/{key}")
boto3.client("s3").upload_file(
str(file_name),
bucket,
key,
)
def download_s3_objects_with_prefix(
bucket_name: str, prefix: str, download_folder: Path
) -> List[Path]:
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
downloads = []
for obj in bucket.objects.filter(Prefix=prefix):
download_path = download_folder / obj.key
ensure_dir_exists(download_path.parent)
print(f"Downloading s3://{bucket.name}/{obj.key}")
print(f" to {download_path}")
s3.Object(bucket.name, obj.key).download_file(str(download_path))
downloads.append(download_path)
if len(downloads) == 0:
print(
f"There were no files matching the prefix `{prefix}` in bucket `{bucket.name}`"
)
return downloads