Parsing An Industrial XML File

Attached is code to parse an XML files that we will soon receive from our Automated Meter Reading (AMR) vendor. We bill our water accounts from daily reads, which we receive seven days a week. Hourly reads will assist our water department with helping customers figure out water usage. In other words, hourly reads serve as diagnostic information to see when water was used during a twenty-four hour period.

Each of the search tags in the Python program, which I believe are XML leaves, are the actual parts of the XML file. Adding these to the dictionary this way ties the XML file to the dictionary, which in this case I believe is a good thing, if someone looks at the XML file all on its own.

This example uses lxml, which from what I can interpret out in the Python community is the XML parsing library of choice.

"""
Pulls a name space away from an element. This is for easier searching.
"""

def parseHrNs(qname):
 try:
 namespace, element_name = re.search('^{(.+)}(.+)$', qname).groups()

 except:
 namespace = None
 element_name = qname

 return namespace, element_name

def parseHrXmlDoc(fnam):

 rc = 0
 se_read = []

 current_ch = 0
 current_endpoint = 0

 ert_ch_text_key = ' '

 hrDict = {}

 try:
 context = etree.iterparse(fnam)

 except:
 context = None
 rc = -1

 if 0 == rc:
 nspace = None

 """
 These searches are roughly in order of appearance in the XML document.

 Channel number found after ID.

 ID is the place to write the last data before assigning new ID.

 Check on se_read[] provides initialization step, so we don't assign rubbish at the
 beginning.
 """

 for action, elem in context:
 nspace, search_tag = parseHrNs(elem.tag)

 #print("%s: %s" % (search_tag, elem.text))

 if 'exportGuid' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'exportDateTime' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'collectionSystemType' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'collectionSystemID' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'startTime' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'endTime' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'intervalLengthInSeconds' == search_tag:
 hrDict[search_tag] = elem.text

 elif 'ID' == search_tag:

 """
 There really are null elements, so perform a continue.
 """

 if not elem.text:
 continue
 else:
 """
 This is an initialization step.
 se_read is an empty list on initialization, so
 just assign the current endpoint if se_read is an empty list. That
 should only happen once.

 After initialization, basically, this is already visted logic.
 We've already seen and cached the ID in current_endpoint and
 the channel in current_ch. Before assigning the new ID,
 assign what we have in the dictionary. The fact we're here
 means we've seen a new ID, so the prior data must be put in dictionary.
 """

 if se_read:

 """
 Endpoint ids are not unique for the dictionary if they are dual-port, so
 we need to create a unique dual key based on the channel number.

 current_ch was already saved, and look below. We have not taken
 elem.text's value yet. We're still going on already cached data, which
 is our intent. See explanation above.
 """

 ert_ch_text_key = str(current_endpoint) + '-' + str(current_ch)

 hrDict[ert_ch_text_key] = se_read

 se_read = []

 current_endpoint = elem.text

 elif 'channelID' == search_tag:
 """
 Channel must be preserved, even though we've already seen ID.
 current_ch will be used before next ID assigned.
 """

 current_ch = elem.text

 elif 'value' == search_tag:
 se_read.append(elem.text)

 return hrDict

Advertisements


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s