feature/flupdt #1
@ -7,5 +7,8 @@ def parse_inputs():
|
||||
parser.add_argument(
|
||||
"flake_path", metavar="flake-path", help="path to flake to evaluate"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--keep-hydra", action="store_true",help="allow evaluating Hydra jobs"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
@ -21,7 +21,7 @@ def configure_logger(level: str = "INFO") -> None:
|
||||
)
|
||||
|
||||
|
||||
def bash_wrapper(command: str, path: str = ".") -> tuple[str, int]:
|
||||
def bash_wrapper(command: str, path: str = ".") -> tuple[str, str, int]:
|
||||
"""Execute a bash command and capture the output.
|
||||
|
||||
Args:
|
||||
@ -34,6 +34,6 @@ def bash_wrapper(command: str, path: str = ".") -> tuple[str, int]:
|
||||
"""
|
||||
# This is a acceptable risk
|
||||
process = Popen(command.split(), stdout=PIPE, stderr=PIPE, cwd=path) # noqa: S603
|
||||
output, _ = process.communicate()
|
||||
output, error = process.communicate()
|
||||
|
||||
return output.decode(), process.returncode
|
||||
return output.decode(), error.decode(), process.returncode
|
||||
|
25
flupdt/flake_eval.py
Normal file
25
flupdt/flake_eval.py
Normal file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
from flupdt.common import bash_wrapper
|
||||
ahuston-0 marked this conversation as resolved
Outdated
|
||||
import re
|
||||
|
||||
drv_re = re.compile(r'.*(/nix/store/.*\.drv).*')
|
||||
|
||||
def evaluate_output(path:str, output: str) -> Optional[str]:
|
||||
logging.info(f"evaluating {output}")
|
||||
out = bash_wrapper(f"nix eval {path}#{output}")
|
||||
logging.debug(out[0])
|
||||
logging.debug(out[1])
|
||||
logging.debug(out[2])
|
||||
if out[2] != 0:
|
||||
logging.warning(f"output {output} did not evaluate correctly")
|
||||
return None
|
||||
else:
|
||||
drv_match = drv_re.match(out[0])
|
||||
if drv_match is None:
|
||||
out_msg = "derivation succeeded but output derivation does not contain a derivation"
|
||||
raise RuntimeError(out_msg)
|
||||
drv = drv_match.group(1)
|
||||
logging.debug(f"derivation evaluated to {drv}")
|
@ -3,9 +3,16 @@
|
||||
import json
|
||||
from flupdt.common import bash_wrapper
|
||||
import shutil
|
||||
import logging
|
||||
import re
|
||||
|
||||
output_regexes = [
|
||||
ahuston-0 marked this conversation as resolved
Outdated
ahuston-0
commented
this should be common.py this should be common.py
|
||||
re.compile(r'checking derivation (.*)...'),
|
||||
re.compile(r'checking NixOS configuration \'(nixosConfigurations.*)\'\.\.\.')
|
||||
]
|
||||
|
||||
|
||||
def traverse_json_base(json_dict: dict, path: list[str]):
|
||||
def traverse_json_base(json_dict: dict, path: list[str]) -> list[str]:
|
||||
final_paths = []
|
||||
for key, value in json_dict.items():
|
||||
if isinstance(value, dict):
|
||||
@ -14,21 +21,46 @@ def traverse_json_base(json_dict: dict, path: list[str]):
|
||||
"nixos-configuration",
|
||||
"derivation",
|
||||
]:
|
||||
final_paths += [".".join(path + [key])]
|
||||
output = ".".join(path + [key])
|
||||
final_paths += [output]
|
||||
else:
|
||||
final_paths += traverse_json_base(value, path + [key])
|
||||
return final_paths
|
||||
|
||||
|
||||
def traverse_json(json_dict: dict):
|
||||
def traverse_json(json_dict: dict) -> list[str]:
|
||||
return traverse_json_base(json_dict, [])
|
||||
|
||||
|
||||
def get_derivations(path_to_flake: str):
|
||||
nix_path = shutil.which("nix")
|
||||
flake_show = bash_wrapper(f"{nix_path} flake show --json", path=path_to_flake)
|
||||
if flake_show[1] != 0:
|
||||
raise RuntimeError("flake show returned non-zero exit code")
|
||||
flake_show_json = json.loads(flake_show[0])
|
||||
derivations = traverse_json(flake_show_json)
|
||||
def get_derivations_from_check(nix_path:str,path_to_flake:str)-> list[str]:
|
||||
flake_check = bash_wrapper(f"{nix_path} flake check --verbose --keep-going", path=path_to_flake)
|
||||
if flake_check[2] != 0:
|
||||
logging.warn("nix flake check returned non-zero exit code, collecting all available outputs")
|
||||
error_out = flake_check[1].split('\n')
|
||||
possible_outputs = filter(lambda s: s.startswith("checking"), error_out)
|
||||
derivations = []
|
||||
for output in possible_outputs:
|
||||
for r in output_regexes:
|
||||
logging.debug(f"{output} {r.pattern}")
|
||||
match = r.match(output)
|
||||
if match is not None:
|
||||
logging.debug(match.groups())
|
||||
derivations += [match.groups()[0]]
|
||||
return derivations
|
||||
|
||||
def get_derivations(path_to_flake: str) -> list[str]:
|
||||
nix_path = shutil.which("nix")
|
||||
derivations = []
|
||||
if nix_path is None:
|
||||
raise RuntimeError("nix is not available in the PATH, please verify that it is installed")
|
||||
flake_show = bash_wrapper(f"{nix_path} flake show --json", path=path_to_flake)
|
||||
if flake_show[2] != 0:
|
||||
logging.error("flake show returned non-zero exit code")
|
||||
logging.warn("falling back to full evaluation via nix flake check")
|
||||
derivations = get_derivations_from_check(nix_path, path_to_flake)
|
||||
else:
|
||||
flake_show_json = json.loads(flake_show[0])
|
||||
derivations = traverse_json(flake_show_json)
|
||||
for i in range(len(derivations)):
|
||||
if derivations[i].startswith("nixosConfigurations"):
|
||||
derivations[i] += ".config.system.build.toplevel"
|
||||
return derivations
|
||||
|
@ -2,11 +2,21 @@
|
||||
|
||||
from flupdt.flake_show import get_derivations
|
||||
from flupdt.cli import parse_inputs
|
||||
|
||||
from flupdt.flake_eval import evaluate_output
|
||||
from flupdt.common import configure_logger
|
||||
import logging
|
||||
ahuston-0 marked this conversation as resolved
Outdated
ahuston-0
commented
should be common.py should be common.py
|
||||
|
||||
def main():
|
||||
configure_logger("INFO")
|
||||
args = parse_inputs()
|
||||
print(get_derivations(args.flake_path))
|
||||
flake_path = args.flake_path
|
||||
derivations = get_derivations(flake_path)
|
||||
if not args.keep_hydra and len(list(filter(lambda s: s.startswith("hydraJobs"), derivations))) > 0:
|
||||
logging.info("--keep-hydra flag is not specified, removing Hydra jobs")
|
||||
derivations = filter(lambda s: not s.startswith("hydraJobs"), derivations)
|
||||
logging.info(f"derivations: {list(derivations)}")
|
||||
for d in derivations:
|
||||
evaluate_output(flake_path, d)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
x
Reference in New Issue
Block a user
this should be common.py