Current path: tmp/
⬆️ Go up:
U
��.e � @ s d Z dS )z19.3.1N)�__version__� r r �0/usr/lib/python3.8/site-packages/pip/__init__.py�<module> � U
��.e � @ s d Z dS )z19.3.1N)�__version__� r r �0/usr/lib/python3.8/site-packages/pip/__init__.py�<module> � U
��.et � @ sj d dl mZ d dlZd dlZedkrFej�ej�e��Zej�d e� d dl m
Z edkrfe�
e� � dS )� )�absolute_importN� )�main�__main__)Z
__future__r �os�sys�__package__�path�dirname�__file__�insertZpip._internal.mainr Z_main�__name__�exit� r r �0/usr/lib/python3.8/site-packages/pip/__main__.py�<module> s U
��.et � @ sj d dl mZ d dlZd dlZedkrFej�ej�e��Zej�d e� d dl m
Z edkrfe�
e� � dS )� )�absolute_importN� )�main�__main__)Z
__future__r �os�sys�__package__�path�dirname�__file__�insertZpip._internal.mainr Z_main�__name__�exit� r r �0/usr/lib/python3.8/site-packages/pip/__main__.py�<module> s #!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2005-2010 ActiveState Software Inc.
# Copyright (c) 2013 Eddy Petrișor
"""Utilities for determining application-specific dirs.
See <http://github.com/ActiveState/appdirs> for details and usage.
"""
# Dev Notes:
# - MSDN on where to store app data files:
# http://support.microsoft.com/default.aspx?scid=kb;en-us;310294#XSLTH3194121123120121120120
# - Mac OS X: http://developer.apple.com/documentation/MacOSX/Conceptual/BPFileSystem/index.html
# - XDG spec for Un*x: http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
__version_info__ = (1, 4, 3)
__version__ = '.'.join(map(str, __version_info__))
import sys
import os
PY3 = sys.version_info[0] == 3
if PY3:
unicode = str
if sys.platform.startswith('java'):
import platform
os_name = platform.java_ver()[3][0]
if os_name.startswith('Windows'): # "Windows XP", "Windows 7", etc.
system = 'win32'
elif os_name.startswith('Mac'): # "Mac OS X", etc.
system = 'darwin'
else: # "Linux", "SunOS", "FreeBSD", etc.
# Setting this to "linux2" is not ideal, but only Windows or Mac
# are actually checked for and the rest of the module expects
# *sys.platform* style strings.
system = 'linux2'
else:
system = sys.platform
def user_data_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
Mac OS X: ~/Library/Application Support/<AppName>
Unix: ~/.local/share/<AppName> # or in $XDG_DATA_HOME, if defined
Win XP (not roaming): C:\Documents and Settings\<username>\Application Data\<AppAuthor>\<AppName>
Win XP (roaming): C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>
Win 7 (not roaming): C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>
Win 7 (roaming): C:\Users\<username>\AppData\Roaming\<AppAuthor>\<AppName>
For Unix, we follow the XDG spec and support $XDG_DATA_HOME.
That means, by default "~/.local/share/<AppName>".
"""
if system == "win32":
if appauthor is None:
appauthor = appname
const = roaming and "CSIDL_APPDATA" or "CSIDL_LOCAL_APPDATA"
path = os.path.normpath(_get_win_folder(const))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('~/Library/Application Support/')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_DATA_HOME', os.path.expanduser("~/.local/share"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_data_dir(appname=None, appauthor=None, version=None, multipath=False):
r"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of data dirs should be
returned. By default, the first item from XDG_DATA_DIRS is
returned, or '/usr/local/share/<AppName>',
if XDG_DATA_DIRS is not set
Typical site data directories are:
Mac OS X: /Library/Application Support/<AppName>
Unix: /usr/local/share/<AppName> or /usr/share/<AppName>
Win XP: C:\Documents and Settings\All Users\Application Data\<AppAuthor>\<AppName>
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
Win 7: C:\ProgramData\<AppAuthor>\<AppName> # Hidden, but writeable on Win 7.
For Unix, this is using the $XDG_DATA_DIRS[0] default.
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('/Library/Application Support')
if appname:
path = os.path.join(path, appname)
else:
# XDG default for $XDG_DATA_DIRS
# only first, if multipath is False
path = os.getenv('XDG_DATA_DIRS',
os.pathsep.join(['/usr/local/share', '/usr/share']))
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
if appname and version:
path = os.path.join(path, version)
return path
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific config dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user config directories are:
Mac OS X: same as user_data_dir
Unix: ~/.config/<AppName> # or in $XDG_CONFIG_HOME, if defined
Win *: same as user_data_dir
For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
That means, by default "~/.config/<AppName>".
"""
if system in ["win32", "darwin"]:
path = user_data_dir(appname, appauthor, None, roaming)
else:
path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_config_dir(appname=None, appauthor=None, version=None, multipath=False):
r"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of config dirs should be
returned. By default, the first item from XDG_CONFIG_DIRS is
returned, or '/etc/xdg/<AppName>', if XDG_CONFIG_DIRS is not set
Typical site config directories are:
Mac OS X: same as site_data_dir
Unix: /etc/xdg/<AppName> or $XDG_CONFIG_DIRS[i]/<AppName> for each value in
$XDG_CONFIG_DIRS
Win *: same as site_data_dir
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
For Unix, this is using the $XDG_CONFIG_DIRS[0] default, if multipath=False
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system in ["win32", "darwin"]:
path = site_data_dir(appname, appauthor)
if appname and version:
path = os.path.join(path, version)
else:
# XDG default for $XDG_CONFIG_DIRS
# only first, if multipath is False
path = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
def user_cache_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific cache dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Cache" to the base app data dir for Windows. See
discussion below.
Typical user cache directories are:
Mac OS X: ~/Library/Caches/<AppName>
Unix: ~/.cache/<AppName> (XDG default)
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Cache
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Cache
On Windows the only suggestion in the MSDN docs is that local settings go in
the `CSIDL_LOCAL_APPDATA` directory. This is identical to the non-roaming
app data dir (the default returned by `user_data_dir` above). Apps typically
put cache data somewhere *under* the given dir here. Some examples:
...\Mozilla\Firefox\Profiles\<ProfileName>\Cache
...\Acme\SuperApp\Cache\1.0
OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value.
This can be disabled with the `opinion=False` option.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_LOCAL_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
if opinion:
path = os.path.join(path, "Cache")
elif system == 'darwin':
path = os.path.expanduser('~/Library/Caches')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def user_state_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific state dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user state directories are:
Mac OS X: same as user_data_dir
Unix: ~/.local/state/<AppName> # or in $XDG_STATE_HOME, if defined
Win *: same as user_data_dir
For Unix, we follow this Debian proposal <https://wiki.debian.org/XDGBaseDirectorySpecification#state>
to extend the XDG spec and support $XDG_STATE_HOME.
That means, by default "~/.local/state/<AppName>".
"""
if system in ["win32", "darwin"]:
path = user_data_dir(appname, appauthor, None, roaming)
else:
path = os.getenv('XDG_STATE_HOME', os.path.expanduser("~/.local/state"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def user_log_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific log dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Logs" to the base app data dir for Windows, and "log" to the
base cache dir for Unix. See discussion below.
Typical user log directories are:
Mac OS X: ~/Library/Logs/<AppName>
Unix: ~/.cache/<AppName>/log # or under $XDG_CACHE_HOME if defined
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Logs
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Logs
On Windows the only suggestion in the MSDN docs is that local settings
go in the `CSIDL_LOCAL_APPDATA` directory. (Note: I'm interested in
examples of what some windows apps use for a logs dir.)
OPINION: This function appends "Logs" to the `CSIDL_LOCAL_APPDATA`
value for Windows and appends "log" to the user cache dir for Unix.
This can be disabled with the `opinion=False` option.
"""
if system == "darwin":
path = os.path.join(
os.path.expanduser('~/Library/Logs'),
appname)
elif system == "win32":
path = user_data_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "Logs")
else:
path = user_cache_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "log")
if appname and version:
path = os.path.join(path, version)
return path
class AppDirs(object):
"""Convenience wrapper for getting application dirs."""
def __init__(self, appname=None, appauthor=None, version=None,
roaming=False, multipath=False):
self.appname = appname
self.appauthor = appauthor
self.version = version
self.roaming = roaming
self.multipath = multipath
@property
def user_data_dir(self):
return user_data_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_data_dir(self):
return site_data_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_config_dir(self):
return user_config_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_config_dir(self):
return site_config_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_cache_dir(self):
return user_cache_dir(self.appname, self.appauthor,
version=self.version)
@property
def user_state_dir(self):
return user_state_dir(self.appname, self.appauthor,
version=self.version)
@property
def user_log_dir(self):
return user_log_dir(self.appname, self.appauthor,
version=self.version)
#---- internal support stuff
def _get_win_folder_from_registry(csidl_name):
"""This is a fallback technique at best. I'm not sure if using the
registry for this guarantees us the correct answer for all CSIDL_*
names.
"""
if PY3:
import winreg as _winreg
else:
import _winreg
shell_folder_name = {
"CSIDL_APPDATA": "AppData",
"CSIDL_COMMON_APPDATA": "Common AppData",
"CSIDL_LOCAL_APPDATA": "Local AppData",
}[csidl_name]
key = _winreg.OpenKey(
_winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders"
)
dir, type = _winreg.QueryValueEx(key, shell_folder_name)
return dir
def _get_win_folder_with_pywin32(csidl_name):
from win32com.shell import shellcon, shell
dir = shell.SHGetFolderPath(0, getattr(shellcon, csidl_name), 0, 0)
# Try to make this a unicode path because SHGetFolderPath does
# not return unicode strings when there is unicode data in the
# path.
try:
dir = unicode(dir)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
try:
import win32api
dir = win32api.GetShortPathName(dir)
except ImportError:
pass
except UnicodeError:
pass
return dir
def _get_win_folder_with_ctypes(csidl_name):
import ctypes
csidl_const = {
"CSIDL_APPDATA": 26,
"CSIDL_COMMON_APPDATA": 35,
"CSIDL_LOCAL_APPDATA": 28,
}[csidl_name]
buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in buf:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf2 = ctypes.create_unicode_buffer(1024)
if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024):
buf = buf2
return buf.value
def _get_win_folder_with_jna(csidl_name):
import array
from com.sun import jna
from com.sun.jna.platform import win32
buf_size = win32.WinDef.MAX_PATH * 2
buf = array.zeros('c', buf_size)
shell = win32.Shell32.INSTANCE
shell.SHGetFolderPath(None, getattr(win32.ShlObj, csidl_name), None, win32.ShlObj.SHGFP_TYPE_CURRENT, buf)
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf = array.zeros('c', buf_size)
kernel = win32.Kernel32.INSTANCE
if kernel.GetShortPathName(dir, buf, buf_size):
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
return dir
if system == "win32":
try:
from ctypes import windll
_get_win_folder = _get_win_folder_with_ctypes
except ImportError:
try:
import com.sun.jna
_get_win_folder = _get_win_folder_with_jna
except ImportError:
_get_win_folder = _get_win_folder_from_registry
#---- self test code
if __name__ == "__main__":
appname = "MyApp"
appauthor = "MyCompany"
props = ("user_data_dir",
"user_config_dir",
"user_cache_dir",
"user_state_dir",
"user_log_dir",
"site_data_dir",
"site_config_dir")
print("-- app dirs %s --" % __version__)
print("-- app dirs (with optional 'version')")
dirs = AppDirs(appname, appauthor, version="1.0")
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'version')")
dirs = AppDirs(appname, appauthor)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'appauthor')")
dirs = AppDirs(appname)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (with disabled 'appauthor')")
dirs = AppDirs(appname, appauthor=False)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
"""
The cache object API for implementing caches. The default is a thread
safe in-memory dictionary.
"""
from threading import Lock
class BaseCache(object):
def get(self, key):
raise NotImplementedError()
def set(self, key, value):
raise NotImplementedError()
def delete(self, key):
raise NotImplementedError()
def close(self):
pass
class DictCache(BaseCache):
def __init__(self, init_dict=None):
self.lock = Lock()
self.data = init_dict or {}
def get(self, key):
return self.data.get(key, None)
def set(self, key, value):
with self.lock:
self.data.update({key: value})
def delete(self, key):
with self.lock:
if key in self.data:
self.data.pop(key)
"""
The httplib2 algorithms ported for use with requests.
"""
import logging
import re
import calendar
import time
from email.utils import parsedate_tz
from pip._vendor.requests.structures import CaseInsensitiveDict
from .cache import DictCache
from .serialize import Serializer
logger = logging.getLogger(__name__)
URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?")
def parse_uri(uri):
"""Parses a URI using the regex given in Appendix B of RFC 3986.
(scheme, authority, path, query, fragment) = parse_uri(uri)
"""
groups = URI.match(uri).groups()
return (groups[1], groups[3], groups[4], groups[6], groups[8])
class CacheController(object):
"""An interface to see if request should cached or not.
"""
def __init__(
self, cache=None, cache_etags=True, serializer=None, status_codes=None
):
self.cache = cache or DictCache()
self.cache_etags = cache_etags
self.serializer = serializer or Serializer()
self.cacheable_status_codes = status_codes or (200, 203, 300, 301)
@classmethod
def _urlnorm(cls, uri):
"""Normalize the URL to create a safe key for the cache"""
(scheme, authority, path, query, fragment) = parse_uri(uri)
if not scheme or not authority:
raise Exception("Only absolute URIs are allowed. uri = %s" % uri)
scheme = scheme.lower()
authority = authority.lower()
if not path:
path = "/"
# Could do syntax based normalization of the URI before
# computing the digest. See Section 6.2.2 of Std 66.
request_uri = query and "?".join([path, query]) or path
defrag_uri = scheme + "://" + authority + request_uri
return defrag_uri
@classmethod
def cache_url(cls, uri):
return cls._urlnorm(uri)
def parse_cache_control(self, headers):
known_directives = {
# https://tools.ietf.org/html/rfc7234#section-5.2
"max-age": (int, True),
"max-stale": (int, False),
"min-fresh": (int, True),
"no-cache": (None, False),
"no-store": (None, False),
"no-transform": (None, False),
"only-if-cached": (None, False),
"must-revalidate": (None, False),
"public": (None, False),
"private": (None, False),
"proxy-revalidate": (None, False),
"s-maxage": (int, True),
}
cc_headers = headers.get("cache-control", headers.get("Cache-Control", ""))
retval = {}
for cc_directive in cc_headers.split(","):
if not cc_directive.strip():
continue
parts = cc_directive.split("=", 1)
directive = parts[0].strip()
try:
typ, required = known_directives[directive]
except KeyError:
logger.debug("Ignoring unknown cache-control directive: %s", directive)
continue
if not typ or not required:
retval[directive] = None
if typ:
try:
retval[directive] = typ(parts[1].strip())
except IndexError:
if required:
logger.debug(
"Missing value for cache-control " "directive: %s",
directive,
)
except ValueError:
logger.debug(
"Invalid value for cache-control directive " "%s, must be %s",
directive,
typ.__name__,
)
return retval
def cached_request(self, request):
"""
Return a cached response if it exists in the cache, otherwise
return False.
"""
cache_url = self.cache_url(request.url)
logger.debug('Looking up "%s" in the cache', cache_url)
cc = self.parse_cache_control(request.headers)
# Bail out if the request insists on fresh data
if "no-cache" in cc:
logger.debug('Request header has "no-cache", cache bypassed')
return False
if "max-age" in cc and cc["max-age"] == 0:
logger.debug('Request header has "max_age" as 0, cache bypassed')
return False
# Request allows serving from the cache, let's see if we find something
cache_data = self.cache.get(cache_url)
if cache_data is None:
logger.debug("No cache entry available")
return False
# Check whether it can be deserialized
resp = self.serializer.loads(request, cache_data)
if not resp:
logger.warning("Cache entry deserialization failed, entry ignored")
return False
# If we have a cached 301, return it immediately. We don't
# need to test our response for other headers b/c it is
# intrinsically "cacheable" as it is Permanent.
# See:
# https://tools.ietf.org/html/rfc7231#section-6.4.2
#
# Client can try to refresh the value by repeating the request
# with cache busting headers as usual (ie no-cache).
if resp.status == 301:
msg = (
'Returning cached "301 Moved Permanently" response '
"(ignoring date and etag information)"
)
logger.debug(msg)
return resp
headers = CaseInsensitiveDict(resp.headers)
if not headers or "date" not in headers:
if "etag" not in headers:
# Without date or etag, the cached response can never be used
# and should be deleted.
logger.debug("Purging cached response: no date or etag")
self.cache.delete(cache_url)
logger.debug("Ignoring cached response: no date")
return False
now = time.time()
date = calendar.timegm(parsedate_tz(headers["date"]))
current_age = max(0, now - date)
logger.debug("Current age based on date: %i", current_age)
# TODO: There is an assumption that the result will be a
# urllib3 response object. This may not be best since we
# could probably avoid instantiating or constructing the
# response until we know we need it.
resp_cc = self.parse_cache_control(headers)
# determine freshness
freshness_lifetime = 0
# Check the max-age pragma in the cache control header
if "max-age" in resp_cc:
freshness_lifetime = resp_cc["max-age"]
logger.debug("Freshness lifetime from max-age: %i", freshness_lifetime)
# If there isn't a max-age, check for an expires header
elif "expires" in headers:
expires = parsedate_tz(headers["expires"])
if expires is not None:
expire_time = calendar.timegm(expires) - date
freshness_lifetime = max(0, expire_time)
logger.debug("Freshness lifetime from expires: %i", freshness_lifetime)
# Determine if we are setting freshness limit in the
# request. Note, this overrides what was in the response.
if "max-age" in cc:
freshness_lifetime = cc["max-age"]
logger.debug(
"Freshness lifetime from request max-age: %i", freshness_lifetime
)
if "min-fresh" in cc:
min_fresh = cc["min-fresh"]
# adjust our current age by our min fresh
current_age += min_fresh
logger.debug("Adjusted current age from min-fresh: %i", current_age)
# Return entry if it is fresh enough
if freshness_lifetime > current_age:
logger.debug('The response is "fresh", returning cached response')
logger.debug("%i > %i", freshness_lifetime, current_age)
return resp
# we're not fresh. If we don't have an Etag, clear it out
if "etag" not in headers:
logger.debug('The cached response is "stale" with no etag, purging')
self.cache.delete(cache_url)
# return the original handler
return False
def conditional_headers(self, request):
cache_url = self.cache_url(request.url)
resp = self.serializer.loads(request, self.cache.get(cache_url))
new_headers = {}
if resp:
headers = CaseInsensitiveDict(resp.headers)
if "etag" in headers:
new_headers["If-None-Match"] = headers["ETag"]
if "last-modified" in headers:
new_headers["If-Modified-Since"] = headers["Last-Modified"]
return new_headers
def cache_response(self, request, response, body=None, status_codes=None):
"""
Algorithm for caching requests.
This assumes a requests Response object.
"""
# From httplib2: Don't cache 206's since we aren't going to
# handle byte range requests
cacheable_status_codes = status_codes or self.cacheable_status_codes
if response.status not in cacheable_status_codes:
logger.debug(
"Status code %s not in %s", response.status, cacheable_status_codes
)
return
response_headers = CaseInsensitiveDict(response.headers)
# If we've been given a body, our response has a Content-Length, that
# Content-Length is valid then we can check to see if the body we've
# been given matches the expected size, and if it doesn't we'll just
# skip trying to cache it.
if (
body is not None
and "content-length" in response_headers
and response_headers["content-length"].isdigit()
and int(response_headers["content-length"]) != len(body)
):
return
cc_req = self.parse_cache_control(request.headers)
cc = self.parse_cache_control(response_headers)
cache_url = self.cache_url(request.url)
logger.debug('Updating cache with response from "%s"', cache_url)
# Delete it from the cache if we happen to have it stored there
no_store = False
if "no-store" in cc:
no_store = True
logger.debug('Response header has "no-store"')
if "no-store" in cc_req:
no_store = True
logger.debug('Request header has "no-store"')
if no_store and self.cache.get(cache_url):
logger.debug('Purging existing cache entry to honor "no-store"')
self.cache.delete(cache_url)
if no_store:
return
# If we've been given an etag, then keep the response
if self.cache_etags and "etag" in response_headers:
logger.debug("Caching due to etag")
self.cache.set(
cache_url, self.serializer.dumps(request, response, body=body)
)
# Add to the cache any 301s. We do this before looking that
# the Date headers.
elif response.status == 301:
logger.debug("Caching permanant redirect")
self.cache.set(cache_url, self.serializer.dumps(request, response))
# Add to the cache if the response headers demand it. If there
# is no date header then we can't do anything about expiring
# the cache.
elif "date" in response_headers:
# cache when there is a max-age > 0
if "max-age" in cc and cc["max-age"] > 0:
logger.debug("Caching b/c date exists and max-age > 0")
self.cache.set(
cache_url, self.serializer.dumps(request, response, body=body)
)
# If the request can expire, it means we should cache it
# in the meantime.
elif "expires" in response_headers:
if response_headers["expires"]:
logger.debug("Caching b/c of expires header")
self.cache.set(
cache_url, self.serializer.dumps(request, response, body=body)
)
def update_cached_response(self, request, response):
"""On a 304 we will get a new set of headers that we want to
update our cached value with, assuming we have one.
This should only ever be called when we've sent an ETag and
gotten a 304 as the response.
"""
cache_url = self.cache_url(request.url)
cached_response = self.serializer.loads(request, self.cache.get(cache_url))
if not cached_response:
# we didn't have a cached response
return response
# Lets update our headers with the headers from the new request:
# http://tools.ietf.org/html/draft-ietf-httpbis-p4-conditional-26#section-4.1
#
# The server isn't supposed to send headers that would make
# the cached body invalid. But... just in case, we'll be sure
# to strip out ones we know that might be problmatic due to
# typical assumptions.
excluded_headers = ["content-length"]
cached_response.headers.update(
dict(
(k, v)
for k, v in response.headers.items()
if k.lower() not in excluded_headers
)
)
# we want a 200 b/c we have content via the cache
cached_response.status = 200
# update our cache
self.cache.set(cache_url, self.serializer.dumps(request, cached_response))
return cached_response
import logging
from pip._vendor import requests
from pip._vendor.cachecontrol.adapter import CacheControlAdapter
from pip._vendor.cachecontrol.cache import DictCache
from pip._vendor.cachecontrol.controller import logger
from argparse import ArgumentParser
def setup_logging():
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
logger.addHandler(handler)
def get_session():
adapter = CacheControlAdapter(
DictCache(), cache_etags=True, serializer=None, heuristic=None
)
sess = requests.Session()
sess.mount("http://", adapter)
sess.mount("https://", adapter)
sess.cache_controller = adapter.controller
return sess
def get_args():
parser = ArgumentParser()
parser.add_argument("url", help="The URL to try and cache")
return parser.parse_args()
def main(args=None):
args = get_args()
sess = get_session()
# Make a request to get a response
resp = sess.get(args.url)
# Turn on logging
setup_logging()
# try setting the cache
sess.cache_controller.cache_response(resp.request, resp.raw)
# Now try to get it
if sess.cache_controller.cached_request(resp.request):
print("Cached!")
else:
print("Not cached :(")
if __name__ == "__main__":
main()
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
try:
import cPickle as pickle
except ImportError:
import pickle
# Handle the case where the requests module has been patched to not have
# urllib3 bundled as part of its source.
try:
from pip._vendor.requests.packages.urllib3.response import HTTPResponse
except ImportError:
from pip._vendor.urllib3.response import HTTPResponse
try:
from pip._vendor.requests.packages.urllib3.util import is_fp_closed
except ImportError:
from pip._vendor.urllib3.util import is_fp_closed
# Replicate some six behaviour
try:
text_type = unicode
except NameError:
text_type = str
import base64
import io
import json
import zlib
from pip._vendor import msgpack
from pip._vendor.requests.structures import CaseInsensitiveDict
from .compat import HTTPResponse, pickle, text_type
def _b64_decode_bytes(b):
return base64.b64decode(b.encode("ascii"))
def _b64_decode_str(s):
return _b64_decode_bytes(s).decode("utf8")
class Serializer(object):
def dumps(self, request, response, body=None):
response_headers = CaseInsensitiveDict(response.headers)
if body is None:
body = response.read(decode_content=False)
# NOTE: 99% sure this is dead code. I'm only leaving it
# here b/c I don't have a test yet to prove
# it. Basically, before using
# `cachecontrol.filewrapper.CallbackFileWrapper`,
# this made an effort to reset the file handle. The
# `CallbackFileWrapper` short circuits this code by
# setting the body as the content is consumed, the
# result being a `body` argument is *always* passed
# into cache_response, and in turn,
# `Serializer.dump`.
response._fp = io.BytesIO(body)
# NOTE: This is all a bit weird, but it's really important that on
# Python 2.x these objects are unicode and not str, even when
# they contain only ascii. The problem here is that msgpack
# understands the difference between unicode and bytes and we
# have it set to differentiate between them, however Python 2
# doesn't know the difference. Forcing these to unicode will be
# enough to have msgpack know the difference.
data = {
u"response": {
u"body": body,
u"headers": dict(
(text_type(k), text_type(v)) for k, v in response.headers.items()
),
u"status": response.status,
u"version": response.version,
u"reason": text_type(response.reason),
u"strict": response.strict,
u"decode_content": response.decode_content,
}
}
# Construct our vary headers
data[u"vary"] = {}
if u"vary" in response_headers:
varied_headers = response_headers[u"vary"].split(",")
for header in varied_headers:
header = text_type(header).strip()
header_value = request.headers.get(header, None)
if header_value is not None:
header_value = text_type(header_value)
data[u"vary"][header] = header_value
return b",".join([b"cc=4", msgpack.dumps(data, use_bin_type=True)])
def loads(self, request, data):
# Short circuit if we've been given an empty set of data
if not data:
return
# Determine what version of the serializer the data was serialized
# with
try:
ver, data = data.split(b",", 1)
except ValueError:
ver = b"cc=0"
# Make sure that our "ver" is actually a version and isn't a false
# positive from a , being in the data stream.
if ver[:3] != b"cc=":
data = ver + data
ver = b"cc=0"
# Get the version number out of the cc=N
ver = ver.split(b"=", 1)[-1].decode("ascii")
# Dispatch to the actual load method for the given version
try:
return getattr(self, "_loads_v{}".format(ver))(request, data)
except AttributeError:
# This is a version we don't have a loads function for, so we'll
# just treat it as a miss and return None
return
def prepare_response(self, request, cached):
"""Verify our vary headers match and construct a real urllib3
HTTPResponse object.
"""
# Special case the '*' Vary value as it means we cannot actually
# determine if the cached response is suitable for this request.
if "*" in cached.get("vary", {}):
return
# Ensure that the Vary headers for the cached response match our
# request
for header, value in cached.get("vary", {}).items():
if request.headers.get(header, None) != value:
return
body_raw = cached["response"].pop("body")
headers = CaseInsensitiveDict(data=cached["response"]["headers"])
if headers.get("transfer-encoding", "") == "chunked":
headers.pop("transfer-encoding")
cached["response"]["headers"] = headers
try:
body = io.BytesIO(body_raw)
except TypeError:
# This can happen if cachecontrol serialized to v1 format (pickle)
# using Python 2. A Python 2 str(byte string) will be unpickled as
# a Python 3 str (unicode string), which will cause the above to
# fail with:
#
# TypeError: 'str' does not support the buffer interface
body = io.BytesIO(body_raw.encode("utf8"))
return HTTPResponse(body=body, preload_content=False, **cached["response"])
def _loads_v0(self, request, data):
# The original legacy cache data. This doesn't contain enough
# information to construct everything we need, so we'll treat this as
# a miss.
return
def _loads_v1(self, request, data):
try:
cached = pickle.loads(data)
except ValueError:
return
return self.prepare_response(request, cached)
def _loads_v2(self, request, data):
try:
cached = json.loads(zlib.decompress(data).decode("utf8"))
except (ValueError, zlib.error):
return
# We need to decode the items that we've base64 encoded
cached["response"]["body"] = _b64_decode_bytes(cached["response"]["body"])
cached["response"]["headers"] = dict(
(_b64_decode_str(k), _b64_decode_str(v))
for k, v in cached["response"]["headers"].items()
)
cached["response"]["reason"] = _b64_decode_str(cached["response"]["reason"])
cached["vary"] = dict(
(_b64_decode_str(k), _b64_decode_str(v) if v is not None else v)
for k, v in cached["vary"].items()
)
return self.prepare_response(request, cached)
def _loads_v3(self, request, data):
# Due to Python 2 encoding issues, it's impossible to know for sure
# exactly how to load v3 entries, thus we'll treat these as a miss so
# that they get rewritten out as v4 entries.
return
def _loads_v4(self, request, data):
try:
cached = msgpack.loads(data, encoding="utf-8")
except ValueError:
return
return self.prepare_response(request, cached)
from io import BytesIO
class CallbackFileWrapper(object):
"""
Small wrapper around a fp object which will tee everything read into a
buffer, and when that file is closed it will execute a callback with the
contents of that buffer.
All attributes are proxied to the underlying file object.
This class uses members with a double underscore (__) leading prefix so as
not to accidentally shadow an attribute.
"""
def __init__(self, fp, callback):
self.__buf = BytesIO()
self.__fp = fp
self.__callback = callback
def __getattr__(self, name):
# The vaguaries of garbage collection means that self.__fp is
# not always set. By using __getattribute__ and the private
# name[0] allows looking up the attribute value and raising an
# AttributeError when it doesn't exist. This stop thigns from
# infinitely recursing calls to getattr in the case where
# self.__fp hasn't been set.
#
# [0] https://docs.python.org/2/reference/expressions.html#atom-identifiers
fp = self.__getattribute__("_CallbackFileWrapper__fp")
return getattr(fp, name)
def __is_fp_closed(self):
try:
return self.__fp.fp is None
except AttributeError:
pass
try:
return self.__fp.closed
except AttributeError:
pass
# We just don't cache it then.
# TODO: Add some logging here...
return False
def _close(self):
if self.__callback:
self.__callback(self.__buf.getvalue())
# We assign this to None here, because otherwise we can get into
# really tricky problems where the CPython interpreter dead locks
# because the callback is holding a reference to something which
# has a __del__ method. Setting this to None breaks the cycle
# and allows the garbage collector to do it's thing normally.
self.__callback = None
def read(self, amt=None):
data = self.__fp.read(amt)
self.__buf.write(data)
if self.__is_fp_closed():
self._close()
return data
def _safe_read(self, amt):
data = self.__fp._safe_read(amt)
if amt == 2 and data == b"\r\n":
# urllib executes this read to toss the CRLF at the end
# of the chunk.
return data
self.__buf.write(data)
if self.__is_fp_closed():
self._close()
return data
from .adapter import CacheControlAdapter
from .cache import DictCache
def CacheControl(
sess,
cache=None,
cache_etags=True,
serializer=None,
heuristic=None,
controller_class=None,
adapter_class=None,
cacheable_methods=None,
):
cache = cache or DictCache()
adapter_class = adapter_class or CacheControlAdapter
adapter = adapter_class(
cache,
cache_etags=cache_etags,
serializer=serializer,
heuristic=heuristic,
controller_class=controller_class,
cacheable_methods=cacheable_methods,
)
sess.mount("http://", adapter)
sess.mount("https://", adapter)
return sess
U
��.e% � @ s4 d Z ddlmZ G dd� de�ZG dd� de�ZdS )zb
The cache object API for implementing caches. The default is a thread
safe in-memory dictionary.
� )�Lockc @ s, e Zd Zdd� Zdd� Zdd� Zdd� Zd S )
� BaseCachec C s
t � �d S �N��NotImplementedError��self�key� r
�B/usr/lib/python3.8/site-packages/pip/_vendor/cachecontrol/cache.py�get
s z
BaseCache.getc C s
t � �d S r r �r r �valuer
r
r �set
s z
BaseCache.setc C s
t � �d S r r r r
r
r �delete s zBaseCache.deletec C s d S r r
)r r
r
r �close s zBaseCache.closeN)�__name__�
__module__�__qualname__r r r r r
r
r
r r s r c @ s. e Zd Zd
dd�Zdd� Zdd� Zdd � ZdS )� DictCacheNc C s t � | _|pi | _d S r )r �lock�data)r Z init_dictr
r
r �__init__ s zDictCache.__init__c C s | j �|d �S r )r r r r
r
r r s z
DictCache.getc C s&