123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650 |
- """IMAP4 client.
- Based on RFC 2060.
- Public class: IMAP4
- Public variable: Debug
- Public functions: Internaldate2tuple
- Int2AP
- ParseFlags
- Time2Internaldate
- """
- # Author: Piers Lauder <piers@cs.su.oz.au> December 1997.
- #
- # Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998.
- # String method conversion by ESR, February 2001.
- # GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001.
- # IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002.
- # GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002.
- # PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002.
- # GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005.
- __version__ = "2.58"
- import binascii, errno, random, re, socket, subprocess, sys, time, calendar
- from datetime import datetime, timezone, timedelta
- from io import DEFAULT_BUFFER_SIZE
- try:
- import ssl
- HAVE_SSL = True
- except ImportError:
- HAVE_SSL = False
- __all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple",
- "Int2AP", "ParseFlags", "Time2Internaldate"]
- # Globals
- CRLF = b'\r\n'
- Debug = 0
- IMAP4_PORT = 143
- IMAP4_SSL_PORT = 993
- AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first
- # Maximal line length when calling readline(). This is to prevent
- # reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1)
- # don't specify a line length. RFC 2683 suggests limiting client
- # command lines to 1000 octets and that servers should be prepared
- # to accept command lines up to 8000 octets, so we used to use 10K here.
- # In the modern world (eg: gmail) the response to, for example, a
- # search command can be quite large, so we now use 1M.
- _MAXLINE = 1000000
- # Commands
- Commands = {
- # name valid states
- 'APPEND': ('AUTH', 'SELECTED'),
- 'AUTHENTICATE': ('NONAUTH',),
- 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
- 'CHECK': ('SELECTED',),
- 'CLOSE': ('SELECTED',),
- 'COPY': ('SELECTED',),
- 'CREATE': ('AUTH', 'SELECTED'),
- 'DELETE': ('AUTH', 'SELECTED'),
- 'DELETEACL': ('AUTH', 'SELECTED'),
- 'ENABLE': ('AUTH', ),
- 'EXAMINE': ('AUTH', 'SELECTED'),
- 'EXPUNGE': ('SELECTED',),
- 'FETCH': ('SELECTED',),
- 'GETACL': ('AUTH', 'SELECTED'),
- 'GETANNOTATION':('AUTH', 'SELECTED'),
- 'GETQUOTA': ('AUTH', 'SELECTED'),
- 'GETQUOTAROOT': ('AUTH', 'SELECTED'),
- 'MYRIGHTS': ('AUTH', 'SELECTED'),
- 'LIST': ('AUTH', 'SELECTED'),
- 'LOGIN': ('NONAUTH',),
- 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
- 'LSUB': ('AUTH', 'SELECTED'),
- 'MOVE': ('SELECTED',),
- 'NAMESPACE': ('AUTH', 'SELECTED'),
- 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
- 'PARTIAL': ('SELECTED',), # NB: obsolete
- 'PROXYAUTH': ('AUTH',),
- 'RENAME': ('AUTH', 'SELECTED'),
- 'SEARCH': ('SELECTED',),
- 'SELECT': ('AUTH', 'SELECTED'),
- 'SETACL': ('AUTH', 'SELECTED'),
- 'SETANNOTATION':('AUTH', 'SELECTED'),
- 'SETQUOTA': ('AUTH', 'SELECTED'),
- 'SORT': ('SELECTED',),
- 'STARTTLS': ('NONAUTH',),
- 'STATUS': ('AUTH', 'SELECTED'),
- 'STORE': ('SELECTED',),
- 'SUBSCRIBE': ('AUTH', 'SELECTED'),
- 'THREAD': ('SELECTED',),
- 'UID': ('SELECTED',),
- 'UNSUBSCRIBE': ('AUTH', 'SELECTED'),
- 'UNSELECT': ('SELECTED',),
- }
- # Patterns to match server responses
- Continuation = re.compile(br'\+( (?P<data>.*))?')
- Flags = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)')
- InternalDate = re.compile(br'.*INTERNALDATE "'
- br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
- br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
- br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
- br'"')
- # Literal is no longer used; kept for backward compatibility.
- Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII)
- MapCRLF = re.compile(br'\r\n|\r|\n')
- # We no longer exclude the ']' character from the data portion of the response
- # code, even though it violates the RFC. Popular IMAP servers such as Gmail
- # allow flags with ']', and there are programs (including imaplib!) that can
- # produce them. The problem with this is if the 'text' portion of the response
- # includes a ']' we'll parse the response wrong (which is the point of the RFC
- # restriction). However, that seems less likely to be a problem in practice
- # than being unable to correctly parse flags that include ']' chars, which
- # was reported as a real-world problem in issue #21815.
- Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>.*))?\]')
- Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
- # Untagged_status is no longer used; kept for backward compatibility
- Untagged_status = re.compile(
- br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII)
- # We compile these in _mode_xxx.
- _Literal = br'.*{(?P<size>\d+)}$'
- _Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'
- class IMAP4:
- r"""IMAP4 client class.
- Instantiate with: IMAP4([host[, port[, timeout=None]]])
- host - host's name (default: localhost);
- port - port number (default: standard IMAP4 port).
- timeout - socket timeout (default: None)
- If timeout is not given or is None,
- the global default socket timeout is used
- All IMAP4rev1 commands are supported by methods of the same
- name (in lower-case).
- All arguments to commands are converted to strings, except for
- AUTHENTICATE, and the last argument to APPEND which is passed as
- an IMAP4 literal. If necessary (the string contains any
- non-printing characters or white-space and isn't enclosed with
- either parentheses or double quotes) each string is quoted.
- However, the 'password' argument to the LOGIN command is always
- quoted. If you want to avoid having an argument string quoted
- (eg: the 'flags' argument to STORE) then enclose the string in
- parentheses (eg: "(\Deleted)").
- Each command returns a tuple: (type, [data, ...]) where 'type'
- is usually 'OK' or 'NO', and 'data' is either the text from the
- tagged response, or untagged results from command. Each 'data'
- is either a string, or a tuple. If a tuple, then the first part
- is the header of the response, and the second part contains
- the data (ie: 'literal' value).
- Errors raise the exception class <instance>.error("<reason>").
- IMAP4 server errors raise <instance>.abort("<reason>"),
- which is a sub-class of 'error'. Mailbox status changes
- from READ-WRITE to READ-ONLY raise the exception class
- <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
- "error" exceptions imply a program error.
- "abort" exceptions imply the connection should be reset, and
- the command re-tried.
- "readonly" exceptions imply the command should be re-tried.
- Note: to use this module, you must read the RFCs pertaining to the
- IMAP4 protocol, as the semantics of the arguments to each IMAP4
- command are left to the invoker, not to mention the results. Also,
- most IMAP servers implement a sub-set of the commands available here.
- """
- class error(Exception): pass # Logical errors - debug required
- class abort(error): pass # Service errors - close and retry
- class readonly(abort): pass # Mailbox status changed to READ-ONLY
- def __init__(self, host='', port=IMAP4_PORT, timeout=None):
- self.debug = Debug
- self.state = 'LOGOUT'
- self.literal = None # A literal argument to a command
- self.tagged_commands = {} # Tagged commands awaiting response
- self.untagged_responses = {} # {typ: [data, ...], ...}
- self.continuation_response = '' # Last continuation response
- self.is_readonly = False # READ-ONLY desired state
- self.tagnum = 0
- self._tls_established = False
- self._mode_ascii()
- # Open socket to server.
- self.open(host, port, timeout)
- try:
- self._connect()
- except Exception:
- try:
- self.shutdown()
- except OSError:
- pass
- raise
- def _mode_ascii(self):
- self.utf8_enabled = False
- self._encoding = 'ascii'
- self.Literal = re.compile(_Literal, re.ASCII)
- self.Untagged_status = re.compile(_Untagged_status, re.ASCII)
- def _mode_utf8(self):
- self.utf8_enabled = True
- self._encoding = 'utf-8'
- self.Literal = re.compile(_Literal)
- self.Untagged_status = re.compile(_Untagged_status)
- def _connect(self):
- # Create unique tag for this session,
- # and compile tagged response matcher.
- self.tagpre = Int2AP(random.randint(4096, 65535))
- self.tagre = re.compile(br'(?P<tag>'
- + self.tagpre
- + br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII)
- # Get server welcome message,
- # request and store CAPABILITY response.
- if __debug__:
- self._cmd_log_len = 10
- self._cmd_log_idx = 0
- self._cmd_log = {} # Last `_cmd_log_len' interactions
- if self.debug >= 1:
- self._mesg('imaplib version %s' % __version__)
- self._mesg('new IMAP4 connection, tag=%s' % self.tagpre)
- self.welcome = self._get_response()
- if 'PREAUTH' in self.untagged_responses:
- self.state = 'AUTH'
- elif 'OK' in self.untagged_responses:
- self.state = 'NONAUTH'
- else:
- raise self.error(self.welcome)
- self._get_capabilities()
- if __debug__:
- if self.debug >= 3:
- self._mesg('CAPABILITIES: %r' % (self.capabilities,))
- for version in AllowedVersions:
- if not version in self.capabilities:
- continue
- self.PROTOCOL_VERSION = version
- return
- raise self.error('server not IMAP4 compliant')
- def __getattr__(self, attr):
- # Allow UPPERCASE variants of IMAP4 command methods.
- if attr in Commands:
- return getattr(self, attr.lower())
- raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
- def __enter__(self):
- return self
- def __exit__(self, *args):
- if self.state == "LOGOUT":
- return
- try:
- self.logout()
- except OSError:
- pass
- # Overridable methods
- def _create_socket(self, timeout):
- # Default value of IMAP4.host is '', but socket.getaddrinfo()
- # (which is used by socket.create_connection()) expects None
- # as a default value for host.
- if timeout is not None and not timeout:
- raise ValueError('Non-blocking socket (timeout=0) is not supported')
- host = None if not self.host else self.host
- sys.audit("imaplib.open", self, self.host, self.port)
- address = (host, self.port)
- if timeout is not None:
- return socket.create_connection(address, timeout)
- return socket.create_connection(address)
- def open(self, host='', port=IMAP4_PORT, timeout=None):
- """Setup connection to remote server on "host:port"
- (default: localhost:standard IMAP4 port).
- This connection will be used by the routines:
- read, readline, send, shutdown.
- """
- self.host = host
- self.port = port
- self.sock = self._create_socket(timeout)
- self.file = self.sock.makefile('rb')
- def read(self, size):
- """Read 'size' bytes from remote."""
- return self.file.read(size)
- def readline(self):
- """Read line from remote."""
- line = self.file.readline(_MAXLINE + 1)
- if len(line) > _MAXLINE:
- raise self.error("got more than %d bytes" % _MAXLINE)
- return line
- def send(self, data):
- """Send data to remote."""
- sys.audit("imaplib.send", self, data)
- self.sock.sendall(data)
- def shutdown(self):
- """Close I/O established in "open"."""
- self.file.close()
- try:
- self.sock.shutdown(socket.SHUT_RDWR)
- except OSError as exc:
- # The server might already have closed the connection.
- # On Windows, this may result in WSAEINVAL (error 10022):
- # An invalid operation was attempted.
- if (exc.errno != errno.ENOTCONN
- and getattr(exc, 'winerror', 0) != 10022):
- raise
- finally:
- self.sock.close()
- def socket(self):
- """Return socket instance used to connect to IMAP4 server.
- socket = <instance>.socket()
- """
- return self.sock
- # Utility methods
- def recent(self):
- """Return most recent 'RECENT' responses if any exist,
- else prompt server for an update using the 'NOOP' command.
- (typ, [data]) = <instance>.recent()
- 'data' is None if no new messages,
- else list of RECENT responses, most recent last.
- """
- name = 'RECENT'
- typ, dat = self._untagged_response('OK', [None], name)
- if dat[-1]:
- return typ, dat
- typ, dat = self.noop() # Prod server for response
- return self._untagged_response(typ, dat, name)
- def response(self, code):
- """Return data for response 'code' if received, or None.
- Old value for response 'code' is cleared.
- (code, [data]) = <instance>.response(code)
- """
- return self._untagged_response(code, [None], code.upper())
- # IMAP4 commands
- def append(self, mailbox, flags, date_time, message):
- """Append message to named mailbox.
- (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
- All args except `message' can be None.
- """
- name = 'APPEND'
- if not mailbox:
- mailbox = 'INBOX'
- if flags:
- if (flags[0],flags[-1]) != ('(',')'):
- flags = '(%s)' % flags
- else:
- flags = None
- if date_time:
- date_time = Time2Internaldate(date_time)
- else:
- date_time = None
- literal = MapCRLF.sub(CRLF, message)
- if self.utf8_enabled:
- literal = b'UTF8 (' + literal + b')'
- self.literal = literal
- return self._simple_command(name, mailbox, flags, date_time)
- def authenticate(self, mechanism, authobject):
- """Authenticate command - requires response processing.
- 'mechanism' specifies which authentication mechanism is to
- be used - it must appear in <instance>.capabilities in the
- form AUTH=<mechanism>.
- 'authobject' must be a callable object:
- data = authobject(response)
- It will be called to process server continuation responses; the
- response argument it is passed will be a bytes. It should return bytes
- data that will be base64 encoded and sent to the server. It should
- return None if the client abort response '*' should be sent instead.
- """
- mech = mechanism.upper()
- # XXX: shouldn't this code be removed, not commented out?
- #cap = 'AUTH=%s' % mech
- #if not cap in self.capabilities: # Let the server decide!
- # raise self.error("Server doesn't allow %s authentication." % mech)
- self.literal = _Authenticator(authobject).process
- typ, dat = self._simple_command('AUTHENTICATE', mech)
- if typ != 'OK':
- raise self.error(dat[-1].decode('utf-8', 'replace'))
- self.state = 'AUTH'
- return typ, dat
- def capability(self):
- """(typ, [data]) = <instance>.capability()
- Fetch capabilities list from server."""
- name = 'CAPABILITY'
- typ, dat = self._simple_command(name)
- return self._untagged_response(typ, dat, name)
- def check(self):
- """Checkpoint mailbox on server.
- (typ, [data]) = <instance>.check()
- """
- return self._simple_command('CHECK')
- def close(self):
- """Close currently selected mailbox.
- Deleted messages are removed from writable mailbox.
- This is the recommended command before 'LOGOUT'.
- (typ, [data]) = <instance>.close()
- """
- try:
- typ, dat = self._simple_command('CLOSE')
- finally:
- self.state = 'AUTH'
- return typ, dat
- def copy(self, message_set, new_mailbox):
- """Copy 'message_set' messages onto end of 'new_mailbox'.
- (typ, [data]) = <instance>.copy(message_set, new_mailbox)
- """
- return self._simple_command('COPY', message_set, new_mailbox)
- def create(self, mailbox):
- """Create new mailbox.
- (typ, [data]) = <instance>.create(mailbox)
- """
- return self._simple_command('CREATE', mailbox)
- def delete(self, mailbox):
- """Delete old mailbox.
- (typ, [data]) = <instance>.delete(mailbox)
- """
- return self._simple_command('DELETE', mailbox)
- def deleteacl(self, mailbox, who):
- """Delete the ACLs (remove any rights) set for who on mailbox.
- (typ, [data]) = <instance>.deleteacl(mailbox, who)
- """
- return self._simple_command('DELETEACL', mailbox, who)
- def enable(self, capability):
- """Send an RFC5161 enable string to the server.
- (typ, [data]) = <instance>.enable(capability)
- """
- if 'ENABLE' not in self.capabilities:
- raise IMAP4.error("Server does not support ENABLE")
- typ, data = self._simple_command('ENABLE', capability)
- if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper():
- self._mode_utf8()
- return typ, data
- def expunge(self):
- """Permanently remove deleted items from selected mailbox.
- Generates 'EXPUNGE' response for each deleted message.
- (typ, [data]) = <instance>.expunge()
- 'data' is list of 'EXPUNGE'd message numbers in order received.
- """
- name = 'EXPUNGE'
- typ, dat = self._simple_command(name)
- return self._untagged_response(typ, dat, name)
- def fetch(self, message_set, message_parts):
- """Fetch (parts of) messages.
- (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
- 'message_parts' should be a string of selected parts
- enclosed in parentheses, eg: "(UID BODY[TEXT])".
- 'data' are tuples of message part envelope and data.
- """
- name = 'FETCH'
- typ, dat = self._simple_command(name, message_set, message_parts)
- return self._untagged_response(typ, dat, name)
- def getacl(self, mailbox):
- """Get the ACLs for a mailbox.
- (typ, [data]) = <instance>.getacl(mailbox)
- """
- typ, dat = self._simple_command('GETACL', mailbox)
- return self._untagged_response(typ, dat, 'ACL')
- def getannotation(self, mailbox, entry, attribute):
- """(typ, [data]) = <instance>.getannotation(mailbox, entry, attribute)
- Retrieve ANNOTATIONs."""
- typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute)
- return self._untagged_response(typ, dat, 'ANNOTATION')
- def getquota(self, root):
- """Get the quota root's resource usage and limits.
- Part of the IMAP4 QUOTA extension defined in rfc2087.
- (typ, [data]) = <instance>.getquota(root)
- """
- typ, dat = self._simple_command('GETQUOTA', root)
- return self._untagged_response(typ, dat, 'QUOTA')
- def getquotaroot(self, mailbox):
- """Get the list of quota roots for the named mailbox.
- (typ, [[QUOTAROOT responses...], [QUOTA responses]]) = <instance>.getquotaroot(mailbox)
- """
- typ, dat = self._simple_command('GETQUOTAROOT', mailbox)
- typ, quota = self._untagged_response(typ, dat, 'QUOTA')
- typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT')
- return typ, [quotaroot, quota]
- def list(self, directory='""', pattern='*'):
- """List mailbox names in directory matching pattern.
- (typ, [data]) = <instance>.list(directory='""', pattern='*')
- 'data' is list of LIST responses.
- """
- name = 'LIST'
- typ, dat = self._simple_command(name, directory, pattern)
- return self._untagged_response(typ, dat, name)
- def login(self, user, password):
- """Identify client using plaintext password.
- (typ, [data]) = <instance>.login(user, password)
- NB: 'password' will be quoted.
- """
- typ, dat = self._simple_command('LOGIN', user, self._quote(password))
- if typ != 'OK':
- raise self.error(dat[-1])
- self.state = 'AUTH'
- return typ, dat
- def login_cram_md5(self, user, password):
- """ Force use of CRAM-MD5 authentication.
- (typ, [data]) = <instance>.login_cram_md5(user, password)
- """
- self.user, self.password = user, password
- return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH)
- def _CRAM_MD5_AUTH(self, challenge):
- """ Authobject to use with CRAM-MD5 authentication. """
- import hmac
- pwd = (self.password.encode('utf-8') if isinstance(self.password, str)
- else self.password)
- return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest()
- def logout(self):
- """Shutdown connection to server.
- (typ, [data]) = <instance>.logout()
- Returns server 'BYE' response.
- """
- self.state = 'LOGOUT'
- typ, dat = self._simple_command('LOGOUT')
- self.shutdown()
- return typ, dat
- def lsub(self, directory='""', pattern='*'):
- """List 'subscribed' mailbox names in directory matching pattern.
- (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
- 'data' are tuples of message part envelope and data.
- """
- name = 'LSUB'
- typ, dat = self._simple_command(name, directory, pattern)
- return self._untagged_response(typ, dat, name)
- def myrights(self, mailbox):
- """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox).
- (typ, [data]) = <instance>.myrights(mailbox)
- """
- typ,dat = self._simple_command('MYRIGHTS', mailbox)
- return self._untagged_response(typ, dat, 'MYRIGHTS')
- def namespace(self):
- """ Returns IMAP namespaces ala rfc2342
- (typ, [data, ...]) = <instance>.namespace()
- """
- name = 'NAMESPACE'
- typ, dat = self._simple_command(name)
- return self._untagged_response(typ, dat, name)
- def noop(self):
- """Send NOOP command.
- (typ, [data]) = <instance>.noop()
- """
- if __debug__:
- if self.debug >= 3:
- self._dump_ur(self.untagged_responses)
- return self._simple_command('NOOP')
- def partial(self, message_num, message_part, start, length):
- """Fetch truncated part of a message.
- (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
- 'data' is tuple of message part envelope and data.
- """
- name = 'PARTIAL'
- typ, dat = self._simple_command(name, message_num, message_part, start, length)
- return self._untagged_response(typ, dat, 'FETCH')
- def proxyauth(self, user):
- """Assume authentication as "user".
- Allows an authorised administrator to proxy into any user's
- mailbox.
- (typ, [data]) = <instance>.proxyauth(user)
- """
- name = 'PROXYAUTH'
- return self._simple_command('PROXYAUTH', user)
- def rename(self, oldmailbox, newmailbox):
- """Rename old mailbox name to new.
- (typ, [data]) = <instance>.rename(oldmailbox, newmailbox)
- """
- return self._simple_command('RENAME', oldmailbox, newmailbox)
- def search(self, charset, *criteria):
- """Search mailbox for matching messages.
- (typ, [data]) = <instance>.search(charset, criterion, ...)
- 'data' is space separated list of matching message numbers.
- If UTF8 is enabled, charset MUST be None.
- """
- name = 'SEARCH'
- if charset:
- if self.utf8_enabled:
- raise IMAP4.error("Non-None charset not valid in UTF8 mode")
- typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria)
- else:
- typ, dat = self._simple_command(name, *criteria)
- return self._untagged_response(typ, dat, name)
- def select(self, mailbox='INBOX', readonly=False):
- """Select a mailbox.
- Flush all untagged responses.
- (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=False)
- 'data' is count of messages in mailbox ('EXISTS' response).
- Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so
- other responses should be obtained via <instance>.response('FLAGS') etc.
- """
- self.untagged_responses = {} # Flush old responses.
- self.is_readonly = readonly
- if readonly:
- name = 'EXAMINE'
- else:
- name = 'SELECT'
- typ, dat = self._simple_command(name, mailbox)
- if typ != 'OK':
- self.state = 'AUTH' # Might have been 'SELECTED'
- return typ, dat
- self.state = 'SELECTED'
- if 'READ-ONLY' in self.untagged_responses \
- and not readonly:
- if __debug__:
- if self.debug >= 1:
- self._dump_ur(self.untagged_responses)
- raise self.readonly('%s is not writable' % mailbox)
- return typ, self.untagged_responses.get('EXISTS', [None])
- def setacl(self, mailbox, who, what):
- """Set a mailbox acl.
- (typ, [data]) = <instance>.setacl(mailbox, who, what)
- """
- return self._simple_command('SETACL', mailbox, who, what)
- def setannotation(self, *args):
- """(typ, [data]) = <instance>.setannotation(mailbox[, entry, attribute]+)
- Set ANNOTATIONs."""
- typ, dat = self._simple_command('SETANNOTATION', *args)
- return self._untagged_response(typ, dat, 'ANNOTATION')
- def setquota(self, root, limits):
- """Set the quota root's resource limits.
- (typ, [data]) = <instance>.setquota(root, limits)
- """
- typ, dat = self._simple_command('SETQUOTA', root, limits)
- return self._untagged_response(typ, dat, 'QUOTA')
- def sort(self, sort_criteria, charset, *search_criteria):
- """IMAP4rev1 extension SORT command.
- (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...)
- """
- name = 'SORT'
- #if not name in self.capabilities: # Let the server decide!
- # raise self.error('unimplemented extension command: %s' % name)
- if (sort_criteria[0],sort_criteria[-1]) != ('(',')'):
- sort_criteria = '(%s)' % sort_criteria
- typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria)
- return self._untagged_response(typ, dat, name)
- def starttls(self, ssl_context=None):
- name = 'STARTTLS'
- if not HAVE_SSL:
- raise self.error('SSL support missing')
- if self._tls_established:
- raise self.abort('TLS session already established')
- if name not in self.capabilities:
- raise self.abort('TLS not supported by server')
- # Generate a default SSL context if none was passed.
- if ssl_context is None:
- ssl_context = ssl._create_stdlib_context()
- typ, dat = self._simple_command(name)
- if typ == 'OK':
- self.sock = ssl_context.wrap_socket(self.sock,
- server_hostname=self.host)
- self.file = self.sock.makefile('rb')
- self._tls_established = True
- self._get_capabilities()
- else:
- raise self.error("Couldn't establish TLS session")
- return self._untagged_response(typ, dat, name)
- def status(self, mailbox, names):
- """Request named status conditions for mailbox.
- (typ, [data]) = <instance>.status(mailbox, names)
- """
- name = 'STATUS'
- #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide!
- # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
- typ, dat = self._simple_command(name, mailbox, names)
- return self._untagged_response(typ, dat, name)
- def store(self, message_set, command, flags):
- """Alters flag dispositions for messages in mailbox.
- (typ, [data]) = <instance>.store(message_set, command, flags)
- """
- if (flags[0],flags[-1]) != ('(',')'):
- flags = '(%s)' % flags # Avoid quoting the flags
- typ, dat = self._simple_command('STORE', message_set, command, flags)
- return self._untagged_response(typ, dat, 'FETCH')
- def subscribe(self, mailbox):
- """Subscribe to new mailbox.
- (typ, [data]) = <instance>.subscribe(mailbox)
- """
- return self._simple_command('SUBSCRIBE', mailbox)
- def thread(self, threading_algorithm, charset, *search_criteria):
- """IMAPrev1 extension THREAD command.
- (type, [data]) = <instance>.thread(threading_algorithm, charset, search_criteria, ...)
- """
- name = 'THREAD'
- typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria)
- return self._untagged_response(typ, dat, name)
- def uid(self, command, *args):
- """Execute "command arg ..." with messages identified by UID,
- rather than message number.
- (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
- Returns response appropriate to 'command'.
- """
- command = command.upper()
- if not command in Commands:
- raise self.error("Unknown IMAP4 UID command: %s" % command)
- if self.state not in Commands[command]:
- raise self.error("command %s illegal in state %s, "
- "only allowed in states %s" %
- (command, self.state,
- ', '.join(Commands[command])))
- name = 'UID'
- typ, dat = self._simple_command(name, command, *args)
- if command in ('SEARCH', 'SORT', 'THREAD'):
- name = command
- else:
- name = 'FETCH'
- return self._untagged_response(typ, dat, name)
- def unsubscribe(self, mailbox):
- """Unsubscribe from old mailbox.
- (typ, [data]) = <instance>.unsubscribe(mailbox)
- """
- return self._simple_command('UNSUBSCRIBE', mailbox)
- def unselect(self):
- """Free server's resources associated with the selected mailbox
- and returns the server to the authenticated state.
- This command performs the same actions as CLOSE, except
- that no messages are permanently removed from the currently
- selected mailbox.
- (typ, [data]) = <instance>.unselect()
- """
- try:
- typ, data = self._simple_command('UNSELECT')
- finally:
- self.state = 'AUTH'
- return typ, data
- def xatom(self, name, *args):
- """Allow simple extension commands
- notified by server in CAPABILITY response.
- Assumes command is legal in current state.
- (typ, [data]) = <instance>.xatom(name, arg, ...)
- Returns response appropriate to extension command `name'.
- """
- name = name.upper()
- #if not name in self.capabilities: # Let the server decide!
- # raise self.error('unknown extension command: %s' % name)
- if not name in Commands:
- Commands[name] = (self.state,)
- return self._simple_command(name, *args)
- # Private methods
- def _append_untagged(self, typ, dat):
- if dat is None:
- dat = b''
- ur = self.untagged_responses
- if __debug__:
- if self.debug >= 5:
- self._mesg('untagged_responses[%s] %s += ["%r"]' %
- (typ, len(ur.get(typ,'')), dat))
- if typ in ur:
- ur[typ].append(dat)
- else:
- ur[typ] = [dat]
- def _check_bye(self):
- bye = self.untagged_responses.get('BYE')
- if bye:
- raise self.abort(bye[-1].decode(self._encoding, 'replace'))
- def _command(self, name, *args):
- if self.state not in Commands[name]:
- self.literal = None
- raise self.error("command %s illegal in state %s, "
- "only allowed in states %s" %
- (name, self.state,
- ', '.join(Commands[name])))
- for typ in ('OK', 'NO', 'BAD'):
- if typ in self.untagged_responses:
- del self.untagged_responses[typ]
- if 'READ-ONLY' in self.untagged_responses \
- and not self.is_readonly:
- raise self.readonly('mailbox status changed to READ-ONLY')
- tag = self._new_tag()
- name = bytes(name, self._encoding)
- data = tag + b' ' + name
- for arg in args:
- if arg is None: continue
- if isinstance(arg, str):
- arg = bytes(arg, self._encoding)
- data = data + b' ' + arg
- literal = self.literal
- if literal is not None:
- self.literal = None
- if type(literal) is type(self._command):
- literator = literal
- else:
- literator = None
- data = data + bytes(' {%s}' % len(literal), self._encoding)
- if __debug__:
- if self.debug >= 4:
- self._mesg('> %r' % data)
- else:
- self._log('> %r' % data)
- try:
- self.send(data + CRLF)
- except OSError as val:
- raise self.abort('socket error: %s' % val)
- if literal is None:
- return tag
- while 1:
- # Wait for continuation response
- while self._get_response():
- if self.tagged_commands[tag]: # BAD/NO?
- return tag
- # Send literal
- if literator:
- literal = literator(self.continuation_response)
- if __debug__:
- if self.debug >= 4:
- self._mesg('write literal size %s' % len(literal))
- try:
- self.send(literal)
- self.send(CRLF)
- except OSError as val:
- raise self.abort('socket error: %s' % val)
- if not literator:
- break
- return tag
- def _command_complete(self, name, tag):
- logout = (name == 'LOGOUT')
- # BYE is expected after LOGOUT
- if not logout:
- self._check_bye()
- try:
- typ, data = self._get_tagged_response(tag, expect_bye=logout)
- except self.abort as val:
- raise self.abort('command: %s => %s' % (name, val))
- except self.error as val:
- raise self.error('command: %s => %s' % (name, val))
- if not logout:
- self._check_bye()
- if typ == 'BAD':
- raise self.error('%s command error: %s %s' % (name, typ, data))
- return typ, data
- def _get_capabilities(self):
- typ, dat = self.capability()
- if dat == [None]:
- raise self.error('no CAPABILITY response from server')
- dat = str(dat[-1], self._encoding)
- dat = dat.upper()
- self.capabilities = tuple(dat.split())
- def _get_response(self):
- # Read response and store.
- #
- # Returns None for continuation responses,
- # otherwise first response line received.
- resp = self._get_line()
- # Command completion response?
- if self._match(self.tagre, resp):
- tag = self.mo.group('tag')
- if not tag in self.tagged_commands:
- raise self.abort('unexpected tagged response: %r' % resp)
- typ = self.mo.group('type')
- typ = str(typ, self._encoding)
- dat = self.mo.group('data')
- self.tagged_commands[tag] = (typ, [dat])
- else:
- dat2 = None
- # '*' (untagged) responses?
- if not self._match(Untagged_response, resp):
- if self._match(self.Untagged_status, resp):
- dat2 = self.mo.group('data2')
- if self.mo is None:
- # Only other possibility is '+' (continuation) response...
- if self._match(Continuation, resp):
- self.continuation_response = self.mo.group('data')
- return None # NB: indicates continuation
- raise self.abort("unexpected response: %r" % resp)
- typ = self.mo.group('type')
- typ = str(typ, self._encoding)
- dat = self.mo.group('data')
- if dat is None: dat = b'' # Null untagged response
- if dat2: dat = dat + b' ' + dat2
- # Is there a literal to come?
- while self._match(self.Literal, dat):
- # Read literal direct from connection.
- size = int(self.mo.group('size'))
- if __debug__:
- if self.debug >= 4:
- self._mesg('read literal size %s' % size)
- data = self.read(size)
- # Store response with literal as tuple
- self._append_untagged(typ, (dat, data))
- # Read trailer - possibly containing another literal
- dat = self._get_line()
- self._append_untagged(typ, dat)
- # Bracketed response information?
- if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
- typ = self.mo.group('type')
- typ = str(typ, self._encoding)
- self._append_untagged(typ, self.mo.group('data'))
- if __debug__:
- if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):
- self._mesg('%s response: %r' % (typ, dat))
- return resp
- def _get_tagged_response(self, tag, expect_bye=False):
- while 1:
- result = self.tagged_commands[tag]
- if result is not None:
- del self.tagged_commands[tag]
- return result
- if expect_bye:
- typ = 'BYE'
- bye = self.untagged_responses.pop(typ, None)
- if bye is not None:
- # Server replies to the "LOGOUT" command with "BYE"
- return (typ, bye)
- # If we've seen a BYE at this point, the socket will be
- # closed, so report the BYE now.
- self._check_bye()
- # Some have reported "unexpected response" exceptions.
- # Note that ignoring them here causes loops.
- # Instead, send me details of the unexpected response and
- # I'll update the code in `_get_response()'.
- try:
- self._get_response()
- except self.abort as val:
- if __debug__:
- if self.debug >= 1:
- self.print_log()
- raise
- def _get_line(self):
- line = self.readline()
- if not line:
- raise self.abort('socket error: EOF')
- # Protocol mandates all lines terminated by CRLF
- if not line.endswith(b'\r\n'):
- raise self.abort('socket error: unterminated line: %r' % line)
- line = line[:-2]
- if __debug__:
- if self.debug >= 4:
- self._mesg('< %r' % line)
- else:
- self._log('< %r' % line)
- return line
- def _match(self, cre, s):
- # Run compiled regular expression match method on 's'.
- # Save result, return success.
- self.mo = cre.match(s)
- if __debug__:
- if self.mo is not None and self.debug >= 5:
- self._mesg("\tmatched %r => %r" % (cre.pattern, self.mo.groups()))
- return self.mo is not None
- def _new_tag(self):
- tag = self.tagpre + bytes(str(self.tagnum), self._encoding)
- self.tagnum = self.tagnum + 1
- self.tagged_commands[tag] = None
- return tag
- def _quote(self, arg):
- arg = arg.replace('\\', '\\\\')
- arg = arg.replace('"', '\\"')
- return '"' + arg + '"'
- def _simple_command(self, name, *args):
- return self._command_complete(name, self._command(name, *args))
- def _untagged_response(self, typ, dat, name):
- if typ == 'NO':
- return typ, dat
- if not name in self.untagged_responses:
- return typ, [None]
- data = self.untagged_responses.pop(name)
- if __debug__:
- if self.debug >= 5:
- self._mesg('untagged_responses[%s] => %s' % (name, data))
- return typ, data
- if __debug__:
- def _mesg(self, s, secs=None):
- if secs is None:
- secs = time.time()
- tm = time.strftime('%M:%S', time.localtime(secs))
- sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s))
- sys.stderr.flush()
- def _dump_ur(self, dict):
- # Dump untagged responses (in `dict').
- l = dict.items()
- if not l: return
- t = '\n\t\t'
- l = map(lambda x:'%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l)
- self._mesg('untagged responses dump:%s%s' % (t, t.join(l)))
- def _log(self, line):
- # Keep log of last `_cmd_log_len' interactions for debugging.
- self._cmd_log[self._cmd_log_idx] = (line, time.time())
- self._cmd_log_idx += 1
- if self._cmd_log_idx >= self._cmd_log_len:
- self._cmd_log_idx = 0
- def print_log(self):
- self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log))
- i, n = self._cmd_log_idx, self._cmd_log_len
- while n:
- try:
- self._mesg(*self._cmd_log[i])
- except:
- pass
- i += 1
- if i >= self._cmd_log_len:
- i = 0
- n -= 1
- if HAVE_SSL:
- class IMAP4_SSL(IMAP4):
- """IMAP4 client class over SSL connection
- Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile[, ssl_context[, timeout=None]]]]]])
- host - host's name (default: localhost);
- port - port number (default: standard IMAP4 SSL port);
- keyfile - PEM formatted file that contains your private key (default: None);
- certfile - PEM formatted certificate chain file (default: None);
- ssl_context - a SSLContext object that contains your certificate chain
- and private key (default: None)
- Note: if ssl_context is provided, then parameters keyfile or
- certfile should not be set otherwise ValueError is raised.
- timeout - socket timeout (default: None) If timeout is not given or is None,
- the global default socket timeout is used
- for more documentation see the docstring of the parent class IMAP4.
- """
- def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None,
- certfile=None, ssl_context=None, timeout=None):
- if ssl_context is not None and keyfile is not None:
- raise ValueError("ssl_context and keyfile arguments are mutually "
- "exclusive")
- if ssl_context is not None and certfile is not None:
- raise ValueError("ssl_context and certfile arguments are mutually "
- "exclusive")
- if keyfile is not None or certfile is not None:
- import warnings
- warnings.warn("keyfile and certfile are deprecated, use a "
- "custom ssl_context instead", DeprecationWarning, 2)
- self.keyfile = keyfile
- self.certfile = certfile
- if ssl_context is None:
- ssl_context = ssl._create_stdlib_context(certfile=certfile,
- keyfile=keyfile)
- self.ssl_context = ssl_context
- IMAP4.__init__(self, host, port, timeout)
- def _create_socket(self, timeout):
- sock = IMAP4._create_socket(self, timeout)
- return self.ssl_context.wrap_socket(sock,
- server_hostname=self.host)
- def open(self, host='', port=IMAP4_SSL_PORT, timeout=None):
- """Setup connection to remote server on "host:port".
- (default: localhost:standard IMAP4 SSL port).
- This connection will be used by the routines:
- read, readline, send, shutdown.
- """
- IMAP4.open(self, host, port, timeout)
- __all__.append("IMAP4_SSL")
- class IMAP4_stream(IMAP4):
- """IMAP4 client class over a stream
- Instantiate with: IMAP4_stream(command)
- "command" - a string that can be passed to subprocess.Popen()
- for more documentation see the docstring of the parent class IMAP4.
- """
- def __init__(self, command):
- self.command = command
- IMAP4.__init__(self)
- def open(self, host=None, port=None, timeout=None):
- """Setup a stream connection.
- This connection will be used by the routines:
- read, readline, send, shutdown.
- """
- self.host = None # For compatibility with parent class
- self.port = None
- self.sock = None
- self.file = None
- self.process = subprocess.Popen(self.command,
- bufsize=DEFAULT_BUFFER_SIZE,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- shell=True, close_fds=True)
- self.writefile = self.process.stdin
- self.readfile = self.process.stdout
- def read(self, size):
- """Read 'size' bytes from remote."""
- return self.readfile.read(size)
- def readline(self):
- """Read line from remote."""
- return self.readfile.readline()
- def send(self, data):
- """Send data to remote."""
- self.writefile.write(data)
- self.writefile.flush()
- def shutdown(self):
- """Close I/O established in "open"."""
- self.readfile.close()
- self.writefile.close()
- self.process.wait()
- class _Authenticator:
- """Private class to provide en/decoding
- for base64-based authentication conversation.
- """
- def __init__(self, mechinst):
- self.mech = mechinst # Callable object to provide/process data
- def process(self, data):
- ret = self.mech(self.decode(data))
- if ret is None:
- return b'*' # Abort conversation
- return self.encode(ret)
- def encode(self, inp):
- #
- # Invoke binascii.b2a_base64 iteratively with
- # short even length buffers, strip the trailing
- # line feed from the result and append. "Even"
- # means a number that factors to both 6 and 8,
- # so when it gets to the end of the 8-bit input
- # there's no partial 6-bit output.
- #
- oup = b''
- if isinstance(inp, str):
- inp = inp.encode('utf-8')
- while inp:
- if len(inp) > 48:
- t = inp[:48]
- inp = inp[48:]
- else:
- t = inp
- inp = b''
- e = binascii.b2a_base64(t)
- if e:
- oup = oup + e[:-1]
- return oup
- def decode(self, inp):
- if not inp:
- return b''
- return binascii.a2b_base64(inp)
- Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
- Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])}
- def Internaldate2tuple(resp):
- """Parse an IMAP4 INTERNALDATE string.
- Return corresponding local time. The return value is a
- time.struct_time tuple or None if the string has wrong format.
- """
- mo = InternalDate.match(resp)
- if not mo:
- return None
- mon = Mon2num[mo.group('mon')]
- zonen = mo.group('zonen')
- day = int(mo.group('day'))
- year = int(mo.group('year'))
- hour = int(mo.group('hour'))
- min = int(mo.group('min'))
- sec = int(mo.group('sec'))
- zoneh = int(mo.group('zoneh'))
- zonem = int(mo.group('zonem'))
- # INTERNALDATE timezone must be subtracted to get UT
- zone = (zoneh*60 + zonem)*60
- if zonen == b'-':
- zone = -zone
- tt = (year, mon, day, hour, min, sec, -1, -1, -1)
- utc = calendar.timegm(tt) - zone
- return time.localtime(utc)
- def Int2AP(num):
- """Convert integer to A-P string representation."""
- val = b''; AP = b'ABCDEFGHIJKLMNOP'
- num = int(abs(num))
- while num:
- num, mod = divmod(num, 16)
- val = AP[mod:mod+1] + val
- return val
- def ParseFlags(resp):
- """Convert IMAP4 flags response to python tuple."""
- mo = Flags.match(resp)
- if not mo:
- return ()
- return tuple(mo.group('flags').split())
- def Time2Internaldate(date_time):
- """Convert date_time to IMAP4 INTERNALDATE representation.
- Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The
- date_time argument can be a number (int or float) representing
- seconds since epoch (as returned by time.time()), a 9-tuple
- representing local time, an instance of time.struct_time (as
- returned by time.localtime()), an aware datetime instance or a
- double-quoted string. In the last case, it is assumed to already
- be in the correct format.
- """
- if isinstance(date_time, (int, float)):
- dt = datetime.fromtimestamp(date_time,
- timezone.utc).astimezone()
- elif isinstance(date_time, tuple):
- try:
- gmtoff = date_time.tm_gmtoff
- except AttributeError:
- if time.daylight:
- dst = date_time[8]
- if dst == -1:
- dst = time.localtime(time.mktime(date_time))[8]
- gmtoff = -(time.timezone, time.altzone)[dst]
- else:
- gmtoff = -time.timezone
- delta = timedelta(seconds=gmtoff)
- dt = datetime(*date_time[:6], tzinfo=timezone(delta))
- elif isinstance(date_time, datetime):
- if date_time.tzinfo is None:
- raise ValueError("date_time must be aware")
- dt = date_time
- elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'):
- return date_time # Assume in correct format
- else:
- raise ValueError("date_time not of a known type")
- fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month])
- return dt.strftime(fmt)
- if __name__ == '__main__':
- # To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]'
- # or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"'
- # to test the IMAP4_stream class
- import getopt, getpass
- try:
- optlist, args = getopt.getopt(sys.argv[1:], 'd:s:')
- except getopt.error as val:
- optlist, args = (), ()
- stream_command = None
- for opt,val in optlist:
- if opt == '-d':
- Debug = int(val)
- elif opt == '-s':
- stream_command = val
- if not args: args = (stream_command,)
- if not args: args = ('',)
- host = args[0]
- USER = getpass.getuser()
- PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost"))
- test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'}
- test_seq1 = (
- ('login', (USER, PASSWD)),
- ('create', ('/tmp/xxx 1',)),
- ('rename', ('/tmp/xxx 1', '/tmp/yyy')),
- ('CREATE', ('/tmp/yyz 2',)),
- ('append', ('/tmp/yyz 2', None, None, test_mesg)),
- ('list', ('/tmp', 'yy*')),
- ('select', ('/tmp/yyz 2',)),
- ('search', (None, 'SUBJECT', 'test')),
- ('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')),
- ('store', ('1', 'FLAGS', r'(\Deleted)')),
- ('namespace', ()),
- ('expunge', ()),
- ('recent', ()),
- ('close', ()),
- )
- test_seq2 = (
- ('select', ()),
- ('response',('UIDVALIDITY',)),
- ('uid', ('SEARCH', 'ALL')),
- ('response', ('EXISTS',)),
- ('append', (None, None, None, test_mesg)),
- ('recent', ()),
- ('logout', ()),
- )
- def run(cmd, args):
- M._mesg('%s %s' % (cmd, args))
- typ, dat = getattr(M, cmd)(*args)
- M._mesg('%s => %s %s' % (cmd, typ, dat))
- if typ == 'NO': raise dat[0]
- return dat
- try:
- if stream_command:
- M = IMAP4_stream(stream_command)
- else:
- M = IMAP4(host)
- if M.state == 'AUTH':
- test_seq1 = test_seq1[1:] # Login not needed
- M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
- M._mesg('CAPABILITIES = %r' % (M.capabilities,))
- for cmd,args in test_seq1:
- run(cmd, args)
- for ml in run('list', ('/tmp/', 'yy%')):
- mo = re.match(r'.*"([^"]+)"$', ml)
- if mo: path = mo.group(1)
- else: path = ml.split()[-1]
- run('delete', (path,))
- for cmd,args in test_seq2:
- dat = run(cmd, args)
- if (cmd,args) != ('uid', ('SEARCH', 'ALL')):
- continue
- uid = dat[-1].split()
- if not uid: continue
- run('uid', ('FETCH', '%s' % uid[-1],
- '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
- print('\nAll tests OK.')
- except:
- print('\nTests failed.')
- if not Debug:
- print('''
- If you would like to see debugging output,
- try: %s -d5
- ''' % sys.argv[0])
- raise
|