PCQRSCANER/venv/Lib/site-packages/imapclient/test/test_imapclient.py

578 lines
23 KiB
Python
Raw Permalink Normal View History

2019-12-22 21:51:47 +01:00
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
from __future__ import unicode_literals
import itertools
import socket
import sys
from datetime import datetime
import six
from mock import patch, sentinel, Mock
from imapclient.fixed_offset import FixedOffset
from .testable_imapclient import TestableIMAPClient as IMAPClient
from .imapclient_test import IMAPClientTest
class TestListFolders(IMAPClientTest):
def test_list_folders(self):
self.client._imap._simple_command.return_value = ('OK', [b'something'])
self.client._imap._untagged_response.return_value = ('LIST', sentinel.folder_data)
self.client._proc_folder_list = Mock(return_value=sentinel.folder_list)
folders = self.client.list_folders('foo', 'bar')
self.client._imap._simple_command.assert_called_once_with(
'LIST', b'"foo"', b'"bar"')
self.assertEqual(self.client._proc_folder_list.call_args, ((sentinel.folder_data,), {}))
self.assertTrue(folders is sentinel.folder_list)
def test_list_sub_folders(self):
self.client._imap._simple_command.return_value = ('OK', [b'something'])
self.client._imap._untagged_response.return_value = ('LSUB', sentinel.folder_data)
self.client._proc_folder_list = Mock(return_value=sentinel.folder_list)
folders = self.client.list_sub_folders('foo', 'bar')
self.client._imap._simple_command.assert_called_once_with(
'LSUB', b'"foo"', b'"bar"')
self.assertEqual(self.client._proc_folder_list.call_args, ((sentinel.folder_data,), {}))
self.assertTrue(folders is sentinel.folder_list)
def test_list_folders_NO(self):
self.client._imap._simple_command.return_value = ('NO', [b'badness'])
self.assertRaises(IMAPClient.Error, self.client.list_folders)
def test_list_sub_folders_NO(self):
self.client._imap._simple_command.return_value = ('NO', [b'badness'])
self.assertRaises(IMAPClient.Error, self.client.list_folders)
def test_utf7_decoding(self):
self.client._imap._simple_command.return_value = ('OK', [b'something'])
self.client._imap._untagged_response.return_value = (
'LIST', [
b'(\\HasNoChildren) "/" "A"',
b'(\\HasNoChildren) "/" "Hello&AP8-world"',
])
folders = self.client.list_folders('foo', 'bar')
self.client._imap._simple_command.assert_called_once_with('LIST', b'"foo"', b'"bar"')
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', 'A'),
((b'\\HasNoChildren',), b'/', 'Hello\xffworld')])
def test_folder_encode_off(self):
self.client.folder_encode = False
self.client._imap._simple_command.return_value = ('OK', [b'something'])
self.client._imap._untagged_response.return_value = (
'LIST', [
b'(\\HasNoChildren) "/" "A"',
b'(\\HasNoChildren) "/" "Hello&AP8-world"',
])
folders = self.client.list_folders('foo', 'bar')
self.client._imap._simple_command.assert_called_once_with('LIST', '"foo"', '"bar"')
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', b'A'),
((b'\\HasNoChildren',), b'/', b'Hello&AP8-world')])
def test_simple(self):
folders = self.client._proc_folder_list([b'(\\HasNoChildren) "/" "A"',
b'(\\HasNoChildren) "/" "Foo Bar"',
])
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', 'A',),
((b'\\HasNoChildren',), b'/', 'Foo Bar')])
def test_without_quotes(self):
folders = self.client._proc_folder_list([b'(\\HasNoChildren) "/" A',
b'(\\HasNoChildren) "/" B',
b'(\\HasNoChildren) "/" C',
])
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', 'A'),
((b'\\HasNoChildren',), b'/', 'B'),
((b'\\HasNoChildren',), b'/', 'C')])
def test_unquoted_numeric_folder_name(self):
# Some IMAP implementations do this
folders = self.client._proc_folder_list([b'(\\HasNoChildren) "/" 123'])
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', '123')])
def test_unqouted_numeric_folder_name_parsed_as_long(self):
# big enough numeric values might get parsed as longs
folder_name = str(sys.maxsize + 1)
folders = self.client._proc_folder_list(
[b'(\\HasNoChildren) "/" ' + folder_name.encode('ascii')])
self.assertEqual(folders, [((b'\\HasNoChildren', ), b'/', folder_name)])
def test_mixed(self):
folders = self.client._proc_folder_list([b'(\\HasNoChildren) "/" Alpha',
b'(\\HasNoChildren) "/" "Foo Bar"',
b'(\\HasNoChildren) "/" C',
])
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', 'Alpha'),
((b'\\HasNoChildren',), b'/', 'Foo Bar'),
((b'\\HasNoChildren',), b'/', 'C')])
def test_funky_characters(self):
folders = self.client._proc_folder_list([(b'(\\NoInferiors \\UnMarked) "/" {5}', 'bang\xff'),
b'',
b'(\\HasNoChildren \\UnMarked) "/" "INBOX"'])
self.assertEqual(folders, [((b'\\NoInferiors', b'\\UnMarked'), b"/", 'bang\xff'),
((b'\\HasNoChildren', b'\\UnMarked'), b"/", 'INBOX')])
def test_quoted_specials(self):
folders = self.client._proc_folder_list([br'(\HasNoChildren) "/" "Test \"Folder\""',
br'(\HasNoChildren) "/" "Left\"Right"',
br'(\HasNoChildren) "/" "Left\\Right"',
br'(\HasNoChildren) "/" "\"Left Right\""',
br'(\HasNoChildren) "/" "\"Left\\Right\""',
])
self.assertEqual(folders, [((b'\\HasNoChildren',), b'/', 'Test "Folder"'),
((b'\\HasNoChildren',), b'/', 'Left\"Right'),
((b'\\HasNoChildren',), b'/', r'Left\Right'),
((b'\\HasNoChildren',), b'/', r'"Left Right"'),
((b'\\HasNoChildren',), b'/', r'"Left\Right"'),
])
def test_empty_response(self):
self.assertEqual(self.client._proc_folder_list([None]), [])
def test_blanks(self):
folders = self.client._proc_folder_list(['', None, br'(\HasNoChildren) "/" "last"'])
self.assertEqual(folders, [((br'\HasNoChildren',), b'/', 'last')])
class TestSelectFolder(IMAPClientTest):
def test_normal(self):
self.client._command_and_check = Mock()
self.client._imap.untagged_responses = {
b'exists': [b'3'],
b'FLAGS': [br"(\Flagged \Deleted abc [foo]/bar def)"],
b'HIGHESTMODSEQ': [b'127110'],
b'OK': [br"[PERMANENTFLAGS (\Flagged \Deleted abc [foo]/bar def \*)] Flags permitted.",
b'[UIDVALIDITY 631062293] UIDs valid.',
b'[UIDNEXT 1281] Predicted next UID.',
b'[HIGHESTMODSEQ 127110]'],
b'PERMANENTFLAGS': [br'(\Flagged \Deleted abc [foo'],
b'READ-WRITE': [b''],
b'RECENT': [b'0'],
b'UIDNEXT': [b'1281'],
b'UIDVALIDITY': [b'631062293'],
b'OTHER': [b'blah']
}
result = self.client.select_folder(b'folder_name', sentinel.readonly)
self.client._command_and_check.assert_called_once_with('select',
b'"folder_name"',
sentinel.readonly)
self.maxDiff = 99999
self.assertEqual(result, {
b'EXISTS': 3,
b'RECENT': 0,
b'UIDNEXT': 1281,
b'UIDVALIDITY': 631062293,
b'HIGHESTMODSEQ': 127110,
b'FLAGS': (br'\Flagged', br'\Deleted', b'abc', b'[foo]/bar', b'def'),
b'PERMANENTFLAGS': (br'\Flagged', br'\Deleted', b'abc', b'[foo]/bar', b'def', br'\*'),
b'READ-WRITE': True,
b'OTHER': [b'blah']
})
class TestAppend(IMAPClientTest):
def test_without_msg_time(self):
self.client._imap.append.return_value = ('OK', [b'Good'])
msg = 'hi'
self.client.append('foobar', msg, ['FLAG', 'WAVE'], None)
self.client._imap.append.assert_called_with(
b'"foobar"', '(FLAG WAVE)', None, b'hi')
@patch('imapclient.imapclient.datetime_to_INTERNALDATE')
def test_with_msg_time(self, datetime_to_INTERNALDATE):
datetime_to_INTERNALDATE.return_value = 'somedate'
self.client._imap.append.return_value = ('OK', [b'Good'])
msg = b'bye'
self.client.append('foobar', msg, ['FLAG', 'WAVE'],
datetime(2009, 4, 5, 11, 0, 5, 0, FixedOffset(2 * 60)))
self.assertTrue(datetime_to_INTERNALDATE.called)
self.client._imap.append.assert_called_with(
b'"foobar"', '(FLAG WAVE)', '"somedate"', msg)
class TestAclMethods(IMAPClientTest):
def test_getacl(self):
self.client._imap.getacl.return_value = ('OK', [b'INBOX Fred rwipslda Sally rwip'])
acl = self.client.getacl('INBOX')
self.assertSequenceEqual(acl, [(b'Fred', b'rwipslda'), (b'Sally', b'rwip')])
def test_setacl(self):
self.client._imap.setacl.return_value = ('OK', [b"SETACL done"])
response = self.client.setacl('folder', sentinel.who, sentinel.what)
self.client._imap.setacl.assert_called_with(b'"folder"',
sentinel.who,
sentinel.what)
self.assertEqual(response, b"SETACL done")
class TestIdleAndNoop(IMAPClientTest):
def assert_sock_calls(self, sock):
self.assertListEqual(sock.method_calls, [
('settimeout', (None,), {}),
('setblocking', (0,), {}),
('setblocking', (1,), {}),
])
def test_idle(self):
self.client._imap._command.return_value = sentinel.tag
self.client._imap._get_response.return_value = None
self.client.idle()
self.client._imap._command.assert_called_with('IDLE')
self.assertEqual(self.client._idle_tag, sentinel.tag)
@patch('imapclient.imapclient.select.select')
def test_idle_check_blocking(self, mock_select):
mock_sock = Mock()
self.client._imap.sock = self.client._imap.sslobj = mock_sock
mock_select.return_value = ([True], [], [])
counter = itertools.count()
def fake_get_line():
count = six.next(counter)
if count == 0:
return b'* 1 EXISTS'
elif count == 1:
return b'* 0 EXPUNGE'
else:
raise socket.timeout
self.client._imap._get_line = fake_get_line
responses = self.client.idle_check()
mock_select.assert_called_once_with([mock_sock], [], [], None)
self.assert_sock_calls(mock_sock)
self.assertListEqual([(1, b'EXISTS'), (0, b'EXPUNGE')], responses)
@patch('imapclient.imapclient.select.select')
def test_idle_check_timeout(self, mock_select):
mock_sock = Mock()
self.client._imap.sock = self.client._imap.sslobj = mock_sock
mock_select.return_value = ([], [], [])
responses = self.client.idle_check(timeout=0.5)
mock_select.assert_called_once_with([mock_sock], [], [], 0.5)
self.assert_sock_calls(mock_sock)
self.assertListEqual([], responses)
@patch('imapclient.imapclient.select.select')
def test_idle_check_with_data(self, mock_select):
mock_sock = Mock()
self.client._imap.sock = self.client._imap.sslobj = mock_sock
mock_select.return_value = ([True], [], [])
counter = itertools.count()
def fake_get_line():
count = six.next(counter)
if count == 0:
return b'* 99 EXISTS'
else:
raise socket.timeout
self.client._imap._get_line = fake_get_line
responses = self.client.idle_check()
mock_select.assert_called_once_with([mock_sock], [], [], None)
self.assert_sock_calls(mock_sock)
self.assertListEqual([(99, b'EXISTS')], responses)
def test_idle_done(self):
self.client._idle_tag = sentinel.tag
mockSend = Mock()
self.client._imap.send = mockSend
mockConsume = Mock(return_value=sentinel.out)
self.client._consume_until_tagged_response = mockConsume
result = self.client.idle_done()
mockSend.assert_called_with(b'DONE\r\n')
mockConsume.assert_called_with(sentinel.tag, 'IDLE')
self.assertEqual(result, sentinel.out)
def test_noop(self):
mockCommand = Mock(return_value=sentinel.tag)
self.client._imap._command = mockCommand
mockConsume = Mock(return_value=sentinel.out)
self.client._consume_until_tagged_response = mockConsume
result = self.client.noop()
mockCommand.assert_called_with('NOOP')
mockConsume.assert_called_with(sentinel.tag, 'NOOP')
self.assertEqual(result, sentinel.out)
def test_consume_until_tagged_response(self):
client = self.client
client._imap.tagged_commands = {sentinel.tag: None}
counter = itertools.count()
def fake_get_response():
count = six.next(counter)
if count == 0:
return b'* 99 EXISTS'
client._imap.tagged_commands[sentinel.tag] = ('OK', [b'Idle done'])
client._imap._get_response = fake_get_response
text, responses = client._consume_until_tagged_response(sentinel.tag, b'IDLE')
self.assertEqual(client._imap.tagged_commands, {})
self.assertEqual(text, b'Idle done')
self.assertListEqual([(99, b'EXISTS')], responses)
class TestDebugLogging(IMAPClientTest):
def test_default_is_stderr(self):
self.assertIs(self.client.log_file, sys.stderr)
def test_IMAP_is_patched(self):
log = six.StringIO()
self.client.log_file = log
self.client._log('one')
self.client._imap._mesg('two')
output = log.getvalue()
self.assertIn('one', output)
self.assertIn('two', output)
class TestTimeNormalisation(IMAPClientTest):
def test_default(self):
self.assertTrue(self.client.normalise_times)
@patch('imapclient.imapclient.parse_fetch_response')
def test_pass_through(self, parse_fetch_response):
self.client._imap._command_complete.return_value = ('OK', sentinel.data)
self.client._imap._untagged_response.return_value = ('OK', sentinel.fetch_data)
self.client.use_uid = sentinel.use_uid
def check(expected):
self.client.fetch(22, ['SOMETHING'])
parse_fetch_response.assert_called_with(sentinel.fetch_data,
expected,
sentinel.use_uid)
self.client.normalise_times = True
check(True)
self.client.normalise_times = False
check(False)
class TestNamespace(IMAPClientTest):
def set_return(self, value):
self.client._imap.namespace.return_value = ('OK', [value])
def test_simple(self):
self.set_return(b'(("FOO." "/")) NIL NIL')
self.assertEqual(self.client.namespace(), ((('FOO.', '/'),), None, None))
def test_folder_decoding(self):
self.set_return(b'(("&AP8-." "/")) NIL NIL')
self.assertEqual(self.client.namespace(), ((('\xff.', '/'),), None, None))
def test_without_folder_decoding(self):
self.set_return(b'(("&AP8-." "/")) NIL NIL')
self.client.folder_encode = False
self.assertEqual(self.client.namespace(), (((b'&AP8-.', '/'),), None, None))
def test_other_only(self):
self.set_return(b'NIL NIL (("" "."))')
self.assertEqual(self.client.namespace(), (None, None, (("", "."),)))
def test_complex(self):
self.set_return(b'(("" "/")) '
b'(("~" "/")) '
b'(("#shared/" "/") ("#public/" "/")("#ftp/" "/")("#news." "."))')
self.assertEqual(self.client.namespace(), (
(("", "/"),),
(("~", "/"),),
(("#shared/", "/"), ("#public/", "/"), ("#ftp/", "/"), ("#news.", ".")),
))
class TestCapabilities(IMAPClientTest):
def test_preauth(self):
self.client._imap.capabilities = ('FOO', 'BAR')
self.client._imap.untagged_responses = {}
self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR'))
def test_server_returned_capability_after_auth(self):
self.client._imap.capabilities = (b'FOO',)
self.client._imap.untagged_responses = {'CAPABILITY': [b'FOO MORE']}
self.assertEqual(self.client._cached_capabilities, None)
self.assertEqual(self.client.capabilities(), (b'FOO', b'MORE'))
self.assertEqual(self.client._cached_capabilities, (b'FOO', b'MORE'))
self.assertEqual(self.client._imap.untagged_responses, {})
def test_caching(self):
self.client._imap.capabilities = ('FOO',)
self.client._imap.untagged_responses = {}
self.client._cached_capabilities = (b'FOO', b'MORE')
self.assertEqual(self.client.capabilities(), (b'FOO', b'MORE'))
def test_post_auth_request(self):
self.client._imap.capabilities = ('FOO',)
self.client._imap.untagged_responses = {}
self.client._imap.state = 'SELECTED'
self.client._imap.capability.return_value = ('OK', [b'FOO BAR'])
self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR'))
self.assertEqual(self.client._cached_capabilities, (b'FOO', b'BAR'))
def test_with_starttls(self):
# Initial connection
self.client._imap.capabilities = ('FOO',)
self.client._imap.untagged_responses = {}
self.client._imap.state = 'NONAUTH'
self.assertEqual(self.client.capabilities(), (b'FOO',))
# Now do STARTTLS; capabilities change and should be reported.
self.client._starttls_done = True
self.client._imap.capability.return_value = ('OK', [b'FOO BAR'])
self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR'))
# Login done; capabilities change again.
self.client._imap.state = 'AUTH'
self.client._imap.capability.return_value = ('OK', [b'FOO BAR QUX'])
self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR', b'QUX'))
def test_has_capability(self):
self.client._cached_capabilities = (b'FOO', b'MORE')
self.assertTrue(self.client.has_capability(b'FOO'))
self.assertTrue(self.client.has_capability(b'foo'))
self.assertFalse(self.client.has_capability(b'BAR'))
self.assertTrue(self.client.has_capability('FOO'))
self.assertTrue(self.client.has_capability('foo'))
self.assertFalse(self.client.has_capability('BAR'))
class TestId(IMAPClientTest):
def test_id(self):
self.client._cached_capabilities = (b'ID',)
self.client._imap._simple_command.return_value = ('OK', [b'Success'])
self.client._imap._untagged_response.return_value = (
b'OK', [b'("name" "GImap" "vendor" "Google, Inc.")'])
id_response = self.client.id_({'name': 'IMAPClient'})
self.client._imap._simple_command.assert_called_with(
'ID', '("name" "IMAPClient")')
self.assertSequenceEqual(
id_response,
((b'name', b'GImap', b'vendor', b'Google, Inc.'),))
def test_no_support(self):
self.client._cached_capabilities = (b'IMAP4rev1',)
self.assertRaises(ValueError, self.client.id_)
def test_invalid_parameters(self):
self.assertRaises(TypeError, self.client.id_, 'bananarama')
class TestRawCommand(IMAPClientTest):
def setUp(self):
super(TestRawCommand, self).setUp()
self.client._imap._get_response.return_value = None
self.client._imap._command_complete.return_value = ('OK', ['done'])
def check(self, command, args, expected):
typ, data = self.client._raw_command(command, args)
self.assertEqual(typ, 'OK')
self.assertEqual(data, ['done'])
self.assertEqual(self.client._imap.sent, expected)
def test_plain(self):
self.check(b'search', [b'ALL'],
b'tag UID SEARCH ALL\r\n',
)
def test_not_uid(self):
self.client.use_uid = False
self.check(b'search', [b'ALL'],
b'tag SEARCH ALL\r\n',
)
def test_literal_at_end(self):
self.check(b'search', [b'TEXT', b'\xfe\xff'],
b'tag UID SEARCH TEXT {2}\r\n'
b'\xfe\xff\r\n'
)
def test_embedded_literal(self):
self.check(b'search', [b'TEXT', b'\xfe\xff', b'DELETED'],
b'tag UID SEARCH TEXT {2}\r\n'
b'\xfe\xff DELETED\r\n'
)
def test_multiple_literals(self):
self.check(b'search', [b'TEXT', b'\xfe\xff', b'TEXT', b'\xcc'],
b'tag UID SEARCH TEXT {2}\r\n'
b'\xfe\xff TEXT {1}\r\n'
b'\xcc\r\n'
)
def test_complex(self):
self.check(b'search', [b'FLAGGED', b'TEXT', b'\xfe\xff', b'TEXT', b'\xcc', b'TEXT', b'foo'],
b'tag UID SEARCH FLAGGED TEXT {2}\r\n'
b'\xfe\xff TEXT {1}\r\n'
b'\xcc TEXT foo\r\n'
)
def test_invalid_input_type(self):
self.assertRaises(ValueError, self.client._raw_command, 'foo', [])
self.assertRaises(ValueError, self.client._raw_command, u'foo', ['foo'])
def test_failed_continuation_wait(self):
self.client._imap._get_response.return_value = b'blah'
self.client._imap.tagged_commands['tag'] = ('NO', ['go away'])
expected_error = "unexpected response while waiting for continuation response: \(u?'NO', \[u?'go away'\]\)"
with self.assertRaisesRegex(IMAPClient.AbortError, expected_error):
self.client._raw_command(b'FOO', [b'\xff'])
class TestShutdown(IMAPClientTest):
def test_shutdown(self):
self.client.shutdown()
self.client._imap.shutdown.assert_called_once_with()