summary refs log tree commit diff
path: root/pkgs/development/tools/poetry2nix/poetry2nix/fetch_from_legacy.py
blob: fee7374d340543fe99f1a072444492991c991979 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Some repositories (such as Devpi) expose the Pypi legacy API
# (https://warehouse.pypa.io/api-reference/legacy.html).
#
# Note it is not possible to use pip
# https://discuss.python.org/t/pip-download-just-the-source-packages-no-building-no-metadata-etc/4651/12

import os
import sys
import netrc
from urllib.parse import urlparse, urlunparse
from html.parser import HTMLParser
import urllib.request
import shutil
import ssl
from os.path import normpath


# Parse the legacy index page to extract the href and package names
class Pep503(HTMLParser):
    def __init__(self):
        super().__init__()
        self.sources = {}
        self.url = None
        self.name = None

    def handle_data(self, data):
        if self.url is not None:
            self.name = data

    def handle_starttag(self, tag, attrs):
        if tag == "a":
            for name, value in attrs:
                if name == "href":
                    self.url = value

    def handle_endtag(self, tag):
        if self.url is not None:
            self.sources[self.name] = self.url
        self.url = None


url = sys.argv[1]
package_name = sys.argv[2]
index_url = url + "/" + package_name + "/"
package_filename = sys.argv[3]

# Parse username and password for this host from the netrc file if given.
username, password = None, None
if os.environ["NETRC"]:
    netrc_obj = netrc.netrc(os.environ["NETRC"])
    host = urlparse(index_url).netloc
    # Strip port number if present
    if ":" in host:
        host = host.split(":")[0]
    username, _, password = netrc_obj.authenticators(host)

print("Reading index %s" % index_url)

context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE

req = urllib.request.Request(index_url)
if username and password:
    import base64

    password_b64 = base64.b64encode(":".join((username, password)).encode()).decode(
        "utf-8"
    )
    req.add_header("Authorization", "Basic {}".format(password_b64))
response = urllib.request.urlopen(req, context=context)
index = response.read()

parser = Pep503()
parser.feed(str(index))
if package_filename not in parser.sources:
    print(
        "The file %s has not be found in the index %s" % (package_filename, index_url)
    )
    exit(1)

package_file = open(package_filename, "wb")
# Sometimes the href is a relative or absolute path within the index's domain.
indicated_url = urlparse(parser.sources[package_filename])
if indicated_url.netloc == "":
    parsed_url = urlparse(index_url)

    if indicated_url.path.startswith("/"):
        # An absolute path within the index's domain.
        path = parser.sources[package_filename]
    else:
        # A relative path.
        path = parsed_url.path + "/" + parser.sources[package_filename]

    package_url = urlunparse(
        (
            parsed_url.scheme,
            parsed_url.netloc,
            path,
            None,
            None,
            None,
        )
    )
else:
    package_url = parser.sources[package_filename]

# Handle urls containing "../"
parsed_url = urlparse(package_url)
real_package_url = urlunparse(
    (
        parsed_url.scheme,
        parsed_url.netloc,
        normpath(parsed_url.path),
        parsed_url.params,
        parsed_url.query,
        parsed_url.fragment,
    )
)
print("Downloading %s" % real_package_url)

req = urllib.request.Request(real_package_url)
if username and password:
    req.add_unredirected_header("Authorization", "Basic {}".format(password_b64))
response = urllib.request.urlopen(req, context=context)

with response as r:
    shutil.copyfileobj(r, package_file)