diff options
Diffstat (limited to 'python/tests/bro_test.py')
-rw-r--r-- | python/tests/bro_test.py | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/python/tests/bro_test.py b/python/tests/bro_test.py new file mode 100644 index 0000000..16ed7ef --- /dev/null +++ b/python/tests/bro_test.py @@ -0,0 +1,120 @@ +# Copyright 2016 The Brotli Authors. All rights reserved. +# +# Distributed under MIT license. +# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT + +import subprocess +import unittest + +import _test_utils +import brotli + +PYTHON = _test_utils.PYTHON +BRO = _test_utils.BRO +TEST_ENV = _test_utils.TEST_ENV + + +def _get_original_name(test_data): + return test_data.split('.compressed')[0] + + +class TestBroDecompress(_test_utils.TestCase): + + def _check_decompression(self, test_data): + # Verify decompression matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + original = _get_original_name(test_data) + self.assertFilesMatch(temp_uncompressed, original) + + def _decompress_file(self, test_data): + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + args = [PYTHON, BRO, '-f', '-d', '-i', test_data, '-o', + temp_uncompressed] + subprocess.check_call(args, env=TEST_ENV) + + def _decompress_pipe(self, test_data): + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + args = [PYTHON, BRO, '-d'] + with open(temp_uncompressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + subprocess.check_call( + args, stdin=in_file, stdout=out_file, env=TEST_ENV) + + def _test_decompress_file(self, test_data): + self._decompress_file(test_data) + self._check_decompression(test_data) + + def _test_decompress_pipe(self, test_data): + self._decompress_pipe(test_data) + self._check_decompression(test_data) + + +_test_utils.generate_test_methods(TestBroDecompress, for_decompression=True) + + +class TestBroCompress(_test_utils.TestCase): + + VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)} + + def _check_decompression(self, test_data, **kwargs): + # Write decompression to temp file and verify it matches the original. + temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + original = test_data + args = [PYTHON, BRO, '-f', '-d'] + if 'dictionary' in kwargs: + args.extend(['--custom-dictionary', str(kwargs['dictionary'])]) + args.extend(['-i', temp_compressed, '-o', temp_uncompressed]) + subprocess.check_call(args, env=TEST_ENV) + self.assertFilesMatch(temp_uncompressed, original) + + def _compress_file(self, test_data, **kwargs): + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + args = [PYTHON, BRO, '-f'] + if 'quality' in kwargs: + args.extend(['-q', str(kwargs['quality'])]) + if 'lgwin' in kwargs: + args.extend(['--lgwin', str(kwargs['lgwin'])]) + if 'dictionary' in kwargs: + args.extend(['--custom-dictionary', str(kwargs['dictionary'])]) + args.extend(['-i', test_data, '-o', temp_compressed]) + subprocess.check_call(args, env=TEST_ENV) + + def _compress_pipe(self, test_data, **kwargs): + temp_compressed = _test_utils.get_temp_compressed_name(test_data) + args = [PYTHON, BRO] + if 'quality' in kwargs: + args.extend(['-q', str(kwargs['quality'])]) + if 'lgwin' in kwargs: + args.extend(['--lgwin', str(kwargs['lgwin'])]) + if 'dictionary' in kwargs: + args.extend(['--custom-dictionary', str(kwargs['dictionary'])]) + with open(temp_compressed, 'wb') as out_file: + with open(test_data, 'rb') as in_file: + subprocess.check_call( + args, stdin=in_file, stdout=out_file, env=TEST_ENV) + + def _test_compress_file(self, test_data, **kwargs): + self._compress_file(test_data, **kwargs) + self._check_decompression(test_data) + + def _test_compress_pipe(self, test_data, **kwargs): + self._compress_pipe(test_data, **kwargs) + self._check_decompression(test_data) + + def _test_compress_file_custom_dictionary(self, test_data, **kwargs): + kwargs['dictionary'] = test_data + self._compress_file(test_data, **kwargs) + self._check_decompression(test_data, **kwargs) + + def _test_compress_pipe_custom_dictionary(self, test_data, **kwargs): + kwargs['dictionary'] = test_data + self._compress_pipe(test_data, **kwargs) + self._check_decompression(test_data, **kwargs) + + +_test_utils.generate_test_methods( + TestBroCompress, variants=TestBroCompress.VARIANTS) + +if __name__ == '__main__': + unittest.main() |