#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
:samp:`Save and load multiple configurations`
The class :class:`Configurations` (mark the `s` at the end) enables you to save, load, remove and list
multiple configurations.
Class :class:`Configuration` (mark the absence of `s` at the end) is a singleton.
It should not be used directly. In stead use :class:`rspub.core.rs_paras.RsParameters`.
The location where configurations are stored is system-dependent:
- ``{user-home}\\AppData\\Local\\Programs\\rspub\\config\\`` on Windows
- ``{user-home}/.config/rspub/config/`` on Mac and Linux
- ``{user-home}/rspub/config/`` fallback
.. seealso:: :doc:`RsParameters <rspub.core.rs_paras>`
"""
import logging
import os
import platform
from configparser import ConfigParser
from glob import glob
from rspub.core.rs_enum import Strategy, SelectMode
CFG_FILENAME = "DEFAULT.cfg"
CFG_DIRNAME = "core"
SECTION_CORE = "core"
EXT = ".cfg"
[docs]class Configurations(object):
"""
:samp:`Enables saving, loading, listing and removing {configurations}`
All methods are static::
Configurations.list_configurations()
Configurations.load_configuration("collection_1")
# etc.
"""
@staticmethod
def __get__logger():
logger = logging.getLogger(__name__)
return logger
@staticmethod
[docs] def list_configurations() -> list:
"""
:samp:`List available configurations`
:return: list of names of previously saved configurations
"""
config_path = Configuration._get_config_path()
config_files = sorted(glob(os.path.join(config_path, "*" + EXT)))
return [os.path.splitext(os.path.basename(x))[0] for x in config_files]
@staticmethod
[docs] def load_configuration(name: str):
"""
:samp:`Load the configuration with the given name`
:param name: name of a previously saved configuration
:return: the restored Configuration
"""
if name not in Configurations.list_configurations():
raise ValueError("No configuration named '%s'" % name)
Configuration.reset()
Configuration._set_configuration_filename(name + EXT)
Configurations.__get__logger().info("Loaded configuration %s" % name)
return Configuration()
@staticmethod
[docs] def save_configuration_as(name: str):
"""
:samp:`Save the current configuration under the given name`
Any previously saved configurations with the same name will be overwritten without warning.
:param name: name under which the configuration will be saved
"""
if name is None or name == "":
raise ValueError("Invalid configuration name. (None or empty string)")
nam = os.path.splitext(name)[0]
config_path = Configuration._get_config_path()
config_file = os.path.join(config_path, nam + EXT)
current_cfg = Configuration()
current_cfg.config_file = config_file
current_cfg.persist()
Configurations.__get__logger().info("Saved configuration %s" % name)
@staticmethod
[docs] def remove_configuration(name: str):
"""
:samp:`Remove the configuration with the given name`
:param name: the name of the configuration to remove
:return: **True** if the configuration was successfully removed, **False** otherwise
"""
if name is None or name == "":
raise ValueError("Invalid configuration name '%s'", name)
nam = os.path.splitext(name)[0]
config_path = Configuration._get_config_path()
config_file = os.path.join(config_path, nam + EXT)
if os.path.exists(config_file):
os.remove(config_file)
Configurations.__get__logger().info("Removed configuration %s" % name)
return True
else:
return False
@staticmethod
[docs] def current_configuration_name():
"""
:samp:`Get the name of the current configuration`
:return: name of the current configuration
"""
current_cfg = Configuration()
return os.path.splitext(os.path.basename(current_cfg.config_file))[0]
@staticmethod
[docs] def rspub_config_dir():
current_cfg = Configuration()
return os.path.dirname(os.path.dirname(current_cfg.config_file))
[docs]class Configuration(object):
"""
:samp:`Singleton persisting object for storing configuration parameters`
.. warning::
Do not use class Configuration directly. Use :doc:`RsParameters <rspub.core.rs_paras>` in stead.
"""
_configuration_filename = CFG_FILENAME
@staticmethod
def __get__logger():
logger = logging.getLogger(__name__)
return logger
@staticmethod
def _set_configuration_filename(cfg_filename):
Configuration.__get__logger().debug("Setting configuration filename to %s", cfg_filename)
Configuration._configuration_filename = cfg_filename
@staticmethod
def _get_configuration_filename():
if not Configuration._configuration_filename:
Configuration._set_configuration_filename(CFG_FILENAME)
return Configuration._configuration_filename
@staticmethod
[docs] def reset():
Configuration._instance = None
Configuration.__get__logger().debug("Configuration was reset.")
@staticmethod
def _get_config_path():
c_path = os.path.expanduser("~")
opsys = platform.system()
if opsys == "Windows":
win_path = os.path.join(c_path, "AppData", "Local")
if os.path.exists(win_path): c_path = win_path
elif opsys == "Darwin":
dar_path = os.path.join(c_path, ".config")
if not os.path.exists(dar_path): os.makedirs(dar_path)
if os.path.exists(dar_path): c_path = dar_path
elif opsys == "Linux":
lin_path = os.path.join(c_path, ".config")
if not os.path.exists(lin_path): os.makedirs(lin_path)
if os.path.exists(lin_path): c_path = lin_path
c_path = os.path.join(c_path, "rspub", CFG_DIRNAME)
if not os.path.exists(c_path):
os.makedirs(c_path)
#Configuration.__get__logger().info("Configuration directory: %s", c_path)
return c_path
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
Configuration.__get__logger().debug("Creating Configuration._instance")
cls._instance = super(Configuration, cls).__new__(cls, *args)
cls.config_path = cls._get_config_path()
cls.config_file = os.path.join(cls.config_path, Configuration._get_configuration_filename())
cls.parser = ConfigParser()
if os.path.exists(cls.config_file):
cls.parser.read(cls.config_file, encoding="utf-8")
return cls._instance
def config_path(self):
return self.config_path
def config_file(self):
return self.config_file
[docs] def name(self):
return os.path.splitext(os.path.basename(self.config_file))[0]
[docs] def persist(self):
f = open(self.config_file, "w", encoding="utf-8")
self.parser.write(f)
f.close()
Configuration.__get__logger().debug("Persisted %s", self.config_file)
def __set_option__(self, section, option, value):
if not self.parser.has_section(section):
self.parser.add_section(section)
if value is None:
self.parser.remove_option(section, option)
else:
self.parser.set(section, option, value)
def __get_int__(self, section, option, fallback=0):
value = self.parser.get(section, option, fallback=str(fallback))
return int(value)
def __set_int__(self, section, option, value):
self.__set_option__(section, option, str(value))
def __get_boolean__(self, section, option, fallback=True):
value = self.parser.get(section, option, fallback=str(fallback))
return not(value == "False" or value == "None")
def __set_boolean__(self, section, option, value):
self.__set_option__(section, option, str(value))
def __get_list__(self, section, option, fallback=list()):
value = self.parser.get(section, option, fallback="\n".join(fallback))
if value == "":
return []
else:
return value.split("\n")
def __set_list__(self, section, option, value):
self.__set_option__(section, option, "\n".join(value))
[docs] def core_items(self):
return self.parser.items(SECTION_CORE)
[docs] def core_clear(self):
self.parser.remove_section(SECTION_CORE)
# core settings
[docs] def resource_dir(self, fallback=os.path.expanduser("~")):
return self.parser.get(SECTION_CORE, "resource_dir", fallback=fallback)
[docs] def set_resource_dir(self, resource_dir):
self.__set_option__(SECTION_CORE, "resource_dir", resource_dir)
[docs] def description_dir(self, fallback=None):
return self.parser.get(SECTION_CORE, "description_dir", fallback=fallback)
[docs] def set_description_dir(self, description_dir):
self.__set_option__(SECTION_CORE, "description_dir", description_dir)
[docs] def selector_file(self, fallback=None):
return self.parser.get(SECTION_CORE, "selector_file", fallback=fallback)
[docs] def set_selector_file(self, selector_file):
self.__set_option__(SECTION_CORE, "selector_file", selector_file)
[docs] def simple_select_file(self, fallback=None):
return self.parser.get(SECTION_CORE, "simple_select_file", fallback=fallback)
[docs] def set_simple_select_file(self, simple_file):
self.__set_option__(SECTION_CORE, "simple_select_file", simple_file)
[docs] def select_mode(self, fallback=SelectMode.simple.name):
return SelectMode[self.parser.get(SECTION_CORE, "select_mode", fallback=fallback)]
[docs] def set_select_mode(self, mode):
self.__set_int__(SECTION_CORE, "select_mode", mode.name)
[docs] def plugin_dir(self, fallback=None):
return self.parser.get(SECTION_CORE, "plugin_dir", fallback=fallback)
[docs] def set_plugin_dir(self, plugin_dir):
self.__set_option__(SECTION_CORE, "plugin_dir", plugin_dir)
[docs] def history_dir(self, fallback=None):
return self.parser.get(SECTION_CORE, "history_dir", fallback=fallback)
[docs] def set_history_dir(self, history_dir):
self.__set_option__(SECTION_CORE, "history_dir", history_dir)
[docs] def url_prefix(self, fallback="http://www.example.com"):
return self.parser.get(SECTION_CORE, "url_prefix", fallback=fallback)
[docs] def set_url_prefix(self, urlprefix):
self.__set_option__(SECTION_CORE, "url_prefix", urlprefix)
[docs] def strategy(self, fallback=Strategy.resourcelist.name):
return Strategy[self.parser.get(SECTION_CORE, "strategy", fallback=fallback)]
[docs] def set_strategy(self, strategy):
self.__set_option__(SECTION_CORE, "strategy", strategy.name)
[docs] def max_items_in_list(self, fallback=50000):
return self.__get_int__(SECTION_CORE, "max_items_in_list", fallback)
[docs] def set_max_items_in_list(self, max_items):
self.__set_int__(SECTION_CORE, "max_items_in_list", max_items)
[docs] def zero_fill_filename(self, fallback=4):
return self.__get_int__(SECTION_CORE, "zero_fill_filename", fallback)
[docs] def set_zero_fill_filename(self, zfill):
self.__set_int__(SECTION_CORE, "zero_fill_filename", zfill)
[docs] def is_saving_pretty_xml(self, fallback=True):
return self.__get_boolean__(SECTION_CORE, "is_saving_pretty_xml", fallback)
[docs] def set_is_saving_pretty_xml(self, p_xml):
self.__set_boolean__(SECTION_CORE, "is_saving_pretty_xml", p_xml)
[docs] def is_saving_sitemaps(self, fallback=True):
return self.__get_boolean__(SECTION_CORE, "is_saving_sitemaps", fallback)
[docs] def set_is_saving_sitemaps(self, is_saving):
self.__set_boolean__(SECTION_CORE, "is_saving_sitemaps", is_saving)
[docs] def has_wellknown_at_root(self, fallback=True):
return self.__get_boolean__(SECTION_CORE, "has_wellknown_at_root", fallback)
[docs] def set_has_wellknown_at_root(self, at_root):
self.__set_boolean__(SECTION_CORE, "has_wellknown_at_root", at_root)
# execution results
[docs] def last_excution(self):
return self.parser.get(SECTION_CORE, "last_excution", fallback=None)
[docs] def set_last_execution(self, date_string):
self.__set_option__(SECTION_CORE, "last_excution", date_string)
[docs] def last_strategy(self):
value = self.parser.get(SECTION_CORE, "last_strategy", fallback="")
if value is None or value == "":
return None
return Strategy[value]
[docs] def set_last_strategy(self, strategy):
if strategy:
self.__set_option__(SECTION_CORE, "last_strategy", strategy.name)
else:
self.__set_option__(SECTION_CORE, "last_strategy", "")
[docs] def last_sitemaps(self, fallback=list()):
return self.__get_list__(SECTION_CORE, "last_sitemaps", fallback=fallback)
[docs] def set_last_sitemaps(self, sitemaplist):
self.__set_list__(SECTION_CORE, "last_sitemaps", sitemaplist)
# # scp parameters for exporting to remote web server
[docs] def exp_scp_server(self, fallback="example.com"):
return self.parser.get(SECTION_CORE, "exp_scp_server", fallback=fallback)
[docs] def set_exp_scp_server(self, exp_scp_server):
self.__set_option__(SECTION_CORE, "exp_scp_server", exp_scp_server)
[docs] def exp_scp_port(self, fallback=22):
return self.__get_int__(SECTION_CORE, "exp_scp_port", fallback=fallback)
[docs] def set_exp_scp_port(self, exp_scp_port):
self.__set_int__(SECTION_CORE, "exp_scp_port", exp_scp_port)
[docs] def exp_scp_user(self, fallback="username"):
return self.parser.get(SECTION_CORE, "exp_scp_user", fallback=fallback)
[docs] def set_exp_scp_user(self, exp_scp_user):
self.__set_option__(SECTION_CORE, "exp_scp_user", exp_scp_user)
[docs] def exp_scp_document_root(self, fallback="/var/www/html/"):
return self.parser.get(SECTION_CORE, "exp_scp_document_root", fallback=fallback)
[docs] def set_exp_scp_document_root(self, exp_scp_document_root):
self.__set_option__(SECTION_CORE, "exp_scp_document_root", exp_scp_document_root)
# # zip parameters
[docs] def zip_filename(self, fallback=os.path.join(os.path.expanduser("~"), "resourcesync.zip")):
return self.parser.get(SECTION_CORE, "zip_filename", fallback=fallback)
[docs] def set_zip_filename(self, zip_filename):
return self.__set_option__(SECTION_CORE, "zip_filename", zip_filename)
# # scp parameters for importing (resources) from remote server
[docs] def imp_scp_server(self, fallback="example.com"):
return self.parser.get(SECTION_CORE, "imp_scp_server", fallback=fallback)
[docs] def set_imp_scp_server(self, imp_scp_server):
self.__set_option__(SECTION_CORE, "imp_scp_server", imp_scp_server)
[docs] def imp_scp_port(self, fallback=22):
return self.__get_int__(SECTION_CORE, "imp_scp_port", fallback=fallback)
[docs] def set_imp_scp_port(self, imp_scp_port):
self.__set_int__(SECTION_CORE, "imp_scp_port", imp_scp_port)
[docs] def imp_scp_user(self, fallback="username"):
return self.parser.get(SECTION_CORE, "imp_scp_user", fallback=fallback)
[docs] def set_imp_scp_user(self, imp_scp_user):
self.__set_option__(SECTION_CORE, "imp_scp_user", imp_scp_user)
[docs] def imp_scp_remote_path(self, fallback="~"):
return self.parser.get(SECTION_CORE, "imp_scp_remote_path", fallback=fallback)
[docs] def set_imp_scp_remote_path(self, imp_scp_remote_path):
self.__set_option__(SECTION_CORE, "imp_scp_remote_path", imp_scp_remote_path)
[docs] def imp_scp_local_path(self, fallback=os.path.expanduser("~")):
return self.parser.get(SECTION_CORE, "imp_scp_local_path", fallback=fallback)
[docs] def set_imp_scp_local_path(self, imp_scp_local_path):
self.__set_option__(SECTION_CORE, "imp_scp_local_path", imp_scp_local_path)