Parsing An Industrial XML File
Posted: March 2, 2011 | Author: drknucklehead | Filed under: Automated Meter Reading, Python, Web-Related | Tags: lxml, Python, XML | Leave a commentAttached is code to parse an XML files that we will soon receive from our Automated Meter Reading (AMR) vendor. We bill our water accounts from daily reads, which we receive seven days a week. Hourly reads will assist our water department with helping customers figure out water usage. In other words, hourly reads serve as diagnostic information to see when water was used during a twenty-four hour period.
Each of the search tags in the Python program, which I believe are XML leaves, are the actual parts of the XML file. Adding these to the dictionary this way ties the XML file to the dictionary, which in this case I believe is a good thing, if someone looks at the XML file all on its own.
This example uses lxml, which from what I can interpret out in the Python community is the XML parsing library of choice.
""" Pulls a name space away from an element. This is for easier searching. """ def parseHrNs(qname): try: namespace, element_name = re.search('^{(.+)}(.+)$', qname).groups() except: namespace = None element_name = qname return namespace, element_name def parseHrXmlDoc(fnam): rc = 0 se_read = [] current_ch = 0 current_endpoint = 0 ert_ch_text_key = ' ' hrDict = {} try: context = etree.iterparse(fnam) except: context = None rc = -1 if 0 == rc: nspace = None """ These searches are roughly in order of appearance in the XML document. Channel number found after ID. ID is the place to write the last data before assigning new ID. Check on se_read[] provides initialization step, so we don't assign rubbish at the beginning. """ for action, elem in context: nspace, search_tag = parseHrNs(elem.tag) #print("%s: %s" % (search_tag, elem.text)) if 'exportGuid' == search_tag: hrDict[search_tag] = elem.text elif 'exportDateTime' == search_tag: hrDict[search_tag] = elem.text elif 'collectionSystemType' == search_tag: hrDict[search_tag] = elem.text elif 'collectionSystemID' == search_tag: hrDict[search_tag] = elem.text elif 'startTime' == search_tag: hrDict[search_tag] = elem.text elif 'endTime' == search_tag: hrDict[search_tag] = elem.text elif 'intervalLengthInSeconds' == search_tag: hrDict[search_tag] = elem.text elif 'ID' == search_tag: """ There really are null elements, so perform a continue. """ if not elem.text: continue else: """ This is an initialization step. se_read is an empty list on initialization, so just assign the current endpoint if se_read is an empty list. That should only happen once. After initialization, basically, this is already visted logic. We've already seen and cached the ID in current_endpoint and the channel in current_ch. Before assigning the new ID, assign what we have in the dictionary. The fact we're here means we've seen a new ID, so the prior data must be put in dictionary. """ if se_read: """ Endpoint ids are not unique for the dictionary if they are dual-port, so we need to create a unique dual key based on the channel number. current_ch was already saved, and look below. We have not taken elem.text's value yet. We're still going on already cached data, which is our intent. See explanation above. """ ert_ch_text_key = str(current_endpoint) + '-' + str(current_ch) hrDict[ert_ch_text_key] = se_read se_read = [] current_endpoint = elem.text elif 'channelID' == search_tag: """ Channel must be preserved, even though we've already seen ID. current_ch will be used before next ID assigned. """ current_ch = elem.text elif 'value' == search_tag: se_read.append(elem.text) return hrDict