summary refs log blame commit diff
path: root/pkgs/development/tools/parsing/tree-sitter/update_impl.py
blob: a53e1ee62c366769dfddc18be377389404441dd3 (plain) (tree)
1
2
3
4
5
6
7
8
9
10




                              
                                                              
                                       
 

                                                               

                                                     
 

                                       
 
                    
 
 



                               
















                                                                                         
                                                          
                                       
                      

                        

                      





                                             
                                                             
                                                 








                                                          


              
                                                              
                                   
                                  








                          

                                 
             
                     


                                


         
                        
                                                                                                       
                  





                                               
                                                                    
                          







                                                                                                   






                                                                                              


                                            
                                                                                             
 
                                                                            
                          


                                                                          
                 
             






                                             



                                                                   
                                                
                                                                         
















                                                                               
               
                                                                          

 
                                                                
                                                                                


                                                                 
                                                     




                                                                                 




                               

                               

















                                                                  





                                          
     

 






                                                                          


                      

                                             

                                       

                                        
from urllib.parse import quote
import json
import subprocess as sub
import os
import sys
from typing import Iterator, Any, Literal, TypedDict, Optional
from tempfile import NamedTemporaryFile

debug: bool = True if os.environ.get("DEBUG", False) else False
Bin = str
args: dict[str, Any] = json.loads(os.environ["ARGS"])
bins: dict[str, Bin] = args["binaries"]

mode: str = sys.argv[1]
jsonArg: dict = json.loads(sys.argv[2])

Args = Iterator[str]


def log(msg: str) -> None:
    print(msg, file=sys.stderr)


def atomically_write(file_path: str, content: bytes) -> None:
    """atomically write the content into `file_path`"""
    with NamedTemporaryFile(
        # write to the parent dir, so that it’s guaranteed to be on the same filesystem
        dir=os.path.dirname(file_path),
        delete=False
    ) as tmp:
        try:
            tmp.write(content)
            os.rename(
                src=tmp.name,
                dst=file_path
            )
        except Exception:
            os.unlink(tmp.name)


def curl_github_args(token: str | None, url: str) -> Args:
    """Query the github API via curl"""
    yield bins["curl"]
    if not debug:
        yield "--silent"
    # follow redirects
    yield "--location"
    if token:
        yield "-H"
        yield f"Authorization: token {token}"
    yield url


def curl_result(output: bytes) -> Any | Literal["not found"]:
    """Parse the curl result of the github API"""
    res: Any = json.loads(output)
    match res:
        case dict(res):
            message: str = res.get("message", "")
            if "rate limit" in message:
                sys.exit("Rate limited by the Github API")
            if "Not Found" in message:
                return "not found"
    # if the result is another type, we can pass it on
    return res


def nix_prefetch_git_args(url: str, version_rev: str) -> Args:
    """Prefetch a git repository"""
    yield bins["nix-prefetch-git"]
    if not debug:
        yield "--quiet"
    yield "--no-deepClone"
    yield "--url"
    yield url
    yield "--rev"
    yield version_rev


def run_cmd(args: Args) -> bytes:
    all = list(args)
    if debug:
        log(str(all))
    return sub.check_output(all)


Dir = str


def fetchRepo() -> None:
    """fetch the given repo and write its nix-prefetch output to the corresponding grammar json file"""
    match jsonArg:
        case {
            "orga": orga,
            "repo": repo,
            "outputDir": outputDir,
            "nixRepoAttrName": nixRepoAttrName,
        }:
            token: str | None = os.environ.get("GITHUB_TOKEN", None)
            out = run_cmd(
                curl_github_args(
                    token,
                    url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest"
                )
            )
            release: str
            match curl_result(out):
                case "not found":
                    if "branch" in jsonArg:
                        branch = jsonArg.get("branch")
                        release = f"refs/heads/{branch}"
                    else:
                        # github sometimes returns an empty list even tough there are releases
                        log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD")
                        release = "HEAD"
                case {"tag_name": tag_name}:
                    release = tag_name
                case _:
                    sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field")

            log(f"Fetching latest release ({release}) of {orga}/{repo} …")
            res = run_cmd(
                nix_prefetch_git_args(
                    url=f"https://github.com/{quote(orga)}/{quote(repo)}",
                    version_rev=release
                )
            )
            atomically_write(
                file_path=os.path.join(
                    outputDir,
                    f"{nixRepoAttrName}.json"
                ),
                content=res
            )
        case _:
            sys.exit("input json must have `orga` and `repo` keys")


def fetchOrgaLatestRepos(orga: str) -> set[str]:
    """fetch the latest (100) repos from the given github organization"""
    token: str | None = os.environ.get("GITHUB_TOKEN", None)
    out = run_cmd(
        curl_github_args(
            token,
            url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100"
        )
    )
    match curl_result(out):
        case "not found":
            sys.exit(f"github organization {orga} not found")
        case list(repos):
            res: list[str] = []
            for repo in repos:
                name = repo.get("name")
                if name:
                    res.append(name)
            return set(res)
        case _:
            sys.exit("github result was not a list of repos, but {other}")


def checkTreeSitterRepos(latest_github_repos: set[str]) -> None:
    """Make sure we know about all tree sitter repos on the tree sitter orga."""
    known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"])
    ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"])

    unknown = latest_github_repos - (known | ignored)

    if unknown:
        sys.exit(f"These repositories are neither known nor ignored:\n{unknown}")


Grammar = TypedDict(
    "Grammar",
    {
        "nixRepoAttrName": str,
        "orga": str,
        "repo": str,
        "branch": Optional[str]
    }
)


def printAllGrammarsNixFile() -> None:
    """Print a .nix file that imports all grammars."""
    allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"]
    outputDir: Dir = jsonArg["outputDir"]

    def file() -> Iterator[str]:
        yield "{ lib }:"
        yield "{"
        for grammar in allGrammars:
            n = grammar["nixRepoAttrName"]
            yield f"  {n} = lib.importJSON ./{n}.json;"
        yield "}"
        yield ""

    atomically_write(
        file_path=os.path.join(
            outputDir,
            "default.nix"
        ),
        content="\n".join(file()).encode()
    )


def fetchAndCheckTreeSitterRepos() -> None:
    log("fetching list of grammars")
    latest_repos = fetchOrgaLatestRepos(orga="tree-sitter")
    log("checking the tree-sitter repo list against the grammars we know")
    checkTreeSitterRepos(latest_repos)


match mode:
    case "fetch-repo":
        fetchRepo()
    case "fetch-and-check-tree-sitter-repos":
        fetchAndCheckTreeSitterRepos()
    case "print-all-grammars-nix-file":
        printAllGrammarsNixFile()
    case _:
        sys.exit(f"mode {mode} unknown")