aboutsummaryrefslogtreecommitdiff
path: root/python/bro.py
blob: 3f370cdf5bddfc0d3d1e01ec23b5e1faf9ad1b73 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#! /usr/bin/env python
"""bro %s -- compression/decompression utility using the Brotli algorithm."""

from __future__ import print_function
import argparse
import sys
import os
import brotli
import platform


__version__ = '1.0'


# default values of encoder parameters
DEFAULT_PARAMS = {
    'mode': brotli.MODE_TEXT,
    'quality': 11,
    'lgwin': 22,
    'lgblock': 0,
    'enable_dictionary': True,
    'enable_transforms': False,
    'greedy_block_split': False,
    'enable_context_modeling': True
}


def get_binary_stdio(stream):
    """ Return the specified standard input, output or errors stream as a
    'raw' buffer object suitable for reading/writing binary data from/to it.
    """
    assert stream in ['stdin', 'stdout', 'stderr'], "invalid stream name"
    stdio = getattr(sys, stream)
    if sys.version_info[0] < 3:
        if sys.platform == 'win32':
            # set I/O stream binary flag on python2.x (Windows)
            runtime = platform.python_implementation()
            if runtime == "PyPy":
                # the msvcrt trick doesn't work in pypy, so I use fdopen
                mode = "rb" if stream == "stdin" else "wb"
                stdio = os.fdopen(stdio.fileno(), mode, 0)
            else:
                # this works with CPython -- untested on other implementations
                import msvcrt
                msvcrt.setmode(stdio.fileno(), os.O_BINARY)
        return stdio
    else:
        # get 'buffer' attribute to read/write binary data on python3.x
        if hasattr(stdio, 'buffer'):
            return stdio.buffer
        else:
            orig_stdio = getattr(sys, "__%s__" % stream)
            return orig_stdio.buffer


def main():

    parser = argparse.ArgumentParser(
        prog='bro.py',
        description="Compression/decompression utility using the Brotli algorithm.")
    parser.add_argument('--version', action='version', version='%(prog)s 1.0')
    parser.add_argument('-i', '--input', metavar='FILE', type=str, dest='infile',
                        help='Input file', default=None)
    parser.add_argument('-o', '--output', metavar='FILE', type=str, dest='outfile',
                        help='Output file', default=None)
    parser.add_argument('-f', '--force', action='store_true',
                        help='Overwrite existing output file', default=False)
    parser.add_argument('-d', '--decompress', action='store_true',
                        help='Decompress input file', default=False)
    params = parser.add_argument_group('optional encoder parameters')
    params.add_argument('-m', '--mode', metavar="MODE", type=int, choices=[0, 1],
                        help='The compression mode can be 0 (text) or 1 (font). '
                        'Defaults to text mode.')
    params.add_argument('-q', '--quality', metavar="QUALITY", type=int,
                        choices=list(range(0, 12)),
                        help='Controls the compression-speed vs compression-density '
                        'tradeoff. The higher the quality, the slower the '
                        'compression. Range is 0 to 11. Defaults to 11.')
    params.add_argument('--lgwin', metavar="LGWIN", type=int,
                        choices=list(range(16, 25)),
                        help='Base 2 logarithm of the sliding window size. Range is '
                        '16 to 24. Defaults to 22.')
    params.add_argument('--lgblock', metavar="LGBLOCK", type=int,
                        choices=[0] + list(range(16, 25)),
                        help='Base 2 logarithm of the maximum input block size. '
                        'Range is 16 to 24. If set to 0, the value will be set based '
                        'on the quality. Defaults to 0.')
    above9 = parser.add_argument_group(
                        'encoder parameters respected only if quality > 9.')
    above9.add_argument('--no-dictionary', action='store_false',
                        dest='enable_dictionary',
                        help='Disable encoder dictionary.')
    above9.add_argument('--transform', action='store_true',
                        dest='enable_transforms',
                        help='Enable encoder transforms.')
    above9.add_argument('--greedy-block-split', action='store_true',
                        dest='greedy_block_split',
                        help='Enable a faster but less dense compression mode.')
    above9.add_argument('--no-context-modeling', action='store_false',
                        dest='enable_context_modeling',
                        help='Disable context modeling.')
    # set default values using global DEFAULT_PARAMS dictionary
    parser.set_defaults(**DEFAULT_PARAMS)

    options = parser.parse_args()
    from pprint import pprint
    pprint(options.__dict__)

    if options.infile:
        if not os.path.isfile(options.infile):
            parser.error('file "%s" not found' % options.infile)
        with open(options.infile, "rb") as infile:
            data = infile.read()
    else:
        if sys.stdin.isatty():
            # interactive console, just quit
            parser.error('no input')
        infile = get_binary_stdio('stdin')
        data = infile.read()

    if options.outfile:
        if os.path.isfile(options.outfile) and not options.force:
            parser.error('output file exists')
        outfile = open(options.outfile, "wb")
    else:
        outfile = get_binary_stdio('stdout')

    try:
        if options.decompress:
            data = brotli.decompress(data)
        else:
            data = brotli.compress(
                data, mode=options.mode, quality=options.quality,
                lgwin=options.lgwin, lgblock=options.lgblock,
                enable_dictionary=options.enable_dictionary,
                enable_transforms=options.enable_transforms,
                greedy_block_split=options.greedy_block_split,
                enable_context_modeling=options.enable_context_modeling)
    except brotli.error as e:
        parser.exit(1,'bro: error: %s: %s' % (e, options.infile or 'sys.stdin'))

    outfile.write(data)
    outfile.close()


if __name__ == '__main__':
    main()