bulk rename to flupdt, add CLI, add README, add .gitignore
Signed-off-by: ahuston-0 <aliceghuston@gmail.com>
This commit is contained in:
0
flupdt/__init__.py
Normal file
0
flupdt/__init__.py
Normal file
11
flupdt/cli.py
Normal file
11
flupdt/cli.py
Normal file
@ -0,0 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
|
||||
def parse_inputs():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"flake_path", metavar="flake-path", help="path to flake to evaluate"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return args
|
39
flupdt/common.py
Normal file
39
flupdt/common.py
Normal file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""common."""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from subprocess import PIPE, Popen
|
||||
|
||||
|
||||
def configure_logger(level: str = "INFO") -> None:
|
||||
"""Configure the logger.
|
||||
|
||||
Args:
|
||||
level (str, optional): The logging level. Defaults to "INFO".
|
||||
"""
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
datefmt="%Y-%m-%dT%H:%M:%S%z",
|
||||
format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
|
||||
|
||||
def bash_wrapper(command: str, path: str = ".") -> tuple[str, int]:
|
||||
"""Execute a bash command and capture the output.
|
||||
|
||||
Args:
|
||||
command (str): The bash command to be executed.
|
||||
path (str): The current working directory, '.' by default
|
||||
|
||||
Returns:
|
||||
Tuple[str, int]: A tuple containing the output of the command (stdout) as a string,
|
||||
the error output (stderr) as a string (optional), and the return code as an integer.
|
||||
"""
|
||||
# This is a acceptable risk
|
||||
process = Popen(command.split(), stdout=PIPE, stderr=PIPE, cwd=path) # noqa: S603
|
||||
output, _ = process.communicate()
|
||||
|
||||
return output.decode(), process.returncode
|
34
flupdt/flake_show.py
Normal file
34
flupdt/flake_show.py
Normal file
@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import json
|
||||
from flupdt.common import bash_wrapper
|
||||
import shutil
|
||||
|
||||
|
||||
def traverse_json_base(json_dict: dict, path: list[str]):
|
||||
final_paths = []
|
||||
for key, value in json_dict.items():
|
||||
if isinstance(value, dict):
|
||||
keys = value.keys()
|
||||
if "type" in keys and value["type"] in [
|
||||
"nixos-configuration",
|
||||
"derivation",
|
||||
]:
|
||||
final_paths += [".".join(path + [key])]
|
||||
else:
|
||||
final_paths += traverse_json_base(value, path + [key])
|
||||
return final_paths
|
||||
|
||||
|
||||
def traverse_json(json_dict: dict):
|
||||
return traverse_json_base(json_dict, [])
|
||||
|
||||
|
||||
def get_derivations(path_to_flake: str):
|
||||
nix_path = shutil.which("nix")
|
||||
flake_show = bash_wrapper(f"{nix_path} flake show --json", path=path_to_flake)
|
||||
if flake_show[1] != 0:
|
||||
raise RuntimeError("flake show returned non-zero exit code")
|
||||
flake_show_json = json.loads(flake_show[0])
|
||||
derivations = traverse_json(flake_show_json)
|
||||
return derivations
|
13
flupdt/main.py
Normal file
13
flupdt/main.py
Normal file
@ -0,0 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from flupdt.flake_show import get_derivations
|
||||
from flupdt.cli import parse_inputs
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_inputs()
|
||||
print(get_derivations(args.flake_path))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
52
flupdt/model.py
Normal file
52
flupdt/model.py
Normal file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from sqlalchemy import Column, Integer, String, ForeignKey, Table
|
||||
from sqlalchemy.orm import relationship, backref
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
author_publisher = Table(
|
||||
"author_publisher",
|
||||
Base.metadata,
|
||||
Column("author_id", Integer, ForeignKey("author.author_id")),
|
||||
Column("publisher_id", Integer, ForeignKey("publisher.publisher_id")),
|
||||
)
|
||||
|
||||
book_publisher = Table(
|
||||
"book_publisher",
|
||||
Base.metadata,
|
||||
Column("book_id", Integer, ForeignKey("book.book_id")),
|
||||
Column("publisher_id", Integer, ForeignKey("publisher.publisher_id")),
|
||||
)
|
||||
|
||||
|
||||
class Author(Base):
|
||||
__tablename__ = "author"
|
||||
author_id = Column(Integer, primary_key=True)
|
||||
first_name = Column(String)
|
||||
last_name = Column(String)
|
||||
books = relationship("Book", backref=backref("author"))
|
||||
publishers = relationship(
|
||||
"Publisher", secondary=author_publisher, back_populates="authors"
|
||||
)
|
||||
|
||||
|
||||
class Book(Base):
|
||||
__tablename__ = "book"
|
||||
book_id = Column(Integer, primary_key=True)
|
||||
author_id = Column(Integer, ForeignKey("author.author_id"))
|
||||
title = Column(String)
|
||||
publishers = relationship(
|
||||
"Publisher", secondary=book_publisher, back_populates="books"
|
||||
)
|
||||
|
||||
|
||||
class Publisher(Base):
|
||||
__tablename__ = "publisher"
|
||||
publisher_id = Column(Integer, primary_key=True)
|
||||
name = Column(String)
|
||||
authors = relationship(
|
||||
"Author", secondary=author_publisher, back_populates="publishers"
|
||||
)
|
||||
books = relationship("Book", secondary=book_publisher, back_populates="publishers")
|
Reference in New Issue
Block a user