# -*- coding: utf-8 -*- """ werkzeug._internal ~~~~~~~~~~~~~~~~~~ This module provides internally used helpers and constants. :copyright: (c) 2014 by the Werkzeug Team, see AUTHORS for more details. :license: BSD, see LICENSE for more details. """ import re import string import inspect from weakref import WeakKeyDictionary from datetime import datetime, date from itertools import chain from werkzeug._compat import iter_bytes, text_type, BytesIO, int_to_byte, \ range_type, integer_types _logger = None _empty_stream = BytesIO() _signature_cache = WeakKeyDictionary() _epoch_ord = date(1970, 1, 1).toordinal() _cookie_params = set((b'expires', b'path', b'comment', b'max-age', b'secure', b'httponly', b'version')) _legal_cookie_chars = (string.ascii_letters + string.digits + u"/=!#$%&'*+-.^_`|~:").encode('ascii') _cookie_quoting_map = { b',': b'\\054', b';': b'\\073', b'"': b'\\"', b'\\': b'\\\\', } for _i in chain(range_type(32), range_type(127, 256)): _cookie_quoting_map[int_to_byte(_i)] = ('\\%03o' % _i).encode('latin1') _octal_re = re.compile(br'\\[0-3][0-7][0-7]') _quote_re = re.compile(br'[\\].') _legal_cookie_chars_re = br'[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]' _cookie_re = re.compile(br""" (?P<key>[^=;]+) (?:\s*=\s* (?P<val> "(?:[^\\"]|\\.)*" | (?:.*?) ) )? \s*; """, flags=re.VERBOSE) class _Missing(object): def __repr__(self): return 'no value' def __reduce__(self): return '_missing' _missing = _Missing() def _get_environ(obj): env = getattr(obj, 'environ', obj) assert isinstance(env, dict), \ '%r is not a WSGI environment (has to be a dict)' % type(obj).__name__ return env def _log(type, message, *args, **kwargs): """Log into the internal werkzeug logger.""" global _logger if _logger is None: import logging _logger = logging.getLogger('werkzeug') # Only set up a default log handler if the # end-user application didn't set anything up. if not logging.root.handlers and _logger.level == logging.NOTSET: _logger.setLevel(logging.INFO) handler = logging.StreamHandler() _logger.addHandler(handler) getattr(_logger, type)(message.rstrip(), *args, **kwargs) def _parse_signature(func): """Return a signature object for the function.""" if hasattr(func, 'im_func'): func = func.im_func # if we have a cached validator for this function, return it parse = _signature_cache.get(func) if parse is not None: return parse # inspect the function signature and collect all the information if hasattr(inspect, 'getfullargspec'): tup = inspect.getfullargspec(func) else: tup = inspect.getargspec(func) positional, vararg_var, kwarg_var, defaults = tup[:4] defaults = defaults or () arg_count = len(positional) arguments = [] for idx, name in enumerate(positional): if isinstance(name, list): raise TypeError('cannot parse functions that unpack tuples ' 'in the function signature') try: default = defaults[idx - arg_count] except IndexError: param = (name, False, None) else: param = (name, True, default) arguments.append(param) arguments = tuple(arguments) def parse(args, kwargs): new_args = [] missing = [] extra = {} # consume as many arguments as positional as possible for idx, (name, has_default, default) in enumerate(arguments): try: new_args.append(args[idx]) except IndexError: try: new_args.append(kwargs.pop(name)) except KeyError: if has_default: new_args.append(default) else: missing.append(name) else: if name in kwargs: extra[name] = kwargs.pop(name) # handle extra arguments extra_positional = args[arg_count:] if vararg_var is not None: new_args.extend(extra_positional) extra_positional = () if kwargs and kwarg_var is None: extra.update(kwargs) kwargs = {} return new_args, kwargs, missing, extra, extra_positional, \ arguments, vararg_var, kwarg_var _signature_cache[func] = parse return parse def _date_to_unix(arg): """Converts a timetuple, integer or datetime object into the seconds from epoch in utc. """ if isinstance(arg, datetime): arg = arg.utctimetuple() elif isinstance(arg, integer_types + (float,)): return int(arg) year, month, day, hour, minute, second = arg[:6] days = date(year, month, 1).toordinal() - _epoch_ord + day - 1 hours = days * 24 + hour minutes = hours * 60 + minute seconds = minutes * 60 + second return seconds class _DictAccessorProperty(object): """Baseclass for `environ_property` and `header_property`.""" read_only = False def __init__(self, name, default=None, load_func=None, dump_func=None, read_only=None, doc=None): self.name = name self.default = default self.load_func = load_func self.dump_func = dump_func if read_only is not None: self.read_only = read_only self.__doc__ = doc def __get__(self, obj, type=None): if obj is None: return self storage = self.lookup(obj) if self.name not in storage: return self.default rv = storage[self.name] if self.load_func is not None: try: rv = self.load_func(rv) except (ValueError, TypeError): rv = self.default return rv def __set__(self, obj, value): if self.read_only: raise AttributeError('read only property') if self.dump_func is not None: value = self.dump_func(value) self.lookup(obj)[self.name] = value def __delete__(self, obj): if self.read_only: raise AttributeError('read only property') self.lookup(obj).pop(self.name, None) def __repr__(self): return '<%s %s>' % ( self.__class__.__name__, self.name ) def _cookie_quote(b): buf = bytearray() all_legal = True _lookup = _cookie_quoting_map.get _push = buf.extend for char in iter_bytes(b): if char not in _legal_cookie_chars: all_legal = False char = _lookup(char, char) _push(char) if all_legal: return bytes(buf) return bytes(b'"' + buf + b'"') def _cookie_unquote(b): if len(b) < 2: return b if b[:1] != b'"' or b[-1:] != b'"': return b b = b[1:-1] i = 0 n = len(b) rv = bytearray() _push = rv.extend while 0 <= i < n: o_match = _octal_re.search(b, i) q_match = _quote_re.search(b, i) if not o_match and not q_match: rv.extend(b[i:]) break j = k = -1 if o_match: j = o_match.start(0) if q_match: k = q_match.start(0) if q_match and (not o_match or k < j): _push(b[i:k]) _push(b[k + 1:k + 2]) i = k + 2 else: _push(b[i:j]) rv.append(int(b[j + 1:j + 4], 8)) i = j + 4 return bytes(rv) def _cookie_parse_impl(b): """Lowlevel cookie parsing facility that operates on bytes.""" i = 0 n = len(b) while i < n: match = _cookie_re.search(b + b';', i) if not match: break key = match.group('key').strip() value = match.group('val') or b'' i = match.end(0) # Ignore parameters. We have no interest in them. if key.lower() not in _cookie_params: yield _cookie_unquote(key), _cookie_unquote(value) def _encode_idna(domain): # If we're given bytes, make sure they fit into ASCII if not isinstance(domain, text_type): domain.decode('ascii') return domain # Otherwise check if it's already ascii, then return try: return domain.encode('ascii') except UnicodeError: pass # Otherwise encode each part separately parts = domain.split('.') for idx, part in enumerate(parts): parts[idx] = part.encode('idna') return b'.'.join(parts) def _decode_idna(domain): # If the input is a string try to encode it to ascii to # do the idna decoding. if that fails because of an # unicode error, then we already have a decoded idna domain if isinstance(domain, text_type): try: domain = domain.encode('ascii') except UnicodeError: return domain # Decode each part separately. If a part fails, try to # decode it with ascii and silently ignore errors. This makes # most sense because the idna codec does not have error handling parts = domain.split(b'.') for idx, part in enumerate(parts): try: parts[idx] = part.decode('idna') except UnicodeError: parts[idx] = part.decode('ascii', 'ignore') return '.'.join(parts) def _make_cookie_domain(domain): if domain is None: return None domain = _encode_idna(domain) if b':' in domain: domain = domain.split(b':', 1)[0] if b'.' in domain: return domain raise ValueError( 'Setting \'domain\' for a cookie on a server running locally (ex: ' 'localhost) is not supported by complying browsers. You should ' 'have something like: \'127.0.0.1 localhost dev.localhost\' on ' 'your hosts file and then point your server to run on ' '\'dev.localhost\' and also set \'domain\' for \'dev.localhost\'' ) def _easteregg(app=None): """Like the name says. But who knows how it works?""" def bzzzzzzz(gyver): import base64 import zlib return zlib.decompress(base64.b64decode(gyver)).decode('ascii') gyver = u'\n'.join([x + (77 - len(x)) * u' ' for x in bzzzzzzz(b''' eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m 9Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz 4QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5 jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317 8P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE 1R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG 8MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8 MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4 GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/ nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p 7f2zLkGNv8b191cD/3vs9Q833z8t''').splitlines()]) def easteregged(environ, start_response): def injecting_start_response(status, headers, exc_info=None): headers.append(('X-Powered-By', 'Werkzeug')) return start_response(status, headers, exc_info) if app is not None and environ.get('QUERY_STRING') != 'macgybarchakku': return app(environ, injecting_start_response) injecting_start_response('200 OK', [('Content-Type', 'text/html')]) return [(u''' <!DOCTYPE html> <html> <head> <title>About Werkzeug</title> <style type="text/css"> body { font: 15px Georgia, serif; text-align: center; } a { color: #333; text-decoration: none; } h1 { font-size: 30px; margin: 20px 0 10px 0; } p { margin: 0 0 30px 0; } pre { font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; } </style> </head> <body> <h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1> <p>the Swiss Army knife of Python web development.</p> <pre>%s\n\n\n</pre> </body> </html>''' % gyver).encode('latin1')] return easteregged