1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001 |
- """Header value parser implementing various email-related RFC parsing rules.
- The parsing methods defined in this module implement various email related
- parsing rules. Principal among them is RFC 5322, which is the followon
- to RFC 2822 and primarily a clarification of the former. It also implements
- RFC 2047 encoded word decoding.
- RFC 5322 goes to considerable trouble to maintain backward compatibility with
- RFC 822 in the parse phase, while cleaning up the structure on the generation
- phase. This parser supports correct RFC 5322 generation by tagging white space
- as folding white space only when folding is allowed in the non-obsolete rule
- sets. Actually, the parser is even more generous when accepting input than RFC
- 5322 mandates, following the spirit of Postel's Law, which RFC 5322 encourages.
- Where possible deviations from the standard are annotated on the 'defects'
- attribute of tokens that deviate.
- The general structure of the parser follows RFC 5322, and uses its terminology
- where there is a direct correspondence. Where the implementation requires a
- somewhat different structure than that used by the formal grammar, new terms
- that mimic the closest existing terms are used. Thus, it really helps to have
- a copy of RFC 5322 handy when studying this code.
- Input to the parser is a string that has already been unfolded according to
- RFC 5322 rules. According to the RFC this unfolding is the very first step, and
- this parser leaves the unfolding step to a higher level message parser, which
- will have already detected the line breaks that need unfolding while
- determining the beginning and end of each header.
- The output of the parser is a TokenList object, which is a list subclass. A
- TokenList is a recursive data structure. The terminal nodes of the structure
- are Terminal objects, which are subclasses of str. These do not correspond
- directly to terminal objects in the formal grammar, but are instead more
- practical higher level combinations of true terminals.
- All TokenList and Terminal objects have a 'value' attribute, which produces the
- semantically meaningful value of that part of the parse subtree. The value of
- all whitespace tokens (no matter how many sub-tokens they may contain) is a
- single space, as per the RFC rules. This includes 'CFWS', which is herein
- included in the general class of whitespace tokens. There is one exception to
- the rule that whitespace tokens are collapsed into single spaces in values: in
- the value of a 'bare-quoted-string' (a quoted-string with no leading or
- trailing whitespace), any whitespace that appeared between the quotation marks
- is preserved in the returned value. Note that in all Terminal strings quoted
- pairs are turned into their unquoted values.
- All TokenList and Terminal objects also have a string value, which attempts to
- be a "canonical" representation of the RFC-compliant form of the substring that
- produced the parsed subtree, including minimal use of quoted pair quoting.
- Whitespace runs are not collapsed.
- Comment tokens also have a 'content' attribute providing the string found
- between the parens (including any nested comments) with whitespace preserved.
- All TokenList and Terminal objects have a 'defects' attribute which is a
- possibly empty list all of the defects found while creating the token. Defects
- may appear on any token in the tree, and a composite list of all defects in the
- subtree is available through the 'all_defects' attribute of any node. (For
- Terminal notes x.defects == x.all_defects.)
- Each object in a parse tree is called a 'token', and each has a 'token_type'
- attribute that gives the name from the RFC 5322 grammar that it represents.
- Not all RFC 5322 nodes are produced, and there is one non-RFC 5322 node that
- may be produced: 'ptext'. A 'ptext' is a string of printable ascii characters.
- It is returned in place of lists of (ctext/quoted-pair) and
- (qtext/quoted-pair).
- XXX: provide complete list of token types.
- """
- import re
- import sys
- import urllib # For urllib.parse.unquote
- from string import hexdigits
- from operator import itemgetter
- from email import _encoded_words as _ew
- from email import errors
- from email import utils
- #
- # Useful constants and functions
- #
- WSP = set(' \t')
- CFWS_LEADER = WSP | set('(')
- SPECIALS = set(r'()<>@,:;.\"[]')
- ATOM_ENDS = SPECIALS | WSP
- DOT_ATOM_ENDS = ATOM_ENDS - set('.')
- # '.', '"', and '(' do not end phrases in order to support obs-phrase
- PHRASE_ENDS = SPECIALS - set('."(')
- TSPECIALS = (SPECIALS | set('/?=')) - set('.')
- TOKEN_ENDS = TSPECIALS | WSP
- ASPECIALS = TSPECIALS | set("*'%")
- ATTRIBUTE_ENDS = ASPECIALS | WSP
- EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%')
- def quote_string(value):
- return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"'
- # Match a RFC 2047 word, looks like =?utf-8?q?someword?=
- rfc2047_matcher = re.compile(r'''
- =\? # literal =?
- [^?]* # charset
- \? # literal ?
- [qQbB] # literal 'q' or 'b', case insensitive
- \? # literal ?
- .*? # encoded word
- \?= # literal ?=
- ''', re.VERBOSE | re.MULTILINE)
- #
- # TokenList and its subclasses
- #
- class TokenList(list):
- token_type = None
- syntactic_break = True
- ew_combine_allowed = True
- def __init__(self, *args, **kw):
- super().__init__(*args, **kw)
- self.defects = []
- def __str__(self):
- return ''.join(str(x) for x in self)
- def __repr__(self):
- return '{}({})'.format(self.__class__.__name__,
- super().__repr__())
- @property
- def value(self):
- return ''.join(x.value for x in self if x.value)
- @property
- def all_defects(self):
- return sum((x.all_defects for x in self), self.defects)
- def startswith_fws(self):
- return self[0].startswith_fws()
- @property
- def as_ew_allowed(self):
- """True if all top level tokens of this part may be RFC2047 encoded."""
- return all(part.as_ew_allowed for part in self)
- @property
- def comments(self):
- comments = []
- for token in self:
- comments.extend(token.comments)
- return comments
- def fold(self, *, policy):
- return _refold_parse_tree(self, policy=policy)
- def pprint(self, indent=''):
- print(self.ppstr(indent=indent))
- def ppstr(self, indent=''):
- return '\n'.join(self._pp(indent=indent))
- def _pp(self, indent=''):
- yield '{}{}/{}('.format(
- indent,
- self.__class__.__name__,
- self.token_type)
- for token in self:
- if not hasattr(token, '_pp'):
- yield (indent + ' !! invalid element in token '
- 'list: {!r}'.format(token))
- else:
- yield from token._pp(indent+' ')
- if self.defects:
- extra = ' Defects: {}'.format(self.defects)
- else:
- extra = ''
- yield '{}){}'.format(indent, extra)
- class WhiteSpaceTokenList(TokenList):
- @property
- def value(self):
- return ' '
- @property
- def comments(self):
- return [x.content for x in self if x.token_type=='comment']
- class UnstructuredTokenList(TokenList):
- token_type = 'unstructured'
- class Phrase(TokenList):
- token_type = 'phrase'
- class Word(TokenList):
- token_type = 'word'
- class CFWSList(WhiteSpaceTokenList):
- token_type = 'cfws'
- class Atom(TokenList):
- token_type = 'atom'
- class Token(TokenList):
- token_type = 'token'
- encode_as_ew = False
- class EncodedWord(TokenList):
- token_type = 'encoded-word'
- cte = None
- charset = None
- lang = None
- class QuotedString(TokenList):
- token_type = 'quoted-string'
- @property
- def content(self):
- for x in self:
- if x.token_type == 'bare-quoted-string':
- return x.value
- @property
- def quoted_value(self):
- res = []
- for x in self:
- if x.token_type == 'bare-quoted-string':
- res.append(str(x))
- else:
- res.append(x.value)
- return ''.join(res)
- @property
- def stripped_value(self):
- for token in self:
- if token.token_type == 'bare-quoted-string':
- return token.value
- class BareQuotedString(QuotedString):
- token_type = 'bare-quoted-string'
- def __str__(self):
- return quote_string(''.join(str(x) for x in self))
- @property
- def value(self):
- return ''.join(str(x) for x in self)
- class Comment(WhiteSpaceTokenList):
- token_type = 'comment'
- def __str__(self):
- return ''.join(sum([
- ["("],
- [self.quote(x) for x in self],
- [")"],
- ], []))
- def quote(self, value):
- if value.token_type == 'comment':
- return str(value)
- return str(value).replace('\\', '\\\\').replace(
- '(', r'\(').replace(
- ')', r'\)')
- @property
- def content(self):
- return ''.join(str(x) for x in self)
- @property
- def comments(self):
- return [self.content]
- class AddressList(TokenList):
- token_type = 'address-list'
- @property
- def addresses(self):
- return [x for x in self if x.token_type=='address']
- @property
- def mailboxes(self):
- return sum((x.mailboxes
- for x in self if x.token_type=='address'), [])
- @property
- def all_mailboxes(self):
- return sum((x.all_mailboxes
- for x in self if x.token_type=='address'), [])
- class Address(TokenList):
- token_type = 'address'
- @property
- def display_name(self):
- if self[0].token_type == 'group':
- return self[0].display_name
- @property
- def mailboxes(self):
- if self[0].token_type == 'mailbox':
- return [self[0]]
- elif self[0].token_type == 'invalid-mailbox':
- return []
- return self[0].mailboxes
- @property
- def all_mailboxes(self):
- if self[0].token_type == 'mailbox':
- return [self[0]]
- elif self[0].token_type == 'invalid-mailbox':
- return [self[0]]
- return self[0].all_mailboxes
- class MailboxList(TokenList):
- token_type = 'mailbox-list'
- @property
- def mailboxes(self):
- return [x for x in self if x.token_type=='mailbox']
- @property
- def all_mailboxes(self):
- return [x for x in self
- if x.token_type in ('mailbox', 'invalid-mailbox')]
- class GroupList(TokenList):
- token_type = 'group-list'
- @property
- def mailboxes(self):
- if not self or self[0].token_type != 'mailbox-list':
- return []
- return self[0].mailboxes
- @property
- def all_mailboxes(self):
- if not self or self[0].token_type != 'mailbox-list':
- return []
- return self[0].all_mailboxes
- class Group(TokenList):
- token_type = "group"
- @property
- def mailboxes(self):
- if self[2].token_type != 'group-list':
- return []
- return self[2].mailboxes
- @property
- def all_mailboxes(self):
- if self[2].token_type != 'group-list':
- return []
- return self[2].all_mailboxes
- @property
- def display_name(self):
- return self[0].display_name
- class NameAddr(TokenList):
- token_type = 'name-addr'
- @property
- def display_name(self):
- if len(self) == 1:
- return None
- return self[0].display_name
- @property
- def local_part(self):
- return self[-1].local_part
- @property
- def domain(self):
- return self[-1].domain
- @property
- def route(self):
- return self[-1].route
- @property
- def addr_spec(self):
- return self[-1].addr_spec
- class AngleAddr(TokenList):
- token_type = 'angle-addr'
- @property
- def local_part(self):
- for x in self:
- if x.token_type == 'addr-spec':
- return x.local_part
- @property
- def domain(self):
- for x in self:
- if x.token_type == 'addr-spec':
- return x.domain
- @property
- def route(self):
- for x in self:
- if x.token_type == 'obs-route':
- return x.domains
- @property
- def addr_spec(self):
- for x in self:
- if x.token_type == 'addr-spec':
- if x.local_part:
- return x.addr_spec
- else:
- return quote_string(x.local_part) + x.addr_spec
- else:
- return '<>'
- class ObsRoute(TokenList):
- token_type = 'obs-route'
- @property
- def domains(self):
- return [x.domain for x in self if x.token_type == 'domain']
- class Mailbox(TokenList):
- token_type = 'mailbox'
- @property
- def display_name(self):
- if self[0].token_type == 'name-addr':
- return self[0].display_name
- @property
- def local_part(self):
- return self[0].local_part
- @property
- def domain(self):
- return self[0].domain
- @property
- def route(self):
- if self[0].token_type == 'name-addr':
- return self[0].route
- @property
- def addr_spec(self):
- return self[0].addr_spec
- class InvalidMailbox(TokenList):
- token_type = 'invalid-mailbox'
- @property
- def display_name(self):
- return None
- local_part = domain = route = addr_spec = display_name
- class Domain(TokenList):
- token_type = 'domain'
- as_ew_allowed = False
- @property
- def domain(self):
- return ''.join(super().value.split())
- class DotAtom(TokenList):
- token_type = 'dot-atom'
- class DotAtomText(TokenList):
- token_type = 'dot-atom-text'
- as_ew_allowed = True
- class NoFoldLiteral(TokenList):
- token_type = 'no-fold-literal'
- as_ew_allowed = False
- class AddrSpec(TokenList):
- token_type = 'addr-spec'
- as_ew_allowed = False
- @property
- def local_part(self):
- return self[0].local_part
- @property
- def domain(self):
- if len(self) < 3:
- return None
- return self[-1].domain
- @property
- def value(self):
- if len(self) < 3:
- return self[0].value
- return self[0].value.rstrip()+self[1].value+self[2].value.lstrip()
- @property
- def addr_spec(self):
- nameset = set(self.local_part)
- if len(nameset) > len(nameset-DOT_ATOM_ENDS):
- lp = quote_string(self.local_part)
- else:
- lp = self.local_part
- if self.domain is not None:
- return lp + '@' + self.domain
- return lp
- class ObsLocalPart(TokenList):
- token_type = 'obs-local-part'
- as_ew_allowed = False
- class DisplayName(Phrase):
- token_type = 'display-name'
- ew_combine_allowed = False
- @property
- def display_name(self):
- res = TokenList(self)
- if len(res) == 0:
- return res.value
- if res[0].token_type == 'cfws':
- res.pop(0)
- else:
- if res[0][0].token_type == 'cfws':
- res[0] = TokenList(res[0][1:])
- if res[-1].token_type == 'cfws':
- res.pop()
- else:
- if res[-1][-1].token_type == 'cfws':
- res[-1] = TokenList(res[-1][:-1])
- return res.value
- @property
- def value(self):
- quote = False
- if self.defects:
- quote = True
- else:
- for x in self:
- if x.token_type == 'quoted-string':
- quote = True
- if len(self) != 0 and quote:
- pre = post = ''
- if self[0].token_type=='cfws' or self[0][0].token_type=='cfws':
- pre = ' '
- if self[-1].token_type=='cfws' or self[-1][-1].token_type=='cfws':
- post = ' '
- return pre+quote_string(self.display_name)+post
- else:
- return super().value
- class LocalPart(TokenList):
- token_type = 'local-part'
- as_ew_allowed = False
- @property
- def value(self):
- if self[0].token_type == "quoted-string":
- return self[0].quoted_value
- else:
- return self[0].value
- @property
- def local_part(self):
- # Strip whitespace from front, back, and around dots.
- res = [DOT]
- last = DOT
- last_is_tl = False
- for tok in self[0] + [DOT]:
- if tok.token_type == 'cfws':
- continue
- if (last_is_tl and tok.token_type == 'dot' and
- last[-1].token_type == 'cfws'):
- res[-1] = TokenList(last[:-1])
- is_tl = isinstance(tok, TokenList)
- if (is_tl and last.token_type == 'dot' and
- tok[0].token_type == 'cfws'):
- res.append(TokenList(tok[1:]))
- else:
- res.append(tok)
- last = res[-1]
- last_is_tl = is_tl
- res = TokenList(res[1:-1])
- return res.value
- class DomainLiteral(TokenList):
- token_type = 'domain-literal'
- as_ew_allowed = False
- @property
- def domain(self):
- return ''.join(super().value.split())
- @property
- def ip(self):
- for x in self:
- if x.token_type == 'ptext':
- return x.value
- class MIMEVersion(TokenList):
- token_type = 'mime-version'
- major = None
- minor = None
- class Parameter(TokenList):
- token_type = 'parameter'
- sectioned = False
- extended = False
- charset = 'us-ascii'
- @property
- def section_number(self):
- # Because the first token, the attribute (name) eats CFWS, the second
- # token is always the section if there is one.
- return self[1].number if self.sectioned else 0
- @property
- def param_value(self):
- # This is part of the "handle quoted extended parameters" hack.
- for token in self:
- if token.token_type == 'value':
- return token.stripped_value
- if token.token_type == 'quoted-string':
- for token in token:
- if token.token_type == 'bare-quoted-string':
- for token in token:
- if token.token_type == 'value':
- return token.stripped_value
- return ''
- class InvalidParameter(Parameter):
- token_type = 'invalid-parameter'
- class Attribute(TokenList):
- token_type = 'attribute'
- @property
- def stripped_value(self):
- for token in self:
- if token.token_type.endswith('attrtext'):
- return token.value
- class Section(TokenList):
- token_type = 'section'
- number = None
- class Value(TokenList):
- token_type = 'value'
- @property
- def stripped_value(self):
- token = self[0]
- if token.token_type == 'cfws':
- token = self[1]
- if token.token_type.endswith(
- ('quoted-string', 'attribute', 'extended-attribute')):
- return token.stripped_value
- return self.value
- class MimeParameters(TokenList):
- token_type = 'mime-parameters'
- syntactic_break = False
- @property
- def params(self):
- # The RFC specifically states that the ordering of parameters is not
- # guaranteed and may be reordered by the transport layer. So we have
- # to assume the RFC 2231 pieces can come in any order. However, we
- # output them in the order that we first see a given name, which gives
- # us a stable __str__.
- params = {} # Using order preserving dict from Python 3.7+
- for token in self:
- if not token.token_type.endswith('parameter'):
- continue
- if token[0].token_type != 'attribute':
- continue
- name = token[0].value.strip()
- if name not in params:
- params[name] = []
- params[name].append((token.section_number, token))
- for name, parts in params.items():
- parts = sorted(parts, key=itemgetter(0))
- first_param = parts[0][1]
- charset = first_param.charset
- # Our arbitrary error recovery is to ignore duplicate parameters,
- # to use appearance order if there are duplicate rfc 2231 parts,
- # and to ignore gaps. This mimics the error recovery of get_param.
- if not first_param.extended and len(parts) > 1:
- if parts[1][0] == 0:
- parts[1][1].defects.append(errors.InvalidHeaderDefect(
- 'duplicate parameter name; duplicate(s) ignored'))
- parts = parts[:1]
- # Else assume the *0* was missing...note that this is different
- # from get_param, but we registered a defect for this earlier.
- value_parts = []
- i = 0
- for section_number, param in parts:
- if section_number != i:
- # We could get fancier here and look for a complete
- # duplicate extended parameter and ignore the second one
- # seen. But we're not doing that. The old code didn't.
- if not param.extended:
- param.defects.append(errors.InvalidHeaderDefect(
- 'duplicate parameter name; duplicate ignored'))
- continue
- else:
- param.defects.append(errors.InvalidHeaderDefect(
- "inconsistent RFC2231 parameter numbering"))
- i += 1
- value = param.param_value
- if param.extended:
- try:
- value = urllib.parse.unquote_to_bytes(value)
- except UnicodeEncodeError:
- # source had surrogate escaped bytes. What we do now
- # is a bit of an open question. I'm not sure this is
- # the best choice, but it is what the old algorithm did
- value = urllib.parse.unquote(value, encoding='latin-1')
- else:
- try:
- value = value.decode(charset, 'surrogateescape')
- except (LookupError, UnicodeEncodeError):
- # XXX: there should really be a custom defect for
- # unknown character set to make it easy to find,
- # because otherwise unknown charset is a silent
- # failure.
- value = value.decode('us-ascii', 'surrogateescape')
- if utils._has_surrogates(value):
- param.defects.append(errors.UndecodableBytesDefect())
- value_parts.append(value)
- value = ''.join(value_parts)
- yield name, value
- def __str__(self):
- params = []
- for name, value in self.params:
- if value:
- params.append('{}={}'.format(name, quote_string(value)))
- else:
- params.append(name)
- params = '; '.join(params)
- return ' ' + params if params else ''
- class ParameterizedHeaderValue(TokenList):
- # Set this false so that the value doesn't wind up on a new line even
- # if it and the parameters would fit there but not on the first line.
- syntactic_break = False
- @property
- def params(self):
- for token in reversed(self):
- if token.token_type == 'mime-parameters':
- return token.params
- return {}
- class ContentType(ParameterizedHeaderValue):
- token_type = 'content-type'
- as_ew_allowed = False
- maintype = 'text'
- subtype = 'plain'
- class ContentDisposition(ParameterizedHeaderValue):
- token_type = 'content-disposition'
- as_ew_allowed = False
- content_disposition = None
- class ContentTransferEncoding(TokenList):
- token_type = 'content-transfer-encoding'
- as_ew_allowed = False
- cte = '7bit'
- class HeaderLabel(TokenList):
- token_type = 'header-label'
- as_ew_allowed = False
- class MsgID(TokenList):
- token_type = 'msg-id'
- as_ew_allowed = False
- def fold(self, policy):
- # message-id tokens may not be folded.
- return str(self) + policy.linesep
- class MessageID(MsgID):
- token_type = 'message-id'
- class InvalidMessageID(MessageID):
- token_type = 'invalid-message-id'
- class Header(TokenList):
- token_type = 'header'
- #
- # Terminal classes and instances
- #
- class Terminal(str):
- as_ew_allowed = True
- ew_combine_allowed = True
- syntactic_break = True
- def __new__(cls, value, token_type):
- self = super().__new__(cls, value)
- self.token_type = token_type
- self.defects = []
- return self
- def __repr__(self):
- return "{}({})".format(self.__class__.__name__, super().__repr__())
- def pprint(self):
- print(self.__class__.__name__ + '/' + self.token_type)
- @property
- def all_defects(self):
- return list(self.defects)
- def _pp(self, indent=''):
- return ["{}{}/{}({}){}".format(
- indent,
- self.__class__.__name__,
- self.token_type,
- super().__repr__(),
- '' if not self.defects else ' {}'.format(self.defects),
- )]
- def pop_trailing_ws(self):
- # This terminates the recursion.
- return None
- @property
- def comments(self):
- return []
- def __getnewargs__(self):
- return(str(self), self.token_type)
- class WhiteSpaceTerminal(Terminal):
- @property
- def value(self):
- return ' '
- def startswith_fws(self):
- return True
- class ValueTerminal(Terminal):
- @property
- def value(self):
- return self
- def startswith_fws(self):
- return False
- class EWWhiteSpaceTerminal(WhiteSpaceTerminal):
- @property
- def value(self):
- return ''
- def __str__(self):
- return ''
- class _InvalidEwError(errors.HeaderParseError):
- """Invalid encoded word found while parsing headers."""
- # XXX these need to become classes and used as instances so
- # that a program can't change them in a parse tree and screw
- # up other parse trees. Maybe should have tests for that, too.
- DOT = ValueTerminal('.', 'dot')
- ListSeparator = ValueTerminal(',', 'list-separator')
- RouteComponentMarker = ValueTerminal('@', 'route-component-marker')
- #
- # Parser
- #
- # Parse strings according to RFC822/2047/2822/5322 rules.
- #
- # This is a stateless parser. Each get_XXX function accepts a string and
- # returns either a Terminal or a TokenList representing the RFC object named
- # by the method and a string containing the remaining unparsed characters
- # from the input. Thus a parser method consumes the next syntactic construct
- # of a given type and returns a token representing the construct plus the
- # unparsed remainder of the input string.
- #
- # For example, if the first element of a structured header is a 'phrase',
- # then:
- #
- # phrase, value = get_phrase(value)
- #
- # returns the complete phrase from the start of the string value, plus any
- # characters left in the string after the phrase is removed.
- _wsp_splitter = re.compile(r'([{}]+)'.format(''.join(WSP))).split
- _non_atom_end_matcher = re.compile(r"[^{}]+".format(
- re.escape(''.join(ATOM_ENDS)))).match
- _non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall
- _non_token_end_matcher = re.compile(r"[^{}]+".format(
- re.escape(''.join(TOKEN_ENDS)))).match
- _non_attribute_end_matcher = re.compile(r"[^{}]+".format(
- re.escape(''.join(ATTRIBUTE_ENDS)))).match
- _non_extended_attribute_end_matcher = re.compile(r"[^{}]+".format(
- re.escape(''.join(EXTENDED_ATTRIBUTE_ENDS)))).match
- def _validate_xtext(xtext):
- """If input token contains ASCII non-printables, register a defect."""
- non_printables = _non_printable_finder(xtext)
- if non_printables:
- xtext.defects.append(errors.NonPrintableDefect(non_printables))
- if utils._has_surrogates(xtext):
- xtext.defects.append(errors.UndecodableBytesDefect(
- "Non-ASCII characters found in header token"))
- def _get_ptext_to_endchars(value, endchars):
- """Scan printables/quoted-pairs until endchars and return unquoted ptext.
- This function turns a run of qcontent, ccontent-without-comments, or
- dtext-with-quoted-printables into a single string by unquoting any
- quoted printables. It returns the string, the remaining value, and
- a flag that is True iff there were any quoted printables decoded.
- """
- fragment, *remainder = _wsp_splitter(value, 1)
- vchars = []
- escape = False
- had_qp = False
- for pos in range(len(fragment)):
- if fragment[pos] == '\\':
- if escape:
- escape = False
- had_qp = True
- else:
- escape = True
- continue
- if escape:
- escape = False
- elif fragment[pos] in endchars:
- break
- vchars.append(fragment[pos])
- else:
- pos = pos + 1
- return ''.join(vchars), ''.join([fragment[pos:]] + remainder), had_qp
- def get_fws(value):
- """FWS = 1*WSP
- This isn't the RFC definition. We're using fws to represent tokens where
- folding can be done, but when we are parsing the *un*folding has already
- been done so we don't need to watch out for CRLF.
- """
- newvalue = value.lstrip()
- fws = WhiteSpaceTerminal(value[:len(value)-len(newvalue)], 'fws')
- return fws, newvalue
- def get_encoded_word(value):
- """ encoded-word = "=?" charset "?" encoding "?" encoded-text "?="
- """
- ew = EncodedWord()
- if not value.startswith('=?'):
- raise errors.HeaderParseError(
- "expected encoded word but found {}".format(value))
- tok, *remainder = value[2:].split('?=', 1)
- if tok == value[2:]:
- raise errors.HeaderParseError(
- "expected encoded word but found {}".format(value))
- remstr = ''.join(remainder)
- if (len(remstr) > 1 and
- remstr[0] in hexdigits and
- remstr[1] in hexdigits and
- tok.count('?') < 2):
- # The ? after the CTE was followed by an encoded word escape (=XX).
- rest, *remainder = remstr.split('?=', 1)
- tok = tok + '?=' + rest
- if len(tok.split()) > 1:
- ew.defects.append(errors.InvalidHeaderDefect(
- "whitespace inside encoded word"))
- ew.cte = value
- value = ''.join(remainder)
- try:
- text, charset, lang, defects = _ew.decode('=?' + tok + '?=')
- except (ValueError, KeyError):
- raise _InvalidEwError(
- "encoded word format invalid: '{}'".format(ew.cte))
- ew.charset = charset
- ew.lang = lang
- ew.defects.extend(defects)
- while text:
- if text[0] in WSP:
- token, text = get_fws(text)
- ew.append(token)
- continue
- chars, *remainder = _wsp_splitter(text, 1)
- vtext = ValueTerminal(chars, 'vtext')
- _validate_xtext(vtext)
- ew.append(vtext)
- text = ''.join(remainder)
- # Encoded words should be followed by a WS
- if value and value[0] not in WSP:
- ew.defects.append(errors.InvalidHeaderDefect(
- "missing trailing whitespace after encoded-word"))
- return ew, value
- def get_unstructured(value):
- """unstructured = (*([FWS] vchar) *WSP) / obs-unstruct
- obs-unstruct = *((*LF *CR *(obs-utext) *LF *CR)) / FWS)
- obs-utext = %d0 / obs-NO-WS-CTL / LF / CR
- obs-NO-WS-CTL is control characters except WSP/CR/LF.
- So, basically, we have printable runs, plus control characters or nulls in
- the obsolete syntax, separated by whitespace. Since RFC 2047 uses the
- obsolete syntax in its specification, but requires whitespace on either
- side of the encoded words, I can see no reason to need to separate the
- non-printable-non-whitespace from the printable runs if they occur, so we
- parse this into xtext tokens separated by WSP tokens.
- Because an 'unstructured' value must by definition constitute the entire
- value, this 'get' routine does not return a remaining value, only the
- parsed TokenList.
- """
- # XXX: but what about bare CR and LF? They might signal the start or
- # end of an encoded word. YAGNI for now, since our current parsers
- # will never send us strings with bare CR or LF.
- unstructured = UnstructuredTokenList()
- while value:
- if value[0] in WSP:
- token, value = get_fws(value)
- unstructured.append(token)
- continue
- valid_ew = True
- if value.startswith('=?'):
- try:
- token, value = get_encoded_word(value)
- except _InvalidEwError:
- valid_ew = False
- except errors.HeaderParseError:
- # XXX: Need to figure out how to register defects when
- # appropriate here.
- pass
- else:
- have_ws = True
- if len(unstructured) > 0:
- if unstructured[-1].token_type != 'fws':
- unstructured.defects.append(errors.InvalidHeaderDefect(
- "missing whitespace before encoded word"))
- have_ws = False
- if have_ws and len(unstructured) > 1:
- if unstructured[-2].token_type == 'encoded-word':
- unstructured[-1] = EWWhiteSpaceTerminal(
- unstructured[-1], 'fws')
- unstructured.append(token)
- continue
- tok, *remainder = _wsp_splitter(value, 1)
- # Split in the middle of an atom if there is a rfc2047 encoded word
- # which does not have WSP on both sides. The defect will be registered
- # the next time through the loop.
- # This needs to only be performed when the encoded word is valid;
- # otherwise, performing it on an invalid encoded word can cause
- # the parser to go in an infinite loop.
- if valid_ew and rfc2047_matcher.search(tok):
- tok, *remainder = value.partition('=?')
- vtext = ValueTerminal(tok, 'vtext')
- _validate_xtext(vtext)
- unstructured.append(vtext)
- value = ''.join(remainder)
- return unstructured
- def get_qp_ctext(value):
- r"""ctext = <printable ascii except \ ( )>
- This is not the RFC ctext, since we are handling nested comments in comment
- and unquoting quoted-pairs here. We allow anything except the '()'
- characters, but if we find any ASCII other than the RFC defined printable
- ASCII, a NonPrintableDefect is added to the token's defects list. Since
- quoted pairs are converted to their unquoted values, what is returned is
- a 'ptext' token. In this case it is a WhiteSpaceTerminal, so it's value
- is ' '.
- """
- ptext, value, _ = _get_ptext_to_endchars(value, '()')
- ptext = WhiteSpaceTerminal(ptext, 'ptext')
- _validate_xtext(ptext)
- return ptext, value
- def get_qcontent(value):
- """qcontent = qtext / quoted-pair
- We allow anything except the DQUOTE character, but if we find any ASCII
- other than the RFC defined printable ASCII, a NonPrintableDefect is
- added to the token's defects list. Any quoted pairs are converted to their
- unquoted values, so what is returned is a 'ptext' token. In this case it
- is a ValueTerminal.
- """
- ptext, value, _ = _get_ptext_to_endchars(value, '"')
- ptext = ValueTerminal(ptext, 'ptext')
- _validate_xtext(ptext)
- return ptext, value
- def get_atext(value):
- """atext = <matches _atext_matcher>
- We allow any non-ATOM_ENDS in atext, but add an InvalidATextDefect to
- the token's defects list if we find non-atext characters.
- """
- m = _non_atom_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected atext but found '{}'".format(value))
- atext = m.group()
- value = value[len(atext):]
- atext = ValueTerminal(atext, 'atext')
- _validate_xtext(atext)
- return atext, value
- def get_bare_quoted_string(value):
- """bare-quoted-string = DQUOTE *([FWS] qcontent) [FWS] DQUOTE
- A quoted-string without the leading or trailing white space. Its
- value is the text between the quote marks, with whitespace
- preserved and quoted pairs decoded.
- """
- if value[0] != '"':
- raise errors.HeaderParseError(
- "expected '\"' but found '{}'".format(value))
- bare_quoted_string = BareQuotedString()
- value = value[1:]
- if value and value[0] == '"':
- token, value = get_qcontent(value)
- bare_quoted_string.append(token)
- while value and value[0] != '"':
- if value[0] in WSP:
- token, value = get_fws(value)
- elif value[:2] == '=?':
- valid_ew = False
- try:
- token, value = get_encoded_word(value)
- bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
- "encoded word inside quoted string"))
- valid_ew = True
- except errors.HeaderParseError:
- token, value = get_qcontent(value)
- # Collapse the whitespace between two encoded words that occur in a
- # bare-quoted-string.
- if valid_ew and len(bare_quoted_string) > 1:
- if (bare_quoted_string[-1].token_type == 'fws' and
- bare_quoted_string[-2].token_type == 'encoded-word'):
- bare_quoted_string[-1] = EWWhiteSpaceTerminal(
- bare_quoted_string[-1], 'fws')
- else:
- token, value = get_qcontent(value)
- bare_quoted_string.append(token)
- if not value:
- bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
- "end of header inside quoted string"))
- return bare_quoted_string, value
- return bare_quoted_string, value[1:]
- def get_comment(value):
- """comment = "(" *([FWS] ccontent) [FWS] ")"
- ccontent = ctext / quoted-pair / comment
- We handle nested comments here, and quoted-pair in our qp-ctext routine.
- """
- if value and value[0] != '(':
- raise errors.HeaderParseError(
- "expected '(' but found '{}'".format(value))
- comment = Comment()
- value = value[1:]
- while value and value[0] != ")":
- if value[0] in WSP:
- token, value = get_fws(value)
- elif value[0] == '(':
- token, value = get_comment(value)
- else:
- token, value = get_qp_ctext(value)
- comment.append(token)
- if not value:
- comment.defects.append(errors.InvalidHeaderDefect(
- "end of header inside comment"))
- return comment, value
- return comment, value[1:]
- def get_cfws(value):
- """CFWS = (1*([FWS] comment) [FWS]) / FWS
- """
- cfws = CFWSList()
- while value and value[0] in CFWS_LEADER:
- if value[0] in WSP:
- token, value = get_fws(value)
- else:
- token, value = get_comment(value)
- cfws.append(token)
- return cfws, value
- def get_quoted_string(value):
- """quoted-string = [CFWS] <bare-quoted-string> [CFWS]
- 'bare-quoted-string' is an intermediate class defined by this
- parser and not by the RFC grammar. It is the quoted string
- without any attached CFWS.
- """
- quoted_string = QuotedString()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- quoted_string.append(token)
- token, value = get_bare_quoted_string(value)
- quoted_string.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- quoted_string.append(token)
- return quoted_string, value
- def get_atom(value):
- """atom = [CFWS] 1*atext [CFWS]
- An atom could be an rfc2047 encoded word.
- """
- atom = Atom()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- atom.append(token)
- if value and value[0] in ATOM_ENDS:
- raise errors.HeaderParseError(
- "expected atom but found '{}'".format(value))
- if value.startswith('=?'):
- try:
- token, value = get_encoded_word(value)
- except errors.HeaderParseError:
- # XXX: need to figure out how to register defects when
- # appropriate here.
- token, value = get_atext(value)
- else:
- token, value = get_atext(value)
- atom.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- atom.append(token)
- return atom, value
- def get_dot_atom_text(value):
- """ dot-text = 1*atext *("." 1*atext)
- """
- dot_atom_text = DotAtomText()
- if not value or value[0] in ATOM_ENDS:
- raise errors.HeaderParseError("expected atom at a start of "
- "dot-atom-text but found '{}'".format(value))
- while value and value[0] not in ATOM_ENDS:
- token, value = get_atext(value)
- dot_atom_text.append(token)
- if value and value[0] == '.':
- dot_atom_text.append(DOT)
- value = value[1:]
- if dot_atom_text[-1] is DOT:
- raise errors.HeaderParseError("expected atom at end of dot-atom-text "
- "but found '{}'".format('.'+value))
- return dot_atom_text, value
- def get_dot_atom(value):
- """ dot-atom = [CFWS] dot-atom-text [CFWS]
- Any place we can have a dot atom, we could instead have an rfc2047 encoded
- word.
- """
- dot_atom = DotAtom()
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- dot_atom.append(token)
- if value.startswith('=?'):
- try:
- token, value = get_encoded_word(value)
- except errors.HeaderParseError:
- # XXX: need to figure out how to register defects when
- # appropriate here.
- token, value = get_dot_atom_text(value)
- else:
- token, value = get_dot_atom_text(value)
- dot_atom.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- dot_atom.append(token)
- return dot_atom, value
- def get_word(value):
- """word = atom / quoted-string
- Either atom or quoted-string may start with CFWS. We have to peel off this
- CFWS first to determine which type of word to parse. Afterward we splice
- the leading CFWS, if any, into the parsed sub-token.
- If neither an atom or a quoted-string is found before the next special, a
- HeaderParseError is raised.
- The token returned is either an Atom or a QuotedString, as appropriate.
- This means the 'word' level of the formal grammar is not represented in the
- parse tree; this is because having that extra layer when manipulating the
- parse tree is more confusing than it is helpful.
- """
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- else:
- leader = None
- if not value:
- raise errors.HeaderParseError(
- "Expected 'atom' or 'quoted-string' but found nothing.")
- if value[0]=='"':
- token, value = get_quoted_string(value)
- elif value[0] in SPECIALS:
- raise errors.HeaderParseError("Expected 'atom' or 'quoted-string' "
- "but found '{}'".format(value))
- else:
- token, value = get_atom(value)
- if leader is not None:
- token[:0] = [leader]
- return token, value
- def get_phrase(value):
- """ phrase = 1*word / obs-phrase
- obs-phrase = word *(word / "." / CFWS)
- This means a phrase can be a sequence of words, periods, and CFWS in any
- order as long as it starts with at least one word. If anything other than
- words is detected, an ObsoleteHeaderDefect is added to the token's defect
- list. We also accept a phrase that starts with CFWS followed by a dot;
- this is registered as an InvalidHeaderDefect, since it is not supported by
- even the obsolete grammar.
- """
- phrase = Phrase()
- try:
- token, value = get_word(value)
- phrase.append(token)
- except errors.HeaderParseError:
- phrase.defects.append(errors.InvalidHeaderDefect(
- "phrase does not start with word"))
- while value and value[0] not in PHRASE_ENDS:
- if value[0]=='.':
- phrase.append(DOT)
- phrase.defects.append(errors.ObsoleteHeaderDefect(
- "period in 'phrase'"))
- value = value[1:]
- else:
- try:
- token, value = get_word(value)
- except errors.HeaderParseError:
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- phrase.defects.append(errors.ObsoleteHeaderDefect(
- "comment found without atom"))
- else:
- raise
- phrase.append(token)
- return phrase, value
- def get_local_part(value):
- """ local-part = dot-atom / quoted-string / obs-local-part
- """
- local_part = LocalPart()
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError(
- "expected local-part but found '{}'".format(value))
- try:
- token, value = get_dot_atom(value)
- except errors.HeaderParseError:
- try:
- token, value = get_word(value)
- except errors.HeaderParseError:
- if value[0] != '\\' and value[0] in PHRASE_ENDS:
- raise
- token = TokenList()
- if leader is not None:
- token[:0] = [leader]
- local_part.append(token)
- if value and (value[0]=='\\' or value[0] not in PHRASE_ENDS):
- obs_local_part, value = get_obs_local_part(str(local_part) + value)
- if obs_local_part.token_type == 'invalid-obs-local-part':
- local_part.defects.append(errors.InvalidHeaderDefect(
- "local-part is not dot-atom, quoted-string, or obs-local-part"))
- else:
- local_part.defects.append(errors.ObsoleteHeaderDefect(
- "local-part is not a dot-atom (contains CFWS)"))
- local_part[0] = obs_local_part
- try:
- local_part.value.encode('ascii')
- except UnicodeEncodeError:
- local_part.defects.append(errors.NonASCIILocalPartDefect(
- "local-part contains non-ASCII characters)"))
- return local_part, value
- def get_obs_local_part(value):
- """ obs-local-part = word *("." word)
- """
- obs_local_part = ObsLocalPart()
- last_non_ws_was_dot = False
- while value and (value[0]=='\\' or value[0] not in PHRASE_ENDS):
- if value[0] == '.':
- if last_non_ws_was_dot:
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "invalid repeated '.'"))
- obs_local_part.append(DOT)
- last_non_ws_was_dot = True
- value = value[1:]
- continue
- elif value[0]=='\\':
- obs_local_part.append(ValueTerminal(value[0],
- 'misplaced-special'))
- value = value[1:]
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "'\\' character outside of quoted-string/ccontent"))
- last_non_ws_was_dot = False
- continue
- if obs_local_part and obs_local_part[-1].token_type != 'dot':
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "missing '.' between words"))
- try:
- token, value = get_word(value)
- last_non_ws_was_dot = False
- except errors.HeaderParseError:
- if value[0] not in CFWS_LEADER:
- raise
- token, value = get_cfws(value)
- obs_local_part.append(token)
- if (obs_local_part[0].token_type == 'dot' or
- obs_local_part[0].token_type=='cfws' and
- obs_local_part[1].token_type=='dot'):
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "Invalid leading '.' in local part"))
- if (obs_local_part[-1].token_type == 'dot' or
- obs_local_part[-1].token_type=='cfws' and
- obs_local_part[-2].token_type=='dot'):
- obs_local_part.defects.append(errors.InvalidHeaderDefect(
- "Invalid trailing '.' in local part"))
- if obs_local_part.defects:
- obs_local_part.token_type = 'invalid-obs-local-part'
- return obs_local_part, value
- def get_dtext(value):
- r""" dtext = <printable ascii except \ [ ]> / obs-dtext
- obs-dtext = obs-NO-WS-CTL / quoted-pair
- We allow anything except the excluded characters, but if we find any
- ASCII other than the RFC defined printable ASCII, a NonPrintableDefect is
- added to the token's defects list. Quoted pairs are converted to their
- unquoted values, so what is returned is a ptext token, in this case a
- ValueTerminal. If there were quoted-printables, an ObsoleteHeaderDefect is
- added to the returned token's defect list.
- """
- ptext, value, had_qp = _get_ptext_to_endchars(value, '[]')
- ptext = ValueTerminal(ptext, 'ptext')
- if had_qp:
- ptext.defects.append(errors.ObsoleteHeaderDefect(
- "quoted printable found in domain-literal"))
- _validate_xtext(ptext)
- return ptext, value
- def _check_for_early_dl_end(value, domain_literal):
- if value:
- return False
- domain_literal.append(errors.InvalidHeaderDefect(
- "end of input inside domain-literal"))
- domain_literal.append(ValueTerminal(']', 'domain-literal-end'))
- return True
- def get_domain_literal(value):
- """ domain-literal = [CFWS] "[" *([FWS] dtext) [FWS] "]" [CFWS]
- """
- domain_literal = DomainLiteral()
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- domain_literal.append(token)
- if not value:
- raise errors.HeaderParseError("expected domain-literal")
- if value[0] != '[':
- raise errors.HeaderParseError("expected '[' at start of domain-literal "
- "but found '{}'".format(value))
- value = value[1:]
- if _check_for_early_dl_end(value, domain_literal):
- return domain_literal, value
- domain_literal.append(ValueTerminal('[', 'domain-literal-start'))
- if value[0] in WSP:
- token, value = get_fws(value)
- domain_literal.append(token)
- token, value = get_dtext(value)
- domain_literal.append(token)
- if _check_for_early_dl_end(value, domain_literal):
- return domain_literal, value
- if value[0] in WSP:
- token, value = get_fws(value)
- domain_literal.append(token)
- if _check_for_early_dl_end(value, domain_literal):
- return domain_literal, value
- if value[0] != ']':
- raise errors.HeaderParseError("expected ']' at end of domain-literal "
- "but found '{}'".format(value))
- domain_literal.append(ValueTerminal(']', 'domain-literal-end'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- domain_literal.append(token)
- return domain_literal, value
- def get_domain(value):
- """ domain = dot-atom / domain-literal / obs-domain
- obs-domain = atom *("." atom))
- """
- domain = Domain()
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError(
- "expected domain but found '{}'".format(value))
- if value[0] == '[':
- token, value = get_domain_literal(value)
- if leader is not None:
- token[:0] = [leader]
- domain.append(token)
- return domain, value
- try:
- token, value = get_dot_atom(value)
- except errors.HeaderParseError:
- token, value = get_atom(value)
- if value and value[0] == '@':
- raise errors.HeaderParseError('Invalid Domain')
- if leader is not None:
- token[:0] = [leader]
- domain.append(token)
- if value and value[0] == '.':
- domain.defects.append(errors.ObsoleteHeaderDefect(
- "domain is not a dot-atom (contains CFWS)"))
- if domain[0].token_type == 'dot-atom':
- domain[:] = domain[0]
- while value and value[0] == '.':
- domain.append(DOT)
- token, value = get_atom(value[1:])
- domain.append(token)
- return domain, value
- def get_addr_spec(value):
- """ addr-spec = local-part "@" domain
- """
- addr_spec = AddrSpec()
- token, value = get_local_part(value)
- addr_spec.append(token)
- if not value or value[0] != '@':
- addr_spec.defects.append(errors.InvalidHeaderDefect(
- "addr-spec local part with no domain"))
- return addr_spec, value
- addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
- token, value = get_domain(value[1:])
- addr_spec.append(token)
- return addr_spec, value
- def get_obs_route(value):
- """ obs-route = obs-domain-list ":"
- obs-domain-list = *(CFWS / ",") "@" domain *("," [CFWS] ["@" domain])
- Returns an obs-route token with the appropriate sub-tokens (that is,
- there is no obs-domain-list in the parse tree).
- """
- obs_route = ObsRoute()
- while value and (value[0]==',' or value[0] in CFWS_LEADER):
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- obs_route.append(token)
- elif value[0] == ',':
- obs_route.append(ListSeparator)
- value = value[1:]
- if not value or value[0] != '@':
- raise errors.HeaderParseError(
- "expected obs-route domain but found '{}'".format(value))
- obs_route.append(RouteComponentMarker)
- token, value = get_domain(value[1:])
- obs_route.append(token)
- while value and value[0]==',':
- obs_route.append(ListSeparator)
- value = value[1:]
- if not value:
- break
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- obs_route.append(token)
- if value[0] == '@':
- obs_route.append(RouteComponentMarker)
- token, value = get_domain(value[1:])
- obs_route.append(token)
- if not value:
- raise errors.HeaderParseError("end of header while parsing obs-route")
- if value[0] != ':':
- raise errors.HeaderParseError( "expected ':' marking end of "
- "obs-route but found '{}'".format(value))
- obs_route.append(ValueTerminal(':', 'end-of-obs-route-marker'))
- return obs_route, value[1:]
- def get_angle_addr(value):
- """ angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr
- obs-angle-addr = [CFWS] "<" obs-route addr-spec ">" [CFWS]
- """
- angle_addr = AngleAddr()
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- angle_addr.append(token)
- if not value or value[0] != '<':
- raise errors.HeaderParseError(
- "expected angle-addr but found '{}'".format(value))
- angle_addr.append(ValueTerminal('<', 'angle-addr-start'))
- value = value[1:]
- # Although it is not legal per RFC5322, SMTP uses '<>' in certain
- # circumstances.
- if value[0] == '>':
- angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
- angle_addr.defects.append(errors.InvalidHeaderDefect(
- "null addr-spec in angle-addr"))
- value = value[1:]
- return angle_addr, value
- try:
- token, value = get_addr_spec(value)
- except errors.HeaderParseError:
- try:
- token, value = get_obs_route(value)
- angle_addr.defects.append(errors.ObsoleteHeaderDefect(
- "obsolete route specification in angle-addr"))
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected addr-spec or obs-route but found '{}'".format(value))
- angle_addr.append(token)
- token, value = get_addr_spec(value)
- angle_addr.append(token)
- if value and value[0] == '>':
- value = value[1:]
- else:
- angle_addr.defects.append(errors.InvalidHeaderDefect(
- "missing trailing '>' on angle-addr"))
- angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- angle_addr.append(token)
- return angle_addr, value
- def get_display_name(value):
- """ display-name = phrase
- Because this is simply a name-rule, we don't return a display-name
- token containing a phrase, but rather a display-name token with
- the content of the phrase.
- """
- display_name = DisplayName()
- token, value = get_phrase(value)
- display_name.extend(token[:])
- display_name.defects = token.defects[:]
- return display_name, value
- def get_name_addr(value):
- """ name-addr = [display-name] angle-addr
- """
- name_addr = NameAddr()
- # Both the optional display name and the angle-addr can start with cfws.
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError(
- "expected name-addr but found '{}'".format(leader))
- if value[0] != '<':
- if value[0] in PHRASE_ENDS:
- raise errors.HeaderParseError(
- "expected name-addr but found '{}'".format(value))
- token, value = get_display_name(value)
- if not value:
- raise errors.HeaderParseError(
- "expected name-addr but found '{}'".format(token))
- if leader is not None:
- token[0][:0] = [leader]
- leader = None
- name_addr.append(token)
- token, value = get_angle_addr(value)
- if leader is not None:
- token[:0] = [leader]
- name_addr.append(token)
- return name_addr, value
- def get_mailbox(value):
- """ mailbox = name-addr / addr-spec
- """
- # The only way to figure out if we are dealing with a name-addr or an
- # addr-spec is to try parsing each one.
- mailbox = Mailbox()
- try:
- token, value = get_name_addr(value)
- except errors.HeaderParseError:
- try:
- token, value = get_addr_spec(value)
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected mailbox but found '{}'".format(value))
- if any(isinstance(x, errors.InvalidHeaderDefect)
- for x in token.all_defects):
- mailbox.token_type = 'invalid-mailbox'
- mailbox.append(token)
- return mailbox, value
- def get_invalid_mailbox(value, endchars):
- """ Read everything up to one of the chars in endchars.
- This is outside the formal grammar. The InvalidMailbox TokenList that is
- returned acts like a Mailbox, but the data attributes are None.
- """
- invalid_mailbox = InvalidMailbox()
- while value and value[0] not in endchars:
- if value[0] in PHRASE_ENDS:
- invalid_mailbox.append(ValueTerminal(value[0],
- 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- invalid_mailbox.append(token)
- return invalid_mailbox, value
- def get_mailbox_list(value):
- """ mailbox-list = (mailbox *("," mailbox)) / obs-mbox-list
- obs-mbox-list = *([CFWS] ",") mailbox *("," [mailbox / CFWS])
- For this routine we go outside the formal grammar in order to improve error
- handling. We recognize the end of the mailbox list only at the end of the
- value or at a ';' (the group terminator). This is so that we can turn
- invalid mailboxes into InvalidMailbox tokens and continue parsing any
- remaining valid mailboxes. We also allow all mailbox entries to be null,
- and this condition is handled appropriately at a higher level.
- """
- mailbox_list = MailboxList()
- while value and value[0] != ';':
- try:
- token, value = get_mailbox(value)
- mailbox_list.append(token)
- except errors.HeaderParseError:
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value or value[0] in ',;':
- mailbox_list.append(leader)
- mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
- "empty element in mailbox-list"))
- else:
- token, value = get_invalid_mailbox(value, ',;')
- if leader is not None:
- token[:0] = [leader]
- mailbox_list.append(token)
- mailbox_list.defects.append(errors.InvalidHeaderDefect(
- "invalid mailbox in mailbox-list"))
- elif value[0] == ',':
- mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
- "empty element in mailbox-list"))
- else:
- token, value = get_invalid_mailbox(value, ',;')
- if leader is not None:
- token[:0] = [leader]
- mailbox_list.append(token)
- mailbox_list.defects.append(errors.InvalidHeaderDefect(
- "invalid mailbox in mailbox-list"))
- if value and value[0] not in ',;':
- # Crap after mailbox; treat it as an invalid mailbox.
- # The mailbox info will still be available.
- mailbox = mailbox_list[-1]
- mailbox.token_type = 'invalid-mailbox'
- token, value = get_invalid_mailbox(value, ',;')
- mailbox.extend(token)
- mailbox_list.defects.append(errors.InvalidHeaderDefect(
- "invalid mailbox in mailbox-list"))
- if value and value[0] == ',':
- mailbox_list.append(ListSeparator)
- value = value[1:]
- return mailbox_list, value
- def get_group_list(value):
- """ group-list = mailbox-list / CFWS / obs-group-list
- obs-group-list = 1*([CFWS] ",") [CFWS]
- """
- group_list = GroupList()
- if not value:
- group_list.defects.append(errors.InvalidHeaderDefect(
- "end of header before group-list"))
- return group_list, value
- leader = None
- if value and value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- # This should never happen in email parsing, since CFWS-only is a
- # legal alternative to group-list in a group, which is the only
- # place group-list appears.
- group_list.defects.append(errors.InvalidHeaderDefect(
- "end of header in group-list"))
- group_list.append(leader)
- return group_list, value
- if value[0] == ';':
- group_list.append(leader)
- return group_list, value
- token, value = get_mailbox_list(value)
- if len(token.all_mailboxes)==0:
- if leader is not None:
- group_list.append(leader)
- group_list.extend(token)
- group_list.defects.append(errors.ObsoleteHeaderDefect(
- "group-list with empty entries"))
- return group_list, value
- if leader is not None:
- token[:0] = [leader]
- group_list.append(token)
- return group_list, value
- def get_group(value):
- """ group = display-name ":" [group-list] ";" [CFWS]
- """
- group = Group()
- token, value = get_display_name(value)
- if not value or value[0] != ':':
- raise errors.HeaderParseError("expected ':' at end of group "
- "display name but found '{}'".format(value))
- group.append(token)
- group.append(ValueTerminal(':', 'group-display-name-terminator'))
- value = value[1:]
- if value and value[0] == ';':
- group.append(ValueTerminal(';', 'group-terminator'))
- return group, value[1:]
- token, value = get_group_list(value)
- group.append(token)
- if not value:
- group.defects.append(errors.InvalidHeaderDefect(
- "end of header in group"))
- elif value[0] != ';':
- raise errors.HeaderParseError(
- "expected ';' at end of group but found {}".format(value))
- group.append(ValueTerminal(';', 'group-terminator'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- group.append(token)
- return group, value
- def get_address(value):
- """ address = mailbox / group
- Note that counter-intuitively, an address can be either a single address or
- a list of addresses (a group). This is why the returned Address object has
- a 'mailboxes' attribute which treats a single address as a list of length
- one. When you need to differentiate between to two cases, extract the single
- element, which is either a mailbox or a group token.
- """
- # The formal grammar isn't very helpful when parsing an address. mailbox
- # and group, especially when allowing for obsolete forms, start off very
- # similarly. It is only when you reach one of @, <, or : that you know
- # what you've got. So, we try each one in turn, starting with the more
- # likely of the two. We could perhaps make this more efficient by looking
- # for a phrase and then branching based on the next character, but that
- # would be a premature optimization.
- address = Address()
- try:
- token, value = get_group(value)
- except errors.HeaderParseError:
- try:
- token, value = get_mailbox(value)
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected address but found '{}'".format(value))
- address.append(token)
- return address, value
- def get_address_list(value):
- """ address_list = (address *("," address)) / obs-addr-list
- obs-addr-list = *([CFWS] ",") address *("," [address / CFWS])
- We depart from the formal grammar here by continuing to parse until the end
- of the input, assuming the input to be entirely composed of an
- address-list. This is always true in email parsing, and allows us
- to skip invalid addresses to parse additional valid ones.
- """
- address_list = AddressList()
- while value:
- try:
- token, value = get_address(value)
- address_list.append(token)
- except errors.HeaderParseError:
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value or value[0] == ',':
- address_list.append(leader)
- address_list.defects.append(errors.ObsoleteHeaderDefect(
- "address-list entry with no content"))
- else:
- token, value = get_invalid_mailbox(value, ',')
- if leader is not None:
- token[:0] = [leader]
- address_list.append(Address([token]))
- address_list.defects.append(errors.InvalidHeaderDefect(
- "invalid address in address-list"))
- elif value[0] == ',':
- address_list.defects.append(errors.ObsoleteHeaderDefect(
- "empty element in address-list"))
- else:
- token, value = get_invalid_mailbox(value, ',')
- if leader is not None:
- token[:0] = [leader]
- address_list.append(Address([token]))
- address_list.defects.append(errors.InvalidHeaderDefect(
- "invalid address in address-list"))
- if value and value[0] != ',':
- # Crap after address; treat it as an invalid mailbox.
- # The mailbox info will still be available.
- mailbox = address_list[-1][0]
- mailbox.token_type = 'invalid-mailbox'
- token, value = get_invalid_mailbox(value, ',')
- mailbox.extend(token)
- address_list.defects.append(errors.InvalidHeaderDefect(
- "invalid address in address-list"))
- if value: # Must be a , at this point.
- address_list.append(ValueTerminal(',', 'list-separator'))
- value = value[1:]
- return address_list, value
- def get_no_fold_literal(value):
- """ no-fold-literal = "[" *dtext "]"
- """
- no_fold_literal = NoFoldLiteral()
- if not value:
- raise errors.HeaderParseError(
- "expected no-fold-literal but found '{}'".format(value))
- if value[0] != '[':
- raise errors.HeaderParseError(
- "expected '[' at the start of no-fold-literal "
- "but found '{}'".format(value))
- no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start'))
- value = value[1:]
- token, value = get_dtext(value)
- no_fold_literal.append(token)
- if not value or value[0] != ']':
- raise errors.HeaderParseError(
- "expected ']' at the end of no-fold-literal "
- "but found '{}'".format(value))
- no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end'))
- return no_fold_literal, value[1:]
- def get_msg_id(value):
- """msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS]
- id-left = dot-atom-text / obs-id-left
- id-right = dot-atom-text / no-fold-literal / obs-id-right
- no-fold-literal = "[" *dtext "]"
- """
- msg_id = MsgID()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- msg_id.append(token)
- if not value or value[0] != '<':
- raise errors.HeaderParseError(
- "expected msg-id but found '{}'".format(value))
- msg_id.append(ValueTerminal('<', 'msg-id-start'))
- value = value[1:]
- # Parse id-left.
- try:
- token, value = get_dot_atom_text(value)
- except errors.HeaderParseError:
- try:
- # obs-id-left is same as local-part of add-spec.
- token, value = get_obs_local_part(value)
- msg_id.defects.append(errors.ObsoleteHeaderDefect(
- "obsolete id-left in msg-id"))
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected dot-atom-text or obs-id-left"
- " but found '{}'".format(value))
- msg_id.append(token)
- if not value or value[0] != '@':
- msg_id.defects.append(errors.InvalidHeaderDefect(
- "msg-id with no id-right"))
- # Even though there is no id-right, if the local part
- # ends with `>` let's just parse it too and return
- # along with the defect.
- if value and value[0] == '>':
- msg_id.append(ValueTerminal('>', 'msg-id-end'))
- value = value[1:]
- return msg_id, value
- msg_id.append(ValueTerminal('@', 'address-at-symbol'))
- value = value[1:]
- # Parse id-right.
- try:
- token, value = get_dot_atom_text(value)
- except errors.HeaderParseError:
- try:
- token, value = get_no_fold_literal(value)
- except errors.HeaderParseError:
- try:
- token, value = get_domain(value)
- msg_id.defects.append(errors.ObsoleteHeaderDefect(
- "obsolete id-right in msg-id"))
- except errors.HeaderParseError:
- raise errors.HeaderParseError(
- "expected dot-atom-text, no-fold-literal or obs-id-right"
- " but found '{}'".format(value))
- msg_id.append(token)
- if value and value[0] == '>':
- value = value[1:]
- else:
- msg_id.defects.append(errors.InvalidHeaderDefect(
- "missing trailing '>' on msg-id"))
- msg_id.append(ValueTerminal('>', 'msg-id-end'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- msg_id.append(token)
- return msg_id, value
- def parse_message_id(value):
- """message-id = "Message-ID:" msg-id CRLF
- """
- message_id = MessageID()
- try:
- token, value = get_msg_id(value)
- message_id.append(token)
- except errors.HeaderParseError as ex:
- token = get_unstructured(value)
- message_id = InvalidMessageID(token)
- message_id.defects.append(
- errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex)))
- else:
- # Value after parsing a valid msg_id should be None.
- if value:
- message_id.defects.append(errors.InvalidHeaderDefect(
- "Unexpected {!r}".format(value)))
- return message_id
- #
- # XXX: As I begin to add additional header parsers, I'm realizing we probably
- # have two level of parser routines: the get_XXX methods that get a token in
- # the grammar, and parse_XXX methods that parse an entire field value. So
- # get_address_list above should really be a parse_ method, as probably should
- # be get_unstructured.
- #
- def parse_mime_version(value):
- """ mime-version = [CFWS] 1*digit [CFWS] "." [CFWS] 1*digit [CFWS]
- """
- # The [CFWS] is implicit in the RFC 2045 BNF.
- # XXX: This routine is a bit verbose, should factor out a get_int method.
- mime_version = MIMEVersion()
- if not value:
- mime_version.defects.append(errors.HeaderMissingRequiredValue(
- "Missing MIME version number (eg: 1.0)"))
- return mime_version
- if value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if not value:
- mime_version.defects.append(errors.HeaderMissingRequiredValue(
- "Expected MIME version number but found only CFWS"))
- digits = ''
- while value and value[0] != '.' and value[0] not in CFWS_LEADER:
- digits += value[0]
- value = value[1:]
- if not digits.isdigit():
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Expected MIME major version number but found {!r}".format(digits)))
- mime_version.append(ValueTerminal(digits, 'xtext'))
- else:
- mime_version.major = int(digits)
- mime_version.append(ValueTerminal(digits, 'digits'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if not value or value[0] != '.':
- if mime_version.major is not None:
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Incomplete MIME version; found only major number"))
- if value:
- mime_version.append(ValueTerminal(value, 'xtext'))
- return mime_version
- mime_version.append(ValueTerminal('.', 'version-separator'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if not value:
- if mime_version.major is not None:
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Incomplete MIME version; found only major number"))
- return mime_version
- digits = ''
- while value and value[0] not in CFWS_LEADER:
- digits += value[0]
- value = value[1:]
- if not digits.isdigit():
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Expected MIME minor version number but found {!r}".format(digits)))
- mime_version.append(ValueTerminal(digits, 'xtext'))
- else:
- mime_version.minor = int(digits)
- mime_version.append(ValueTerminal(digits, 'digits'))
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mime_version.append(token)
- if value:
- mime_version.defects.append(errors.InvalidHeaderDefect(
- "Excess non-CFWS text after MIME version"))
- mime_version.append(ValueTerminal(value, 'xtext'))
- return mime_version
- def get_invalid_parameter(value):
- """ Read everything up to the next ';'.
- This is outside the formal grammar. The InvalidParameter TokenList that is
- returned acts like a Parameter, but the data attributes are None.
- """
- invalid_parameter = InvalidParameter()
- while value and value[0] != ';':
- if value[0] in PHRASE_ENDS:
- invalid_parameter.append(ValueTerminal(value[0],
- 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- invalid_parameter.append(token)
- return invalid_parameter, value
- def get_ttext(value):
- """ttext = <matches _ttext_matcher>
- We allow any non-TOKEN_ENDS in ttext, but add defects to the token's
- defects list if we find non-ttext characters. We also register defects for
- *any* non-printables even though the RFC doesn't exclude all of them,
- because we follow the spirit of RFC 5322.
- """
- m = _non_token_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected ttext but found '{}'".format(value))
- ttext = m.group()
- value = value[len(ttext):]
- ttext = ValueTerminal(ttext, 'ttext')
- _validate_xtext(ttext)
- return ttext, value
- def get_token(value):
- """token = [CFWS] 1*ttext [CFWS]
- The RFC equivalent of ttext is any US-ASCII chars except space, ctls, or
- tspecials. We also exclude tabs even though the RFC doesn't.
- The RFC implies the CFWS but is not explicit about it in the BNF.
- """
- mtoken = Token()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mtoken.append(token)
- if value and value[0] in TOKEN_ENDS:
- raise errors.HeaderParseError(
- "expected token but found '{}'".format(value))
- token, value = get_ttext(value)
- mtoken.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- mtoken.append(token)
- return mtoken, value
- def get_attrtext(value):
- """attrtext = 1*(any non-ATTRIBUTE_ENDS character)
- We allow any non-ATTRIBUTE_ENDS in attrtext, but add defects to the
- token's defects list if we find non-attrtext characters. We also register
- defects for *any* non-printables even though the RFC doesn't exclude all of
- them, because we follow the spirit of RFC 5322.
- """
- m = _non_attribute_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected attrtext but found {!r}".format(value))
- attrtext = m.group()
- value = value[len(attrtext):]
- attrtext = ValueTerminal(attrtext, 'attrtext')
- _validate_xtext(attrtext)
- return attrtext, value
- def get_attribute(value):
- """ [CFWS] 1*attrtext [CFWS]
- This version of the BNF makes the CFWS explicit, and as usual we use a
- value terminal for the actual run of characters. The RFC equivalent of
- attrtext is the token characters, with the subtraction of '*', "'", and '%'.
- We include tab in the excluded set just as we do for token.
- """
- attribute = Attribute()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- if value and value[0] in ATTRIBUTE_ENDS:
- raise errors.HeaderParseError(
- "expected token but found '{}'".format(value))
- token, value = get_attrtext(value)
- attribute.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- return attribute, value
- def get_extended_attrtext(value):
- """attrtext = 1*(any non-ATTRIBUTE_ENDS character plus '%')
- This is a special parsing routine so that we get a value that
- includes % escapes as a single string (which we decode as a single
- string later).
- """
- m = _non_extended_attribute_end_matcher(value)
- if not m:
- raise errors.HeaderParseError(
- "expected extended attrtext but found {!r}".format(value))
- attrtext = m.group()
- value = value[len(attrtext):]
- attrtext = ValueTerminal(attrtext, 'extended-attrtext')
- _validate_xtext(attrtext)
- return attrtext, value
- def get_extended_attribute(value):
- """ [CFWS] 1*extended_attrtext [CFWS]
- This is like the non-extended version except we allow % characters, so that
- we can pick up an encoded value as a single string.
- """
- # XXX: should we have an ExtendedAttribute TokenList?
- attribute = Attribute()
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- if value and value[0] in EXTENDED_ATTRIBUTE_ENDS:
- raise errors.HeaderParseError(
- "expected token but found '{}'".format(value))
- token, value = get_extended_attrtext(value)
- attribute.append(token)
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- attribute.append(token)
- return attribute, value
- def get_section(value):
- """ '*' digits
- The formal BNF is more complicated because leading 0s are not allowed. We
- check for that and add a defect. We also assume no CFWS is allowed between
- the '*' and the digits, though the RFC is not crystal clear on that.
- The caller should already have dealt with leading CFWS.
- """
- section = Section()
- if not value or value[0] != '*':
- raise errors.HeaderParseError("Expected section but found {}".format(
- value))
- section.append(ValueTerminal('*', 'section-marker'))
- value = value[1:]
- if not value or not value[0].isdigit():
- raise errors.HeaderParseError("Expected section number but "
- "found {}".format(value))
- digits = ''
- while value and value[0].isdigit():
- digits += value[0]
- value = value[1:]
- if digits[0] == '0' and digits != '0':
- section.defects.append(errors.InvalidHeaderDefect(
- "section number has an invalid leading 0"))
- section.number = int(digits)
- section.append(ValueTerminal(digits, 'digits'))
- return section, value
- def get_value(value):
- """ quoted-string / attribute
- """
- v = Value()
- if not value:
- raise errors.HeaderParseError("Expected value but found end of string")
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- raise errors.HeaderParseError("Expected value but found "
- "only {}".format(leader))
- if value[0] == '"':
- token, value = get_quoted_string(value)
- else:
- token, value = get_extended_attribute(value)
- if leader is not None:
- token[:0] = [leader]
- v.append(token)
- return v, value
- def get_parameter(value):
- """ attribute [section] ["*"] [CFWS] "=" value
- The CFWS is implied by the RFC but not made explicit in the BNF. This
- simplified form of the BNF from the RFC is made to conform with the RFC BNF
- through some extra checks. We do it this way because it makes both error
- recovery and working with the resulting parse tree easier.
- """
- # It is possible CFWS would also be implicitly allowed between the section
- # and the 'extended-attribute' marker (the '*') , but we've never seen that
- # in the wild and we will therefore ignore the possibility.
- param = Parameter()
- token, value = get_attribute(value)
- param.append(token)
- if not value or value[0] == ';':
- param.defects.append(errors.InvalidHeaderDefect("Parameter contains "
- "name ({}) but no value".format(token)))
- return param, value
- if value[0] == '*':
- try:
- token, value = get_section(value)
- param.sectioned = True
- param.append(token)
- except errors.HeaderParseError:
- pass
- if not value:
- raise errors.HeaderParseError("Incomplete parameter")
- if value[0] == '*':
- param.append(ValueTerminal('*', 'extended-parameter-marker'))
- value = value[1:]
- param.extended = True
- if value[0] != '=':
- raise errors.HeaderParseError("Parameter not followed by '='")
- param.append(ValueTerminal('=', 'parameter-separator'))
- value = value[1:]
- if value and value[0] in CFWS_LEADER:
- token, value = get_cfws(value)
- param.append(token)
- remainder = None
- appendto = param
- if param.extended and value and value[0] == '"':
- # Now for some serious hackery to handle the common invalid case of
- # double quotes around an extended value. We also accept (with defect)
- # a value marked as encoded that isn't really.
- qstring, remainder = get_quoted_string(value)
- inner_value = qstring.stripped_value
- semi_valid = False
- if param.section_number == 0:
- if inner_value and inner_value[0] == "'":
- semi_valid = True
- else:
- token, rest = get_attrtext(inner_value)
- if rest and rest[0] == "'":
- semi_valid = True
- else:
- try:
- token, rest = get_extended_attrtext(inner_value)
- except:
- pass
- else:
- if not rest:
- semi_valid = True
- if semi_valid:
- param.defects.append(errors.InvalidHeaderDefect(
- "Quoted string value for extended parameter is invalid"))
- param.append(qstring)
- for t in qstring:
- if t.token_type == 'bare-quoted-string':
- t[:] = []
- appendto = t
- break
- value = inner_value
- else:
- remainder = None
- param.defects.append(errors.InvalidHeaderDefect(
- "Parameter marked as extended but appears to have a "
- "quoted string value that is non-encoded"))
- if value and value[0] == "'":
- token = None
- else:
- token, value = get_value(value)
- if not param.extended or param.section_number > 0:
- if not value or value[0] != "'":
- appendto.append(token)
- if remainder is not None:
- assert not value, value
- value = remainder
- return param, value
- param.defects.append(errors.InvalidHeaderDefect(
- "Apparent initial-extended-value but attribute "
- "was not marked as extended or was not initial section"))
- if not value:
- # Assume the charset/lang is missing and the token is the value.
- param.defects.append(errors.InvalidHeaderDefect(
- "Missing required charset/lang delimiters"))
- appendto.append(token)
- if remainder is None:
- return param, value
- else:
- if token is not None:
- for t in token:
- if t.token_type == 'extended-attrtext':
- break
- t.token_type == 'attrtext'
- appendto.append(t)
- param.charset = t.value
- if value[0] != "'":
- raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
- "delimiter, but found {!r}".format(value))
- appendto.append(ValueTerminal("'", 'RFC2231-delimiter'))
- value = value[1:]
- if value and value[0] != "'":
- token, value = get_attrtext(value)
- appendto.append(token)
- param.lang = token.value
- if not value or value[0] != "'":
- raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
- "delimiter, but found {}".format(value))
- appendto.append(ValueTerminal("'", 'RFC2231-delimiter'))
- value = value[1:]
- if remainder is not None:
- # Treat the rest of value as bare quoted string content.
- v = Value()
- while value:
- if value[0] in WSP:
- token, value = get_fws(value)
- elif value[0] == '"':
- token = ValueTerminal('"', 'DQUOTE')
- value = value[1:]
- else:
- token, value = get_qcontent(value)
- v.append(token)
- token = v
- else:
- token, value = get_value(value)
- appendto.append(token)
- if remainder is not None:
- assert not value, value
- value = remainder
- return param, value
- def parse_mime_parameters(value):
- """ parameter *( ";" parameter )
- That BNF is meant to indicate this routine should only be called after
- finding and handling the leading ';'. There is no corresponding rule in
- the formal RFC grammar, but it is more convenient for us for the set of
- parameters to be treated as its own TokenList.
- This is 'parse' routine because it consumes the remaining value, but it
- would never be called to parse a full header. Instead it is called to
- parse everything after the non-parameter value of a specific MIME header.
- """
- mime_parameters = MimeParameters()
- while value:
- try:
- token, value = get_parameter(value)
- mime_parameters.append(token)
- except errors.HeaderParseError:
- leader = None
- if value[0] in CFWS_LEADER:
- leader, value = get_cfws(value)
- if not value:
- mime_parameters.append(leader)
- return mime_parameters
- if value[0] == ';':
- if leader is not None:
- mime_parameters.append(leader)
- mime_parameters.defects.append(errors.InvalidHeaderDefect(
- "parameter entry with no content"))
- else:
- token, value = get_invalid_parameter(value)
- if leader:
- token[:0] = [leader]
- mime_parameters.append(token)
- mime_parameters.defects.append(errors.InvalidHeaderDefect(
- "invalid parameter {!r}".format(token)))
- if value and value[0] != ';':
- # Junk after the otherwise valid parameter. Mark it as
- # invalid, but it will have a value.
- param = mime_parameters[-1]
- param.token_type = 'invalid-parameter'
- token, value = get_invalid_parameter(value)
- param.extend(token)
- mime_parameters.defects.append(errors.InvalidHeaderDefect(
- "parameter with invalid trailing text {!r}".format(token)))
- if value:
- # Must be a ';' at this point.
- mime_parameters.append(ValueTerminal(';', 'parameter-separator'))
- value = value[1:]
- return mime_parameters
- def _find_mime_parameters(tokenlist, value):
- """Do our best to find the parameters in an invalid MIME header
- """
- while value and value[0] != ';':
- if value[0] in PHRASE_ENDS:
- tokenlist.append(ValueTerminal(value[0], 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- tokenlist.append(token)
- if not value:
- return
- tokenlist.append(ValueTerminal(';', 'parameter-separator'))
- tokenlist.append(parse_mime_parameters(value[1:]))
- def parse_content_type_header(value):
- """ maintype "/" subtype *( ";" parameter )
- The maintype and substype are tokens. Theoretically they could
- be checked against the official IANA list + x-token, but we
- don't do that.
- """
- ctype = ContentType()
- if not value:
- ctype.defects.append(errors.HeaderMissingRequiredValue(
- "Missing content type specification"))
- return ctype
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Expected content maintype but found {!r}".format(value)))
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.append(token)
- # XXX: If we really want to follow the formal grammar we should make
- # mantype and subtype specialized TokenLists here. Probably not worth it.
- if not value or value[0] != '/':
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Invalid content type"))
- if value:
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.maintype = token.value.strip().lower()
- ctype.append(ValueTerminal('/', 'content-type-separator'))
- value = value[1:]
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Expected content subtype but found {!r}".format(value)))
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.append(token)
- ctype.subtype = token.value.strip().lower()
- if not value:
- return ctype
- if value[0] != ';':
- ctype.defects.append(errors.InvalidHeaderDefect(
- "Only parameters are valid after content type, but "
- "found {!r}".format(value)))
- # The RFC requires that a syntactically invalid content-type be treated
- # as text/plain. Perhaps we should postel this, but we should probably
- # only do that if we were checking the subtype value against IANA.
- del ctype.maintype, ctype.subtype
- _find_mime_parameters(ctype, value)
- return ctype
- ctype.append(ValueTerminal(';', 'parameter-separator'))
- ctype.append(parse_mime_parameters(value[1:]))
- return ctype
- def parse_content_disposition_header(value):
- """ disposition-type *( ";" parameter )
- """
- disp_header = ContentDisposition()
- if not value:
- disp_header.defects.append(errors.HeaderMissingRequiredValue(
- "Missing content disposition"))
- return disp_header
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- disp_header.defects.append(errors.InvalidHeaderDefect(
- "Expected content disposition but found {!r}".format(value)))
- _find_mime_parameters(disp_header, value)
- return disp_header
- disp_header.append(token)
- disp_header.content_disposition = token.value.strip().lower()
- if not value:
- return disp_header
- if value[0] != ';':
- disp_header.defects.append(errors.InvalidHeaderDefect(
- "Only parameters are valid after content disposition, but "
- "found {!r}".format(value)))
- _find_mime_parameters(disp_header, value)
- return disp_header
- disp_header.append(ValueTerminal(';', 'parameter-separator'))
- disp_header.append(parse_mime_parameters(value[1:]))
- return disp_header
- def parse_content_transfer_encoding_header(value):
- """ mechanism
- """
- # We should probably validate the values, since the list is fixed.
- cte_header = ContentTransferEncoding()
- if not value:
- cte_header.defects.append(errors.HeaderMissingRequiredValue(
- "Missing content transfer encoding"))
- return cte_header
- try:
- token, value = get_token(value)
- except errors.HeaderParseError:
- cte_header.defects.append(errors.InvalidHeaderDefect(
- "Expected content transfer encoding but found {!r}".format(value)))
- else:
- cte_header.append(token)
- cte_header.cte = token.value.strip().lower()
- if not value:
- return cte_header
- while value:
- cte_header.defects.append(errors.InvalidHeaderDefect(
- "Extra text after content transfer encoding"))
- if value[0] in PHRASE_ENDS:
- cte_header.append(ValueTerminal(value[0], 'misplaced-special'))
- value = value[1:]
- else:
- token, value = get_phrase(value)
- cte_header.append(token)
- return cte_header
- #
- # Header folding
- #
- # Header folding is complex, with lots of rules and corner cases. The
- # following code does its best to obey the rules and handle the corner
- # cases, but you can be sure there are few bugs:)
- #
- # This folder generally canonicalizes as it goes, preferring the stringified
- # version of each token. The tokens contain information that supports the
- # folder, including which tokens can be encoded in which ways.
- #
- # Folded text is accumulated in a simple list of strings ('lines'), each
- # one of which should be less than policy.max_line_length ('maxlen').
- #
- def _steal_trailing_WSP_if_exists(lines):
- wsp = ''
- if lines and lines[-1] and lines[-1][-1] in WSP:
- wsp = lines[-1][-1]
- lines[-1] = lines[-1][:-1]
- return wsp
- def _refold_parse_tree(parse_tree, *, policy):
- """Return string of contents of parse_tree folded according to RFC rules.
- """
- # max_line_length 0/None means no limit, ie: infinitely long.
- maxlen = policy.max_line_length or sys.maxsize
- encoding = 'utf-8' if policy.utf8 else 'us-ascii'
- lines = ['']
- last_ew = None
- wrap_as_ew_blocked = 0
- want_encoding = False
- end_ew_not_allowed = Terminal('', 'wrap_as_ew_blocked')
- parts = list(parse_tree)
- while parts:
- part = parts.pop(0)
- if part is end_ew_not_allowed:
- wrap_as_ew_blocked -= 1
- continue
- tstr = str(part)
- if part.token_type == 'ptext' and set(tstr) & SPECIALS:
- # Encode if tstr contains special characters.
- want_encoding = True
- try:
- tstr.encode(encoding)
- charset = encoding
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- # If policy.utf8 is false this should really be taken from a
- # 'charset' property on the policy.
- charset = 'utf-8'
- want_encoding = True
- if part.token_type == 'mime-parameters':
- # Mime parameter folding (using RFC2231) is extra special.
- _fold_mime_parameters(part, lines, maxlen, encoding)
- continue
- if want_encoding and not wrap_as_ew_blocked:
- if not part.as_ew_allowed:
- want_encoding = False
- last_ew = None
- if part.syntactic_break:
- encoded_part = part.fold(policy=policy)[:-len(policy.linesep)]
- if policy.linesep not in encoded_part:
- # It fits on a single line
- if len(encoded_part) > maxlen - len(lines[-1]):
- # But not on this one, so start a new one.
- newline = _steal_trailing_WSP_if_exists(lines)
- # XXX what if encoded_part has no leading FWS?
- lines.append(newline)
- lines[-1] += encoded_part
- continue
- # Either this is not a major syntactic break, so we don't
- # want it on a line by itself even if it fits, or it
- # doesn't fit on a line by itself. Either way, fall through
- # to unpacking the subparts and wrapping them.
- if not hasattr(part, 'encode'):
- # It's not a Terminal, do each piece individually.
- parts = list(part) + parts
- else:
- # It's a terminal, wrap it as an encoded word, possibly
- # combining it with previously encoded words if allowed.
- last_ew = _fold_as_ew(tstr, lines, maxlen, last_ew,
- part.ew_combine_allowed, charset)
- want_encoding = False
- continue
- if len(tstr) <= maxlen - len(lines[-1]):
- lines[-1] += tstr
- continue
- # This part is too long to fit. The RFC wants us to break at
- # "major syntactic breaks", so unless we don't consider this
- # to be one, check if it will fit on the next line by itself.
- if (part.syntactic_break and
- len(tstr) + 1 <= maxlen):
- newline = _steal_trailing_WSP_if_exists(lines)
- if newline or part.startswith_fws():
- lines.append(newline + tstr)
- last_ew = None
- continue
- if not hasattr(part, 'encode'):
- # It's not a terminal, try folding the subparts.
- newparts = list(part)
- if not part.as_ew_allowed:
- wrap_as_ew_blocked += 1
- newparts.append(end_ew_not_allowed)
- parts = newparts + parts
- continue
- if part.as_ew_allowed and not wrap_as_ew_blocked:
- # It doesn't need CTE encoding, but encode it anyway so we can
- # wrap it.
- parts.insert(0, part)
- want_encoding = True
- continue
- # We can't figure out how to wrap, it, so give up.
- newline = _steal_trailing_WSP_if_exists(lines)
- if newline or part.startswith_fws():
- lines.append(newline + tstr)
- else:
- # We can't fold it onto the next line either...
- lines[-1] += tstr
- return policy.linesep.join(lines) + policy.linesep
- def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset):
- """Fold string to_encode into lines as encoded word, combining if allowed.
- Return the new value for last_ew, or None if ew_combine_allowed is False.
- If there is already an encoded word in the last line of lines (indicated by
- a non-None value for last_ew) and ew_combine_allowed is true, decode the
- existing ew, combine it with to_encode, and re-encode. Otherwise, encode
- to_encode. In either case, split to_encode as necessary so that the
- encoded segments fit within maxlen.
- """
- if last_ew is not None and ew_combine_allowed:
- to_encode = str(
- get_unstructured(lines[-1][last_ew:] + to_encode))
- lines[-1] = lines[-1][:last_ew]
- if to_encode[0] in WSP:
- # We're joining this to non-encoded text, so don't encode
- # the leading blank.
- leading_wsp = to_encode[0]
- to_encode = to_encode[1:]
- if (len(lines[-1]) == maxlen):
- lines.append(_steal_trailing_WSP_if_exists(lines))
- lines[-1] += leading_wsp
- trailing_wsp = ''
- if to_encode[-1] in WSP:
- # Likewise for the trailing space.
- trailing_wsp = to_encode[-1]
- to_encode = to_encode[:-1]
- new_last_ew = len(lines[-1]) if last_ew is None else last_ew
- encode_as = 'utf-8' if charset == 'us-ascii' else charset
- # The RFC2047 chrome takes up 7 characters plus the length
- # of the charset name.
- chrome_len = len(encode_as) + 7
- if (chrome_len + 1) >= maxlen:
- raise errors.HeaderParseError(
- "max_line_length is too small to fit an encoded word")
- while to_encode:
- remaining_space = maxlen - len(lines[-1])
- text_space = remaining_space - chrome_len
- if text_space <= 0:
- lines.append(' ')
- continue
- to_encode_word = to_encode[:text_space]
- encoded_word = _ew.encode(to_encode_word, charset=encode_as)
- excess = len(encoded_word) - remaining_space
- while excess > 0:
- # Since the chunk to encode is guaranteed to fit into less than 100 characters,
- # shrinking it by one at a time shouldn't take long.
- to_encode_word = to_encode_word[:-1]
- encoded_word = _ew.encode(to_encode_word, charset=encode_as)
- excess = len(encoded_word) - remaining_space
- lines[-1] += encoded_word
- to_encode = to_encode[len(to_encode_word):]
- if to_encode:
- lines.append(' ')
- new_last_ew = len(lines[-1])
- lines[-1] += trailing_wsp
- return new_last_ew if ew_combine_allowed else None
- def _fold_mime_parameters(part, lines, maxlen, encoding):
- """Fold TokenList 'part' into the 'lines' list as mime parameters.
- Using the decoded list of parameters and values, format them according to
- the RFC rules, including using RFC2231 encoding if the value cannot be
- expressed in 'encoding' and/or the parameter+value is too long to fit
- within 'maxlen'.
- """
- # Special case for RFC2231 encoding: start from decoded values and use
- # RFC2231 encoding iff needed.
- #
- # Note that the 1 and 2s being added to the length calculations are
- # accounting for the possibly-needed spaces and semicolons we'll be adding.
- #
- for name, value in part.params:
- # XXX What if this ';' puts us over maxlen the first time through the
- # loop? We should split the header value onto a newline in that case,
- # but to do that we need to recognize the need earlier or reparse the
- # header, so I'm going to ignore that bug for now. It'll only put us
- # one character over.
- if not lines[-1].rstrip().endswith(';'):
- lines[-1] += ';'
- charset = encoding
- error_handler = 'strict'
- try:
- value.encode(encoding)
- encoding_required = False
- except UnicodeEncodeError:
- encoding_required = True
- if utils._has_surrogates(value):
- charset = 'unknown-8bit'
- error_handler = 'surrogateescape'
- else:
- charset = 'utf-8'
- if encoding_required:
- encoded_value = urllib.parse.quote(
- value, safe='', errors=error_handler)
- tstr = "{}*={}''{}".format(name, charset, encoded_value)
- else:
- tstr = '{}={}'.format(name, quote_string(value))
- if len(lines[-1]) + len(tstr) + 1 < maxlen:
- lines[-1] = lines[-1] + ' ' + tstr
- continue
- elif len(tstr) + 2 <= maxlen:
- lines.append(' ' + tstr)
- continue
- # We need multiple sections. We are allowed to mix encoded and
- # non-encoded sections, but we aren't going to. We'll encode them all.
- section = 0
- extra_chrome = charset + "''"
- while value:
- chrome_len = len(name) + len(str(section)) + 3 + len(extra_chrome)
- if maxlen <= chrome_len + 3:
- # We need room for the leading blank, the trailing semicolon,
- # and at least one character of the value. If we don't
- # have that, we'd be stuck, so in that case fall back to
- # the RFC standard width.
- maxlen = 78
- splitpoint = maxchars = maxlen - chrome_len - 2
- while True:
- partial = value[:splitpoint]
- encoded_value = urllib.parse.quote(
- partial, safe='', errors=error_handler)
- if len(encoded_value) <= maxchars:
- break
- splitpoint -= 1
- lines.append(" {}*{}*={}{}".format(
- name, section, extra_chrome, encoded_value))
- extra_chrome = ''
- section += 1
- value = value[splitpoint:]
- if value:
- lines[-1] += ';'
|