Source code for rspub.core.selector

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import logging
import os
from enum import Enum
from itertools import takewhile

from rspub.util.observe import Observable, ObserverInterruptException

LOG = logging.getLogger(__name__)


[docs]class SelectorEvent(Enum): file_does_not_exist = 0 not_a_regular_file = 1 file_excluded = 2 next_file = 10
[docs]class Selector(Observable):
[docs] def __init__(self, location=None): Observable.__init__(self) self.location = location self._includes = set() self._excludes = set() if self.location: self.read(self.location) self._abs_includes = None self._abs_excludes = None self._exc_count = 0
def __iter__(self): # alternative implementation would take 2 blobs of filenames (recursive through directories) in memory: # abs_includes = {x for x in self.list_includes()} # abs_excludes = {x for x in self.list_excludes()} # abs_includes.difference_update(abs_excludes) # return iter(sorted(abs_includes)) # this implementation only expands the excludes to absolute paths (not recursive) and yields the includes. # + less memory but # - repeatedly iterating the abs_excludes but # + sorted output self._abs_includes = Selector.filter_base_paths({os.path.abspath(x) for x in self._includes}) self._abs_excludes = {os.path.abspath(x) for x in self._excludes} self._exc_count = len(self._abs_excludes) generator = self._file_generator() return generator(sorted(self._abs_includes)) @staticmethod
[docs] def filter_base_paths(abs_paths): return {x for x in abs_paths if Selector.is_base_path(x, abs_paths)}
@staticmethod
[docs] def is_base_path(x, other_paths): base_path = True for test_path in other_paths: if not x == test_path and x.startswith(test_path): base_path = False break return base_path
def __len__(self): return len([x for x in self]) def _file_generator(self): def generator(filenames): for name in filenames: file = os.path.abspath(name) if not os.path.exists(file): LOG.warning("File does not exist: %s" % file) self.observers_inform(self, SelectorEvent.file_does_not_exist, filename=file) elif os.path.isdir(file): for rfile in generator(self._walk_directories(file)): yield rfile elif os.path.isfile(file): if not self.observers_confirm(self, SelectorEvent.next_file, filename=file): raise ObserverInterruptException("Process interrupted on SelectorEvent.next_file") if len(list(takewhile(lambda x: not file.startswith(x), self._abs_excludes))) == self._exc_count: yield file else: if not self.observers_confirm(self, SelectorEvent.file_excluded, filename=file): raise ObserverInterruptException("Process interrupted on SelectorEvent.file_excluded") else: LOG.warning("Not a regular file: %s" % file) self.observers_inform(self, SelectorEvent.not_a_regular_file, filename=file) return generator @staticmethod def _walk_directories(*directories): for directory in directories: abs_dir = os.path.abspath(directory) for root, _directories, _filenames in os.walk(abs_dir): for filename in _filenames: file = os.path.join(root, filename) yield file
[docs] def include(self, *filenames): for item in filenames: if isinstance(item, str): self._includes.add(item) elif isinstance(item, Selector): self._includes.update(item._includes) elif hasattr(item, '__iter__'): for thing in item: self.include(thing) else: raise ValueError("Illegal argument: %s" % item)
[docs] def exclude(self, *filenames): for item in filenames: if isinstance(item, str): self._excludes.add(item) elif isinstance(item, Selector): self._excludes.update(item._excludes) elif hasattr(item, '__iter__'): for thing in item: self.exclude(thing) else: raise ValueError("Illegal argument: %s" % item)
[docs] def discard_include(self, *filenames): for item in filenames: if isinstance(item, str): self._includes.discard(item) elif isinstance(item, Selector): self._includes.difference_update(item._includes) elif hasattr(item, '__iter__'): for thing in item: self.discard_include(thing) else: raise ValueError("Illegal argument: %s" % item)
[docs] def discard_exclude(self, *filenames): for item in filenames: if isinstance(item, str): self._excludes.discard(item) elif isinstance(item, Selector): self._excludes.difference_update(item._excludes) elif hasattr(item, '__iter__'): for thing in item: self.discard_exclude(thing) else: raise ValueError("Illegal argument: %s" % item)
[docs] def clear_includes(self): self._includes.clear()
[docs] def clear_excludes(self): self._excludes.clear()
[docs] def list_includes(self): return self._list_files()(sorted(self._includes))
[docs] def list_excludes(self): return self._list_files()(sorted(self._excludes))
def _list_files(self): def generator(filenames): for name in filenames: file = os.path.abspath(name) if not os.path.exists(file): LOG.warning("File does not exist: %s" % file) elif os.path.isdir(file): for rfile in generator(self._walk_directories(file)): yield rfile pass elif os.path.isfile(file): yield file else: LOG.warning("Not a regular file: %s" % file) return generator
[docs] def relativize_includes(self, root_path): self._includes = {os.path.relpath(x, root_path) for x in self._includes} return self.get_included_entries()
[docs] def relativize_excludes(self, root_path): self._excludes = {os.path.relpath(x, root_path) for x in self._excludes} return self.get_excluded_entries()
[docs] def get_included_entries(self): return self._includes
[docs] def get_excluded_entries(self): return self._excludes
[docs] def is_empty(self): return len(self._includes) + len(self._excludes) == 0
[docs] def read_includes(self, filename): with open(filename, "r", encoding="utf-8") as file: self.include(file.read().splitlines())
[docs] def read_excludes(self, filename): with open(filename, "r", encoding="utf-8") as file: self.exclude(file.read().splitlines())
[docs] def write_includes(self, filename): with open(filename, 'w', encoding="utf-8") as file: for item in sorted(self._includes): file.write("{}\n".format(item))
[docs] def write_excludes(self, filename): with open(filename, 'w', encoding="utf-8") as file: for item in sorted(self._excludes): file.write("{}\n".format(item))
[docs] def write(self, filename=None): if filename is None: filename = self.location if filename is None: raise RuntimeError("No filename, no location. Cannot save selector.") with open(filename, 'w', encoding="utf-8", newline='') as file: writer = csv.writer(file, quoting=csv.QUOTE_ALL) for item in self._includes: writer.writerow(["+", item]) for item in self._excludes: writer.writerow(["-", item]) self.location = filename
[docs] def read(self, filename): filename = os.path.abspath(filename) row_count = 0 with open(filename, 'r', encoding="utf-8", newline='') as file: reader = csv.reader(file) for row in reader: row_count += 1 if row[0] == "+": self.include(row[1]) elif row[0] == "-": self.exclude(row[1]) else: raise RuntimeError("Invalid line in file: line %d: %s, file='%s'" % (row_count, row, filename))
[docs] def abs_location(self): if self.location: return os.path.abspath(self.location) else: return None