gnumed-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gnumed-devel] simple HL7 parser


From: Ian Haywood
Subject: [Gnumed-devel] simple HL7 parser
Date: Sat, 17 Sep 2005 22:19:39 +1000
User-agent: Mozilla Thunderbird 1.0.6 (X11/20050912)

This is a very simple HL7 parser in Python.
This is to demonstrate one possible way to represent
the relationship between various segments: by a hierarchy of python 
dictionaries and
lists, which can then be easily accessed in python and then transformed to YAML 
(and maybe
some other markup I suppose)

It works by a "rules file" which maps the segment and field number to a path in 
the
hierarchy. The paths have one or two asterisks, these are the "pivot points" 
where
subsequent values on the same path are inserted into a python list. The first 
one for
repeating segments, the second for repeating fields.

I enclose a heavily simplified HL7 path report based on common blood tests,
and a corresponding rules file.

The script, when run, is a HL7-to-YAML filter, the name of rules file as a 
command-line parameter.

Ian
# an experimental HL7 parser

import datetime, ydump

def set (tree, path, mode, value):
    """
    @param tree: the logical parse tree
    @param path: the path
    @param: mode: 0= search first pivot, 1=search second pivot, 2= use first 
pivot point, 3= use second pivot
    @param value: the value to set to
    """
    in_tree = tree
    treelist = path.split ('/')
    for i in treelist[:-1]:
        if i:
            if i[0] == '*':
                i = i[1:]
                if mode == 2:
                    in_tree[i].append ({})
                if mode == 3:
                    mode = 2
            try:
                in_tree = in_tree[i]
            except KeyError:
                in_tree[i] = [{}]
                in_tree = in_tree[i]
            in_tree = in_tree[-1]
    i = treelist[-1]
    if i[0] == '*':
        i = i[1:]
        if mode == 2:
            if type (in_tree[i]) != types.ListType:
                in_tree[i] = [in_tree[i]]
            in_tree[i].append (value)
    else:
        if in_tree.has_key (i):
            set (tree, path, mode+2, value)
        else:
            in_tree[i] = value


def parse_rules (rulefile):
    """
    Read in the rules file.
    Format is
    <segment> <field> [<component> <subcomponent>]] type path

    types supported so far are ST 9string), NI (numeric) IN (integer) DT (date) 
TS (timestamp) TI (time)
    """
    ret = []
    for i in rulefile.xreadlines ():
         i = i.split ()
         segment = i[0]
         field = int (i[1])
         typ = i[-2]
         path = i[-1]
         if len (i) == 6:
             subcomponent = int (i[3])
         else:
             subcomponent = 0
         if len (i) > 4:
             component = int (i[2])
         else:
             component = 0
         ret.append ((segment, field, component, subcomponent, typ, path))
    return ret

def parse_hl7 (message, rulefile):
    """
    @param message the HL7 message as a string
    @param rulefile a Python file object containin the rulefile

    @return an alternating hierarchy of dictionaries and lists representing the
    structure of the HL7 message
    """
    tree = {}
    rules = parse_rules (rulefile)
    global fieldsep
    global repsep
    global compsep 
    global subcompsep
    global escsep
    subcompep = '&'
    escsep = '\\'
    for segment in message.split ('\n'):
        segname = segment[:3]
        if segname in ['MSG', 'FHS', 'BHS']:
            fieldsep = segment[3]
        fields = segment.split (fieldsep)
        if segname in ['MSG', 'FHS', 'BHS']:
            repsep = fields[1][0]
            compsep = fields[1][1]
            if len (fields[1]) > 2:
                subcompsep = fields[1][2]
            if len (fields[1]) > 3:
                escsep = fields[1][3]
        for i in xrange (1, len (fields)):
            j = fields[i].split (repsep)
            parse_field (rules, tree, segname, i, 0, j[0])
            if len (j) > 1:
                for k in j[1:]:
                    parse_field (rules, tree, segname, i, 1, k)
    return tree

def parse_field (rules, tree, segname, fieldno, pivot, f):
    f = f.split (compsep)
    for i in range (0, len (f)):
        j = f[i].split (subcompsep)
        for k in range (0, len (j)):
            for segment, field, component, subcomponent, typ, path in rules:
                if segment == segname and field == fieldno and component == i 
and subcomponent == k:
                    set (tree, path, pivot, globals ()['parse_%s' % typ] (j[k]))

def parse_ST (hl7):
    """
    This produces plain-text output, not all control sequences are parsed
    """
    hl7.replace (escsep + 'F' + escsep, fieldsep)
    hl7.replace (escsep + 'S' + escsep, compsep)
    hl7.replace (escsep + 'T' + escsep, subcompsep)
    hl7.replace (escsep + 'R' + escsep, repsep)
    hl7.replace (escsep + '.br' + escsep, '\n')
    hl7.replace (escsep + '.sp' + escsep, '\r')
    hl7.replace (escsep + 'E' + escsep, escsep)
    return hl7
# FIXME: we need to support HTML output here

def parse_TI (hl7):
    s = hl7.split ('+')
    if len (s) > 1:
        tz = datetime.time (-int (s[1][0:1], -int (s[1][2:3]))) # hours ahead 
of GMT, so we must subract to get to GMT
    else:
        s = s[0].split('-')
        if len (s) > 1:
            tz = datetime.timedelta (hours=int (s[1][0:1], minutes=int 
(s[1][2:3])))
        else:
            tz = datetime.timedelta (0) # no timezone provided, we have little 
choice but to presume GMT
    hl7 = s[0]
    hour = int (hl7[0:1])
    if len (hl7) >= 4:
        minute = int (hl7[2:3])
        if len (hl7) > 4:
            second = float (hl7[4:])
    return datetime.time (hour, minute, second) + tz

def parse_DT (hl7):
    year = int (hl7[0:3])
    if len (hl7) >= 6:
        month = int (hl7[4:5])
        if len (hl7) == 8:
            day = int (hl7[6:7])
        else:
            day = 1
    else:
        day = 1
        month = 1
    return datetime.date (year, month, day)

def parse_TS (hl7):
    if len (hl7) > 8:
        return datetime.datetime.combine (parse_DT (self, hl7[0:7]),parse_TI 
(self, hl7[8:]))
    else:
        return parse_DT (self, hl7)

def parse_NI (hl7):
    return float (hl7)

def parse_IN (hl7):
    return int (hl7)


if __name__ == '__main__':
    # we run as a UNIX filter
    # example command: python hl7.py hl7.rules < example.hl7 | more
    import sys
    ydump.dumpToFile (sys.stdout, parse_hl7 (sys.stdin.read (), file 
(sys.argv[1])))
        
MSG 2           ST /*message/title
MSG 3           ST /*message/origin
PID 1           ST /message/*patient/name
PID 2 0         ST /message/*patient/*address/street
PID 2 1         ST /message/*patient/*address/town
OBR 1           ST /message/patient/*result/type
OBR 2           ST /message/patient/*result/status
OBX 2           NI /message/patient/result/*values/number
OBX 1           ST /message/patient/result/*values/code
MSG|~^&\|Example Message|Pathology
PID|Ian Haywood|1 Example St^Anytown~2 Example St^Somwhere Else|
OBR|Urea and Electrolytes|Normal
OBX|Na|145
OBX|K|3.6
OBX|Urea|24.5
OBX|Creatinine|0.07
PID|Cilla Haywood|1 Example St^Anytown
OBR|Full Blood Examination|Normal
OBX|Hb|12.2
OBX|WCC|9.6
OBX|Platelet|123
OBR|Strongyloides Serum Antibody|Abnormal
OBX|Strongyloides titre|128

Attachment: signature.asc
Description: OpenPGP digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]