diff options
-rw-r--r-- | mesonbuild/wrap/__init__.py | 4 | ||||
-rw-r--r-- | mesonbuild/wrap/wrap.py | 75 | ||||
-rw-r--r-- | mesonbuild/wrap/wraptool.py | 41 |
3 files changed, 61 insertions, 59 deletions
diff --git a/mesonbuild/wrap/__init__.py b/mesonbuild/wrap/__init__.py index 6be2c44..182a94d 100644 --- a/mesonbuild/wrap/__init__.py +++ b/mesonbuild/wrap/__init__.py @@ -48,10 +48,10 @@ class WrapMode(Enum): nodownload = 3 forcefallback = 4 - def __str__(self): + def __str__(self) -> str: return self.name @staticmethod - def from_string(mode_name): + def from_string(mode_name: str): g = string_to_value[mode_name] return WrapMode(g) diff --git a/mesonbuild/wrap/wrap.py b/mesonbuild/wrap/wrap.py index 8b19b6d..8ad3418 100644 --- a/mesonbuild/wrap/wrap.py +++ b/mesonbuild/wrap/wrap.py @@ -14,14 +14,24 @@ from .. import mlog import contextlib -import urllib.request, os, hashlib, shutil, tempfile, stat +import urllib.request +import urllib.error +import os +import hashlib +import shutil +import tempfile +import stat import subprocess import sys import configparser +import typing from . import WrapMode from ..mesonlib import ProgressBar, MesonException +if typing.TYPE_CHECKING: + import http.client + try: import ssl has_ssl = True @@ -33,7 +43,7 @@ except ImportError: req_timeout = 600.0 ssl_warning_printed = False -def build_ssl_context(): +def build_ssl_context() -> 'ssl.SSLContext': ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ctx.options |= ssl.OP_NO_SSLv2 ctx.options |= ssl.OP_NO_SSLv3 @@ -41,18 +51,17 @@ def build_ssl_context(): ctx.load_default_certs() return ctx -def quiet_git(cmd, workingdir): +def quiet_git(cmd: typing.List[str], workingdir: str) -> typing.Tuple[bool, str]: try: - pc = subprocess.Popen(['git', '-C', workingdir] + cmd, stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + pc = subprocess.run(['git', '-C', workingdir] + cmd, universal_newlines=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as e: return False, str(e) - out, err = pc.communicate() if pc.returncode != 0: - return False, err - return True, out + return False, pc.stderr + return True, pc.stdout -def open_wrapdburl(urlstring): +def open_wrapdburl(urlstring: str) -> 'http.client.HTTPResponse': global ssl_warning_printed if has_ssl: try: @@ -78,7 +87,7 @@ class WrapNotFoundException(WrapException): pass class PackageDefinition: - def __init__(self, fname): + def __init__(self, fname: str): self.filename = fname self.basename = os.path.basename(fname) self.name = self.basename[:-5] @@ -96,23 +105,23 @@ class PackageDefinition: self.type = self.wrap_section[5:] self.values = dict(self.config[self.wrap_section]) - def get(self, key): + def get(self, key: str) -> str: try: return self.values[key] except KeyError: m = 'Missing key {!r} in {}' raise WrapException(m.format(key, self.basename)) - def has_patch(self): + def has_patch(self) -> bool: return 'patch_url' in self.values class Resolver: - def __init__(self, subdir_root, wrap_mode=WrapMode.default): + def __init__(self, subdir_root: str, wrap_mode=WrapMode.default): self.wrap_mode = wrap_mode self.subdir_root = subdir_root self.cachedir = os.path.join(self.subdir_root, 'packagecache') - def resolve(self, packagename: str, method: str): + def resolve(self, packagename: str, method: str) -> str: self.packagename = packagename self.directory = packagename # We always have to load the wrap file, if it exists, because it could @@ -168,20 +177,20 @@ class Resolver: return self.directory - def load_wrap(self): + def load_wrap(self) -> PackageDefinition: fname = os.path.join(self.subdir_root, self.packagename + '.wrap') if os.path.isfile(fname): return PackageDefinition(fname) return None - def check_can_download(self): + def check_can_download(self) -> None: # Don't download subproject data based on wrap file if requested. # Git submodules are ok (see above)! if self.wrap_mode is WrapMode.nodownload: m = 'Automatic wrap-based subproject downloading is disabled' raise WrapException(m) - def resolve_git_submodule(self): + def resolve_git_submodule(self) -> bool: # Are we in a git repository? ret, out = quiet_git(['rev-parse'], self.subdir_root) if not ret: @@ -191,29 +200,29 @@ class Resolver: if not ret: return False # Submodule has not been added, add it - if out.startswith(b'+'): + if out.startswith('+'): mlog.warning('git submodule might be out of date') return True - elif out.startswith(b'U'): + elif out.startswith('U'): raise WrapException('git submodule has merge conflicts') # Submodule exists, but is deinitialized or wasn't initialized - elif out.startswith(b'-'): + elif out.startswith('-'): if subprocess.call(['git', '-C', self.subdir_root, 'submodule', 'update', '--init', self.dirname]) == 0: return True raise WrapException('git submodule failed to init') # Submodule looks fine, but maybe it wasn't populated properly. Do a checkout. - elif out.startswith(b' '): + elif out.startswith(' '): subprocess.call(['git', 'checkout', '.'], cwd=self.dirname) # Even if checkout failed, try building it anyway and let the user # handle any problems manually. return True - elif out == b'': + elif out == '': # It is not a submodule, just a folder that exists in the main repository. return False m = 'Unknown git submodule output: {!r}' raise WrapException(m.format(out)) - def get_file(self): + def get_file(self) -> None: path = self.get_file_internal('source') extract_dir = self.subdir_root # Some upstreams ship packages that do not have a leading directory. @@ -225,7 +234,7 @@ class Resolver: if self.wrap.has_patch(): self.apply_patch() - def get_git(self): + def get_git(self) -> None: revno = self.wrap.get('revision') is_shallow = self.wrap.values.get('depth', '') != '' # for some reason git only allows commit ids to be shallowly fetched by fetch not with clone @@ -271,13 +280,13 @@ class Resolver: '--push', 'origin', push_url], cwd=self.dirname) - def is_git_full_commit_id(self, revno): + def is_git_full_commit_id(self, revno: str) -> bool: result = False if len(revno) in (40, 64): # 40 for sha1, 64 for upcoming sha256 result = all((ch in '0123456789AaBbCcDdEeFf' for ch in revno)) return result - def get_hg(self): + def get_hg(self) -> None: revno = self.wrap.get('revision') subprocess.check_call(['hg', 'clone', self.wrap.get('url'), self.directory], cwd=self.subdir_root) @@ -285,12 +294,12 @@ class Resolver: subprocess.check_call(['hg', 'checkout', revno], cwd=self.dirname) - def get_svn(self): + def get_svn(self) -> None: revno = self.wrap.get('revision') subprocess.check_call(['svn', 'checkout', '-r', revno, self.wrap.get('url'), self.directory], cwd=self.subdir_root) - def get_data(self, url): + def get_data(self, url: str) -> typing.Tuple[str, str]: blocksize = 10 * 1024 h = hashlib.sha256() tmpfile = tempfile.NamedTemporaryFile(mode='wb', dir=self.cachedir, delete=False) @@ -327,7 +336,7 @@ class Resolver: hashvalue = h.hexdigest() return hashvalue, tmpfile.name - def check_hash(self, what, path): + def check_hash(self, what: str, path: str) -> None: expected = self.wrap.get(what + '_hash') h = hashlib.sha256() with open(path, 'rb') as f: @@ -336,7 +345,7 @@ class Resolver: if dhash != expected: raise WrapException('Incorrect hash for %s:\n %s expected\n %s actual.' % (what, expected, dhash)) - def download(self, what, ofname): + def download(self, what: str, ofname: str) -> None: self.check_can_download() srcurl = self.wrap.get(what + '_url') mlog.log('Downloading', mlog.bold(self.packagename), what, 'from', mlog.bold(srcurl)) @@ -347,7 +356,7 @@ class Resolver: raise WrapException('Incorrect hash for %s:\n %s expected\n %s actual.' % (what, expected, dhash)) os.rename(tmpfile, ofname) - def get_file_internal(self, what): + def get_file_internal(self, what: str) -> str: filename = self.wrap.get(what + '_filename') cache_path = os.path.join(self.cachedir, filename) @@ -361,7 +370,7 @@ class Resolver: self.download(what, cache_path) return cache_path - def apply_patch(self): + def apply_patch(self) -> None: path = self.get_file_internal('patch') try: shutil.unpack_archive(path, self.subdir_root) @@ -370,7 +379,7 @@ class Resolver: shutil.unpack_archive(path, workdir) self.copy_tree(workdir, self.subdir_root) - def copy_tree(self, root_src_dir, root_dst_dir): + def copy_tree(self, root_src_dir: str, root_dst_dir: str) -> None: """ Copy directory tree. Overwrites also read only files. """ diff --git a/mesonbuild/wrap/wraptool.py b/mesonbuild/wrap/wraptool.py index 80cc027..a9d83fc 100644 --- a/mesonbuild/wrap/wraptool.py +++ b/mesonbuild/wrap/wraptool.py @@ -58,9 +58,8 @@ def get_result(urlstring): data = u.read().decode('utf-8') jd = json.loads(data) if jd['output'] != 'ok': - print('Got bad output from server.') - print(data) - sys.exit(1) + print('Got bad output from server.', file=sys.stderr) + raise SystemExit(data) return jd def get_projectlist(): @@ -79,7 +78,7 @@ def search(options): for p in jd['projects']: print(p) -def get_latest_version(name): +def get_latest_version(name: str) -> tuple: jd = get_result(API_ROOT + 'query/get_latest/' + name) branch = jd['branch'] revision = jd['revision'] @@ -88,15 +87,12 @@ def get_latest_version(name): def install(options): name = options.name if not os.path.isdir('subprojects'): - print('Subprojects dir not found. Run this script in your source root directory.') - sys.exit(1) + raise SystemExit('Subprojects dir not found. Run this script in your source root directory.') if os.path.isdir(os.path.join('subprojects', name)): - print('Subproject directory for this project already exists.') - sys.exit(1) + raise SystemExit('Subproject directory for this project already exists.') wrapfile = os.path.join('subprojects', name + '.wrap') if os.path.exists(wrapfile): - print('Wrap file already exists.') - sys.exit(1) + raise SystemExit('Wrap file already exists.') (branch, revision) = get_latest_version(name) u = open_wrapdburl(API_ROOT + 'projects/%s/%s/%s/get_wrap' % (name, branch, revision)) data = u.read() @@ -125,17 +121,15 @@ def update_wrap_file(wrapfile, name, new_branch, new_revision): def update(options): name = options.name if not os.path.isdir('subprojects'): - print('Subprojects dir not found. Run this command in your source root directory.') - sys.exit(1) + raise SystemExit('Subprojects dir not found. Run this command in your source root directory.') wrapfile = os.path.join('subprojects', name + '.wrap') if not os.path.exists(wrapfile): - print('Project', name, 'is not in use.') - sys.exit(1) + raise SystemExit('Project', name, 'is not in use.') (branch, revision, subdir, src_file, patch_file) = get_current_version(wrapfile) (new_branch, new_revision) = get_latest_version(name) if new_branch == branch and new_revision == revision: print('Project', name, 'is already up to date.') - sys.exit(0) + raise SystemExit update_wrap_file(wrapfile, name, new_branch, new_revision) shutil.rmtree(os.path.join('subprojects', subdir), ignore_errors=True) try: @@ -153,8 +147,7 @@ def info(options): jd = get_result(API_ROOT + 'projects/' + name) versions = jd['versions'] if not versions: - print('No available versions of', name) - sys.exit(0) + raise SystemExit('No available versions of' + name) print('Available versions of %s:' % name) for v in versions: print(' ', v['branch'], v['revision']) @@ -167,7 +160,7 @@ def do_promotion(from_path, spdir_name): sproj_name = os.path.basename(from_path) outputdir = os.path.join(spdir_name, sproj_name) if os.path.exists(outputdir): - sys.exit('Output dir %s already exists. Will not overwrite.' % outputdir) + raise SystemExit('Output dir %s already exists. Will not overwrite.' % outputdir) shutil.copytree(from_path, outputdir, ignore=shutil.ignore_patterns('subprojects')) def promote(options): @@ -184,13 +177,13 @@ def promote(options): # otherwise the argument is just a subproject basename which must be unambiguous if argument not in sprojs: - sys.exit('Subproject %s not found in directory tree.' % argument) + raise SystemExit('Subproject %s not found in directory tree.' % argument) matches = sprojs[argument] if len(matches) > 1: - print('There is more than one version of %s in tree. Please specify which one to promote:\n' % argument) + print('There is more than one version of %s in tree. Please specify which one to promote:\n' % argument, file=sys.stderr) for s in matches: - print(s) - sys.exit(1) + print(s, file=sys.stderr) + raise SystemExit(1) do_promotion(matches[0], spdir_name) def status(options): @@ -200,12 +193,12 @@ def status(options): try: (latest_branch, latest_revision) = get_latest_version(name) except Exception: - print('', name, 'not available in wrapdb.') + print('', name, 'not available in wrapdb.', file=sys.stderr) continue try: (current_branch, current_revision, _, _, _) = get_current_version(w) except Exception: - print('Wrap file not from wrapdb.') + print('Wrap file not from wrapdb.', file=sys.stderr) continue if current_branch == latest_branch and current_revision == latest_revision: print('', name, 'up to date. Branch %s, revision %d.' % (current_branch, current_revision)) |