243 lines
7.8 KiB
Python
243 lines
7.8 KiB
Python
# The following comment should be removed at some point in the future.
|
|
# mypy: disallow-untyped-defs=False
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os.path
|
|
import sys
|
|
|
|
from pip._vendor import pkg_resources
|
|
from pip._vendor.packaging import version as packaging_version
|
|
from pip._vendor.six import ensure_binary
|
|
|
|
from pip._internal.index.collector import LinkCollector
|
|
from pip._internal.index.package_finder import PackageFinder
|
|
from pip._internal.models.search_scope import SearchScope
|
|
from pip._internal.models.selection_prefs import SelectionPreferences
|
|
from pip._internal.utils.filesystem import (
|
|
adjacent_tmp_file,
|
|
check_path_owner,
|
|
replace,
|
|
)
|
|
from pip._internal.utils.misc import (
|
|
ensure_dir,
|
|
get_installed_version,
|
|
redact_auth_from_url,
|
|
)
|
|
from pip._internal.utils.packaging import get_installer
|
|
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
|
|
|
|
if MYPY_CHECK_RUNNING:
|
|
import optparse
|
|
from optparse import Values
|
|
from typing import Any, Dict, Text, Union
|
|
|
|
from pip._internal.network.session import PipSession
|
|
|
|
|
|
SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def make_link_collector(
|
|
session, # type: PipSession
|
|
options, # type: Values
|
|
suppress_no_index=False, # type: bool
|
|
):
|
|
# type: (...) -> LinkCollector
|
|
"""
|
|
:param session: The Session to use to make requests.
|
|
:param suppress_no_index: Whether to ignore the --no-index option
|
|
when constructing the SearchScope object.
|
|
"""
|
|
index_urls = [options.index_url] + options.extra_index_urls
|
|
if options.no_index and not suppress_no_index:
|
|
logger.debug(
|
|
'Ignoring indexes: %s',
|
|
','.join(redact_auth_from_url(url) for url in index_urls),
|
|
)
|
|
index_urls = []
|
|
|
|
# Make sure find_links is a list before passing to create().
|
|
find_links = options.find_links or []
|
|
|
|
search_scope = SearchScope.create(
|
|
find_links=find_links, index_urls=index_urls,
|
|
)
|
|
|
|
link_collector = LinkCollector(session=session, search_scope=search_scope)
|
|
|
|
return link_collector
|
|
|
|
|
|
def _get_statefile_name(key):
|
|
# type: (Union[str, Text]) -> str
|
|
key_bytes = ensure_binary(key)
|
|
name = hashlib.sha224(key_bytes).hexdigest()
|
|
return name
|
|
|
|
|
|
class SelfCheckState(object):
|
|
def __init__(self, cache_dir):
|
|
# type: (str) -> None
|
|
self.state = {} # type: Dict[str, Any]
|
|
self.statefile_path = None
|
|
|
|
# Try to load the existing state
|
|
if cache_dir:
|
|
self.statefile_path = os.path.join(
|
|
cache_dir, "selfcheck", _get_statefile_name(self.key)
|
|
)
|
|
try:
|
|
with open(self.statefile_path) as statefile:
|
|
self.state = json.load(statefile)
|
|
except (IOError, ValueError, KeyError):
|
|
# Explicitly suppressing exceptions, since we don't want to
|
|
# error out if the cache file is invalid.
|
|
pass
|
|
|
|
@property
|
|
def key(self):
|
|
return sys.prefix
|
|
|
|
def save(self, pypi_version, current_time):
|
|
# type: (str, datetime.datetime) -> None
|
|
# If we do not have a path to cache in, don't bother saving.
|
|
if not self.statefile_path:
|
|
return
|
|
|
|
# Check to make sure that we own the directory
|
|
if not check_path_owner(os.path.dirname(self.statefile_path)):
|
|
return
|
|
|
|
# Now that we've ensured the directory is owned by this user, we'll go
|
|
# ahead and make sure that all our directories are created.
|
|
ensure_dir(os.path.dirname(self.statefile_path))
|
|
|
|
state = {
|
|
# Include the key so it's easy to tell which pip wrote the
|
|
# file.
|
|
"key": self.key,
|
|
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
|
|
"pypi_version": pypi_version,
|
|
}
|
|
|
|
text = json.dumps(state, sort_keys=True, separators=(",", ":"))
|
|
|
|
with adjacent_tmp_file(self.statefile_path) as f:
|
|
f.write(ensure_binary(text))
|
|
|
|
try:
|
|
# Since we have a prefix-specific state file, we can just
|
|
# overwrite whatever is there, no need to check.
|
|
replace(f.name, self.statefile_path)
|
|
except OSError:
|
|
# Best effort.
|
|
pass
|
|
|
|
|
|
def was_installed_by_pip(pkg):
|
|
# type: (str) -> bool
|
|
"""Checks whether pkg was installed by pip
|
|
|
|
This is used not to display the upgrade message when pip is in fact
|
|
installed by system package manager, such as dnf on Fedora.
|
|
"""
|
|
try:
|
|
dist = pkg_resources.get_distribution(pkg)
|
|
return "pip" == get_installer(dist)
|
|
except pkg_resources.DistributionNotFound:
|
|
return False
|
|
|
|
|
|
def pip_self_version_check(session, options):
|
|
# type: (PipSession, optparse.Values) -> None
|
|
"""Check for an update for pip.
|
|
|
|
Limit the frequency of checks to once per week. State is stored either in
|
|
the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
|
|
of the pip script path.
|
|
"""
|
|
installed_version = get_installed_version("pip")
|
|
if not installed_version:
|
|
return
|
|
|
|
pip_version = packaging_version.parse(installed_version)
|
|
pypi_version = None
|
|
|
|
try:
|
|
state = SelfCheckState(cache_dir=options.cache_dir)
|
|
|
|
current_time = datetime.datetime.utcnow()
|
|
# Determine if we need to refresh the state
|
|
if "last_check" in state.state and "pypi_version" in state.state:
|
|
last_check = datetime.datetime.strptime(
|
|
state.state["last_check"],
|
|
SELFCHECK_DATE_FMT
|
|
)
|
|
if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60:
|
|
pypi_version = state.state["pypi_version"]
|
|
|
|
# Refresh the version if we need to or just see if we need to warn
|
|
if pypi_version is None:
|
|
# Lets use PackageFinder to see what the latest pip version is
|
|
link_collector = make_link_collector(
|
|
session,
|
|
options=options,
|
|
suppress_no_index=True,
|
|
)
|
|
|
|
# Pass allow_yanked=False so we don't suggest upgrading to a
|
|
# yanked version.
|
|
selection_prefs = SelectionPreferences(
|
|
allow_yanked=False,
|
|
allow_all_prereleases=False, # Explicitly set to False
|
|
)
|
|
|
|
finder = PackageFinder.create(
|
|
link_collector=link_collector,
|
|
selection_prefs=selection_prefs,
|
|
)
|
|
best_candidate = finder.find_best_candidate("pip").best_candidate
|
|
if best_candidate is None:
|
|
return
|
|
pypi_version = str(best_candidate.version)
|
|
|
|
# save that we've performed a check
|
|
state.save(pypi_version, current_time)
|
|
|
|
remote_version = packaging_version.parse(pypi_version)
|
|
|
|
local_version_is_older = (
|
|
pip_version < remote_version and
|
|
pip_version.base_version != remote_version.base_version and
|
|
was_installed_by_pip('pip')
|
|
)
|
|
|
|
# Determine if our pypi_version is older
|
|
if not local_version_is_older:
|
|
return
|
|
|
|
# We cannot tell how the current pip is available in the current
|
|
# command context, so be pragmatic here and suggest the command
|
|
# that's always available. This does not accommodate spaces in
|
|
# `sys.executable`.
|
|
pip_cmd = "{} -m pip".format(sys.executable)
|
|
logger.warning(
|
|
"You are using pip version %s; however, version %s is "
|
|
"available.\nYou should consider upgrading via the "
|
|
"'%s install --upgrade pip' command.",
|
|
pip_version, pypi_version, pip_cmd
|
|
)
|
|
except Exception:
|
|
logger.debug(
|
|
"There was an error checking the latest version of pip",
|
|
exc_info=True,
|
|
)
|