from urllib.parse import quote
import json
import subprocess as sub
import os
import sys
from typing import Iterator, Any, Literal, TypedDict, Optional
from tempfile import NamedTemporaryFile
debug: bool = True if os.environ.get("DEBUG", False) else False
Bin = str
args: dict[str, Any] = json.loads(os.environ["ARGS"])
bins: dict[str, Bin] = args["binaries"]
mode: str = sys.argv[1]
jsonArg: dict = json.loads(sys.argv[2])
Args = Iterator[str]
def log(msg: str) -> None:
print(msg, file=sys.stderr)
def atomically_write(file_path: str, content: bytes) -> None:
"""atomically write the content into `file_path`"""
with NamedTemporaryFile(
# write to the parent dir, so that it’s guaranteed to be on the same filesystem
dir=os.path.dirname(file_path),
delete=False
) as tmp:
try:
tmp.write(content)
os.rename(
src=tmp.name,
dst=file_path
)
except Exception:
os.unlink(tmp.name)
def curl_github_args(token: str | None, url: str) -> Args:
"""Query the github API via curl"""
yield bins["curl"]
if not debug:
yield "--silent"
# follow redirects
yield "--location"
if token:
yield "-H"
yield f"Authorization: token {token}"
yield url
def curl_result(output: bytes) -> Any | Literal["not found"]:
"""Parse the curl result of the github API"""
res: Any = json.loads(output)
match res:
case dict(res):
message: str = res.get("message", "")
if "rate limit" in message:
sys.exit("Rate limited by the Github API")
if "Not Found" in message:
return "not found"
# if the result is another type, we can pass it on
return res
def nix_prefetch_git_args(url: str, version_rev: str) -> Args:
"""Prefetch a git repository"""
yield bins["nix-prefetch-git"]
if not debug:
yield "--quiet"
yield "--no-deepClone"
yield "--url"
yield url
yield "--rev"
yield version_rev
def run_cmd(args: Args) -> bytes:
all = list(args)
if debug:
log(str(all))
return sub.check_output(all)
Dir = str
def fetchRepo() -> None:
"""fetch the given repo and write its nix-prefetch output to the corresponding grammar json file"""
match jsonArg:
case {
"orga": orga,
"repo": repo,
"outputDir": outputDir,
"nixRepoAttrName": nixRepoAttrName,
}:
token: str | None = os.environ.get("GITHUB_TOKEN", None)
out = run_cmd(
curl_github_args(
token,
url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest"
)
)
release: str
match curl_result(out):
case "not found":
if "branch" in jsonArg:
branch = jsonArg.get("branch")
release = f"refs/heads/{branch}"
else:
# github sometimes returns an empty list even tough there are releases
log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD")
release = "HEAD"
case {"tag_name": tag_name}:
release = tag_name
case _:
sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field")
log(f"Fetching latest release ({release}) of {orga}/{repo} …")
res = run_cmd(
nix_prefetch_git_args(
url=f"https://github.com/{quote(orga)}/{quote(repo)}",
version_rev=release
)
)
atomically_write(
file_path=os.path.join(
outputDir,
f"{nixRepoAttrName}.json"
),
content=res
)
case _:
sys.exit("input json must have `orga` and `repo` keys")
def fetchOrgaLatestRepos(orga: str) -> set[str]:
"""fetch the latest (100) repos from the given github organization"""
token: str | None = os.environ.get("GITHUB_TOKEN", None)
out = run_cmd(
curl_github_args(
token,
url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100"
)
)
match curl_result(out):
case "not found":
sys.exit(f"github organization {orga} not found")
case list(repos):
res: list[str] = []
for repo in repos:
name = repo.get("name")
if name:
res.append(name)
return set(res)
case _:
sys.exit("github result was not a list of repos, but {other}")
def checkTreeSitterRepos(latest_github_repos: set[str]) -> None:
"""Make sure we know about all tree sitter repos on the tree sitter orga."""
known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"])
ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"])
unknown = latest_github_repos - (known | ignored)
if unknown:
sys.exit(f"These repositories are neither known nor ignored:\n{unknown}")
Grammar = TypedDict(
"Grammar",
{
"nixRepoAttrName": str,
"orga": str,
"repo": str,
"branch": Optional[str]
}
)
def printAllGrammarsNixFile() -> None:
"""Print a .nix file that imports all grammars."""
allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"]
outputDir: Dir = jsonArg["outputDir"]
def file() -> Iterator[str]:
yield "{ lib }:"
yield "{"
for grammar in allGrammars:
n = grammar["nixRepoAttrName"]
yield f" {n} = lib.importJSON ./{n}.json;"
yield "}"
yield ""
atomically_write(
file_path=os.path.join(
outputDir,
"default.nix"
),
content="\n".join(file()).encode()
)
def fetchAndCheckTreeSitterRepos() -> None:
log("fetching list of grammars")
latest_repos = fetchOrgaLatestRepos(orga="tree-sitter")
log("checking the tree-sitter repo list against the grammars we know")
checkTreeSitterRepos(latest_repos)
match mode:
case "fetch-repo":
fetchRepo()
case "fetch-and-check-tree-sitter-repos":
fetchAndCheckTreeSitterRepos()
case "print-all-grammars-nix-file":
printAllGrammarsNixFile()
case _:
sys.exit(f"mode {mode} unknown")