Apply review comments

This commit is contained in:
Jakub Beránek 2025-01-07 18:40:08 +01:00
parent 0fc2f14c0c
commit 5a95dda44a

View file

@ -34,10 +34,11 @@ def add_job_properties(jobs: List[Dict], prefix: str) -> List[Job]:
"""
modified_jobs = []
for job in jobs:
job = dict(job)
job["image"] = get_job_image(job)
job["name"] = f"{prefix} - {job['name']}"
modified_jobs.append(job)
# Create a copy of the `job` dictionary to avoid modifying `jobs`
new_job = dict(job)
new_job["image"] = get_job_image(new_job)
new_job["name"] = f"{prefix} - {new_job['name']}"
modified_jobs.append(new_job)
return modified_jobs
@ -46,11 +47,15 @@ def add_base_env(jobs: List[Job], environment: Dict[str, str]) -> List[Job]:
Prepends `environment` to the `env` attribute of each job.
The `env` of each job has higher precedence than `environment`.
"""
modified_jobs = []
for job in jobs:
env = environment.copy()
env.update(job.get("env", {}))
job["env"] = env
return jobs
new_job = dict(job)
new_job["env"] = env
modified_jobs.append(new_job)
return modified_jobs
@dataclasses.dataclass
@ -123,7 +128,9 @@ def find_run_type(ctx: GitHubCtx) -> Optional[WorkflowRunType]:
def calculate_jobs(run_type: WorkflowRunType, job_data: Dict[str, Any]) -> List[Job]:
if isinstance(run_type, PRRunType):
return add_base_env(add_job_properties(job_data["pr"], "PR"), job_data["envs"]["pr"])
return add_base_env(
add_job_properties(job_data["pr"], "PR"), job_data["envs"]["pr"]
)
elif isinstance(run_type, TryRunType):
jobs = job_data["try"]
custom_jobs = run_type.custom_jobs
@ -188,45 +195,110 @@ def format_run_type(run_type: WorkflowRunType) -> str:
raise AssertionError()
def get_job_image(job) -> str:
def get_job_image(job: Job) -> str:
"""
By default, the Docker image of a job is based on its name.
However, it can be overridden by its IMAGE environment variable.
"""
return job.get("env", {}).get("IMAGE", job["name"])
env = job.get("env", {})
# Return the IMAGE environment variable if it exists, otherwise return the job name
return env.get("IMAGE", job["name"])
def is_linux_job(job: Job) -> bool:
return "ubuntu" in job["os"]
def find_linux_job(job_data: Dict[str, Any], job_name: str, pr_jobs: bool) -> Job:
candidates = job_data["pr"] if pr_jobs else job_data["auto"]
jobs = [job for job in candidates if job.get("name") == job_name]
if len(jobs) == 0:
available_jobs = "\n".join(
sorted(job["name"] for job in candidates if is_linux_job(job))
)
raise Exception(f"""Job `{job_name}` not found in {'pr' if pr_jobs else 'auto'} jobs.
The following jobs are available:
{available_jobs}""")
assert len(jobs) == 1
job = jobs[0]
if not is_linux_job(job):
raise Exception("Only Linux jobs can be executed locally")
return job
def run_workflow_locally(job_data: Dict[str, Any], job_name: str, pr_jobs: bool):
DOCKER_DIR = Path(__file__).absolute().parent.parent / "docker"
jobs = job_data["pr"] if pr_jobs else job_data["auto"]
jobs = [job for job in jobs if job.get("name") == job_name]
if len(jobs) == 0:
raise Exception(f"Job `{job_name}` not found in {'pr' if pr_jobs else 'auto'} jobs")
assert len(jobs) == 1
job = jobs[0]
if "ubuntu" not in job["os"]:
raise Exception("Only Linux jobs can be executed locally")
job = find_linux_job(job_data, job_name=job_name, pr_jobs=pr_jobs)
custom_env = {}
custom_env["DEPLOY"] = "1"
# Replicate src/ci/scripts/setup-environment.sh
# Adds custom environment variables to the job
if job_name.startswith("dist-"):
if job_name.endswith("-alt"):
custom_env["DEPLOY_ALT"] = "1"
else:
custom_env["DEPLOY"] = "1"
custom_env.update({k: str(v) for (k, v) in job.get("env", {}).items()})
args = [
str(DOCKER_DIR / "run.sh"),
get_job_image(job)
]
args = [str(DOCKER_DIR / "run.sh"), get_job_image(job)]
env_formatted = [f"{k}={v}" for (k, v) in sorted(custom_env.items())]
print(f"Executing `{' '.join(env_formatted)} {' '.join(args)}`")
env = os.environ.copy()
env.update(custom_env)
process = subprocess.Popen(args, env=env)
try:
process.wait()
except KeyboardInterrupt:
process.kill()
subprocess.run(args, env=env)
def calculate_job_matrix(job_data: Dict[str, Any]):
github_ctx = get_github_ctx()
run_type = find_run_type(github_ctx)
logging.info(f"Job type: {run_type}")
with open(CI_DIR / "channel") as f:
channel = f.read().strip()
jobs = []
if run_type is not None:
jobs = calculate_jobs(run_type, job_data)
jobs = skip_jobs(jobs, channel)
if not jobs:
raise Exception("Scheduled job list is empty, this is an error")
run_type = format_run_type(run_type)
logging.info(f"Output:\n{yaml.dump(dict(jobs=jobs, run_type=run_type), indent=4)}")
print(f"jobs={json.dumps(jobs)}")
print(f"run_type={run_type}")
def create_cli_parser():
parser = argparse.ArgumentParser(
prog="ci.py", description="Generate or run CI workflows"
)
subparsers = parser.add_subparsers(
help="Command to execute", dest="command", required=True
)
subparsers.add_parser(
"calculate-job-matrix",
help="Generate a matrix of jobs that should be executed in CI",
)
run_parser = subparsers.add_parser(
"run-local", help="Run a CI jobs locally (on Linux)"
)
run_parser.add_argument(
"job_name",
help="CI job that should be executed. By default, a merge (auto) "
"job with the given name will be executed",
)
run_parser.add_argument(
"--pr", action="store_true", help="Run a PR job instead of an auto job"
)
return parser
if __name__ == "__main__":
@ -235,48 +307,11 @@ if __name__ == "__main__":
with open(JOBS_YAML_PATH) as f:
data = yaml.safe_load(f)
parser = argparse.ArgumentParser(
prog="ci.py",
description="Generate or run CI workflows"
)
generate_matrix = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="Command to execute", dest="command", required=True)
subparsers.add_parser("calculate-job-matrix")
run_parser = subparsers.add_parser("run-local")
run_parser.add_argument(
"job_name",
help="CI job that should be executed. By default, a merge (auto) "
"job with the given name will be executed"
)
run_parser.add_argument(
"--pr",
action="store_true",
help="Run a PR job instead of an auto job"
)
parser = create_cli_parser()
args = parser.parse_args()
if args.command == "calculate-job-matrix":
github_ctx = get_github_ctx()
run_type = find_run_type(github_ctx)
logging.info(f"Job type: {run_type}")
with open(CI_DIR / "channel") as f:
channel = f.read().strip()
jobs = []
if run_type is not None:
jobs = calculate_jobs(run_type, data)
jobs = skip_jobs(jobs, channel)
if not jobs:
raise Exception("Scheduled job list is empty, this is an error")
run_type = format_run_type(run_type)
logging.info(f"Output:\n{yaml.dump(dict(jobs=jobs, run_type=run_type), indent=4)}")
print(f"jobs={json.dumps(jobs)}")
print(f"run_type={run_type}")
calculate_job_matrix(data)
elif args.command == "run-local":
run_workflow_locally(data, args.job_name, args.pr)
else: