#! /usr/bin/env python """Compression/decompression utility using the Brotli algorithm.""" # Note: Python2 has been deprecated long ago, but some projects out in # the wide world may still use it nevertheless. This should not # deprive them from being able to run Brotli. from __future__ import print_function import argparse import os import platform import sys import brotli # default values of encoder parameters _DEFAULT_PARAMS = { 'mode': brotli.MODE_GENERIC, 'quality': 11, 'lgwin': 22, 'lgblock': 0, } def get_binary_stdio(stream): """Return the specified stdin/stdout/stderr stream. If the stdio stream requested (i.e. sys.(stdin|stdout|stderr)) has been replaced with a stream object that does not have a `.buffer` attribute, this will return the original stdio stream's buffer, i.e. `sys.__(stdin|stdout|stderr)__.buffer`. Args: stream: One of 'stdin', 'stdout', 'stderr'. Returns: The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass instance such as io.Bufferedreader/io.BufferedWriter), suitable for reading/writing binary data from/to it. """ if stream == 'stdin': stdio = sys.stdin elif stream == 'stdout': stdio = sys.stdout elif stream == 'stderr': stdio = sys.stderr else: raise ValueError('invalid stream name: %s' % (stream,)) if sys.version_info[0] < 3: if sys.platform == 'win32': # set I/O stream binary flag on python2.x (Windows) runtime = platform.python_implementation() if runtime == 'PyPy': # the msvcrt trick doesn't work in pypy, so use fdopen(). mode = 'rb' if stream == 'stdin' else 'wb' stdio = os.fdopen(stdio.fileno(), mode, 0) else: # this works with CPython -- untested on other implementations import msvcrt msvcrt.setmode(stdio.fileno(), os.O_BINARY) return stdio else: try: return stdio.buffer except AttributeError: # The Python reference explains # (-> https://docs.python.org/3/library/sys.html#sys.stdin) # that the `.buffer` attribute might not exist, since # the standard streams might have been replaced by something else # (such as an `io.StringIO()` - perhaps via # `contextlib.redirect_stdout()`). # We fall back to the original stdio in these cases. if stream == 'stdin': return sys.__stdin__.buffer if stream == 'stdout': return sys.__stdout__.buffer if stream == 'stderr': return sys.__stderr__.buffer assert False, 'Impossible Situation.' def main(args=None): parser = argparse.ArgumentParser( prog=os.path.basename(__file__), description=__doc__) parser.add_argument( '--version', action='version', version=brotli.version) parser.add_argument( '-i', '--input', metavar='FILE', type=str, dest='infile', help='Input file', default=None) parser.add_argument( '-o', '--output', metavar='FILE', type=str, dest='outfile', help='Output file', default=None) parser.add_argument( '-f', '--force', action='store_true', help='Overwrite existing output file', default=False) parser.add_argument( '-d', '--decompress', action='store_true', help='Decompress input file', default=False) params = parser.add_argument_group('optional encoder parameters') params.add_argument( '-m', '--mode', metavar='MODE', type=int, choices=[0, 1, 2], help='The compression mode can be 0 for generic input, ' '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. ' 'Defaults to 0.') params.add_argument( '-q', '--quality', metavar='QUALITY', type=int, choices=list(range(0, 12)), help='Controls the compression-speed vs compression-density ' 'tradeoff. The higher the quality, the slower the ' 'compression. Range is 0 to 11. Defaults to 11.') params.add_argument( '--lgwin', metavar='LGWIN', type=int, choices=list(range(10, 25)), help='Base 2 logarithm of the sliding window size. Range is ' '10 to 24. Defaults to 22.') params.add_argument( '--lgblock', metavar='LGBLOCK', type=int, choices=[0] + list(range(16, 25)), help='Base 2 logarithm of the maximum input block size. ' 'Range is 16 to 24. If set to 0, the value will be set based ' 'on the quality. Defaults to 0.') # set default values using global _DEFAULT_PARAMS dictionary parser.set_defaults(**_DEFAULT_PARAMS) options = parser.parse_args(args=args) if options.infile: try: with open(options.infile, 'rb') as infile: data = infile.read() except OSError: parser.error('Could not read --infile: %s' % (infile,)) else: if sys.stdin.isatty(): # interactive console, just quit parser.error('No input (called from interactive terminal).') infile = get_binary_stdio('stdin') data = infile.read() if options.outfile: # Caution! If `options.outfile` is a broken symlink, will try to # redirect the write according to symlink. if os.path.exists(options.outfile) and not options.force: parser.error(('Target --outfile=%s already exists, ' 'but --force was not requested.') % (outfile,)) outfile = open(options.outfile, 'wb') did_open_outfile = True else: outfile = get_binary_stdio('stdout') did_open_outfile = False try: try: if options.decompress: data = brotli.decompress(data) else: data = brotli.compress( data, mode=options.mode, quality=options.quality, lgwin=options.lgwin, lgblock=options.lgblock) outfile.write(data) finally: if did_open_outfile: outfile.close() except brotli.error as e: parser.exit(1, 'bro: error: %s: %s' % (e, options.infile or '{stdin}')) if __name__ == '__main__': main()