diff --git a/.gitignore b/.gitignore index ac42661..5cab484 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ build/ dist/ -grin.egg-info/ +*.egg-info/ +.mypy_cache/ +.pytest_cache/ +.vscode/ +*.pyc +__pycache__ diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst new file mode 100644 index 0000000..625157b --- /dev/null +++ b/CONTRIBUTING.rst @@ -0,0 +1,15 @@ +How to contribute: + +- Clone the repository +- Create a virtualenv +- Install the project in dev mode using ``python setup.py develop`` +- Install the test runner: ``pip install nose`` +- Run the tests: ``nosetest tests/*.py``. + Avoid running the .pyc files by mistake, it runs the tests twice and it fails. +- Check that tests all pass. They won't pass on an FAT/NTFS partition, + use WSL if you are on Windows. +- Edit the code, add some test. +- Commit and make a PR. + + +I diff --git a/grin.py b/grin.py index bf42dc0..c8941db 100755 --- a/grin.py +++ b/grin.py @@ -2,6 +2,8 @@ """ grin searches text files. """ +from __future__ import print_function, unicode_literals + import bisect import fnmatch import gzip @@ -14,9 +16,10 @@ import argparse +from io import open, UnsupportedOperation #### Constants #### -__version__ = '1.2.1' +__version__ = "1.3.0" # Maintain the numerical order of these constants. We use them for sorting. PRE = -1 @@ -24,18 +27,27 @@ POST = 1 # Use file(1)'s choices for what's text and what's not. -TEXTCHARS = ''.join(map(chr, [7,8,9,10,12,13,27] + range(0x20, 0x100))) -ALLBYTES = ''.join(map(chr, range(256))) - -COLOR_TABLE = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', - 'white', 'default'] +TEXTCHARS = bytearray([7, 8, 9, 10, 12, 13, 27] + list(range(0x20, 0x100))) +ALLBYTES = bytearray(range(256)) + +COLOR_TABLE = [ + "black", + "red", + "green", + "yellow", + "blue", + "magenta", + "cyan", + "white", + "default", +] COLOR_STYLE = { - 'filename': dict(fg="green", bold=True), - 'searchterm': dict(fg="black", bg="yellow"), - } + "filename": dict(fg="green", bold=True), + "searchterm": dict(fg="black", bg="yellow"), +} # gzip magic header bytes. -GZIP_MAGIC = '\037\213' +GZIP_MAGIC = b"\037\213" # Target amount of data to read into memory at a time. READ_BLOCKSIZE = 16 * 1024 * 1024 @@ -52,9 +64,9 @@ def is_binary_string(bytes): ------- is_binary : bool """ - nontext = bytes.translate(ALLBYTES, TEXTCHARS) - return bool(nontext) - + return bool(bytes.translate(ALLBYTES, TEXTCHARS)) + + def get_line_offsets(block): """ Compute the list of offsets in DataBlock 'block' which correspond to the beginnings of new lines. @@ -64,10 +76,10 @@ def get_line_offsets(block): # Note: this implementation based on string.find() benchmarks about twice as # fast as a list comprehension using re.finditer(). line_offsets = [0] - line_count = 0 # Count of lines inside range [block.start, block.end) *only* + line_count = 0 # Count of lines inside range [block.start, block.end) *only* s = block.data while True: - next_newline = s.find('\n', line_offsets[-1]) + next_newline = s.find(b"\n", line_offsets[-1]) if next_newline < 0: # Tack on a final "line start" corresponding to EOF, if not done already. # This makes it possible to determine the length of each line by computing @@ -80,18 +92,19 @@ def get_line_offsets(block): # Keep track of the count of lines within the "current block" if next_newline >= block.start and next_newline < block.end: line_count += 1 - + + def colorize(s, fg=None, bg=None, bold=False, underline=False, reverse=False): """ Wraps a string with ANSI color escape sequences corresponding to the style parameters given. - + All of the color and style parameters are optional. - + Parameters ---------- s : str fg : str - Foreground color of the text. One of (black, red, green, yellow, blue, + Foreground color of the text. One of (black, red, green, yellow, blue, magenta, cyan, white, default) bg : str Background color of the text. Color choices are the same as for fg. @@ -106,7 +119,7 @@ def colorize(s, fg=None, bg=None, bold=False, underline=False, reverse=False): ------- A string with embedded color escape sequences. """ - + style_fragments = [] if fg in COLOR_TABLE: # Foreground colors go from 30-39 @@ -120,8 +133,9 @@ def colorize(s, fg=None, bg=None, bold=False, underline=False, reverse=False): style_fragments.append(4) if reverse: style_fragments.append(7) - style_start = '\x1b[' + ';'.join(map(str,style_fragments)) + 'm' - style_end = '\x1b[0m' + styles = (str(style).encode("ascii") for style in style_fragments) + style_start = b"\x1b[" + b";".join(styles) + b"m" + style_end = b"\x1b[0m" return style_start + s + style_end @@ -138,12 +152,12 @@ def default_options(): """ Populate the default options. """ opt = Options( - before_context = 0, - after_context = 0, - show_line_numbers = True, - show_match = True, - show_filename = True, - show_emacs = False, + before_context=0, + after_context=0, + show_line_numbers=True, + show_match=True, + show_filename=True, + show_emacs=False, skip_hidden_dirs=False, skip_hidden_files=False, skip_backup_files=True, @@ -174,13 +188,15 @@ class DataBlock(object): is_last : bool True if this is the final block in the file """ - def __init__(self, data='', start=0, end=0, before_count=0, is_last=False): + + def __init__(self, data="", start=0, end=0, before_count=0, is_last=False): self.data = data self.start = start self.end = end self.before_count = before_count self.is_last = is_last + EMPTY_DATABLOCK = DataBlock() @@ -207,7 +223,7 @@ def __init__(self, regex, options=None): def read_block_with_context(self, prev, fp, fp_size): """ Read a block of data from the file, along with some surrounding context. - + Parameters ---------- prev : DataBlock, or None @@ -216,11 +232,11 @@ def read_block_with_context(self, prev, fp, fp_size): fp : filelike object The source of block data. - + fp_size : int or None Size of the file in bytes, or None if the size could not be determined. - + Returns ------- A DataBlock representing the "current" block along with context. @@ -241,11 +257,11 @@ def read_block_with_context(self, prev, fp, fp_size): # can avoid the overhead of locating lines of 'before' and # 'after' context. result = DataBlock( - data = block_main, - start = 0, - end = len(block_main), - before_count = 0, - is_last = True, + data=block_main, + start=0, + end=len(block_main), + before_count=0, + is_last=True, ) return result else: @@ -262,29 +278,34 @@ def read_block_with_context(self, prev, fp, fp_size): before_start = prev.end - 1 before_count = 0 for i in range(self.options.before_context): - ofs = prev.data.rfind('\n', 0, before_start) + ofs = prev.data.rfind("\n", 0, before_start) before_start = ofs before_count += 1 if ofs < 0: break before_start += 1 - before_lines = prev.data[before_start:prev.end] + before_lines = prev.data[before_start : prev.end] # Using readline() to force this block out to a newline boundary... - curr_block = (prev.data[prev.end:] + block_main + - ('' if is_last_block else fp.readline())) + curr_block = ( + prev.data[prev.end :] + + block_main + + ("" if is_last_block else fp.readline()) + ) # Read in some lines of 'after' context. if is_last_block: - after_lines = '' + after_lines = "" else: - after_lines_list = [fp.readline() for i in range(self.options.after_context)] - after_lines = ''.join(after_lines_list) + after_lines_list = [ + fp.readline() for i in range(self.options.after_context) + ] + after_lines = "".join(after_lines_list) result = DataBlock( - data = before_lines + curr_block + after_lines, - start = len(before_lines), - end = len(before_lines) + len(curr_block), - before_count = before_count, - is_last = is_last_block, + data=before_lines + curr_block + after_lines, + start=len(before_lines), + end=len(before_lines) + len(curr_block), + before_count=before_count, + is_last=is_last_block, ) return result @@ -313,13 +334,14 @@ def do_grep(self, fp): fp_size = status.st_size else: fp_size = None - except AttributeError: # doesn't support fileno() + except (AttributeError, UnsupportedOperation): # doesn't support fileno() fp_size = None block = self.read_block_with_context(None, fp, fp_size) while block.end > block.start: - (block_line_count, block_context) = self.do_grep_block(block, - line_count - block.before_count) + (block_line_count, block_context) = self.do_grep_block( + block, line_count - block.before_count + ) context += block_context if block.is_last: break @@ -364,25 +386,41 @@ def do_grep_block(self, block, line_num_offset): block_context = [] line_offsets = None line_count = None - + def build_match_context(match): - match_line_num = bisect.bisect(line_offsets, match.start() + block.start) - 1 + match_line_num = ( + bisect.bisect(line_offsets, match.start() + block.start) - 1 + ) before_count = min(before, match_line_num) after_count = min(after, (len(line_offsets) - 1) - match_line_num - 1) - match_line = block.data[line_offsets[match_line_num]:line_offsets[match_line_num + 1]] + match_line = block.data[ + line_offsets[match_line_num] : line_offsets[match_line_num + 1] + ] spans = [m.span() for m in self.regex.finditer(match_line)] - before_ctx = [(i + line_num_offset, PRE, - block.data[line_offsets[i]:line_offsets[i+1]], None) - for i in range(match_line_num - before_count, match_line_num)] - after_ctx = [(i + line_num_offset, POST, - block.data[line_offsets[i]:line_offsets[i+1]], None) - for i in range(match_line_num + 1, match_line_num + after_count + 1)] + before_ctx = [ + ( + i + line_num_offset, + PRE, + block.data[line_offsets[i] : line_offsets[i + 1]], + None, + ) + for i in range(match_line_num - before_count, match_line_num) + ] + after_ctx = [ + ( + i + line_num_offset, + POST, + block.data[line_offsets[i] : line_offsets[i + 1]], + None, + ) + for i in range(match_line_num + 1, match_line_num + after_count + 1) + ] match_ctx = [(match_line_num + line_num_offset, MATCH, match_line, spans)] return before_ctx + match_ctx + after_ctx # Using re.MULTILINE here, so ^ and $ will work as expected. - for match in self.regex_m.finditer(block.data[block.start:block.end]): + for match in self.regex_m.finditer(block.data[block.start : block.end]): # Computing line offsets is expensive, so we do it lazily. We don't # take the extra CPU hit unless there's a regex match in the file. if line_offsets is None: @@ -427,27 +465,35 @@ def report(self, context_lines, filename=None): might be an empty string without a newline. """ if len(context_lines) == 0: - return '' + return b"" lines = [] if not self.options.show_match: # Just show the filename if we match. - line = '%s\n' % filename + line = b"%s\n" % filename lines.append(line) else: - if self.options.show_filename and filename is not None and not self.options.show_emacs: - line = '%s:\n' % filename + if ( + self.options.show_filename + and filename is not None + and not self.options.show_emacs + ): + line = b"%s:\n" % filename if self.options.use_color: - line = colorize(line, **COLOR_STYLE.get('filename', {})) + line = colorize(line, **COLOR_STYLE.get("filename", {})) lines.append(line) if self.options.show_emacs: - template = '%(filename)s:%(lineno)s: %(line)s' + template = b"%(filename)s:%(lineno)s: %(line)s" elif self.options.show_line_numbers: - template = '%(lineno)5s %(sep)s %(line)s' + template = b"%(lineno)5s %(sep)s %(line)s" else: - template = '%(line)s' + template = b"%(line)s" for i, kind, line, spans in context_lines: - if self.options.use_color and kind == MATCH and 'searchterm' in COLOR_STYLE: - style = COLOR_STYLE['searchterm'] + if ( + self.options.use_color + and kind == MATCH + and "searchterm" in COLOR_STYLE + ): + style = COLOR_STYLE["searchterm"] orig_line = line[:] total_offset = 0 for start, end in spans: @@ -457,22 +503,21 @@ def report(self, context_lines, filename=None): color_substring = colorize(old_substring, **style) line = line[:start] + color_substring + line[end:] total_offset += len(color_substring) - len(old_substring) - - ns = dict( - lineno = i+1, - sep = {PRE: '-', POST: '+', MATCH: ':'}[kind], - line = line, - filename = filename, - ) + + ns = { + b"lineno": str(i + 1).encode("ascii"), + b"sep": {PRE: b"-", POST: b"+", MATCH: b":"}[kind], + b"line": line, + b"filename": filename, + } line = template % ns lines.append(line) - if not line.endswith('\n'): - lines.append('\n') + if not line.endswith(b"\n"): + lines.append(b"\n") - text = ''.join(lines) + text = b"".join(lines) return text - def grep_a_file(self, filename, opener=open): """ Grep a single file that actually exists on the file system. @@ -491,16 +536,16 @@ def grep_a_file(self, filename, opener=open): The grep results as text. """ # Special-case stdin as "-". - if filename == '-': + if filename == b"-": f = sys.stdin - filename = '' + filename = b"" else: # 'r' does the right thing for both open ('rt') and gzip.open ('rb') - f = opener(filename, 'r') + f = opener(filename, "rb") try: unique_context = self.do_grep(f) finally: - if filename != '-': + if filename != b"-": f.close() report = self.report(unique_context, filename) return report @@ -533,10 +578,17 @@ class FileRecognizer(object): binary characters. """ - def __init__(self, skip_hidden_dirs=False, skip_hidden_files=False, - skip_backup_files=False, skip_dirs=set(), skip_exts=set(), - skip_symlink_dirs=True, skip_symlink_files=True, - binary_bytes=4096): + def __init__( + self, + skip_hidden_dirs=False, + skip_hidden_files=False, + skip_backup_files=False, + skip_dirs=set(), + skip_exts=set(), + skip_symlink_dirs=True, + skip_symlink_files=True, + binary_bytes=4096, + ): self.skip_hidden_dirs = skip_hidden_dirs self.skip_hidden_files = skip_hidden_files self.skip_backup_files = skip_backup_files @@ -549,11 +601,11 @@ def __init__(self, skip_hidden_dirs=False, skip_hidden_files=False, self.skip_exts_simple = set() self.skip_exts_endswith = list() for ext in skip_exts: - if os.path.splitext('foo.bar'+ext)[1] == ext: - self.skip_exts_simple.add(ext) + if os.path.splitext(b"foo.bar" + ext)[1] == ext: + self.skip_exts_simple.add(ext) else: self.skip_exts_endswith.append(ext) - + self.skip_symlink_dirs = skip_symlink_dirs self.skip_symlink_files = skip_symlink_files self.binary_bytes = binary_bytes @@ -569,10 +621,8 @@ def is_binary(self, filename): ------- is_binary : bool """ - f = open(filename, 'rb') - is_binary = self._is_binary_file(f) - f.close() - return is_binary + with open(filename, "rb") as f: + return self._is_binary_file(f) def _is_binary_file(self, f): """ Determine if a given filelike object has binary data or not. @@ -586,12 +636,12 @@ def _is_binary_file(self, f): is_binary : bool """ try: - bytes = f.read(self.binary_bytes) - except Exception, e: + data = f.read(self.binary_bytes) + except Exception as e: # When trying to read from something that looks like a gzipped file, # it may be corrupt. If we do get an error, assume that the file is binary. return True - return is_binary_string(bytes) + return is_binary_string(data) def is_gzipped_text(self, filename): """ Determine if a given file is a gzip-compressed text file or not. @@ -607,22 +657,18 @@ def is_gzipped_text(self, filename): ------- is_gzipped_text : bool """ - is_gzipped_text = False - f = open(filename, 'rb') - marker = f.read(2) - f.close() - if marker == GZIP_MAGIC: - fp = gzip.open(filename) - try: - try: - is_gzipped_text = not self._is_binary_file(fp) - except IOError: - # We saw the GZIP_MAGIC marker, but it is not actually a gzip - # file. - is_gzipped_text = False - finally: - fp.close() - return is_gzipped_text + with open(filename, "rb") as f: + marker = f.read(2) + if marker == GZIP_MAGIC: + with gzip.open(filename) as fp: + try: + return not self._is_binary_file(fp) + except IOError: + # We saw the GZIP_MAGIC marker, but it is not actually a gzip + # file. + pass + + return False def recognize(self, filename): """ Determine what kind of thing a filename represents. @@ -632,8 +678,8 @@ def recognize(self, filename): 'text' : It should should be grepped for the pattern and the matching - lines displayed. - 'binary' : + lines displayed. + 'binary' : The file is binary and should be either ignored or grepped without displaying the matching lines depending on the configuration. @@ -670,51 +716,54 @@ def recognize(self, filename): # We're only interested in regular files and directories. # A named pipe in particular would be problematic, because # it would cause open() to hang indefinitely. - return 'skip' + return "skip" except OSError: - return 'unreadable' - + return "unreadable" + def recognize_directory(self, filename): """ Determine what to do with a directory. """ basename = os.path.split(filename)[-1] - if (self.skip_hidden_dirs and basename.startswith('.') and - basename not in ('.', '..')): - return 'skip' + if ( + self.skip_hidden_dirs + and basename.startswith(b".") + and basename not in (b".", b"..") + ): + return "skip" if self.skip_symlink_dirs and os.path.islink(filename): - return 'link' + return "link" if basename in self.skip_dirs: - return 'skip' - return 'directory' + return "skip" + return "directory" def recognize_file(self, filename): """ Determine what to do with a file. """ basename = os.path.split(filename)[-1] - if self.skip_hidden_files and basename.startswith('.'): - return 'skip' - if self.skip_backup_files and basename.endswith('~'): - return 'skip' + if self.skip_hidden_files and basename.startswith(b"."): + return "skip" + if self.skip_backup_files and basename.endswith(b"~"): + return "skip" if self.skip_symlink_files and os.path.islink(filename): - return 'link' - + return "link" + filename_nc = os.path.normcase(filename) ext = os.path.splitext(filename_nc)[1] - if ext in self.skip_exts_simple or ext.startswith('.~'): - return 'skip' + if ext in self.skip_exts_simple or ext.startswith(b".~"): + return "skip" for ext in self.skip_exts_endswith: if filename_nc.endswith(ext): - return 'skip' + return "skip" try: if self.is_binary(filename): if self.is_gzipped_text(filename): - return 'gzip' + return "gzip" else: - return 'binary' + return "binary" else: - return 'text' + return "text" except (OSError, IOError): - return 'unreadable' + return "unreadable" def walk(self, startpath): """ Walk the tree from a given start path yielding all of the files (not @@ -733,11 +782,11 @@ def walk(self, startpath): kind : str """ kind = self.recognize(startpath) - if kind in ('binary', 'text', 'gzip'): + if kind in ("binary", "text", "gzip"): yield startpath, kind # Not a directory, so there is no need to recurse. return - elif kind == 'directory': + elif kind == "directory": try: basenames = os.listdir(startpath) except OSError: @@ -758,92 +807,221 @@ def get_grin_arg_parser(parser=None): formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument('-v', '--version', action='version', version='grin %s' % __version__, - help="show program's version number and exit") - parser.add_argument('-i', '--ignore-case', action='append_const', - dest='re_flags', const=re.I, default=[], help="ignore case in the regex") - parser.add_argument('-A', '--after-context', default=0, type=int, - help="the number of lines of context to show after the match [default=%(default)r]") - parser.add_argument('-B', '--before-context', default=0, type=int, - help="the number of lines of context to show before the match [default=%(default)r]") - parser.add_argument('-C', '--context', type=int, - help="the number of lines of context to show on either side of the match") - parser.add_argument('-I', '--include', default='*', - help="only search in files matching this glob [default=%(default)r]") - parser.add_argument('-n', '--line-number', action='store_true', - dest='show_line_numbers', default=True, - help="show the line numbers [default]") - parser.add_argument('-N', '--no-line-number', action='store_false', - dest='show_line_numbers', help="do not show the line numbers") - parser.add_argument('-H', '--with-filename', action='store_true', - dest='show_filename', default=True, - help="show the filenames of files that match [default]") - parser.add_argument('--without-filename', action='store_false', - dest='show_filename', - help="do not show the filenames of files that match") - parser.add_argument('--emacs', action='store_true', - dest='show_emacs', - help="print the filename with every match for easier parsing by e.g. Emacs") - parser.add_argument('-l', '--files-with-matches', action='store_false', - dest='show_match', - help="show only the filenames and not the texts of the matches") - parser.add_argument('-L', '--files-without-matches', action='store_true', - dest='show_match', default=False, - help="show the matches with the filenames") - parser.add_argument('--no-color', action='store_true', default=sys.platform == 'win32', - help="do not use colorized output [default if piping the output]") - parser.add_argument('--use-color', action='store_false', dest='no_color', - help="use colorized output [default if outputting to a terminal]") - parser.add_argument('--force-color', action='store_true', + parser.add_argument( + "-v", + "--version", + action="version", + version="grin %s" % __version__, + help="show program's version number and exit", + ) + parser.add_argument( + "-i", + "--ignore-case", + action="append_const", + dest="re_flags", + const=re.I, + default=[], + help="ignore case in the regex", + ) + parser.add_argument( + "-A", + "--after-context", + default=0, + type=int, + help="the number of lines of context to show after the match [default=%(default)r]", + ) + parser.add_argument( + "-B", + "--before-context", + default=0, + type=int, + help="the number of lines of context to show before the match [default=%(default)r]", + ) + parser.add_argument( + "-C", + "--context", + type=int, + help="the number of lines of context to show on either side of the match", + ) + parser.add_argument( + "-I", + "--include", + default="*", + help="only search in files matching this glob [default=%(default)r]", + ) + parser.add_argument( + "-n", + "--line-number", + action="store_true", + dest="show_line_numbers", + default=True, + help="show the line numbers [default]", + ) + parser.add_argument( + "-N", + "--no-line-number", + action="store_false", + dest="show_line_numbers", + help="do not show the line numbers", + ) + parser.add_argument( + "-H", + "--with-filename", + action="store_true", + dest="show_filename", + default=True, + help="show the filenames of files that match [default]", + ) + parser.add_argument( + "--without-filename", + action="store_false", + dest="show_filename", + help="do not show the filenames of files that match", + ) + parser.add_argument( + "--emacs", + action="store_true", + dest="show_emacs", + help="print the filename with every match for easier parsing by e.g. Emacs", + ) + parser.add_argument( + "-l", + "--files-with-matches", + action="store_false", + dest="show_match", + help="show only the filenames and not the texts of the matches", + ) + parser.add_argument( + "-L", + "--files-without-matches", + action="store_true", + dest="show_match", + default=False, + help="show the matches with the filenames", + ) + parser.add_argument( + "--no-color", + action="store_true", + default=sys.platform == "win32", + help="do not use colorized output [default if piping the output]", + ) + parser.add_argument( + "--use-color", + action="store_false", + dest="no_color", + help="use colorized output [default if outputting to a terminal]", + ) + parser.add_argument( + "--force-color", + action="store_true", help="always use colorized output even when piping to something that " - "may not be able to handle it") - parser.add_argument('-s', '--no-skip-hidden-files', - dest='skip_hidden_files', action='store_false', - help="do not skip .hidden files") - parser.add_argument('--skip-hidden-files', - dest='skip_hidden_files', action='store_true', default=True, - help="do skip .hidden files [default]") - parser.add_argument('-b', '--no-skip-backup-files', - dest='skip_backup_files', action='store_false', - help="do not skip backup~ files [deprecated; edit --skip-exts]") - parser.add_argument('--skip-backup-files', - dest='skip_backup_files', action='store_true', default=True, - help="do skip backup~ files [default] [deprecated; edit --skip-exts]") - parser.add_argument('-S', '--no-skip-hidden-dirs', dest='skip_hidden_dirs', - action='store_false', - help="do not skip .hidden directories") - parser.add_argument('--skip-hidden-dirs', dest='skip_hidden_dirs', - default=True, action='store_true', - help="do skip .hidden directories [default]") - parser.add_argument('-d', '--skip-dirs', - default='CVS,RCS,.svn,.hg,.bzr,build,dist', - help="comma-separated list of directory names to skip [default=%(default)r]") - parser.add_argument('-D', '--no-skip-dirs', dest='skip_dirs', - action='store_const', const='', - help="do not skip any directories") - parser.add_argument('-e', '--skip-exts', - default='.pyc,.pyo,.so,.o,.a,.tgz,.tar.gz,.rar,.zip,~,#,.bak,.png,.jpg,.gif,.bmp,.tif,.tiff,.pyd,.dll,.exe,.obj,.lib', - help="comma-separated list of file extensions to skip [default=%(default)r]") - parser.add_argument('-E', '--no-skip-exts', dest='skip_exts', - action='store_const', const='', - help="do not skip any file extensions") - parser.add_argument('--no-follow', action='store_false', dest='follow_symlinks', + "may not be able to handle it", + ) + parser.add_argument( + "-s", + "--no-skip-hidden-files", + dest="skip_hidden_files", + action="store_false", + help="do not skip .hidden files", + ) + parser.add_argument( + "--skip-hidden-files", + dest="skip_hidden_files", + action="store_true", + default=True, + help="do skip .hidden files [default]", + ) + parser.add_argument( + "-b", + "--no-skip-backup-files", + dest="skip_backup_files", + action="store_false", + help="do not skip backup~ files [deprecated; edit --skip-exts]", + ) + parser.add_argument( + "--skip-backup-files", + dest="skip_backup_files", + action="store_true", + default=True, + help="do skip backup~ files [default] [deprecated; edit --skip-exts]", + ) + parser.add_argument( + "-S", + "--no-skip-hidden-dirs", + dest="skip_hidden_dirs", + action="store_false", + help="do not skip .hidden directories", + ) + parser.add_argument( + "--skip-hidden-dirs", + dest="skip_hidden_dirs", + default=True, + action="store_true", + help="do skip .hidden directories [default]", + ) + parser.add_argument( + "-d", + "--skip-dirs", + default="CVS,RCS,.svn,.hg,.bzr,build,dist", + help="comma-separated list of directory names to skip [default=%(default)r]", + ) + parser.add_argument( + "-D", + "--no-skip-dirs", + dest="skip_dirs", + action="store_const", + const=b"", + help="do not skip any directories", + ) + parser.add_argument( + "-e", + "--skip-exts", + default=".pyc,.pyo,.so,.o,.a,.tgz,.tar.gz,.rar,.zip,~,#,.bak,.png,.jpg,.gif,.bmp,.tif,.tiff,.pyd,.dll,.exe,.obj,.lib", + help="comma-separated list of file extensions to skip [default=%(default)r]", + ) + parser.add_argument( + "-E", + "--no-skip-exts", + dest="skip_exts", + action="store_const", + const=b"", + help="do not skip any file extensions", + ) + parser.add_argument( + "--no-follow", + action="store_false", + dest="follow_symlinks", default=False, - help="do not follow symlinks to directories and files [default]") - parser.add_argument('--follow', action='store_true', dest='follow_symlinks', - help="follow symlinks to directories and files") - parser.add_argument('-f', '--files-from-file', metavar="FILE", - help="read files to search from a file, one per line; - for stdin") - parser.add_argument('-0', '--null-separated', action='store_true', - help="filenames specified in --files-from-file are separated by NULs") - parser.add_argument('--sys-path', action='store_true', - help="search the directories on sys.path") - - parser.add_argument('regex', help="the regular expression to search for") - parser.add_argument('files', nargs='*', help="the files to search") + help="do not follow symlinks to directories and files [default]", + ) + parser.add_argument( + "--follow", + action="store_true", + dest="follow_symlinks", + help="follow symlinks to directories and files", + ) + parser.add_argument( + "-f", + "--files-from-file", + metavar="FILE", + help="read files to search from a file, one per line; - for stdin", + ) + parser.add_argument( + "-0", + "--null-separated", + action="store_true", + help="filenames specified in --files-from-file are separated by NULs", + ) + parser.add_argument( + "--sys-path", action="store_true", help="search the directories on sys.path" + ) + + parser.add_argument("regex", help="the regular expression to search for") + parser.add_argument("files", nargs="*", help="the files to search") return parser + def get_grind_arg_parser(parser=None): """ Create the command-line parser for the find-like companion program. """ @@ -853,62 +1031,132 @@ def get_grind_arg_parser(parser=None): epilog="Bug reports to .", ) - parser.add_argument('-v', '--version', action='version', version='grin %s' % __version__, - help="show program's version number and exit") - parser.add_argument('-s', '--no-skip-hidden-files', - dest='skip_hidden_files', action='store_false', - help="do not skip .hidden files") - parser.add_argument('--skip-hidden-files', - dest='skip_hidden_files', action='store_true', default=True, - help="do skip .hidden files") - parser.add_argument('-b', '--no-skip-backup-files', - dest='skip_backup_files', action='store_false', - help="do not skip backup~ files [deprecated; edit --skip-exts]") - parser.add_argument('--skip-backup-files', - dest='skip_backup_files', action='store_true', default=True, - help="do skip backup~ files [default] [deprecated; edit --skip-exts]") - parser.add_argument('-S', '--no-skip-hidden-dirs', dest='skip_hidden_dirs', - action='store_false', - help="do not skip .hidden directories") - parser.add_argument('--skip-hidden-dirs', dest='skip_hidden_dirs', - default=True, action='store_true', - help="do skip .hidden directories") - parser.add_argument('-d', '--skip-dirs', - default='CVS,RCS,.svn,.hg,.bzr,build,dist', - help="comma-separated list of directory names to skip [default=%(default)r]") - parser.add_argument('-D', '--no-skip-dirs', dest='skip_dirs', - action='store_const', const='', - help="do not skip any directories") - parser.add_argument('-e', '--skip-exts', - default='.pyc,.pyo,.so,.o,.a,.tgz,.tar.gz,.rar,.zip,~,#,.bak,.png,.jpg,.gif,.bmp,.tif,.tiff,.pyd,.dll,.exe,.obj,.lib', - help="comma-separated list of file extensions to skip [default=%(default)r]") - parser.add_argument('-E', '--no-skip-exts', dest='skip_exts', - action='store_const', const='', - help="do not skip any file extensions") - parser.add_argument('--no-follow', action='store_false', dest='follow_symlinks', + parser.add_argument( + "-v", + "--version", + action="version", + version="grin %s" % __version__, + help="show program's version number and exit", + ) + parser.add_argument( + "-s", + "--no-skip-hidden-files", + dest="skip_hidden_files", + action="store_false", + help="do not skip .hidden files", + ) + parser.add_argument( + "--skip-hidden-files", + dest="skip_hidden_files", + action="store_true", + default=True, + help="do skip .hidden files", + ) + parser.add_argument( + "-b", + "--no-skip-backup-files", + dest="skip_backup_files", + action="store_false", + help="do not skip backup~ files [deprecated; edit --skip-exts]", + ) + parser.add_argument( + "--skip-backup-files", + dest="skip_backup_files", + action="store_true", + default=True, + help="do skip backup~ files [default] [deprecated; edit --skip-exts]", + ) + parser.add_argument( + "-S", + "--no-skip-hidden-dirs", + dest="skip_hidden_dirs", + action="store_false", + help="do not skip .hidden directories", + ) + parser.add_argument( + "--skip-hidden-dirs", + dest="skip_hidden_dirs", + default=True, + action="store_true", + help="do skip .hidden directories", + ) + parser.add_argument( + "-d", + "--skip-dirs", + default="CVS,RCS,.svn,.hg,.bzr,build,dist", + help="comma-separated list of directory names to skip [default=%(default)r]", + ) + parser.add_argument( + "-D", + "--no-skip-dirs", + dest="skip_dirs", + action="store_const", + const="", + help="do not skip any directories", + ) + parser.add_argument( + "-e", + "--skip-exts", + default=".pyc,.pyo,.so,.o,.a,.tgz,.tar.gz,.rar,.zip,~,#,.bak,.png,.jpg,.gif,.bmp,.tif,.tiff,.pyd,.dll,.exe,.obj,.lib", + help="comma-separated list of file extensions to skip [default=%(default)r]", + ) + parser.add_argument( + "-E", + "--no-skip-exts", + dest="skip_exts", + action="store_const", + const="", + help="do not skip any file extensions", + ) + parser.add_argument( + "--no-follow", + action="store_false", + dest="follow_symlinks", default=False, - help="do not follow symlinks to directories and files [default]") - parser.add_argument('--follow', action='store_true', dest='follow_symlinks', - help="follow symlinks to directories and files") - parser.add_argument('-0', '--null-separated', action='store_true', - help="print the filenames separated by NULs") - parser.add_argument('--dirs', nargs='+', default=["."], - help="the directories to start from") - parser.add_argument('--sys-path', action='store_true', - help="search the directories on sys.path") - - parser.add_argument('glob', default='*', nargs='?', + help="do not follow symlinks to directories and files [default]", + ) + parser.add_argument( + "--follow", + action="store_true", + dest="follow_symlinks", + help="follow symlinks to directories and files", + ) + parser.add_argument( + "-0", + "--null-separated", + action="store_true", + help="print the filenames separated by NULs", + ) + parser.add_argument( + "--dirs", nargs="+", default=["."], help="the directories to start from" + ) + parser.add_argument( + "--sys-path", action="store_true", help="search the directories on sys.path" + ) + + parser.add_argument( + "glob", + default="*", + nargs="?", help="the glob pattern to match; you may need to quote this to prevent " - "the shell from trying to expand it [default=%(default)r]") + "the shell from trying to expand it [default=%(default)r]", + ) return parser + def get_recognizer(args): """ Get the file recognizer object from the configured options. """ # Make sure we have empty sets when we have empty strings. - skip_dirs = set([x for x in args.skip_dirs.split(',') if x]) - skip_exts = set([x for x in args.skip_exts.split(',') if x]) + skip_dirs = args.skip_dirs + skip_exts = args.skip_exts + if sys.version_info.major > 2: + skip_dirs = skip_dirs.encode(sys.stdout.encoding) + skip_exts = skip_exts.encode(sys.stdout.encoding) + + skip_dirs = set([x for x in skip_dirs.split(b",") if x]) + skip_exts = set([x for x in skip_exts.split(b",") if x]) fr = FileRecognizer( skip_hidden_files=args.skip_hidden_files, skip_backup_files=args.skip_backup_files, @@ -920,6 +1168,7 @@ def get_recognizer(args): ) return fr + def get_filenames(args): """ Generate the filenames to grep. @@ -941,14 +1190,17 @@ def get_filenames(args): files = [] # If the user has given us a file with filenames, consume them first. if args.files_from_file is not None: - if args.files_from_file == '-': - files_file = sys.stdin + if args.files_from_file == b"-": + files_file_content = sys.stdin.read() + if sys.version_info.major > 2: + files_file_content = files_file_content.encode(sys.stdout.encoding) should_close = False elif os.path.exists(args.files_from_file): - files_file = open(args.files_from_file) + files_file = open(args.files_from_file, "rb") + files_file_content = files_file.read() should_close = True else: - raise IOError(2, 'No such file: %r' % args.files_from_file) + raise IOError(2, "No such file: %r" % args.files_from_file) try: # Remove '' @@ -956,49 +1208,60 @@ def get_filenames(args): # grin -f against a binary file and got an unhelpful error message # later. if args.null_separated: - files.extend([x.strip() for x in files_file.read().split('\0')]) + files.extend([x.strip() for x in files_file_content.split(b"\0")]) else: - files.extend([x.strip() for x in files_file]) + files.extend([x.strip() for x in files_file_content.split(b"\n")]) finally: if should_close: files_file.close() # Now add the filenames provided on the command line itself. - files.extend(args.files) + files = args.files + path_files = sys.path + if sys.version_info.major: + files = [f.encode(sys.stdout.encoding) for f in files] + path_files = [f.encode(sys.stdout.encoding) for f in path_files] + files.extend(files) if args.sys_path: - files.extend(sys.path) + files.extend(path_files) # Make sure we don't have any empty strings lying around. # Also skip certain special null files which may be added by programs like # Emacs. - if sys.platform == 'win32': - upper_bad = set(['NUL:', 'NUL']) - raw_bad = set(['']) + if sys.platform == "win32": + upper_bad = set([b"NUL:", b"NUL"]) + raw_bad = set([b""]) else: upper_bad = set() - raw_bad = set(['', '/dev/null']) + raw_bad = set([b"", b"/dev/null"]) files = [fn for fn in files if fn not in raw_bad and fn.upper() not in upper_bad] if len(files) == 0: # Add the current directory at least. - files = ['.'] + files = [b"."] # Go over our list of filenames and see if we can recognize each as # something we want to grep. fr = get_recognizer(args) for fn in files: # Special case text stdin. - if fn == '-': - yield fn, 'text' + if fn == b"-": + yield fn, "text" continue kind = fr.recognize(fn) - if kind in ('text', 'gzip') and fnmatch.fnmatch(os.path.basename(fn), args.include): + include = args.include + if sys.version_info.major > 2: + include = include.encode(sys.stdout.encoding) + if kind in ("text", "gzip") and fnmatch.fnmatch(os.path.basename(fn), include): yield fn, kind - elif kind == 'directory': + elif kind == "directory": for filename, k in fr.walk(fn): - if k in ('text', 'gzip') and fnmatch.fnmatch(os.path.basename(filename), args.include): + if k in ("text", "gzip") and fnmatch.fnmatch( + os.path.basename(filename), include + ): yield filename, k # XXX: warn about other files? # XXX: handle binary? + def get_regex(args): """ Get the compiled regex object to search with. """ @@ -1006,53 +1269,65 @@ def get_regex(args): flags = 0 for flag in args.re_flags: flags |= flag - return re.compile(args.regex, flags) + + regex = args.regex + if sys.version_info.major > 2: + regex = regex.encode("utf8") + return re.compile(regex, flags) def grin_main(argv=None): try: if argv is None: # Look at the GRIN_ARGS environment variable for more arguments. - env_args = shlex.split(os.getenv('GRIN_ARGS', '')) + env_args = shlex.split(os.getenv("GRIN_ARGS", "")) argv = [sys.argv[0]] + env_args + sys.argv[1:] parser = get_grin_arg_parser() args = parser.parse_args(argv[1:]) if args.context is not None: args.before_context = args.context args.after_context = args.context - args.use_color = args.force_color or (not args.no_color and - sys.stdout.isatty() and - (os.environ.get('TERM') != 'dumb')) + args.use_color = args.force_color or ( + not args.no_color + and sys.stdout.isatty() + and (os.environ.get("TERM") != "dumb") + ) regex = get_regex(args) g = GrepText(regex, args) openers = dict(text=open, gzip=gzip.open) for filename, kind in get_filenames(args): report = g.grep_a_file(filename, opener=openers[kind]) - sys.stdout.write(report) + if sys.version_info.major > 2: + sys.stdout.buffer.write(report) + else: + sys.stdout.write(report) except KeyboardInterrupt: raise SystemExit(0) - except IOError, e: - if 'Broken pipe' in str(e): + except IOError as e: + if "Broken pipe" in str(e): # The user is probably piping to a pager like less(1) and has exited # it. Just exit. raise SystemExit(0) raise + def print_line(filename): - print filename + print(filename) + def print_null(filename): - # Note that the final filename will have a trailing NUL, just like + # Note that the final filename will have a trailing NUL, just like # "find -print0" does. sys.stdout.write(filename) - sys.stdout.write('\0') + sys.stdout.write(b"\0") + def grind_main(argv=None): try: if argv is None: # Look at the GRIND_ARGS environment variable for more arguments. - env_args = shlex.split(os.getenv('GRIND_ARGS', '')) + env_args = shlex.split(os.getenv("GRIND_ARGS", "")) argv = [sys.argv[0]] + env_args + sys.argv[1:] parser = get_grind_arg_parser() args = parser.parse_args(argv[1:]) @@ -1066,19 +1341,26 @@ def grind_main(argv=None): if args.sys_path: args.dirs.extend(sys.path) + dirs = args.dirs + glob = args.glob + if sys.version_info.major > 2: + dirs = [d.encode(sys.stdout.encoding) for d in dirs] + glob = glob.encode(sys.stdout.encoding) + fr = get_recognizer(args) - for dir in args.dirs: + for dir in dirs: for filename, k in fr.walk(dir): - if fnmatch.fnmatch(os.path.basename(filename), args.glob): + if fnmatch.fnmatch(os.path.basename(filename), glob): output(filename) except KeyboardInterrupt: raise SystemExit(0) - except IOError, e: - if 'Broken pipe' in str(e): + except IOError as e: + if "Broken pipe" in str(e): # The user is probably piping to a pager like less(1) and has exited # it. Just exit. raise SystemExit(0) raise -if __name__ == '__main__': + +if __name__ == "__main__": grin_main() diff --git a/setup.py b/setup.py index 3d1f1eb..fdb32ca 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name='grin', - version='1.2.1', + version='1.3.0', author='Robert Kern', author_email='robert.kern@enthought.com', description="A grep program configured the way I like it.", diff --git a/tests/test_file_recognizer.py b/tests/test_file_recognizer.py index 70b7f5a..b08dad8 100644 --- a/tests/test_file_recognizer.py +++ b/tests/test_file_recognizer.py @@ -1,413 +1,532 @@ +# -*- coding: utf-8 + """ Test the file recognizer capabilities. """ +from __future__ import print_function, unicode_literals + + +import contextlib import gzip import os import shutil import socket import sys +from io import open + +from functools import partial + import nose -from grin import FileRecognizer +from grin import FileRecognizer, GZIP_MAGIC + +printerr = partial(print, file=sys.stderr) + +ALL_BYTES = bytes(bytearray(range(256))) + def empty_file(filename, open=open): - f = open(filename, 'wb') - f.close() + open(filename, "a").close() + def binary_file(filename, open=open): - f = open(filename, 'wb') - f.write(''.join(map(chr, range(256)))) - f.close() + with open(filename, "wb") as f: + f.write(ALL_BYTES) + def text_file(filename, open=open): - lines = ['foo\n', 'bar\n'] * 100 - lines.append('baz\n') - lines.extend(['foo\n', 'bar\n'] * 100) - f = open(filename, 'wb') - f.writelines(lines) - f.close() + lines = [b"foo\n", b"bar\n"] * 100 + lines.append(b"baz\n") + lines.extend([b"foo\n", b"bar\n"] * 100) + with open(filename, "wb") as f: + f.writelines(lines) + def fake_gzip_file(filename, open=open): """ Write out a binary file that has the gzip magic header bytes, but is not a gzip file. """ - GZIP_MAGIC = '\037\213' - f = open(filename, 'wb') - f.write(GZIP_MAGIC) - f.write(''.join(map(chr, range(256)))) - f.close() + with open(filename, "wb") as f: + f.write(GZIP_MAGIC) + f.write(ALL_BYTES) + def binary_middle(filename, open=open): """ Write out a file that is text for the first 100 bytes, then 100 binary bytes, then 100 text bytes to test that the recognizer only reads some of the file. """ - text = 'a'*100 + '\0'*100 + 'b'*100 - f = open(filename, 'wb') + text = b"a" * 100 + b"\0" * 100 + b"b" * 100 + f = open(filename, "wb") f.write(text) f.close() + def socket_file(filename): s = socket.socket(socket.AF_UNIX) s.bind(filename) + def unreadable_file(filename): """ Write a file that does not have read permissions. """ text_file(filename) - os.chmod(filename, 0200) + os.chmod(filename, 0o200) + try: + with open(filename) as f: + pass + except IOError as e: + if "Permission denied" not in str(e): + raise + else: + raise RuntimeError( + "grin tests cannot run on a filesystem that doesn't support chmod(). " + "You will encounter false negative" + ) + def unreadable_dir(filename): """ Make a directory that does not have read permissions. """ os.mkdir(filename) - os.chmod(filename, 0300) + os.chmod(filename, 0o300) + def unexecutable_dir(filename): """ Make a directory that does not have execute permissions. """ os.mkdir(filename) - os.chmod(filename, 0600) + os.chmod(filename, 0o600) + def totally_unusable_dir(filename): """ Make a directory that has neither read nor execute permissions. """ os.mkdir(filename) - os.chmod(filename, 0100) + os.chmod(filename, 0o100) + def setup(): + # Make sure we don't have files remaining from previous tests + teardown() + # Make files to test individual recognizers. - empty_file('empty') - binary_file('binary') - binary_middle('binary_middle') - text_file('text') - text_file('text~') - text_file('text#') - text_file('foo.bar.baz') - os.mkdir('dir') - binary_file('.binary') - text_file('.text') - empty_file('empty.gz', open=gzip.open) - binary_file('binary.gz', open=gzip.open) - text_file('text.gz', open=gzip.open) - binary_file('.binary.gz', open=gzip.open) - text_file('.text.gz', open=gzip.open) - fake_gzip_file('fake.gz') - os.mkdir('.dir') - os.symlink('binary', 'binary_link') - os.symlink('text', 'text_link') - os.symlink('dir', 'dir_link') - os.symlink('.binary', '.binary_link') - os.symlink('.text', '.text_link') - os.symlink('.dir', '.dir_link') - unreadable_file('unreadable_file') - unreadable_dir('unreadable_dir') - unexecutable_dir('unexecutable_dir') - totally_unusable_dir('totally_unusable_dir') - os.symlink('unreadable_file', 'unreadable_file_link') - os.symlink('unreadable_dir', 'unreadable_dir_link') - os.symlink('unexecutable_dir', 'unexecutable_dir_link') - os.symlink('totally_unusable_dir', 'totally_unusable_dir_link') - text_file('text.skip_ext') - os.mkdir('dir.skip_ext') - text_file('text.dont_skip_ext') - os.mkdir('skip_dir') - text_file('fake_skip_dir') - socket_file('socket_test') + empty_file(b"empty") + binary_file(b"binary") + binary_middle(b"binary_middle") + text_file(b"text") + text_file(b"text~") + text_file(b"text#") + text_file(b"foo.bar.baz") + os.mkdir(b"dir") + binary_file(b".binary") + text_file(b".text") + empty_file(b"empty.gz", open=gzip.open) + binary_file(b"binary.gz", open=gzip.open) + text_file(b"text.gz", open=gzip.open) + binary_file(b".binary.gz", open=gzip.open) + text_file(b".text.gz", open=gzip.open) + fake_gzip_file("fake.gz") + os.mkdir(b".dir") + os.symlink(b"binary", b"binary_link") + os.symlink(b"text", b"text_link") + os.symlink(b"dir", b"dir_link") + os.symlink(b".binary", b".binary_link") + os.symlink(b".text", b".text_link") + os.symlink(b".dir", b".dir_link") + unreadable_file(b"unreadable_file") + unreadable_dir(b"unreadable_dir") + unexecutable_dir(b"unexecutable_dir") + totally_unusable_dir(b"totally_unusable_dir") + os.symlink(b"unreadable_file", b"unreadable_file_link") + os.symlink(b"unreadable_dir", b"unreadable_dir_link") + os.symlink(b"unexecutable_dir", b"unexecutable_dir_link") + os.symlink(b"totally_unusable_dir", b"totally_unusable_dir_link") + text_file(b"text.skip_ext") + os.mkdir(b"dir.skip_ext") + text_file(b"text.dont_skip_ext") + os.mkdir(b"skip_dir") + text_file(b"fake_skip_dir") + socket_file("socket_test") # Make a directory tree to test tree-walking. - os.mkdir('tree') - os.mkdir('tree/.hidden_dir') - os.mkdir('tree/dir') - os.mkdir('tree/dir/subdir') - text_file('tree/dir/text') - text_file('tree/dir/subdir/text') - text_file('tree/text') - text_file('tree/text.skip_ext') - os.mkdir('tree/dir.skip_ext') - text_file('tree/dir.skip_ext/text') - text_file('tree/text.dont_skip_ext') - binary_file('tree/binary') - os.mkdir('tree/skip_dir') - text_file('tree/skip_dir/text') - os.mkdir('tree/.skip_hidden_dir') - text_file('tree/.skip_hidden_file') - os.mkdir('tree/unreadable_dir') - text_file('tree/unreadable_dir/text') - os.chmod('tree/unreadable_dir', 0300) - os.mkdir('tree/unexecutable_dir') - text_file('tree/unexecutable_dir/text') - os.chmod('tree/unexecutable_dir', 0600) - os.mkdir('tree/totally_unusable_dir') - text_file('tree/totally_unusable_dir/text') - os.chmod('tree/totally_unusable_dir', 0100) - -def ensure_deletability(arg, dirname, fnames): - """ os.path.walk() callback function which will make sure every directory is - readable and executable so that it may be easily deleted. - """ - for fn in fnames: - fn = os.path.join(dirname, fn) - if os.path.isdir(fn): - os.chmod(fn, 0700) + os.mkdir(b"tree") + os.mkdir(b"tree/.hidden_dir") + os.mkdir(b"tree/dir") + os.mkdir(b"tree/dir/subdir") + text_file(b"tree/dir/text") + text_file(b"tree/dir/subdir/text") + text_file(b"tree/text") + text_file(b"tree/text.skip_ext") + os.mkdir(b"tree/dir.skip_ext") + text_file(b"tree/dir.skip_ext/text") + text_file(b"tree/text.dont_skip_ext") + binary_file(b"tree/binary") + os.mkdir(b"tree/skip_dir") + text_file(b"tree/skip_dir/text") + os.mkdir(b"tree/.skip_hidden_dir") + text_file(b"tree/.skip_hidden_file") + os.mkdir(b"tree/unreadable_dir") + text_file(b"tree/unreadable_dir/text") + os.chmod("tree/unreadable_dir", 0o300) + os.mkdir(b"tree/unexecutable_dir") + text_file(b"tree/unexecutable_dir/text") + os.chmod(b"tree/unexecutable_dir", 0o600) + os.mkdir(b"tree/totally_unusable_dir") + text_file(b"tree/totally_unusable_dir/text") + os.chmod(b"tree/totally_unusable_dir", 0o100) + +@contextlib.contextmanager +def catch_and_log_env_error(message=None, ignore="No such file or directory", args=()): + """ Catch IOError, print a message, optionnaly reraise. Ignore some types """ + try: + yield + except EnvironmentError as e: + if ignore not in str(e): + if message is None: + raise e + printerr(message % (tuple(args) + (e,))) + def teardown(): - files_to_delete = ['empty', 'binary', 'binary_middle', 'text', 'text~', - 'empty.gz', 'binary.gz', 'text.gz', 'dir', 'binary_link', 'text_link', - 'dir_link', '.binary', '.text', '.binary.gz', '.text.gz', 'fake.gz', - '.dir', '.binary_link', '.text_link', '.dir_link', 'unreadable_file', - 'unreadable_dir', 'unexecutable_dir', 'totally_unusable_dir', - 'unreadable_file_link', 'unreadable_dir_link', 'unexecutable_dir_link', - 'totally_unusable_dir_link', 'text.skip_ext', 'text.dont_skip_ext', - 'dir.skip_ext', 'skip_dir', 'fake_skip_dir', 'text#', 'foo.bar.baz', + files_to_delete = [ + b"empty", + b"binary", + b"binary_middle", + b"text", + b"text~", + b"empty.gz", + b"binary.gz", + b"text.gz", + b"dir", + b"binary_link", + b"text_link", + b"dir_link", + b".binary", + b".text", + b".binary.gz", + b".text.gz", + b"fake.gz", + b".dir", + b".binary_link", + b".text_link", + b".dir_link", + b"unreadable_file", + b"unreadable_dir", + b"unexecutable_dir", + b"totally_unusable_dir", + b"unreadable_file_link", + b"unreadable_dir_link", + b"unexecutable_dir_link", + b"totally_unusable_dir_link", + b"text.skip_ext", + b"text.dont_skip_ext", + b"dir.skip_ext", + b"skip_dir", + b"fake_skip_dir", + b"text#", + b"foo.bar.baz", + b"tree", + b"socket_test" ] for filename in files_to_delete: - try: - if os.path.islink(filename) or os.path.isfile(filename): - os.unlink(filename) - else: - os.rmdir(filename) - except Exception, e: - print >>sys.stderr, 'Could not delete %s: %s' % (filename, e) - os.unlink('socket_test') - os.path.walk('tree', ensure_deletability, None) - shutil.rmtree('tree') + with catch_and_log_env_error(): + os.chmod(filename, 0o777) + + if os.path.isdir(filename): + + if not filename.startswith(b'/'): + + # Make sure we have permission to delete everything + for dirname, dirs, files in os.walk(filename, followlinks=True): + paths = [os.path.join(dirname, p) for p in (dirs + files)] + os.chmod(dirname, 0o777) + for path in paths: + os.chmod(path, 0o777) + + with catch_and_log_env_error("Could not delete %r: %r", args=(filename,)): + shutil.rmtree(filename) + + else: + with catch_and_log_env_error("Could not delete %r: %r", args=(filename,)): + os.unlink(filename) def test_binary(): fr = FileRecognizer() - assert fr.is_binary('binary') - assert fr.recognize_file('binary') == 'binary' - assert fr.recognize('binary') == 'binary' + assert fr.is_binary(b"binary") + assert fr.recognize_file(b"binary") == "binary" + assert fr.recognize(b"binary") == "binary" + def test_text(): fr = FileRecognizer() - assert not fr.is_binary('text') - assert fr.recognize_file('text') == 'text' - assert fr.recognize('text') == 'text' + assert not fr.is_binary(b"text") + assert fr.recognize_file(b"text") == "text" + assert fr.recognize(b"text") == "text" + def test_gzipped(): fr = FileRecognizer() - assert fr.is_binary('text.gz') - assert fr.recognize_file('text.gz') == 'gzip' - assert fr.recognize('text.gz') == 'gzip' - assert fr.is_binary('binary.gz') - assert fr.recognize_file('binary.gz') == 'binary' - assert fr.recognize('binary.gz') == 'binary' - assert fr.is_binary('fake.gz') - assert fr.recognize_file('fake.gz') == 'binary' - assert fr.recognize('fake.gz') == 'binary' + assert fr.is_binary(b"text.gz") + assert fr.recognize_file(b"text.gz") == "gzip" + assert fr.recognize(b"text.gz") == "gzip" + assert fr.is_binary(b"binary.gz") + assert fr.recognize_file(b"binary.gz") == "binary" + assert fr.recognize(b"binary.gz") == "binary" + assert fr.is_binary(b"fake.gz") + assert fr.recognize_file(b"fake.gz") == "binary" + assert fr.recognize(b"fake.gz") == "binary" + def test_binary_middle(): fr = FileRecognizer(binary_bytes=100) - assert not fr.is_binary('binary_middle') - assert fr.recognize_file('binary_middle') == 'text' - assert fr.recognize('binary_middle') == 'text' + assert not fr.is_binary(b"binary_middle") + assert fr.recognize_file(b"binary_middle") == "text" + assert fr.recognize(b"binary_middle") == "text" fr = FileRecognizer(binary_bytes=101) - assert fr.is_binary('binary_middle') - assert fr.recognize_file('binary_middle') == 'binary' - assert fr.recognize('binary_middle') == 'binary' + assert fr.is_binary(b"binary_middle") + assert fr.recognize_file(b"binary_middle") == "binary" + assert fr.recognize(b"binary_middle") == "binary" + def test_socket(): - fr= FileRecognizer() - assert fr.recognize('socket_test') == 'skip' + fr = FileRecognizer() + assert fr.recognize(b"socket_test") == "skip" + def test_dir(): fr = FileRecognizer() - assert fr.recognize_directory('dir') == 'directory' - assert fr.recognize('dir') == 'directory' + assert fr.recognize_directory(b"dir") == "directory" + assert fr.recognize(b"dir") == "directory" + def test_skip_symlinks(): fr = FileRecognizer(skip_symlink_files=True, skip_symlink_dirs=True) - assert fr.recognize('binary_link') == 'link' - assert fr.recognize_file('binary_link') == 'link' - assert fr.recognize('text_link') == 'link' - assert fr.recognize_file('text_link') == 'link' - assert fr.recognize('dir_link') == 'link' - assert fr.recognize_directory('dir_link') == 'link' + assert fr.recognize(b"binary_link") == "link" + assert fr.recognize_file(b"binary_link") == "link" + assert fr.recognize(b"text_link") == "link" + assert fr.recognize_file(b"text_link") == "link" + assert fr.recognize(b"dir_link") == "link" + assert fr.recognize_directory(b"dir_link") == "link" + def test_do_not_skip_symlinks(): fr = FileRecognizer(skip_symlink_files=False, skip_symlink_dirs=False) - assert fr.recognize('binary_link') == 'binary' - assert fr.recognize_file('binary_link') == 'binary' - assert fr.recognize('text_link') == 'text' - assert fr.recognize_file('text_link') == 'text' - assert fr.recognize('dir_link') == 'directory' - assert fr.recognize_directory('dir_link') == 'directory' + assert fr.recognize(b"binary_link") == "binary" + assert fr.recognize_file(b"binary_link") == "binary" + assert fr.recognize(b"text_link") == "text" + assert fr.recognize_file(b"text_link") == "text" + assert fr.recognize(b"dir_link") == "directory" + assert fr.recognize_directory(b"dir_link") == "directory" + def test_skip_hidden(): fr = FileRecognizer(skip_hidden_files=True, skip_hidden_dirs=True) - assert fr.recognize('.binary') == 'skip' - assert fr.recognize_file('.binary') == 'skip' - assert fr.recognize('.text') == 'skip' - assert fr.recognize_file('.text') == 'skip' - assert fr.recognize('.dir') == 'skip' - assert fr.recognize_directory('.dir') == 'skip' - assert fr.recognize('.binary_link') == 'skip' - assert fr.recognize_file('.binary_link') == 'skip' - assert fr.recognize('.text_link') == 'skip' - assert fr.recognize_file('.text_link') == 'skip' - assert fr.recognize('.dir_link') == 'skip' - assert fr.recognize_directory('.dir_link') == 'skip' - assert fr.recognize('.text.gz') == 'skip' - assert fr.recognize_file('.text.gz') == 'skip' - assert fr.recognize('.binary.gz') == 'skip' - assert fr.recognize_file('.binary.gz') == 'skip' + assert fr.recognize(b".binary") == "skip" + assert fr.recognize_file(b".binary") == "skip" + assert fr.recognize(b".text") == "skip" + assert fr.recognize_file(b".text") == "skip" + assert fr.recognize(b".dir") == "skip" + assert fr.recognize_directory(b".dir") == "skip" + assert fr.recognize(b".binary_link") == "skip" + assert fr.recognize_file(b".binary_link") == "skip" + assert fr.recognize(b".text_link") == "skip" + assert fr.recognize_file(b".text_link") == "skip" + assert fr.recognize(b".dir_link") == "skip" + assert fr.recognize_directory(b".dir_link") == "skip" + assert fr.recognize(b".text.gz") == "skip" + assert fr.recognize_file(b".text.gz") == "skip" + assert fr.recognize(b".binary.gz") == "skip" + assert fr.recognize_file(b".binary.gz") == "skip" + def test_skip_backup(): fr = FileRecognizer(skip_backup_files=True) - assert fr.recognize_file('text~') == 'skip' + assert fr.recognize_file(b"text~") == "skip" + def test_do_not_skip_backup(): fr = FileRecognizer(skip_backup_files=False) - assert fr.recognize_file('text~') == 'text' + assert fr.recognize_file(b"text~") == "text" + def test_skip_weird_exts(): fr = FileRecognizer(skip_exts=set()) - assert fr.recognize_file('text#') == 'text' - assert fr.recognize_file('foo.bar.baz') == 'text' - fr = FileRecognizer(skip_exts=set(['#', '.bar.baz'])) - assert fr.recognize_file('text#') == 'skip' - assert fr.recognize_file('foo.bar.baz') == 'skip' + assert fr.recognize_file(b"text#") == "text" + assert fr.recognize_file(b"foo.bar.baz") == "text" + fr = FileRecognizer(skip_exts=set([b"#", b".bar.baz"])) + assert fr.recognize_file(b"text#") == "skip" + assert fr.recognize_file(b"foo.bar.baz") == "skip" + def test_do_not_skip_hidden_or_symlinks(): - fr = FileRecognizer(skip_hidden_files=False, skip_hidden_dirs=False, - skip_symlink_dirs=False, skip_symlink_files=False) - assert fr.recognize('.binary') == 'binary' - assert fr.recognize_file('.binary') == 'binary' - assert fr.recognize('.text') == 'text' - assert fr.recognize_file('.text') == 'text' - assert fr.recognize('.dir') == 'directory' - assert fr.recognize_directory('.dir') == 'directory' - assert fr.recognize('.binary_link') == 'binary' - assert fr.recognize_file('.binary_link') == 'binary' - assert fr.recognize('.text_link') == 'text' - assert fr.recognize_file('.text_link') == 'text' - assert fr.recognize('.dir_link') == 'directory' - assert fr.recognize_directory('.dir_link') == 'directory' - assert fr.recognize('.text.gz') == 'gzip' - assert fr.recognize_file('.text.gz') == 'gzip' - assert fr.recognize('.binary.gz') == 'binary' - assert fr.recognize_file('.binary.gz') == 'binary' + fr = FileRecognizer( + skip_hidden_files=False, + skip_hidden_dirs=False, + skip_symlink_dirs=False, + skip_symlink_files=False, + ) + assert fr.recognize(b".binary") == "binary" + assert fr.recognize_file(b".binary") == "binary" + assert fr.recognize(b".text") == "text" + assert fr.recognize_file(b".text") == "text" + assert fr.recognize(b".dir") == "directory" + assert fr.recognize_directory(b".dir") == "directory" + assert fr.recognize(b".binary_link") == "binary" + assert fr.recognize_file(b".binary_link") == "binary" + assert fr.recognize(b".text_link") == "text" + assert fr.recognize_file(b".text_link") == "text" + assert fr.recognize(b".dir_link") == "directory" + assert fr.recognize_directory(b".dir_link") == "directory" + assert fr.recognize(b".text.gz") == "gzip" + assert fr.recognize_file(b".text.gz") == "gzip" + assert fr.recognize(b".binary.gz") == "binary" + assert fr.recognize_file(b".binary.gz") == "binary" + def test_do_not_skip_hidden_but_skip_symlinks(): - fr = FileRecognizer(skip_hidden_files=False, skip_hidden_dirs=False, - skip_symlink_dirs=True, skip_symlink_files=True) - assert fr.recognize('.binary') == 'binary' - assert fr.recognize_file('.binary') == 'binary' - assert fr.recognize('.text') == 'text' - assert fr.recognize_file('.text') == 'text' - assert fr.recognize('.dir') == 'directory' - assert fr.recognize_directory('.dir') == 'directory' - assert fr.recognize('.binary_link') == 'link' - assert fr.recognize_file('.binary_link') == 'link' - assert fr.recognize('.text_link') == 'link' - assert fr.recognize_file('.text_link') == 'link' - assert fr.recognize('.dir_link') == 'link' - assert fr.recognize_directory('.dir_link') == 'link' - assert fr.recognize('.text.gz') == 'gzip' - assert fr.recognize_file('.text.gz') == 'gzip' - assert fr.recognize('.binary.gz') == 'binary' - assert fr.recognize_file('.binary.gz') == 'binary' + fr = FileRecognizer( + skip_hidden_files=False, + skip_hidden_dirs=False, + skip_symlink_dirs=True, + skip_symlink_files=True, + ) + assert fr.recognize(b".binary") == "binary" + assert fr.recognize_file(b".binary") == "binary" + assert fr.recognize(b".text") == "text" + assert fr.recognize_file(b".text") == "text" + assert fr.recognize(b".dir") == "directory" + assert fr.recognize_directory(b".dir") == "directory" + assert fr.recognize(b".binary_link") == "link" + assert fr.recognize_file(b".binary_link") == "link" + assert fr.recognize(b".text_link") == "link" + assert fr.recognize_file(b".text_link") == "link" + assert fr.recognize(b".dir_link") == "link" + assert fr.recognize_directory(b".dir_link") == "link" + assert fr.recognize(b".text.gz") == "gzip" + assert fr.recognize_file(b".text.gz") == "gzip" + assert fr.recognize(b".binary.gz") == "binary" + assert fr.recognize_file(b".binary.gz") == "binary" + def test_lack_of_permissions(): fr = FileRecognizer() - assert fr.recognize('unreadable_file') == 'unreadable' - assert fr.recognize_file('unreadable_file') == 'unreadable' - assert fr.recognize('unreadable_dir') == 'directory' - assert fr.recognize_directory('unreadable_dir') == 'directory' - assert fr.recognize('unexecutable_dir') == 'directory' - assert fr.recognize_directory('unexecutable_dir') == 'directory' - assert fr.recognize('totally_unusable_dir') == 'directory' - assert fr.recognize_directory('totally_unusable_dir') == 'directory' + assert fr.recognize(b"unreadable_file") == "unreadable" + assert fr.recognize_file(b"unreadable_file") == "unreadable" + assert fr.recognize(b"unreadable_dir") == "directory" + assert fr.recognize_directory(b"unreadable_dir") == "directory" + assert fr.recognize(b"unexecutable_dir") == "directory" + assert fr.recognize_directory(b"unexecutable_dir") == "directory" + assert fr.recognize(b"totally_unusable_dir") == "directory" + assert fr.recognize_directory(b"totally_unusable_dir") == "directory" + def test_symlink_src_unreadable(): fr = FileRecognizer(skip_symlink_files=False, skip_symlink_dirs=False) - assert fr.recognize('unreadable_file_link') == 'unreadable' - assert fr.recognize_file('unreadable_file_link') == 'unreadable' - assert fr.recognize('unreadable_dir_link') == 'directory' - assert fr.recognize_directory('unreadable_dir_link') == 'directory' - assert fr.recognize('unexecutable_dir_link') == 'directory' - assert fr.recognize_directory('unexecutable_dir_link') == 'directory' - assert fr.recognize('totally_unusable_dir_link') == 'directory' - assert fr.recognize_directory('totally_unusable_dir_link') == 'directory' + assert fr.recognize(b"unreadable_file_link") == "unreadable" + assert fr.recognize_file(b"unreadable_file_link") == "unreadable" + assert fr.recognize(b"unreadable_dir_link") == "directory" + assert fr.recognize_directory(b"unreadable_dir_link") == "directory" + assert fr.recognize(b"unexecutable_dir_link") == "directory" + assert fr.recognize_directory(b"unexecutable_dir_link") == "directory" + assert fr.recognize(b"totally_unusable_dir_link") == "directory" + assert fr.recognize_directory(b"totally_unusable_dir_link") == "directory" + def test_skip_ext(): - fr = FileRecognizer(skip_exts=set(['.skip_ext'])) - assert fr.recognize('text.skip_ext') == 'skip' - assert fr.recognize_file('text.skip_ext') == 'skip' - assert fr.recognize('text') == 'text' - assert fr.recognize_file('text') == 'text' - assert fr.recognize('text.dont_skip_ext') == 'text' - assert fr.recognize_file('text.dont_skip_ext') == 'text' - assert fr.recognize('dir.skip_ext') == 'directory' - assert fr.recognize_directory('dir.skip_ext') == 'directory' + fr = FileRecognizer(skip_exts=set([b".skip_ext"])) + assert fr.recognize(b"text.skip_ext") == "skip" + assert fr.recognize_file(b"text.skip_ext") == "skip" + assert fr.recognize(b"text") == "text" + assert fr.recognize_file(b"text") == "text" + assert fr.recognize(b"text.dont_skip_ext") == "text" + assert fr.recognize_file(b"text.dont_skip_ext") == "text" + assert fr.recognize(b"dir.skip_ext") == "directory" + assert fr.recognize_directory(b"dir.skip_ext") == "directory" + def test_skip_dir(): - fr = FileRecognizer(skip_dirs=set(['skip_dir', 'fake_skip_dir'])) - assert fr.recognize('skip_dir') == 'skip' - assert fr.recognize_directory('skip_dir') == 'skip' - assert fr.recognize('fake_skip_dir') == 'text' - assert fr.recognize_file('fake_skip_dir') == 'text' + fr = FileRecognizer(skip_dirs=set([b"skip_dir", b"fake_skip_dir"])) + assert fr.recognize(b"skip_dir") == "skip" + assert fr.recognize_directory(b"skip_dir") == "skip" + assert fr.recognize(b"fake_skip_dir") == "text" + assert fr.recognize_file(b"fake_skip_dir") == "text" + def test_walking(): - fr = FileRecognizer(skip_hidden_files=True, skip_hidden_dirs=True, - skip_exts=set(['.skip_ext']),skip_dirs=set(['skip_dir'])) + fr = FileRecognizer( + skip_hidden_files=True, + skip_hidden_dirs=True, + skip_exts=set([b".skip_ext"]), + skip_dirs=set([b"skip_dir"]), + ) truth = [ - ('tree/binary', 'binary'), - ('tree/dir.skip_ext/text', 'text'), - ('tree/dir/subdir/text', 'text'), - ('tree/dir/text', 'text'), - ('tree/text', 'text'), - ('tree/text.dont_skip_ext', 'text'), + (b"tree/binary", "binary"), + (b"tree/dir.skip_ext/text", "text"), + (b"tree/dir/subdir/text", "text"), + (b"tree/dir/text", "text"), + (b"tree/text", "text"), + (b"tree/text.dont_skip_ext", "text"), ] - result = sorted(fr.walk('tree')) + result = sorted(fr.walk(b"tree")) assert result == truth def predot(): - os.chdir('tree') + os.chdir(b"tree") + def postdot(): - os.chdir('..') + os.chdir(b"..") + @nose.with_setup(predot, postdot) def test_dot(): - fr = FileRecognizer(skip_hidden_files=True, skip_hidden_dirs=True, - skip_exts=set(['.skip_ext']),skip_dirs=set(['skip_dir'])) + fr = FileRecognizer( + skip_hidden_files=True, + skip_hidden_dirs=True, + skip_exts=set([b".skip_ext"]), + skip_dirs=set([b"skip_dir"]), + ) truth = [ - ('./binary', 'binary'), - ('./dir.skip_ext/text', 'text'), - ('./dir/subdir/text', 'text'), - ('./dir/text', 'text'), - ('./text', 'text'), - ('./text.dont_skip_ext', 'text'), + (b"./binary", "binary"), + (b"./dir.skip_ext/text", "text"), + (b"./dir/subdir/text", "text"), + (b"./dir/text", "text"), + (b"./text", "text"), + (b"./text.dont_skip_ext", "text"), ] - result = sorted(fr.walk('.')) + result = sorted(fr.walk(b".")) assert result == truth + def predotdot(): - os.chdir('tree') - os.chdir('dir') + os.chdir(b"tree") + os.chdir(b"dir") + def postdotdot(): - os.chdir('..') - os.chdir('..') + os.chdir(b"..") + os.chdir(b"..") + @nose.with_setup(predotdot, postdotdot) def test_dot_dot(): - fr = FileRecognizer(skip_hidden_files=True, skip_hidden_dirs=True, - skip_exts=set(['.skip_ext']),skip_dirs=set(['skip_dir'])) + fr = FileRecognizer( + skip_hidden_files=True, + skip_hidden_dirs=True, + skip_exts=set([b".skip_ext"]), + skip_dirs=set([b"skip_dir"]), + ) truth = [ - ('../binary', 'binary'), - ('../dir.skip_ext/text', 'text'), - ('../dir/subdir/text', 'text'), - ('../dir/text', 'text'), - ('../text', 'text'), - ('../text.dont_skip_ext', 'text'), + (b"../binary", "binary"), + (b"../dir.skip_ext/text", "text"), + (b"../dir/subdir/text", "text"), + (b"../dir/text", "text"), + (b"../text", "text"), + (b"../text.dont_skip_ext", "text"), ] - result = sorted(fr.walk('..')) + result = sorted(fr.walk(b"..")) assert result == truth - - diff --git a/tests/test_grep.py b/tests/test_grep.py index aa367f2..14f1477 100644 --- a/tests/test_grep.py +++ b/tests/test_grep.py @@ -1,182 +1,290 @@ # Doctests are a bit easier to write for these tests. -r''' -Set up - - >>> import grin - >>> from cStringIO import StringIO - >>> import re - >>> - >>> all_foo = """\ - ... foo - ... foo - ... foo - ... foo - ... foo - ... """ - >>> first_foo = """\ - ... foo - ... bar - ... bar - ... bar - ... bar - ... """ - >>> last_foo = """\ - ... bar - ... bar - ... bar - ... bar - ... foo - ... """ - >>> second_foo = """\ - ... bar - ... foo - ... bar - ... bar - ... bar - ... """ - >>> second_last_foo = """\ - ... bar - ... bar - ... bar - ... foo - ... bar - ... """ - >>> middle_foo = """\ - ... bar - ... bar - ... foo - ... bar - ... bar - ... """ - >>> small_gap = """\ - ... bar - ... bar - ... foo - ... bar - ... foo - ... bar - ... bar - ... """ - >>> no_eol = "foo" - >>> middle_of_line = """\ - ... bar - ... bar - ... barfoobar - ... bar - ... bar - ... """ - -Test the basic defaults, no context. - - >>> gt_default = grin.GrepText(re.compile('foo')) - >>> gt_default.do_grep(StringIO(all_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 0, 'foo\n', [(0, 3)]), (2, 0, 'foo\n', [(0, 3)]), (3, 0, 'foo\n', [(0, 3)]), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(first_foo)) - [(0, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(last_foo)) - [(4, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(second_foo)) - [(1, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(second_last_foo)) - [(3, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(middle_foo)) - [(2, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(small_gap)) - [(2, 0, 'foo\n', [(0, 3)]), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_default.do_grep(StringIO(no_eol)) - [(0, 0, 'foo', [(0, 3)])] - >>> gt_default.do_grep(StringIO(middle_of_line)) - [(2, 0, 'barfoobar\n', [(3, 6)])] - -Symmetric 1-line context. - - >>> gt_context_1 = grin.GrepText(re.compile('foo'), options=grin.Options(before_context=1, after_context=1)) - >>> gt_context_1.do_grep(StringIO(all_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 0, 'foo\n', [(0, 3)]), (2, 0, 'foo\n', [(0, 3)]), (3, 0, 'foo\n', [(0, 3)]), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_context_1.do_grep(StringIO(first_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 1, 'bar\n', None)] - >>> gt_context_1.do_grep(StringIO(last_foo)) - [(3, -1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_context_1.do_grep(StringIO(second_foo)) - [(0, -1, 'bar\n', None), (1, 0, 'foo\n', [(0, 3)]), (2, 1, 'bar\n', None)] - >>> gt_context_1.do_grep(StringIO(second_last_foo)) - [(2, -1, 'bar\n', None), (3, 0, 'foo\n', [(0, 3)]), (4, 1, 'bar\n', None)] - >>> gt_context_1.do_grep(StringIO(middle_foo)) - [(1, -1, 'bar\n', None), (2, 0, 'foo\n', [(0, 3)]), (3, 1, 'bar\n', None)] - >>> gt_context_1.do_grep(StringIO(small_gap)) - [(1, -1, 'bar\n', None), (2, 0, 'foo\n', [(0, 3)]), (3, 1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)]), (5, 1, 'bar\n', None)] - >>> gt_context_1.do_grep(StringIO(no_eol)) - [(0, 0, 'foo', [(0, 3)])] - >>> gt_context_1.do_grep(StringIO(middle_of_line)) - [(1, -1, 'bar\n', None), (2, 0, 'barfoobar\n', [(3, 6)]), (3, 1, 'bar\n', None)] - -Symmetric 2-line context. - - >>> gt_context_2 = grin.GrepText(re.compile('foo'), options=grin.Options(before_context=2, after_context=2)) - >>> gt_context_2.do_grep(StringIO(all_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 0, 'foo\n', [(0, 3)]), (2, 0, 'foo\n', [(0, 3)]), (3, 0, 'foo\n', [(0, 3)]), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_context_2.do_grep(StringIO(first_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 1, 'bar\n', None), (2, 1, 'bar\n', None)] - >>> gt_context_2.do_grep(StringIO(last_foo)) - [(2, -1, 'bar\n', None), (3, -1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_context_2.do_grep(StringIO(second_foo)) - [(0, -1, 'bar\n', None), (1, 0, 'foo\n', [(0, 3)]), (2, 1, 'bar\n', None), (3, 1, 'bar\n', None)] - >>> gt_context_2.do_grep(StringIO(second_last_foo)) - [(1, -1, 'bar\n', None), (2, -1, 'bar\n', None), (3, 0, 'foo\n', [(0, 3)]), (4, 1, 'bar\n', None)] - >>> gt_context_2.do_grep(StringIO(middle_foo)) - [(0, -1, 'bar\n', None), (1, -1, 'bar\n', None), (2, 0, 'foo\n', [(0, 3)]), (3, 1, 'bar\n', None), (4, 1, 'bar\n', None)] - >>> gt_context_2.do_grep(StringIO(small_gap)) - [(0, -1, 'bar\n', None), (1, -1, 'bar\n', None), (2, 0, 'foo\n', [(0, 3)]), (3, 1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)]), (5, 1, 'bar\n', None), (6, 1, 'bar\n', None)] - >>> gt_context_2.do_grep(StringIO(no_eol)) - [(0, 0, 'foo', [(0, 3)])] - >>> gt_context_2.do_grep(StringIO(middle_of_line)) - [(0, -1, 'bar\n', None), (1, -1, 'bar\n', None), (2, 0, 'barfoobar\n', [(3, 6)]), (3, 1, 'bar\n', None), (4, 1, 'bar\n', None)] - -1 line of before-context, no lines after. - - >>> gt_before_context_1 = grin.GrepText(re.compile('foo'), options=grin.Options(before_context=1, after_context=0)) - >>> gt_before_context_1.do_grep(StringIO(all_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 0, 'foo\n', [(0, 3)]), (2, 0, 'foo\n', [(0, 3)]), (3, 0, 'foo\n', [(0, 3)]), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(first_foo)) - [(0, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(last_foo)) - [(3, -1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(second_foo)) - [(0, -1, 'bar\n', None), (1, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(second_last_foo)) - [(2, -1, 'bar\n', None), (3, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(middle_foo)) - [(1, -1, 'bar\n', None), (2, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(small_gap)) - [(1, -1, 'bar\n', None), (2, 0, 'foo\n', [(0, 3)]), (3, -1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(no_eol)) - [(0, 0, 'foo', [(0, 3)])] - >>> gt_before_context_1.do_grep(StringIO(middle_of_line)) - [(1, -1, 'bar\n', None), (2, 0, 'barfoobar\n', [(3, 6)])] - -1 line of after-context, no lines before. - - >>> gt_after_context_1 = grin.GrepText(re.compile('foo'), options=grin.Options(before_context=0, after_context=1)) - >>> gt_after_context_1.do_grep(StringIO(all_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 0, 'foo\n', [(0, 3)]), (2, 0, 'foo\n', [(0, 3)]), (3, 0, 'foo\n', [(0, 3)]), (4, 0, 'foo\n', [(0, 3)])] - >>> gt_after_context_1.do_grep(StringIO(first_foo)) - [(0, 0, 'foo\n', [(0, 3)]), (1, 1, 'bar\n', None)] - >>> gt_after_context_1.do_grep(StringIO(last_foo)) - [(4, 0, 'foo\n', [(0, 3)])] - >>> gt_after_context_1.do_grep(StringIO(second_foo)) - [(1, 0, 'foo\n', [(0, 3)]), (2, 1, 'bar\n', None)] - >>> gt_after_context_1.do_grep(StringIO(second_last_foo)) - [(3, 0, 'foo\n', [(0, 3)]), (4, 1, 'bar\n', None)] - >>> gt_after_context_1.do_grep(StringIO(middle_foo)) - [(2, 0, 'foo\n', [(0, 3)]), (3, 1, 'bar\n', None)] - >>> gt_after_context_1.do_grep(StringIO(small_gap)) - [(2, 0, 'foo\n', [(0, 3)]), (3, 1, 'bar\n', None), (4, 0, 'foo\n', [(0, 3)]), (5, 1, 'bar\n', None)] - >>> gt_after_context_1.do_grep(StringIO(no_eol)) - [(0, 0, 'foo', [(0, 3)])] - >>> gt_after_context_1.do_grep(StringIO(middle_of_line)) - [(2, 0, 'barfoobar\n', [(3, 6)]), (3, 1, 'bar\n', None)] - -''' +from __future__ import unicode_literals +from io import BytesIO +import re +import grin + + +all_foo = b"""\ +foo +foo +foo +foo +foo +""" +first_foo = b"""\ +foo +bar +bar +bar +bar +""" +last_foo = b"""\ +bar +bar +bar +bar +foo +""" +second_foo = b"""\ +bar +foo +bar +bar +bar +""" +second_last_foo = b"""\ +bar +bar +bar +foo +bar +""" +middle_foo = b"""\ +bar +bar +foo +bar +bar +""" +small_gap = b"""\ +bar +bar +foo +bar +foo +bar +bar +""" +no_eol = b"foo" +middle_of_line = b"""\ +bar +bar +barfoobar +bar +bar +""" + + +def test_basic_defaults_with_no_context(): + + gt_default = grin.GrepText(re.compile(b"foo")) + assert gt_default.do_grep(BytesIO(all_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 0, b"foo\n", [(0, 3)]), + (2, 0, b"foo\n", [(0, 3)]), + (3, 0, b"foo\n", [(0, 3)]), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_default.do_grep(BytesIO(first_foo)) == [(0, 0, b"foo\n", [(0, 3)])] + assert gt_default.do_grep(BytesIO(last_foo)) == [(4, 0, b"foo\n", [(0, 3)])] + assert gt_default.do_grep(BytesIO(second_foo)) == [(1, 0, b"foo\n", [(0, 3)])] + assert gt_default.do_grep(BytesIO(second_last_foo)) == [(3, 0, b"foo\n", [(0, 3)])] + assert gt_default.do_grep(BytesIO(middle_foo)) == [(2, 0, b"foo\n", [(0, 3)])] + assert gt_default.do_grep(BytesIO(small_gap)) == [ + (2, 0, b"foo\n", [(0, 3)]), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_default.do_grep(BytesIO(no_eol)) == [(0, 0, b"foo", [(0, 3)])] + assert gt_default.do_grep(BytesIO(middle_of_line)) == [ + (2, 0, b"barfoobar\n", [(3, 6)]) + ] + + +def test_symetric_1_line_context(): + + gt_context_1 = grin.GrepText( + re.compile(b"foo"), options=grin.Options(before_context=1, after_context=1) + ) + assert gt_context_1.do_grep(BytesIO(all_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 0, b"foo\n", [(0, 3)]), + (2, 0, b"foo\n", [(0, 3)]), + (3, 0, b"foo\n", [(0, 3)]), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_context_1.do_grep(BytesIO(first_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 1, b"bar\n", None), + ] + assert gt_context_1.do_grep(BytesIO(last_foo)) == [ + (3, -1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_context_1.do_grep(BytesIO(second_foo)) == [ + (0, -1, b"bar\n", None), + (1, 0, b"foo\n", [(0, 3)]), + (2, 1, b"bar\n", None), + ] + assert gt_context_1.do_grep(BytesIO(second_last_foo)) == [ + (2, -1, b"bar\n", None), + (3, 0, b"foo\n", [(0, 3)]), + (4, 1, b"bar\n", None), + ] + assert gt_context_1.do_grep(BytesIO(middle_foo)) == [ + (1, -1, b"bar\n", None), + (2, 0, b"foo\n", [(0, 3)]), + (3, 1, b"bar\n", None), + ] + assert gt_context_1.do_grep(BytesIO(small_gap)) == [ + (1, -1, b"bar\n", None), + (2, 0, b"foo\n", [(0, 3)]), + (3, 1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + (5, 1, b"bar\n", None), + ] + assert gt_context_1.do_grep(BytesIO(no_eol)) == [(0, 0, b"foo", [(0, 3)])] + assert gt_context_1.do_grep(BytesIO(middle_of_line)) == [ + (1, -1, b"bar\n", None), + (2, 0, b"barfoobar\n", [(3, 6)]), + (3, 1, b"bar\n", None), + ] + + +def test_symmetric_2_line_context(): + + gt_context_2 = grin.GrepText( + re.compile(b"foo"), options=grin.Options(before_context=2, after_context=2) + ) + assert gt_context_2.do_grep(BytesIO(all_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 0, b"foo\n", [(0, 3)]), + (2, 0, b"foo\n", [(0, 3)]), + (3, 0, b"foo\n", [(0, 3)]), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_context_2.do_grep(BytesIO(first_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 1, b"bar\n", None), + (2, 1, b"bar\n", None), + ] + assert gt_context_2.do_grep(BytesIO(last_foo)) == [ + (2, -1, b"bar\n", None), + (3, -1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_context_2.do_grep(BytesIO(second_foo)) == [ + (0, -1, b"bar\n", None), + (1, 0, b"foo\n", [(0, 3)]), + (2, 1, b"bar\n", None), + (3, 1, b"bar\n", None), + ] + assert gt_context_2.do_grep(BytesIO(second_last_foo)) == [ + (1, -1, b"bar\n", None), + (2, -1, b"bar\n", None), + (3, 0, b"foo\n", [(0, 3)]), + (4, 1, b"bar\n", None), + ] + assert gt_context_2.do_grep(BytesIO(middle_foo)) == [ + (0, -1, b"bar\n", None), + (1, -1, b"bar\n", None), + (2, 0, b"foo\n", [(0, 3)]), + (3, 1, b"bar\n", None), + (4, 1, b"bar\n", None), + ] + assert gt_context_2.do_grep(BytesIO(small_gap)) == [ + (0, -1, b"bar\n", None), + (1, -1, b"bar\n", None), + (2, 0, b"foo\n", [(0, 3)]), + (3, 1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + (5, 1, b"bar\n", None), + (6, 1, b"bar\n", None), + ] + assert gt_context_2.do_grep(BytesIO(no_eol)) == [(0, 0, b"foo", [(0, 3)])] + assert gt_context_2.do_grep(BytesIO(middle_of_line)) == [ + (0, -1, b"bar\n", None), + (1, -1, b"bar\n", None), + (2, 0, b"barfoobar\n", [(3, 6)]), + (3, 1, b"bar\n", None), + (4, 1, b"bar\n", None), + ] + + +def test_1_line_of_before_context_no_lines_after(): + + gt_before_context_1 = grin.GrepText( + re.compile(b"foo"), options=grin.Options(before_context=1, after_context=0) + ) + assert gt_before_context_1.do_grep(BytesIO(all_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 0, b"foo\n", [(0, 3)]), + (2, 0, b"foo\n", [(0, 3)]), + (3, 0, b"foo\n", [(0, 3)]), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_before_context_1.do_grep(BytesIO(first_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]) + ] + assert gt_before_context_1.do_grep(BytesIO(last_foo)) == [ + (3, -1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_before_context_1.do_grep(BytesIO(second_foo)) == [ + (0, -1, b"bar\n", None), + (1, 0, b"foo\n", [(0, 3)]), + ] + assert gt_before_context_1.do_grep(BytesIO(second_last_foo)) == [ + (2, -1, b"bar\n", None), + (3, 0, b"foo\n", [(0, 3)]), + ] + assert gt_before_context_1.do_grep(BytesIO(middle_foo)) == [ + (1, -1, b"bar\n", None), + (2, 0, b"foo\n", [(0, 3)]), + ] + assert gt_before_context_1.do_grep(BytesIO(small_gap)) == [ + (1, -1, b"bar\n", None), + (2, 0, b"foo\n", [(0, 3)]), + (3, -1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_before_context_1.do_grep(BytesIO(no_eol)) == [(0, 0, b"foo", [(0, 3)])] + assert gt_before_context_1.do_grep(BytesIO(middle_of_line)) == [ + (1, -1, b"bar\n", None), + (2, 0, b"barfoobar\n", [(3, 6)]), + ] + + +def test_1_line_of_before_context_no_lines_before(): + + gt_after_context_1 = grin.GrepText( + re.compile(b"foo"), options=grin.Options(before_context=0, after_context=1) + ) + assert gt_after_context_1.do_grep(BytesIO(all_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 0, b"foo\n", [(0, 3)]), + (2, 0, b"foo\n", [(0, 3)]), + (3, 0, b"foo\n", [(0, 3)]), + (4, 0, b"foo\n", [(0, 3)]), + ] + assert gt_after_context_1.do_grep(BytesIO(first_foo)) == [ + (0, 0, b"foo\n", [(0, 3)]), + (1, 1, b"bar\n", None), + ] + assert gt_after_context_1.do_grep(BytesIO(last_foo)) == [(4, 0, b"foo\n", [(0, 3)])] + assert gt_after_context_1.do_grep(BytesIO(second_foo)) == [ + (1, 0, b"foo\n", [(0, 3)]), + (2, 1, b"bar\n", None), + ] + assert gt_after_context_1.do_grep(BytesIO(second_last_foo)) == [ + (3, 0, b"foo\n", [(0, 3)]), + (4, 1, b"bar\n", None), + ] + assert gt_after_context_1.do_grep(BytesIO(middle_foo)) == [ + (2, 0, b"foo\n", [(0, 3)]), + (3, 1, b"bar\n", None), + ] + assert gt_after_context_1.do_grep(BytesIO(small_gap)) == [ + (2, 0, b"foo\n", [(0, 3)]), + (3, 1, b"bar\n", None), + (4, 0, b"foo\n", [(0, 3)]), + (5, 1, b"bar\n", None), + ] + assert gt_after_context_1.do_grep(BytesIO(no_eol)) == [(0, 0, b"foo", [(0, 3)])] + assert gt_after_context_1.do_grep(BytesIO(middle_of_line)) == [ + (2, 0, b"barfoobar\n", [(3, 6)]), + (3, 1, b"bar\n", None), + ]