Source code for pydeploy.environment

from hashlib import sha512
from .python3_compat import urlparse, make_bytes, basestring
import distutils.sysconfig
import pickle
import os
import sys
import logging
import pkg_resources
import site
import subprocess
from contextlib import contextmanager
from . import command
from . import os_api
from . import virtualenv_api
from .environment_utils import EnvironmentUtils
from .installer import Installer
from .checkout_cache import CheckoutCache
from .sources import (
    get_all_sources_dict,
    Source
    )
from .exceptions import CommandFailed

_logger = logging.getLogger("pydeploy.env")

[docs]class PythonEnvironment(object): def __init__(self, argv=()): super(PythonEnvironment, self).__init__() self._argv = list(argv) self.installer = Installer(self) self.utils = EnvironmentUtils(self) self._aliases = {} self._executed_deployment_file_locations = set() self._executed_deployment_file_hashes = set() def add_alias(self, name, source): self._aliases[name] = source def has_alias(self, name): if isinstance(name, pkg_resources.Requirement): name = name.unsafe_name return name in self._aliases
[docs] def checkout(self, source, *args, **kwargs): """ Downloads a package from *source*, and returns the path to which it was extracted. """ source = self._make_source_object(source) _logger.info("Checking out %r (%s)", source.get_name(), type(source).__name__) source = self._make_source_object(source) return source.checkout(self, *args, **kwargs)
def create_and_activate(self): raise NotImplementedError() # pragma: no cover def execute_deployment_file_once(self, path_or_url): executed = False if path_or_url not in self._executed_deployment_file_locations: content = self._get_deployment_file_content(path_or_url) content_hash = sha512(make_bytes(content)).digest() if content_hash not in self._executed_deployment_file_hashes: self.execute_deployment_string(content) self._executed_deployment_file_hashes.add(content_hash) executed = True self._executed_deployment_file_locations.add(path_or_url) if not executed: _logger.info("Deployment file at %r already executed. Skipping...", path_or_url) def execute_deployment_file(self, path_or_url): self.execute_deployment_string(self._get_deployment_file_content(path_or_url)) def execute_deployment_file_url(self, url): _logger.info("Executing deployment file from url %r...", url) self.execute_deployment_file_object(os_api.urlopen(url)) def execute_deployment_file_object(self, fileobj, filename=None): return self.execute_deployment_string(fileobj.read(), filename=filename) def execute_deployment_string(self, content, filename=None): executed_locals = self._get_config_file_locals(filename) exec(content, executed_locals) def execute_deployment_file_path(self, path): _logger.info("Executing deployment file %r...", path) with open(path, "rb") as fileobj: with self._filename_as_argv0(path): self.execute_deployment_file_object(fileobj, filename=path) def _get_deployment_file_content(self, path_or_url): if self._is_url(path_or_url): return os_api.urlopen(path_or_url).read() return open(path_or_url).read() def execute_deployment_stdin(self): _logger.info("Executing deployment script from standard input...") self.execute_deployment_file_object(sys.stdin, filename="<stdin>") @contextmanager def _filename_as_argv0(self, path): old_argv = self.get_argv() try: self._argv.insert(0, path) yield finally: self._argv = old_argv
[docs] def install(self, source, reinstall=True): """ Installs a package from *source*. @param reinstall: If False, skips installation if this source has already been installed. """ with self.installer.get_installation_context(): source = self._make_source_object(source) _logger.info("Installing %r (%s)", source.get_name(), type(source).__name__) if not self._is_already_installed(source) or reinstall: returned = source.install(self, reinstall=reinstall) self._post_install(source) return returned
def get_argv(self): return list(self._argv) def get_checkout_cache(self): raise NotImplementedError() # pragma: no cover def get_path(self): raise NotImplementedError() # pragma: no cover def get_python_executable(self): raise NotImplementedError() # pragma: no cover def execute_pip_install(self, source, reinstall): raise NotImplementedError() # pragma: no cover def execute_easy_install(self, source, reinstall): raise NotImplementedError() # pragma: no cover def _execute(self, cmd): command.execute_assert_success(cmd, shell=True) def _get_config_file_locals(self, filename): returned = dict( env = self, ) if filename is not None: returned.update(__file__=filename) returned.update(get_all_sources_dict()) for module_name in ['sys', 'os']: returned[module_name] = __import__(module_name) return returned def _is_already_installed(self, source): raise NotImplementedError() # pragma: no cover def _is_url(self, path_or_url): return bool(urlparse(path_or_url).scheme) def _make_source_object(self, source): if isinstance(source, pkg_resources.Requirement): specs = source.specs source = source.unsafe_name else: specs = [] if isinstance(source, basestring) and source in self._aliases: source = self._aliases[source] if specs: source = source.resolve_constraints(specs) source = Source.from_anything(source) return source def _post_install(self, source): raise NotImplementedError() # pragma: no cover
class Environment(PythonEnvironment): def __init__(self, path, argv=()): super(Environment, self).__init__(argv) self._path = os.path.abspath(path) self._checkout_cache = None self._states = [] def get_path(self): return self._path def get_checkout_cache(self): return self._checkout_cache def create_and_activate(self): self._push_python_state() virtualenv_api.create_environment(self._path) self._checkout_cache = CheckoutCache(self._path) self._try_load_installed_signatures() virtualenv_api.activate_environment(self._path) set_active_environment(self) def deactivate(self): self._pop_python_state() def _push_python_state(self): state = dict( prefix = sys.prefix, path = sys.path[:], modules = sys.modules.copy(), active_environment = get_active_envrionment(), ) if hasattr(sys, 'old_prefix'): state['old_prefix'] = sys.old_prefix self._states.append(state) def _pop_python_state(self): state = self._states.pop(-1) if 'old_prefix' in state: sys.old_prefix = state.pop('old_prefix') elif hasattr(sys, "old_prefix"): del sys.old_prefix sys.prefix = state.pop('prefix') set_active_environment(state.pop('active_environment')) sys.modules.clear() sys.modules.update(state.pop('modules')) sys.path[:] = state.pop('path') assert not state def execute_pip_install(self, source, reinstall): self._execute("{0} install {1}".format(self._get_pip_executable(), source)) def execute_easy_install(self, source, reinstall): self._execute("{0} {1}".format(self._get_easy_install_executable(), source)) def _get_pip_executable(self): return os.path.join(self._get_bin_dir(), "pip") def _get_easy_install_executable(self): return os.path.join(self._get_bin_dir(), "easy_install") def get_python_executable(self): return os.path.join(self._get_bin_dir(), "python") def _get_bin_dir(self): return os.path.join(self._path, "bin") #installation def _post_install(self, source): site.addsitedir(self._get_site_dir()) self._mark_installed(source) def _get_site_dir(self): # taken from virtualenv's activate_this.py... if sys.platform == 'win32': return os.path.join(self._path, 'Lib', 'site-packages') return os.path.join(self._path, 'lib', 'python%s' % sys.version[:3], 'site-packages') #signatures def _is_already_installed(self, source): return source.get_signature() in self._installed def _mark_installed(self, source): self._installed.append(source.get_signature()) self._saved_installed_signatures() def _saved_installed_signatures(self): with open(self._get_temporary_installed_signatures_filename(), "wb") as f: pickle.dump(self._installed, f) os.rename(self._get_temporary_installed_signatures_filename(), self._get_installed_signatures_filename()) def _try_load_installed_signatures(self): try: self._load_installed_signatures() except IOError: self._installed = [] def _load_installed_signatures(self): with open(self._get_installed_signatures_filename(), "rb") as f: self._installed = pickle.load(f) def _get_installed_signatures_filename(self): return os.path.join(self._path, ".installed_signatures") def _get_temporary_installed_signatures_filename(self): return self._get_installed_signatures_filename() + ".tmp" class GlobalEnvironment(PythonEnvironment): _checkout_cache = None def create_and_activate(self): self._checkout_cache = CheckoutCache(self._get_pydeploy_dir()) def execute_easy_install(self, source, reinstall): return self._execute("easy_install {0} {1}".format("-U" if reinstall else "", source)) def execute_pip_install(self, source, reinstall): return self._execute("pip install {0} {1}".format("-U" if reinstall else "", source)) def get_checkout_cache(self): return self._checkout_cache def get_python_executable(self): return sys.executable def _is_already_installed(self, source): return False # let easy_install take care of it for now def _get_pydeploy_dir(self): return os.path.expanduser("~/.pydeploy") def _post_install(self, source): site.addsitedir(distutils.sysconfig.get_python_lib()) _active_environment = None def clear_active_environment(): set_active_environment(None) def get_active_envrionment(): return _active_environment def set_active_environment(e): global _active_environment _active_environment = e

Project Versions