123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- import xml.sax
- import xml.sax.handler
- START_ELEMENT = "START_ELEMENT"
- END_ELEMENT = "END_ELEMENT"
- COMMENT = "COMMENT"
- START_DOCUMENT = "START_DOCUMENT"
- END_DOCUMENT = "END_DOCUMENT"
- PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
- IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
- CHARACTERS = "CHARACTERS"
- class PullDOM(xml.sax.ContentHandler):
- _locator = None
- document = None
- def __init__(self, documentFactory=None):
- from xml.dom import XML_NAMESPACE
- self.documentFactory = documentFactory
- self.firstEvent = [None, None]
- self.lastEvent = self.firstEvent
- self.elementStack = []
- self.push = self.elementStack.append
- try:
- self.pop = self.elementStack.pop
- except AttributeError:
- # use class' pop instead
- pass
- self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
- self._current_context = self._ns_contexts[-1]
- self.pending_events = []
- def pop(self):
- result = self.elementStack[-1]
- del self.elementStack[-1]
- return result
- def setDocumentLocator(self, locator):
- self._locator = locator
- def startPrefixMapping(self, prefix, uri):
- if not hasattr(self, '_xmlns_attrs'):
- self._xmlns_attrs = []
- self._xmlns_attrs.append((prefix or 'xmlns', uri))
- self._ns_contexts.append(self._current_context.copy())
- self._current_context[uri] = prefix or None
- def endPrefixMapping(self, prefix):
- self._current_context = self._ns_contexts.pop()
- def startElementNS(self, name, tagName , attrs):
- # Retrieve xml namespace declaration attributes.
- xmlns_uri = 'http://www.w3.org/2000/xmlns/'
- xmlns_attrs = getattr(self, '_xmlns_attrs', None)
- if xmlns_attrs is not None:
- for aname, value in xmlns_attrs:
- attrs._attrs[(xmlns_uri, aname)] = value
- self._xmlns_attrs = []
- uri, localname = name
- if uri:
- # When using namespaces, the reader may or may not
- # provide us with the original name. If not, create
- # *a* valid tagName from the current context.
- if tagName is None:
- prefix = self._current_context[uri]
- if prefix:
- tagName = prefix + ":" + localname
- else:
- tagName = localname
- if self.document:
- node = self.document.createElementNS(uri, tagName)
- else:
- node = self.buildDocument(uri, tagName)
- else:
- # When the tagname is not prefixed, it just appears as
- # localname
- if self.document:
- node = self.document.createElement(localname)
- else:
- node = self.buildDocument(None, localname)
- for aname,value in attrs.items():
- a_uri, a_localname = aname
- if a_uri == xmlns_uri:
- if a_localname == 'xmlns':
- qname = a_localname
- else:
- qname = 'xmlns:' + a_localname
- attr = self.document.createAttributeNS(a_uri, qname)
- node.setAttributeNodeNS(attr)
- elif a_uri:
- prefix = self._current_context[a_uri]
- if prefix:
- qname = prefix + ":" + a_localname
- else:
- qname = a_localname
- attr = self.document.createAttributeNS(a_uri, qname)
- node.setAttributeNodeNS(attr)
- else:
- attr = self.document.createAttribute(a_localname)
- node.setAttributeNode(attr)
- attr.value = value
- self.lastEvent[1] = [(START_ELEMENT, node), None]
- self.lastEvent = self.lastEvent[1]
- self.push(node)
- def endElementNS(self, name, tagName):
- self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
- self.lastEvent = self.lastEvent[1]
- def startElement(self, name, attrs):
- if self.document:
- node = self.document.createElement(name)
- else:
- node = self.buildDocument(None, name)
- for aname,value in attrs.items():
- attr = self.document.createAttribute(aname)
- attr.value = value
- node.setAttributeNode(attr)
- self.lastEvent[1] = [(START_ELEMENT, node), None]
- self.lastEvent = self.lastEvent[1]
- self.push(node)
- def endElement(self, name):
- self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
- self.lastEvent = self.lastEvent[1]
- def comment(self, s):
- if self.document:
- node = self.document.createComment(s)
- self.lastEvent[1] = [(COMMENT, node), None]
- self.lastEvent = self.lastEvent[1]
- else:
- event = [(COMMENT, s), None]
- self.pending_events.append(event)
- def processingInstruction(self, target, data):
- if self.document:
- node = self.document.createProcessingInstruction(target, data)
- self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
- self.lastEvent = self.lastEvent[1]
- else:
- event = [(PROCESSING_INSTRUCTION, target, data), None]
- self.pending_events.append(event)
- def ignorableWhitespace(self, chars):
- node = self.document.createTextNode(chars)
- self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
- self.lastEvent = self.lastEvent[1]
- def characters(self, chars):
- node = self.document.createTextNode(chars)
- self.lastEvent[1] = [(CHARACTERS, node), None]
- self.lastEvent = self.lastEvent[1]
- def startDocument(self):
- if self.documentFactory is None:
- import xml.dom.minidom
- self.documentFactory = xml.dom.minidom.Document.implementation
- def buildDocument(self, uri, tagname):
- # Can't do that in startDocument, since we need the tagname
- # XXX: obtain DocumentType
- node = self.documentFactory.createDocument(uri, tagname, None)
- self.document = node
- self.lastEvent[1] = [(START_DOCUMENT, node), None]
- self.lastEvent = self.lastEvent[1]
- self.push(node)
- # Put everything we have seen so far into the document
- for e in self.pending_events:
- if e[0][0] == PROCESSING_INSTRUCTION:
- _,target,data = e[0]
- n = self.document.createProcessingInstruction(target, data)
- e[0] = (PROCESSING_INSTRUCTION, n)
- elif e[0][0] == COMMENT:
- n = self.document.createComment(e[0][1])
- e[0] = (COMMENT, n)
- else:
- raise AssertionError("Unknown pending event ",e[0][0])
- self.lastEvent[1] = e
- self.lastEvent = e
- self.pending_events = None
- return node.firstChild
- def endDocument(self):
- self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
- self.pop()
- def clear(self):
- "clear(): Explicitly release parsing structures"
- self.document = None
- class ErrorHandler:
- def warning(self, exception):
- print(exception)
- def error(self, exception):
- raise exception
- def fatalError(self, exception):
- raise exception
- class DOMEventStream:
- def __init__(self, stream, parser, bufsize):
- self.stream = stream
- self.parser = parser
- self.bufsize = bufsize
- if not hasattr(self.parser, 'feed'):
- self.getEvent = self._slurp
- self.reset()
- def reset(self):
- self.pulldom = PullDOM()
- # This content handler relies on namespace support
- self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
- self.parser.setContentHandler(self.pulldom)
- def __next__(self):
- rc = self.getEvent()
- if rc:
- return rc
- raise StopIteration
- def __iter__(self):
- return self
- def expandNode(self, node):
- event = self.getEvent()
- parents = [node]
- while event:
- token, cur_node = event
- if cur_node is node:
- return
- if token != END_ELEMENT:
- parents[-1].appendChild(cur_node)
- if token == START_ELEMENT:
- parents.append(cur_node)
- elif token == END_ELEMENT:
- del parents[-1]
- event = self.getEvent()
- def getEvent(self):
- # use IncrementalParser interface, so we get the desired
- # pull effect
- if not self.pulldom.firstEvent[1]:
- self.pulldom.lastEvent = self.pulldom.firstEvent
- while not self.pulldom.firstEvent[1]:
- buf = self.stream.read(self.bufsize)
- if not buf:
- self.parser.close()
- return None
- self.parser.feed(buf)
- rc = self.pulldom.firstEvent[1][0]
- self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
- return rc
- def _slurp(self):
- """ Fallback replacement for getEvent() using the
- standard SAX2 interface, which means we slurp the
- SAX events into memory (no performance gain, but
- we are compatible to all SAX parsers).
- """
- self.parser.parse(self.stream)
- self.getEvent = self._emit
- return self._emit()
- def _emit(self):
- """ Fallback replacement for getEvent() that emits
- the events that _slurp() read previously.
- """
- rc = self.pulldom.firstEvent[1][0]
- self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
- return rc
- def clear(self):
- """clear(): Explicitly release parsing objects"""
- self.pulldom.clear()
- del self.pulldom
- self.parser = None
- self.stream = None
- class SAX2DOM(PullDOM):
- def startElementNS(self, name, tagName , attrs):
- PullDOM.startElementNS(self, name, tagName, attrs)
- curNode = self.elementStack[-1]
- parentNode = self.elementStack[-2]
- parentNode.appendChild(curNode)
- def startElement(self, name, attrs):
- PullDOM.startElement(self, name, attrs)
- curNode = self.elementStack[-1]
- parentNode = self.elementStack[-2]
- parentNode.appendChild(curNode)
- def processingInstruction(self, target, data):
- PullDOM.processingInstruction(self, target, data)
- node = self.lastEvent[0][1]
- parentNode = self.elementStack[-1]
- parentNode.appendChild(node)
- def ignorableWhitespace(self, chars):
- PullDOM.ignorableWhitespace(self, chars)
- node = self.lastEvent[0][1]
- parentNode = self.elementStack[-1]
- parentNode.appendChild(node)
- def characters(self, chars):
- PullDOM.characters(self, chars)
- node = self.lastEvent[0][1]
- parentNode = self.elementStack[-1]
- parentNode.appendChild(node)
- default_bufsize = (2 ** 14) - 20
- def parse(stream_or_string, parser=None, bufsize=None):
- if bufsize is None:
- bufsize = default_bufsize
- if isinstance(stream_or_string, str):
- stream = open(stream_or_string, 'rb')
- else:
- stream = stream_or_string
- if not parser:
- parser = xml.sax.make_parser()
- return DOMEventStream(stream, parser, bufsize)
- def parseString(string, parser=None):
- from io import StringIO
- bufsize = len(string)
- buf = StringIO(string)
- if not parser:
- parser = xml.sax.make_parser()
- return DOMEventStream(buf, parser, bufsize)
|