# -*- coding: utf-8 -*- """GNUmed patient objects. This is a patient object intended to let a useful client-side API crystallize from actual use in true XP fashion. """ #============================================================ __author__ = "K.Hilbert " __license__ = "GPL" # std lib import sys import os.path import time import re as regex import datetime as pyDT import codecs import thread import threading import logging import io from xml.etree import ElementTree as etree # GNUmed if __name__ == '__main__': logging.basicConfig(level = logging.DEBUG) sys.path.insert(0, '../../') from Gnumed.pycommon import gmExceptions from Gnumed.pycommon import gmDispatcher from Gnumed.pycommon import gmBorg from Gnumed.pycommon import gmI18N if __name__ == '__main__': gmI18N.activate_locale() gmI18N.install_domain() from Gnumed.pycommon import gmNull from Gnumed.pycommon import gmBusinessDBObject from Gnumed.pycommon import gmTools from Gnumed.pycommon import gmPG2 from Gnumed.pycommon import gmDateTime from Gnumed.pycommon import gmMatchProvider from Gnumed.pycommon import gmLog2 from Gnumed.pycommon import gmHooks from Gnumed.business import gmDemographicRecord from Gnumed.business import gmClinicalRecord from Gnumed.business import gmXdtMappings from Gnumed.business import gmProviderInbox from Gnumed.business import gmExportArea from Gnumed.business import gmBilling from Gnumed.business.gmDocuments import cDocumentFolder from Gnumed.business.gmChartPulling import tui_chart_puller _log = logging.getLogger('gm.person') __gender_list = None __gender_idx = None __gender2salutation_map = None __gender2string_map = None #============================================================ _MERGE_SCRIPT_HEADER = u"""-- GNUmed patient merge script -- created: %(date)s -- patient to keep : #%(pat2keep)s -- patient to merge: #%(pat2del)s -- -- You can EASILY cause mangled data by uncritically applying this script, so ... -- ... BE POSITIVELY SURE YOU UNDERSTAND THE FULL EXTENT OF WHAT IT DOES ! --set default_transaction_read_only to off; BEGIN; """ #============================================================ def external_id_exists(pk_issuer, value): cmd = u'SELECT COUNT(1) FROM dem.lnk_identity2ext_id WHERE fk_origin = %(issuer)s AND external_id = %(val)s' args = {'issuer': pk_issuer, 'val': value} rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) return rows[0][0] #============================================================ def person_exists(lastnames, dob, firstnames=None): args = { 'last': lastnames, 'dob': dob } where_parts = [ u"lastnames = %(last)s", u"dem.date_trunc_utc('day', dob) = dem.date_trunc_utc('day', %(dob)s)" ] if firstnames is not None: if firstnames.strip() != u'': #where_parts.append(u"position(%(first)s in firstnames) = 1") where_parts.append(u"firstnames ~* %(first)s") args['first'] = u'\\m' + firstnames cmd = u"""SELECT COUNT(1) FROM dem.v_basic_person WHERE %s""" % u' AND '.join(where_parts) rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) return rows[0][0] #============================================================ # FIXME: make this work as a mapping type, too class cDTO_person(object): def __init__(self): self.identity = None self.external_ids = [] self.comm_channels = [] self.addresses = [] self.firstnames = None self.lastnames = None self.title = None self.gender = None self.dob = None self.dob_is_estimated = False self.source = self.__class__.__name__ #-------------------------------------------------------- # external API #-------------------------------------------------------- def keys(self): return 'firstnames lastnames dob gender title'.split() #-------------------------------------------------------- def delete_from_source(self): pass #-------------------------------------------------------- def is_unique(self): where_snippets = [ u'firstnames = %(first)s', u'lastnames = %(last)s' ] args = { 'first': self.firstnames, 'last': self.lastnames } if self.dob is not None: where_snippets.append(u"dem.date_trunc_utc('day'::text, dob) = dem.date_trunc_utc('day'::text, %(dob)s)") args['dob'] = self.dob.replace(hour = 23, minute = 59, second = 59) if self.gender is not None: where_snippets.append('gender = %(sex)s') args['sex'] = self.gender cmd = u'SELECT count(1) FROM dem.v_person_names WHERE %s' % ' AND '.join(where_snippets) rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) return rows[0][0] == 1 is_unique = property(is_unique, lambda x:x) #-------------------------------------------------------- def exists(self): where_snippets = [ u'firstnames = %(first)s', u'lastnames = %(last)s' ] args = { 'first': self.firstnames, 'last': self.lastnames } if self.dob is not None: where_snippets.append(u"dem.date_trunc_utc('day'::text, dob) = dem.date_trunc_utc('day'::text, %(dob)s)") args['dob'] = self.dob.replace(hour = 23, minute = 59, second = 59) if self.gender is not None: where_snippets.append('gender = %(sex)s') args['sex'] = self.gender cmd = u'SELECT count(1) FROM dem.v_person_names WHERE %s' % ' AND '.join(where_snippets) rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) return rows[0][0] > 0 exists = property(exists, lambda x:x) #-------------------------------------------------------- def get_candidate_identities(self, can_create=False): """Generate generic queries. - not locale dependant - data -> firstnames, lastnames, dob, gender shall we mogrify name parts ? probably not as external sources should know what they do finds by inactive name, too, but then shows the corresponding active name ;-) Returns list of matching identities (may be empty) or None if it was told to create an identity but couldn't. """ where_snippets = [] args = {} where_snippets.append(u'lower(firstnames) = lower(%(first)s)') args['first'] = self.firstnames where_snippets.append(u'lower(lastnames) = lower(%(last)s)') args['last'] = self.lastnames if self.dob is not None: where_snippets.append(u"dem.date_trunc_utc('day'::text, dob) = dem.date_trunc_utc('day'::text, %(dob)s)") args['dob'] = self.dob.replace(hour = 23, minute = 59, second = 59) if self.gender is not None: where_snippets.append(u'lower(gender) = lower(%(sex)s)') args['sex'] = self.gender cmd = u""" SELECT *, '%s' AS match_type FROM dem.v_basic_person WHERE pk_identity IN ( SELECT pk_identity FROM dem.v_person_names WHERE %s ) ORDER BY lastnames, firstnames, dob""" % ( _('external patient source (name, gender, date of birth)'), ' AND '.join(where_snippets) ) try: rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx=True) except: _log.error(u'cannot get candidate identities for dto "%s"' % self) _log.exception('query %s' % cmd) rows = [] if len(rows) == 0: _log.debug('no candidate identity matches found') if not can_create: return [] ident = self.import_into_database() if ident is None: return None identities = [ident] else: identities = [ cIdentity(row = {'pk_field': 'pk_identity', 'data': row, 'idx': idx}) for row in rows ] return identities #-------------------------------------------------------- def import_into_database(self): """Imports self into the database.""" self.identity = create_identity ( firstnames = self.firstnames, lastnames = self.lastnames, gender = self.gender, dob = self.dob ) if self.identity is None: return None if self.dob_is_estimated: self.identity['dob_is_estimated'] = True if self.title is not None: self.identity['title'] = self.title self.identity.save() for ext_id in self.external_ids: try: self.identity.add_external_id ( type_name = ext_id['name'], value = ext_id['value'], issuer = ext_id['issuer'], comment = ext_id['comment'] ) except StandardError: _log.exception('cannot import from external data source') gmLog2.log_stack_trace() for comm in self.comm_channels: try: self.identity.link_comm_channel ( comm_medium = comm['channel'], url = comm['url'] ) except StandardError: _log.exception('cannot import from external data source') gmLog2.log_stack_trace() for adr in self.addresses: try: self.identity.link_address ( adr_type = adr['type'], number = adr['number'], subunit = adr['subunit'], street = adr['street'], postcode = adr['zip'], urb = adr['urb'], region_code = adr['region_code'], country_code = adr['country_code'] ) except StandardError: _log.exception('cannot import
from external data source') gmLog2.log_stack_trace() return self.identity #-------------------------------------------------------- def import_extra_data(self, *args, **kwargs): pass #-------------------------------------------------------- def remember_external_id(self, name=None, value=None, issuer=None, comment=None): value = value.strip() if value == u'': return name = name.strip() if name == u'': raise ValueError(_(' cannot be empty')) issuer = issuer.strip() if issuer == u'': raise ValueError(_(' cannot be empty')) self.external_ids.append({'name': name, 'value': value, 'issuer': issuer, 'comment': comment}) #-------------------------------------------------------- def remember_comm_channel(self, channel=None, url=None): url = url.strip() if url == u'': return channel = channel.strip() if channel == u'': raise ValueError(_(' cannot be empty')) self.comm_channels.append({'channel': channel, 'url': url}) #-------------------------------------------------------- def remember_address(self, number=None, street=None, urb=None, region_code=None, zip=None, country_code=None, adr_type=None, subunit=None): number = number.strip() if number == u'': raise ValueError(_(' cannot be empty')) street = street.strip() if street == u'': raise ValueError(_(' cannot be empty')) urb = urb.strip() if urb == u'': raise ValueError(_(' cannot be empty')) zip = zip.strip() if zip == u'': raise ValueError(_(' cannot be empty')) country_code = country_code.strip() if country_code == u'': raise ValueError(_(' cannot be empty')) if region_code is not None: region_code = region_code.strip() if region_code in [None, u'']: region_code = u'??' self.addresses.append ({ u'type': adr_type, u'number': number, u'subunit': subunit, u'street': street, u'zip': zip, u'urb': urb, u'region_code': region_code, u'country_code': country_code }) #-------------------------------------------------------- # customizing behaviour #-------------------------------------------------------- def __str__(self): return u'<%s (%s) @ %s: %s %s (%s) %s>' % ( self.__class__.__name__, self.source, id(self), self.firstnames, self.lastnames, self.gender, self.dob ) #-------------------------------------------------------- def __setattr__(self, attr, val): """Do some sanity checks on self.* access.""" if attr == 'gender': if val is None: object.__setattr__(self, attr, val) return glist, idx = get_gender_list() for gender in glist: if str(val) in [gender[0], gender[1], gender[2], gender[3]]: val = gender[idx['tag']] object.__setattr__(self, attr, val) return raise ValueError('invalid gender: [%s]' % val) if attr == 'dob': if val is not None: if not isinstance(val, pyDT.datetime): raise TypeError('invalid type for DOB (must be datetime.datetime): %s [%s]' % (type(val), val)) if val.tzinfo is None: raise ValueError('datetime.datetime instance is lacking a time zone: [%s]' % val.isoformat()) object.__setattr__(self, attr, val) return #-------------------------------------------------------- def __getitem__(self, attr): return getattr(self, attr) #============================================================ class cPersonName(gmBusinessDBObject.cBusinessDBObject): _cmd_fetch_payload = u"SELECT * FROM dem.v_person_names WHERE pk_name = %s" _cmds_store_payload = [ u"""UPDATE dem.names SET active = FALSE WHERE %(active_name)s IS TRUE -- act only when needed and only AND id_identity = %(pk_identity)s -- on names of this identity AND active IS TRUE -- which are active AND id != %(pk_name)s -- but NOT *this* name """, u"""update dem.names set active = %(active_name)s, preferred = %(preferred)s, comment = %(comment)s where id = %(pk_name)s and id_identity = %(pk_identity)s and -- belt and suspenders xmin = %(xmin_name)s""", u"""select xmin as xmin_name from dem.names where id = %(pk_name)s""" ] _updatable_fields = ['active_name', 'preferred', 'comment'] #-------------------------------------------------------- def __setitem__(self, attribute, value): if attribute == 'active_name': # cannot *directly* deactivate a name, only indirectly # by activating another one # FIXME: should be done at DB level if self._payload[self._idx['active_name']] is True: return gmBusinessDBObject.cBusinessDBObject.__setitem__(self, attribute, value) #-------------------------------------------------------- def _get_description(self): return '%(last)s, %(title)s %(first)s%(nick)s' % { 'last': self._payload[self._idx['lastnames']], 'title': gmTools.coalesce ( self._payload[self._idx['title']], map_gender2salutation(self._payload[self._idx['gender']]) ), 'first': self._payload[self._idx['firstnames']], 'nick': gmTools.coalesce(self._payload[self._idx['preferred']], u'', u" '%s'", u'%s') } description = property(_get_description, lambda x:x) #============================================================ class cIdentity(gmBusinessDBObject.cBusinessDBObject): _cmd_fetch_payload = u"SELECT * FROM dem.v_basic_person WHERE pk_identity = %s" _cmds_store_payload = [ u"""UPDATE dem.identity SET gender = %(gender)s, dob = %(dob)s, dob_is_estimated = %(dob_is_estimated)s, tob = %(tob)s, cob = gm.nullify_empty_string(%(cob)s), title = gm.nullify_empty_string(%(title)s), fk_marital_status = %(pk_marital_status)s, pupic = gm.nullify_empty_string(%(pupic)s), deceased = %(deceased)s, emergency_contact = gm.nullify_empty_string(%(emergency_contact)s), fk_emergency_contact = %(pk_emergency_contact)s, fk_primary_provider = %(pk_primary_provider)s, comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk_identity)s and xmin = %(xmin_identity)s RETURNING xmin AS xmin_identity""" ] _updatable_fields = [ "title", "dob", "tob", "cob", "gender", "pk_marital_status", "pupic", 'deceased', 'emergency_contact', 'pk_emergency_contact', 'pk_primary_provider', 'comment', 'dob_is_estimated' ] #-------------------------------------------------------- def _get_ID(self): return self._payload[self._idx['pk_identity']] def _set_ID(self, value): raise AttributeError('setting ID of identity is not allowed') ID = property(_get_ID, _set_ID) #-------------------------------------------------------- def __setitem__(self, attribute, value): if attribute == 'dob': if value is not None: if isinstance(value, pyDT.datetime): if value.tzinfo is None: raise ValueError('datetime.datetime instance is lacking a time zone: [%s]' % dt.isoformat()) else: raise TypeError, '[%s]: type [%s] (%s) invalid for attribute [dob], must be datetime.datetime or None' % (self.__class__.__name__, type(value), value) # compare DOB at seconds level if self._payload[self._idx['dob']] is not None: old_dob = gmDateTime.pydt_strftime ( self._payload[self._idx['dob']], format = '%Y %m %d %H %M %S', accuracy = gmDateTime.acc_seconds ) new_dob = gmDateTime.pydt_strftime ( value, format = '%Y %m %d %H %M %S', accuracy = gmDateTime.acc_seconds ) if new_dob == old_dob: return gmBusinessDBObject.cBusinessDBObject.__setitem__(self, attribute, value) #-------------------------------------------------------- def cleanup(self): pass #-------------------------------------------------------- def _get_is_patient(self): return identity_is_patient(self._payload[self._idx['pk_identity']]) def _set_is_patient(self, turn_into_patient): if turn_into_patient: return turn_identity_into_patient(self._payload[self._idx['pk_identity']]) return False is_patient = property(_get_is_patient, _set_is_patient) #-------------------------------------------------------- def _get_as_patient(self): return cPatient(self._payload[self._idx['pk_identity']]) as_patient = property(_get_as_patient, lambda x:x) #-------------------------------------------------------- def _get_staff_id(self): cmd = u"SELECT pk FROM dem.staff WHERE fk_identity = %(pk)s" args = {'pk': self._payload[self._idx['pk_identity']]} rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) if len(rows) == 0: return None return rows[0][0] staff_id = property(_get_staff_id, lambda x:x) #-------------------------------------------------------- # identity API #-------------------------------------------------------- def _get_gender_symbol(self): return map_gender2symbol[self._payload[self._idx[u'gender']]] gender_symbol = property(_get_gender_symbol, lambda x:x) #-------------------------------------------------------- def _get_gender_string(self): return map_gender2string(gender = self._payload[self._idx['gender']]) gender_string = property(_get_gender_string, lambda x:x) #-------------------------------------------------------- def _get_gender_list(self): gender_list, tmp = get_gender_list() return gender_list gender_list = property(_get_gender_list, lambda x:x) #-------------------------------------------------------- def get_active_name(self): names = self.get_names(active_only = True) if len(names) == 0: _log.error('cannot retrieve active name for patient [%s]', self._payload[self._idx['pk_identity']]) return None return names[0] active_name = property(get_active_name, lambda x:x) #-------------------------------------------------------- def get_names(self, active_only=False, exclude_active=False): args = {'pk_pat': self._payload[self._idx['pk_identity']]} where_parts = [u'pk_identity = %(pk_pat)s'] if active_only: where_parts.append(u'active_name is True') if exclude_active: where_parts.append(u'active_name is False') cmd = u""" SELECT * FROM dem.v_person_names WHERE %s ORDER BY active_name DESC, lastnames, firstnames """ % u' AND '.join(where_parts) rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True) if len(rows) == 0: # no names registered for patient return [] names = [ cPersonName(row = {'idx': idx, 'data': r, 'pk_field': 'pk_name'}) for r in rows ] return names #-------------------------------------------------------- def get_description_gender(self): return _(u'%(last)s,%(title)s %(first)s%(nick)s (%(sex)s)') % { 'last': self._payload[self._idx['lastnames']], 'title': gmTools.coalesce(self._payload[self._idx['title']], u'', u' %s'), 'first': self._payload[self._idx['firstnames']], 'nick': gmTools.coalesce(self._payload[self._idx['preferred']], u'', u" '%s'"), 'sex': self.gender_symbol } #-------------------------------------------------------- def get_description(self): return _(u'%(last)s,%(title)s %(first)s%(nick)s') % { 'last': self._payload[self._idx['lastnames']], 'title': gmTools.coalesce(self._payload[self._idx['title']], u'', u' %s'), 'first': self._payload[self._idx['firstnames']], 'nick': gmTools.coalesce(self._payload[self._idx['preferred']], u'', u" '%s'") } #-------------------------------------------------------- def add_name(self, firstnames, lastnames, active=True): """Add a name. @param firstnames The first names. @param lastnames The last names. @param active When True, the new name will become the active one (hence setting other names to inactive) @type active A types.BooleanType instance """ name = create_name(self.ID, firstnames, lastnames, active) if active: self.refetch_payload() return name #-------------------------------------------------------- def delete_name(self, name=None): cmd = u"delete from dem.names where id = %(name)s and id_identity = %(pat)s" args = {'name': name['pk_name'], 'pat': self.ID} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) # can't have been the active name as that would raise an # exception (since no active name would be left) so no # data refetch needed #-------------------------------------------------------- def set_nickname(self, nickname=None): """ Set the nickname. Setting the nickname only makes sense for the currently active name. @param nickname The preferred/nick/warrior name to set. """ if self._payload[self._idx['preferred']] == nickname: return True rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': u"SELECT dem.set_nickname(%s, %s)", 'args': [self.ID, nickname]}]) # setting nickname doesn't change dem.identity, so other fields # of dem.v_basic_person do not get changed as a consequence of # setting the nickname, hence locally setting nickname matches # in-database reality self._payload[self._idx['preferred']] = nickname #self.refetch_payload() return True #-------------------------------------------------------- def get_tags(self, order_by=None): if order_by is None: order_by = u'' else: order_by = u'ORDER BY %s' % order_by cmd = gmDemographicRecord._SQL_get_identity_tags % (u'pk_identity = %%(pat)s %s' % order_by) rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {u'pat': self.ID}}], get_col_idx = True) return [ gmDemographicRecord.cIdentityTag(row = {'data': r, 'idx': idx, 'pk_field': 'pk_identity_tag'}) for r in rows ] tags = property(get_tags, lambda x:x) #-------------------------------------------------------- def add_tag(self, tag): args = { u'tag': tag, u'identity': self.ID } # already exists ? cmd = u"SELECT pk FROM dem.identity_tag WHERE fk_tag = %(tag)s AND fk_identity = %(identity)s" rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) if len(rows) > 0: return gmDemographicRecord.cIdentityTag(aPK_obj = rows[0]['pk']) # no, add cmd = u""" INSERT INTO dem.identity_tag ( fk_tag, fk_identity ) VALUES ( %(tag)s, %(identity)s ) RETURNING pk """ rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True, get_col_idx = False) return gmDemographicRecord.cIdentityTag(aPK_obj = rows[0]['pk']) #-------------------------------------------------------- def remove_tag(self, tag): cmd = u"DELETE FROM dem.identity_tag WHERE pk = %(pk)s" gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': {'pk': tag}}]) #-------------------------------------------------------- # external ID API # # since external IDs are not treated as first class # citizens (classes in their own right, that is), we # handle them *entirely* within cIdentity, also they # only make sense with one single person (like names) # and are not reused (like addresses), so they are # truly added/deleted, not just linked/unlinked #-------------------------------------------------------- def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None): """Adds an external ID to the patient. creates ID type if necessary """ # check for existing ID if pk_type is not None: cmd = u""" select * from dem.v_external_ids4identity where pk_identity = %(pat)s and pk_type = %(pk_type)s and value = %(val)s""" else: # by type/value/issuer if issuer is None: cmd = u""" select * from dem.v_external_ids4identity where pk_identity = %(pat)s and name = %(name)s and value = %(val)s""" else: cmd = u""" select * from dem.v_external_ids4identity where pk_identity = %(pat)s and name = %(name)s and value = %(val)s and issuer = %(issuer)s""" args = { 'pat': self.ID, 'name': type_name, 'val': value, 'issuer': issuer, 'pk_type': pk_type } rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) # create new ID if not found if len(rows) == 0: args = { 'pat': self.ID, 'val': value, 'type_name': type_name, 'pk_type': pk_type, 'issuer': issuer, 'comment': comment } if pk_type is None: cmd = u"""insert into dem.lnk_identity2ext_id (external_id, fk_origin, comment, id_identity) values ( %(val)s, (select dem.add_external_id_type(%(type_name)s, %(issuer)s)), %(comment)s, %(pat)s )""" else: cmd = u"""insert into dem.lnk_identity2ext_id (external_id, fk_origin, comment, id_identity) values ( %(val)s, %(pk_type)s, %(comment)s, %(pat)s )""" rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) # or update comment of existing ID else: row = rows[0] if comment is not None: # comment not already there ? if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1: comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip) cmd = u"update dem.lnk_identity2ext_id set comment = %(comment)s where id=%(pk)s" args = {'comment': comment, 'pk': row['pk_id']} rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #-------------------------------------------------------- def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None): """Edits an existing external ID. Creates ID type if necessary. """ cmd = u""" UPDATE dem.lnk_identity2ext_id SET fk_origin = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)), external_id = %(value)s, comment = gm.nullify_empty_string(%(comment)s) WHERE id = %(pk)s """ args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment} rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #-------------------------------------------------------- def get_external_ids(self, id_type=None, issuer=None): where_parts = ['pk_identity = %(pat)s'] args = {'pat': self.ID} if id_type is not None: where_parts.append(u'name = %(name)s') args['name'] = id_type.strip() if issuer is not None: where_parts.append(u'issuer = %(issuer)s') args['issuer'] = issuer.strip() cmd = u"SELECT * FROM dem.v_external_ids4identity WHERE %s" % ' AND '.join(where_parts) rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return rows external_ids = property(get_external_ids, lambda x:x) #-------------------------------------------------------- def delete_external_id(self, pk_ext_id=None): cmd = u""" delete from dem.lnk_identity2ext_id where id_identity = %(pat)s and id = %(pk)s""" args = {'pat': self.ID, 'pk': pk_ext_id} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #-------------------------------------------------------- def assimilate_identity(self, other_identity=None, link_obj=None): """Merge another identity into this one. Keep this one. Delete other one.""" if other_identity.ID == self.ID: return True, None curr_pat = gmCurrentPatient() if curr_pat.connected: if other_identity.ID == curr_pat.ID: return False, _('Cannot merge active patient into another patient.') queries = [] args = {'pat2del': other_identity.ID, 'pat2keep': self.ID} # merge allergy state queries.append ({ 'cmd': u""" UPDATE clin.allergy_state SET has_allergy = greatest ( (SELECT has_allergy FROM clin.v_pat_allergy_state WHERE pk_patient = %(pat2del)s), (SELECT has_allergy FROM clin.v_pat_allergy_state WHERE pk_patient = %(pat2keep)s) ), -- perhaps use least() to play it safe and make it appear longer ago than it might have been, actually ? last_confirmed = greatest ( (SELECT last_confirmed FROM clin.v_pat_allergy_state WHERE pk_patient = %(pat2del)s), (SELECT last_confirmed FROM clin.v_pat_allergy_state WHERE pk_patient = %(pat2keep)s) ) WHERE pk = (SELECT pk_allergy_state FROM clin.v_pat_allergy_state WHERE pk_patient = %(pat2keep)s) """, 'args': args }) # delete old allergy state queries.append ({ 'cmd': u'delete from clin.allergy_state where pk = (select pk_allergy_state from clin.v_pat_allergy_state where pk_patient = %(pat2del)s)', 'args': args }) # merge patient proxy queries.append ({ 'cmd': u""" UPDATE clin.patient SET edc = coalesce ( edc, (SELECT edc FROM clin.patient WHERE fk_identity = %(pat2del)s) ) WHERE fk_identity = %(pat2keep)s """, 'args': args }) # transfer names # 1) disambiguate names in old pat queries.append ({ 'cmd': u""" UPDATE dem.names d_n1 SET lastnames = lastnames || ' (%s %s)' WHERE d_n1.id_identity = %%(pat2del)s AND EXISTS ( SELECT 1 FROM dem.names d_n2 WHERE d_n2.id_identity = %%(pat2keep)s AND d_n2.lastnames = d_n1.lastnames AND d_n2.firstnames = d_n1.firstnames )""" % (_('assimilated'), gmDateTime.pydt_strftime(gmDateTime.pydt_now_here())), 'args': args }) # 2) move inactive ones (but beware of dupes) queries.append ({ 'cmd': u""" UPDATE dem.names SET id_identity = %(pat2keep)s WHERE id_identity = %(pat2del)s AND active IS false""", 'args': args }) # 3) copy active ones queries.append ({ 'cmd': u""" INSERT INTO dem.names ( id_identity, active, lastnames, firstnames, preferred, comment ) SELECT %(pat2keep)s, false, lastnames, firstnames, preferred, comment FROM dem.names d_n WHERE d_n.id_identity = %(pat2del)s AND d_n.active IS true""", 'args': args }) # disambiguate potential dupes # - same-url comm channels queries.append ({ 'cmd': u""" UPDATE dem.lnk_identity2comm SET url = url || ' (%s %s)' WHERE fk_identity = %%(pat2del)s AND EXISTS ( SELECT 1 FROM dem.lnk_identity2comm d_li2c WHERE d_li2c.fk_identity = %%(pat2keep)s AND d_li2c.url = url ) """ % (_('merged'), gmDateTime.pydt_strftime(gmDateTime.pydt_now_here())), 'args': args }) # - same-value external IDs queries.append ({ 'cmd': u""" UPDATE dem.lnk_identity2ext_id SET external_id = external_id || ' (%s %s)' WHERE id_identity = %%(pat2del)s AND EXISTS ( SELECT 1 FROM dem.lnk_identity2ext_id d_li2e WHERE d_li2e.id_identity = %%(pat2keep)s AND d_li2e.external_id = external_id AND d_li2e.fk_origin = fk_origin ) """ % (_('merged'), gmDateTime.pydt_strftime(gmDateTime.pydt_now_here())), 'args': args }) # - same addresses queries.append ({ 'cmd': u""" DELETE FROM dem.lnk_person_org_address WHERE id_identity = %(pat2del)s AND id_address IN ( SELECT id_address FROM dem.lnk_person_org_address d_lpoa WHERE d_lpoa.id_identity = %(pat2keep)s ) """, 'args': args }) # find FKs pointing to dem.identity.pk FKs = gmPG2.get_foreign_keys2column ( schema = u'dem', table = u'identity', column = u'pk' ) # find FKs pointing to clin.patient.fk_identity FKs.extend (gmPG2.get_foreign_keys2column ( schema = u'clin', table = u'patient', column = u'fk_identity' )) # generate UPDATEs cmd_template = u'UPDATE %s SET %s = %%(pat2keep)s WHERE %s = %%(pat2del)s' for FK in FKs: if FK['referencing_table'] in [u'dem.names', u'clin.patient']: continue queries.append ({ 'cmd': cmd_template % (FK['referencing_table'], FK['referencing_column'], FK['referencing_column']), 'args': args }) # delete old patient proxy queries.append ({ 'cmd': u'DELETE FROM clin.patient WHERE fk_identity = %(pat2del)s', 'args': args }) # remove old identity entry queries.append ({ 'cmd': u'delete from dem.identity where pk = %(pat2del)s', 'args': args }) script_name = gmTools.get_unique_filename(prefix = u'gm-assimilate-%(pat2del)s-into-%(pat2keep)s-' % args, suffix = u'.sql') _log.warning('identity [%s] is about to assimilate identity [%s], SQL script [%s]', self.ID, other_identity.ID, script_name) script = io.open(script_name, 'wt') args['date'] = gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), '%Y %B %d %H:%M') script.write(_MERGE_SCRIPT_HEADER % args) for query in queries: script.write((query['cmd'].lstrip()) % args) script.write(u';\n') script.write(u'\nROLLBACK;\n') script.write(u'--COMMIT;\n') script.close() gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, end_tx = True) self.add_external_id ( type_name = u'merged GNUmed identity primary key', value = u'GNUmed::pk::%s' % other_identity.ID, issuer = u'GNUmed' ) return True, None #-------------------------------------------------------- #-------------------------------------------------------- def put_on_waiting_list(self, urgency=0, comment=None, zone=None): cmd = u""" insert into clin.waiting_list (fk_patient, urgency, comment, area, list_position) values ( %(pat)s, %(urg)s, %(cmt)s, %(area)s, (select coalesce((max(list_position) + 1), 1) from clin.waiting_list) )""" args = {'pat': self.ID, 'urg': urgency, 'cmt': comment, 'area': zone} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], verbose = True) #-------------------------------------------------------- def get_waiting_list_entry(self): cmd = u"""SELECT * FROM clin.v_waiting_list WHERE pk_identity = %(pat)s""" args = {'pat': self.ID} rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return rows waiting_list_entries = property(get_waiting_list_entry, lambda x:x) #-------------------------------------------------------- def _get_export_area(self): return gmExportArea.cExportArea(self.ID) export_area = property(_get_export_area, lambda x:x) #-------------------------------------------------------- def export_as_gdt(self, filename=None, encoding='iso-8859-15', external_id_type=None): template = u'%s%s%s\r\n' if filename is None: filename = gmTools.get_unique_filename ( prefix = u'gm-patient2gdt-', suffix = u'.gdt' ) gdt_file = codecs.open ( filename = filename, mode = 'wb', encoding = encoding, errors = 'strict' ) gdt_file.write(template % (u'013', u'8000', u'6301')) gdt_file.write(template % (u'013', u'9218', u'2.10')) if external_id_type is None: gdt_file.write(template % (u'%03d' % (9 + len(str(self.ID))), u'3000', self.ID)) else: ext_ids = self.get_external_ids(id_type = external_id_type) if len(ext_ids) > 0: gdt_file.write(template % (u'%03d' % (9 + len(ext_ids[0]['value'])), u'3000', ext_ids[0]['value'])) gdt_file.write(template % (u'%03d' % (9 + len(self._payload[self._idx['lastnames']])), u'3101', self._payload[self._idx['lastnames']])) gdt_file.write(template % (u'%03d' % (9 + len(self._payload[self._idx['firstnames']])), u'3102', self._payload[self._idx['firstnames']])) gdt_file.write(template % (u'%03d' % (9 + len(self._payload[self._idx['dob']].strftime('%d%m%Y'))), u'3103', self._payload[self._idx['dob']].strftime('%d%m%Y'))) gdt_file.write(template % (u'010', u'3110', gmXdtMappings.map_gender_gm2xdt[self._payload[self._idx['gender']]])) gdt_file.write(template % (u'025', u'6330', 'GNUmed::9206::encoding')) gdt_file.write(template % (u'%03d' % (9 + len(encoding)), u'6331', encoding)) if external_id_type is None: gdt_file.write(template % (u'029', u'6332', u'GNUmed::3000::source')) gdt_file.write(template % (u'017', u'6333', u'internal')) else: if len(ext_ids) > 0: gdt_file.write(template % (u'029', u'6332', u'GNUmed::3000::source')) gdt_file.write(template % (u'%03d' % (9 + len(external_id_type)), u'6333', external_id_type)) gdt_file.close() return filename #-------------------------------------------------------- def export_as_xml_linuxmednews(self, filename=None): if filename is None: filename = gmTools.get_unique_filename ( prefix = u'gm-LinuxMedNews_demographics-', suffix = u'.xml' ) dob_format = '%Y-%m-%d' pat = etree.Element(u'patient') first = etree.SubElement(pat, u'firstname') first.text = gmTools.coalesce(self._payload[self._idx['firstnames']], u'') last = etree.SubElement(pat, u'lastname') last.text = gmTools.coalesce(self._payload[self._idx['lastnames']], u'') middle = etree.SubElement(pat, u'middlename') middle.set(u'comment', _('preferred name/call name/...')) middle.text = gmTools.coalesce(self._payload[self._idx['preferred']], u'') pref = etree.SubElement(pat, u'name_prefix') pref.text = gmTools.coalesce(self._payload[self._idx['title']], u'') suff = etree.SubElement(pat, u'name_suffix') suff.text = u'' dob = etree.SubElement(pat, u'DOB') dob.set(u'format', dob_format) dob.text = gmDateTime.pydt_strftime(self._payload[self._idx['dob']], dob_format, encoding = 'utf8', accuracy = gmDateTime.acc_days, none_str = u'') gender = etree.SubElement(pat, u'gender') gender.set(u'comment', self.gender_string) if self._payload[self._idx['gender']] is None: gender.text = u'' else: gender.text = map_gender2mf[self._payload[self._idx['gender']]] home = etree.SubElement(pat, u'home_address') adrs = self.get_addresses(address_type = u'home') if len(adrs) > 0: adr = adrs[0] city = etree.SubElement(home, u'city') city.set(u'comment', gmTools.coalesce(adr['suburb'], u'')) city.text = gmTools.coalesce(adr['urb'], u'') state = etree.SubElement(home, u'state') state.set(u'comment', gmTools.coalesce(adr['l10n_state'], u'')) state.text = gmTools.coalesce(adr['code_state'], u'') zipcode = etree.SubElement(home, u'postal_code') zipcode.text = gmTools.coalesce(adr['postcode'], u'') street = etree.SubElement(home, u'street') street.set(u'comment', gmTools.coalesce(adr['notes_street'], u'')) street.text = gmTools.coalesce(adr['street'], u'') no = etree.SubElement(home, u'number') no.set(u'subunit', gmTools.coalesce(adr['subunit'], u'')) no.set(u'comment', gmTools.coalesce(adr['notes_subunit'], u'')) no.text = gmTools.coalesce(adr['number'], u'') country = etree.SubElement(home, u'country') country.set(u'comment', adr['l10n_country']) country.text = gmTools.coalesce(adr['code_country'], u'') phone = etree.SubElement(pat, u'home_phone') rec = self.get_comm_channels(comm_medium = u'homephone') if len(rec) > 0: if not rec[0]['is_confidential']: phone.set(u'comment', gmTools.coalesce(rec[0]['comment'], u'')) phone.text = rec[0]['url'] phone = etree.SubElement(pat, u'work_phone') rec = self.get_comm_channels(comm_medium = u'workphone') if len(rec) > 0: if not rec[0]['is_confidential']: phone.set(u'comment', gmTools.coalesce(rec[0]['comment'], u'')) phone.text = rec[0]['url'] phone = etree.SubElement(pat, u'cell_phone') rec = self.get_comm_channels(comm_medium = u'mobile') if len(rec) > 0: if not rec[0]['is_confidential']: phone.set(u'comment', gmTools.coalesce(rec[0]['comment'], u'')) phone.text = rec[0]['url'] tree = etree.ElementTree(pat) tree.write(filename, encoding = u'UTF-8') return filename #-------------------------------------------------------- def export_as_vcard(self, filename=None): # http://vobject.skyhouseconsulting.com/usage.html # http://en.wikipedia.org/wiki/VCard # http://svn.osafoundation.org/vobject/trunk/vobject/vcard.py # http://www.ietf.org/rfc/rfc2426.txt dob_format = '%Y%m%d' import vobject vc = vobject.vCard() vc.add(u'kind') vc.kind.value = u'individual' vc.add(u'fn') vc.fn.value = self.get_description() vc.add(u'n') vc.n.value = vobject.vcard.Name(family = self._payload[self._idx['lastnames']], given = self._payload[self._idx['firstnames']]) vc.add(u'nickname') vc.nickname.value = gmTools.coalesce(self._payload[self._idx['preferred']], u'') vc.add(u'title') vc.title.value = gmTools.coalesce(self._payload[self._idx['title']], u'') vc.add(u'gender') # FIXME: dont know how to add gender_string after ';' vc.gender.value = map_gender2vcard[self._payload[self._idx['gender']]]#, self.gender_string vc.add(u'bday') vc.bday.value = gmDateTime.pydt_strftime(self._payload[self._idx['dob']], dob_format, encoding = 'utf8', accuracy = gmDateTime.acc_days, none_str = u'') channels = self.get_comm_channels(comm_medium = u'homephone') if len(channels) > 0: if not channels[0]['is_confidential']: vc.add(u'tel') vc.tel.value = channels[0]['url'] vc.tel.type_param = u'HOME' channels = self.get_comm_channels(comm_medium = u'workphone') if len(channels) > 0: if not channels[0]['is_confidential']: vc.add(u'tel') vc.tel.value = channels[0]['url'] vc.tel.type_param = u'WORK' channels = self.get_comm_channels(comm_medium = u'mobile') if len(channels) > 0: if not channels[0]['is_confidential']: vc.add(u'tel') vc.tel.value = channels[0]['url'] vc.tel.type_param = u'CELL' channels = self.get_comm_channels(comm_medium = u'fax') if len(channels) > 0: if not channels[0]['is_confidential']: vc.add(u'tel') vc.tel.value = channels[0]['url'] vc.tel.type_param = u'FAX' channels = self.get_comm_channels(comm_medium = u'email') if len(channels) > 0: if not channels[0]['is_confidential']: vc.add(u'email') vc.tel.value = channels[0]['url'] vc.tel.type_param = u'INTERNET' channels = self.get_comm_channels(comm_medium = u'web') if len(channels) > 0: if not channels[0]['is_confidential']: vc.add(u'url') vc.tel.value = channels[0]['url'] vc.tel.type_param = u'INTERNET' adrs = self.get_addresses(address_type = u'home') if len(adrs) > 0: home_adr = adrs[0] vc.add(u'adr') vc.adr.type_param = u'HOME' vc.adr.value = vobject.vcard.Address() vc_adr = vc.adr.value vc_adr.extended = gmTools.coalesce(home_adr['subunit'], u'') vc_adr.street = gmTools.coalesce(home_adr['street'], u'', u'%s ') + gmTools.coalesce(home_adr['number'], u'') vc_adr.region = gmTools.coalesce(home_adr['l10n_state'], u'') vc_adr.code = gmTools.coalesce(home_adr['postcode'], u'') vc_adr.city = gmTools.coalesce(home_adr['urb'], u'') vc_adr.country = gmTools.coalesce(home_adr['l10n_country'], u'') #photo (base64) if filename is None: filename = gmTools.get_unique_filename ( prefix = u'gm-patient2vcard-', suffix = u'.vcf' ) vcf = codecs.open(filename, mode = 'wb', encoding = 'utf8') vcf.write(vc.serialize().decode('utf-8')) vcf.close() return filename #-------------------------------------------------------- # occupations API #-------------------------------------------------------- def get_occupations(self): return gmDemographicRecord.get_occupations(pk_identity = self.pk_obj) #-------------------------------------------------------- def link_occupation(self, occupation=None, activities=None): """Link an occupation with a patient, creating the occupation if it does not exists. @param occupation The name of the occupation to link the patient to. """ if (activities is None) and (occupation is None): return True occupation = occupation.strip() if len(occupation) == 0: return True if activities is not None: activities = activities.strip() args = {'act': activities, 'pat_id': self.pk_obj, 'job': occupation} cmd = u"select activities from dem.v_person_jobs where pk_identity = %(pat_id)s and l10n_occupation = _(%(job)s)" rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) queries = [] if len(rows) == 0: queries.append ({ 'cmd': u"INSERT INTO dem.lnk_job2person (fk_identity, fk_occupation, activities) VALUES (%(pat_id)s, dem.create_occupation(%(job)s), %(act)s)", 'args': args }) else: if rows[0]['activities'] != activities: queries.append ({ 'cmd': u"update dem.lnk_job2person set activities=%(act)s where fk_identity=%(pat_id)s and fk_occupation=(select id from dem.occupation where _(name) = _(%(job)s))", 'args': args }) rows, idx = gmPG2.run_rw_queries(queries = queries) return True #-------------------------------------------------------- def unlink_occupation(self, occupation=None): if occupation is None: return True occupation = occupation.strip() cmd = u"delete from dem.lnk_job2person where fk_identity=%(pk)s and fk_occupation in (select id from dem.occupation where _(name) = _(%(job)s))" rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': {'pk': self.pk_obj, 'job': occupation}}]) return True #-------------------------------------------------------- # comms API #-------------------------------------------------------- def get_comm_channels(self, comm_medium=None): cmd = u"select * from dem.v_person_comms where pk_identity = %s" rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': [self.pk_obj]}], get_col_idx = True) filtered = rows if comm_medium is not None: filtered = [] for row in rows: if row['comm_type'] == comm_medium: filtered.append(row) return [ gmDemographicRecord.cCommChannel(row = { 'pk_field': 'pk_lnk_identity2comm', 'data': r, 'idx': idx }) for r in filtered ] comm_channels = property(get_comm_channels, lambda x:x) #-------------------------------------------------------- def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None): """Link a communication medium with a patient. @param comm_medium The name of the communication medium. @param url The communication resource locator. @type url A types.StringType instance. @param is_confidential Wether the data must be treated as confidential. @type is_confidential A types.BooleanType instance. """ comm_channel = gmDemographicRecord.create_comm_channel ( comm_medium = comm_medium, url = url, is_confidential = is_confidential, pk_channel_type = pk_channel_type, pk_identity = self.pk_obj ) return comm_channel #-------------------------------------------------------- def unlink_comm_channel(self, comm_channel=None): gmDemographicRecord.delete_comm_channel ( pk = comm_channel['pk_lnk_identity2comm'], pk_patient = self.pk_obj ) #-------------------------------------------------------- # contacts API #-------------------------------------------------------- def get_addresses(self, address_type=None): cmd = u"SELECT * FROM dem.v_pat_addresses WHERE pk_identity = %(pat)s" args = {'pat': self.pk_obj} if address_type is not None: cmd = cmd + u" AND address_type = %(typ)s" args['typ'] = address_type rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True) return [ gmDemographicRecord.cPatientAddress(row = {'idx': idx, 'data': r, 'pk_field': 'pk_address'}) for r in rows ] #-------------------------------------------------------- def link_address(self, number=None, street=None, postcode=None, urb=None, region_code=None, country_code=None, subunit=None, suburb=None, id_type=None, address=None, adr_type=None): """Link an address with a patient, creating the address if it does not exists. @param id_type The primary key of the address type. """ if address is None: address = gmDemographicRecord.create_address ( country_code = country_code, region_code = region_code, urb = urb, suburb = suburb, postcode = postcode, street = street, number = number, subunit = subunit ) if address is None: return None # already linked ? cmd = u"SELECT id_address FROM dem.lnk_person_org_address WHERE id_identity = %(pat)s AND id_address = %(adr)s" args = {'pat': self.pk_obj, 'adr': address['pk_address']} rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) # no, link to person if len(rows) == 0: args = {'id': self.pk_obj, 'adr': address['pk_address'], 'type': id_type} cmd = u""" INSERT INTO dem.lnk_person_org_address(id_identity, id_address) VALUES (%(id)s, %(adr)s) RETURNING id_address""" rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True) linked_adr = gmDemographicRecord.cPatientAddress(aPK_obj = rows[0]['id_address']) # possibly change type if id_type is None: if adr_type is not None: id_type = gmDemographicRecord.create_address_type(address_type = adr_type) if id_type is not None: linked_adr['pk_address_type'] = id_type linked_adr.save() return linked_adr #---------------------------------------------------------------------- def unlink_address(self, address=None, pk_address=None): """Remove an address from the patient. The address itself stays in the database. The address can be either cAdress or cPatientAdress. """ if pk_address is None: args = {'person': self.pk_obj, 'adr': address['pk_address']} else: args = {'person': self.pk_obj, 'adr': pk_address} cmd = u""" DELETE FROM dem.lnk_person_org_address WHERE dem.lnk_person_org_address.id_identity = %(person)s AND dem.lnk_person_org_address.id_address = %(adr)s AND NOT EXISTS(SELECT 1 FROM bill.bill WHERE fk_receiver_address = dem.lnk_person_org_address.id) """ gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #---------------------------------------------------------------------- # bills API #---------------------------------------------------------------------- def get_bills(self, order_by=None, pk_patient=None): return gmBilling.get_bills ( order_by = order_by, pk_patient = self.pk_obj ) bills = property(get_bills, lambda x:x) #---------------------------------------------------------------------- # relatives API #---------------------------------------------------------------------- def get_relatives(self): cmd = u""" select t.description, vbp.pk_identity as id, title, firstnames, lastnames, dob, cob, gender, pupic, pk_marital_status, marital_status, xmin_identity, preferred from dem.v_basic_person vbp, dem.relation_types t, dem.lnk_person2relative l where ( l.id_identity = %(pk)s and vbp.pk_identity = l.id_relative and t.id = l.id_relation_type ) or ( l.id_relative = %(pk)s and vbp.pk_identity = l.id_identity and t.inverse = l.id_relation_type )""" rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {'pk': self.pk_obj}}]) if len(rows) == 0: return [] return [(row[0], cIdentity(row = {'data': row[1:], 'idx':idx, 'pk_field': 'pk'})) for row in rows] #-------------------------------------------------------- def link_new_relative(self, rel_type = 'parent'): # create new relative id_new_relative = create_dummy_identity() relative = cIdentity(aPK_obj=id_new_relative) # pre-fill with data from ourselves # relative.copy_addresses(self) relative.add_name( '**?**', self.get_names()['lastnames']) # and link the two if self._ext_cache.has_key('relatives'): del self._ext_cache['relatives'] cmd = u""" insert into dem.lnk_person2relative ( id_identity, id_relative, id_relation_type ) values ( %s, %s, (select id from dem.relation_types where description = %s) )""" rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': [self.ID, id_new_relative, rel_type ]}]) return True #---------------------------------------------------------------------- def delete_relative(self, relation): # unlink only, don't delete relative itself self.set_relative(None, relation) #-------------------------------------------------------- def _get_emergency_contact_from_database(self): if self._payload[self._idx['pk_emergency_contact']] is None: return None return cIdentity(aPK_obj = self._payload[self._idx['pk_emergency_contact']]) emergency_contact_in_database = property(_get_emergency_contact_from_database, lambda x:x) #---------------------------------------------------------------------- # age/dob related #---------------------------------------------------------------------- def get_formatted_dob(self, format='%Y %b %d', encoding=None, none_string=None): return gmDateTime.format_dob ( self._payload[self._idx['dob']], format = format, encoding = encoding, none_string = none_string, dob_is_estimated = self._payload[self._idx['dob_is_estimated']] ) #---------------------------------------------------------------------- def get_medical_age(self): dob = self['dob'] if dob is None: return u'??' if dob > gmDateTime.pydt_now_here(): return _('invalid age: DOB in the future') death = self['deceased'] if death is None: return u'%s%s' % ( gmTools.bool2subst ( self._payload[self._idx['dob_is_estimated']], gmTools.u_almost_equal_to, u'' ), gmDateTime.format_apparent_age_medically ( age = gmDateTime.calculate_apparent_age(start = dob) ) ) if dob > death: return _('invalid age: DOB after death') return u'%s%s%s' % ( gmTools.u_latin_cross, gmTools.bool2subst ( self._payload[self._idx['dob_is_estimated']], gmTools.u_almost_equal_to, u'' ), gmDateTime.format_apparent_age_medically ( age = gmDateTime.calculate_apparent_age ( start = dob, end = self['deceased'] ) ) ) #---------------------------------------------------------------------- def dob_in_range(self, min_distance=u'1 week', max_distance=u'1 week'): if self['dob'] is None: return False cmd = u'select dem.dob_is_in_range(%(dob)s, %(min)s, %(max)s)' rows, idx = gmPG2.run_ro_queries ( queries = [{ 'cmd': cmd, 'args': {'dob': self['dob'], 'min': min_distance, 'max': max_distance} }] ) return rows[0][0] #---------------------------------------------------------------------- def current_birthday_passed(self): if self['dob'] is None: return None now = gmDateTime.pydt_now_here() if now.month < self['dob'].month: return False if now.month > self['dob'].month: return True # DOB is this month if now.day < self['dob'].day: return False if now.day > self['dob'].day: return True return None #---------------------------------------------------------------------- def _get_birthday_this_year(self): if self['dob'] is None: return None now = gmDateTime.pydt_now_here() return gmDateTime.pydt_replace ( dt = self['dob'], year = now.year, strict = False ) birthday_this_year = property(_get_birthday_this_year, lambda x:x) #---------------------------------------------------------------------- # practice related #---------------------------------------------------------------------- def get_last_encounter(self): cmd = u'select * from clin.v_most_recent_encounters where pk_patient=%s' rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': [self._payload[self._idx['pk_identity']]]}]) if len(rows) > 0: return rows[0] else: return None #-------------------------------------------------------- def get_messages(self, order_by=None): return gmProviderInbox.get_inbox_messages(pk_patient = self._payload[self._idx['pk_identity']], order_by = order_by) messages = property(get_messages, lambda x:x) #-------------------------------------------------------- def _get_overdue_messages(self): return gmProviderInbox.get_overdue_messages(pk_patient = self._payload[self._idx['pk_identity']]) overdue_messages = property(_get_overdue_messages, lambda x:x) #-------------------------------------------------------- def delete_message(self, pk=None): return gmProviderInbox.delete_inbox_message(inbox_message = pk) #-------------------------------------------------------- def _get_dynamic_hints(self, include_suppressed_needing_invalidation=False): return gmProviderInbox.get_hints_for_patient ( pk_identity = self._payload[self._idx['pk_identity']], include_suppressed_needing_invalidation = include_suppressed_needing_invalidation ) dynamic_hints = property(_get_dynamic_hints, lambda x:x) #-------------------------------------------------------- def _get_primary_provider(self): if self._payload[self._idx['pk_primary_provider']] is None: return None from Gnumed.business import gmStaff return gmStaff.cStaff(aPK_obj = self._payload[self._idx['pk_primary_provider']]) primary_provider = property(_get_primary_provider, lambda x:x) #---------------------------------------------------------------------- # convenience #---------------------------------------------------------------------- def get_dirname(self): """Format patient demographics into patient specific path name fragment.""" return (u'%s-%s%s-%s' % ( self._payload[self._idx['lastnames']].replace(u' ', u'_'), self._payload[self._idx['firstnames']].replace(u' ', u'_'), gmTools.coalesce(self._payload[self._idx['preferred']], u'', template_initial = u'-(%s)').replace(u' ', u'_'), self.get_formatted_dob(format = '%Y-%m-%d', encoding = gmI18N.get_encoding()) )).replace ( u"'", u"" ).replace ( u'"', u'' ).replace ( u'/', u'_' ).replace ( u'\\', u'_' ).replace ( u'~', u'' ).replace ( u'|', u'_' ).replace ( u'*', u'' ).replace ( u'\u2248', u'' # "approximately", having been added by dob_is_estimated ) dirname = property(get_dirname, lambda x:x) #============================================================ def identity_is_patient(pk_identity): cmd = u'SELECT 1 FROM clin.patient WHERE fk_identity = %(pk_pat)s' args = {'pk_pat': pk_identity} rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = False) if len(rows) == 0: return False return True #------------------------------------------------------------ def turn_identity_into_patient(pk_identity): cmd = u""" INSERT INTO clin.patient (fk_identity) SELECT %(pk_ident)s WHERE NOT EXISTS ( SELECT 1 FROM clin.patient c_p WHERE fk_identity = %(pk_ident)s )""" args = {u'pk_ident': pk_identity} queries = [{'cmd': cmd, 'args': args}] gmPG2.run_rw_queries(queries = queries) return True #============================================================ # helper functions #------------------------------------------------------------ #_spin_on_emr_access = None # #def set_emr_access_spinner(func=None): # if not callable(func): # _log.error('[%] not callable, not setting _spin_on_emr_access', func) # return False # # _log.debug('setting _spin_on_emr_access to [%s]', func) # # global _spin_on_emr_access # _spin_on_emr_access = func # #------------------------------------------------------------ _pull_chart = tui_chart_puller def set_chart_puller(chart_puller): if not callable(chart_puller): raise TypeError('chart puller <%s> not callable' % chart_puller) global _pull_chart _pull_chart = chart_puller _log.debug('setting chart puller to <%s>', chart_puller) _yield = lambda x:x def set_yielder(yielder): if not callable(yielder): raise TypeError('yielder <%s> is not callable' % yielder) global _yield _yield = yielder _log.debug('setting yielder to <%s>', yielder) #============================================================ class cPatient(cIdentity): """Represents a person which is a patient. - a specializing subclass of cIdentity turning it into a patient - its use is to cache subobjects like EMR and document folder """ def __init__(self, aPK_obj=None, row=None): cIdentity.__init__(self, aPK_obj = aPK_obj, row = row) self.__emr_access_lock = threading.Lock() self.__emr = None self.__doc_folder = None #-------------------------------------------------------- def cleanup(self): """Do cleanups before dying. - note that this may be called in a thread """ if self.__emr is not None: self.__emr.cleanup() if self.__doc_folder is not None: self.__doc_folder.cleanup() cIdentity.cleanup(self) #---------------------------------------------------------------- def ensure_has_allergy_state(self, pk_encounter=None): from Gnumed.business.gmAllergy import ensure_has_allergy_state ensure_has_allergy_state(encounter = pk_encounter) return True #---------------------------------------------------------- #---------------------------------------------------------- # def get_emr(self, allow_user_interaction=True): # if not self.__emr_access_lock.acquire(False): # # maybe something slow is happening on the machine # _log.debug('failed to acquire EMR access lock, sleeping for 500ms') # time.sleep(0.5) # if not self.__emr_access_lock.acquire(False): # _log.error('still failed to acquire EMR access lock, aborting') # raise AttributeError('cannot lock access to EMR') # # if self.__emr is None: # self.__emr = gmClinicalRecord.cClinicalRecord(aPKey = self._payload[self._idx['pk_identity']], allow_user_interaction = allow_user_interaction) # self.__emr.gender = self['gender'] # self.__emr.dob = self['dob'] # # self.__emr_access_lock.release() # return self.__emr def get_emr(self, allow_user_interaction=True): _log.debug('accessing EMR for identity [%s] (thread %s)', self._payload[self._idx['pk_identity']], thread.get_ident()) if not self.__emr_access_lock.acquire(False): got_lock = False for idx in range(100): _yield() time.sleep(0.1) _yield() if self.__emr_access_lock.acquire(False): got_lock = True break if not got_lock: _log.error('still failed to acquire EMR access lock, aborting (thread %s)', thread.get_ident()) raise AttributeError('cannot lock access to EMR for identity [%s]', self._payload[self._idx['pk_identity']]) # # maybe something slow is happening on the machine # _log.debug('failed to acquire EMR access lock, sleeping for 500ms (thread %s)', thread.get_ident()) # time.sleep(0.5) # if not self.__emr_access_lock.acquire(False): # _log.error('still failed to acquire EMR access lock, aborting (thread %s)', thread.get_ident()) # raise AttributeError('cannot lock access to EMR') if self.__emr is None: _log.debug('pulling chart for identity [%s] (thread %s)', self._payload[self._idx['pk_identity']], thread.get_ident()) #emr = _pull_chart(self._payload[self._idx['pk_identity']]) emr = _pull_chart(self) if emr is None: # user aborted pulling chart return None self.__emr = emr _log.debug('returning EMR for identity [%s] (thread %s)', self._payload[self._idx['pk_identity']], thread.get_ident()) self.__emr_access_lock.release() return self.__emr emr = property(get_emr, lambda x:x) #---------------------------------------------------------- def get_document_folder(self): if self.__doc_folder is None: self.__doc_folder = cDocumentFolder(aPKey = self._payload[self._idx['pk_identity']]) return self.__doc_folder document_folder = property(get_document_folder, lambda x:x) #============================================================ class gmCurrentPatient(gmBorg.cBorg): """Patient Borg to hold currently active patient. There may be many instances of this but they all share state. The sequence of events when changing the active patient: 1) Registered callbacks are run. Those are run synchronously. If a callback returns False or throws an exception the patient switch is aborted. Callback code can rely on the patient still being active and to not go away until it returns. It is not passed any arguments and must return False or True. 2) Signal "pre_patient_unselection" is sent. This does not wait for nor check results. 3) the current patient is unset (gmNull.cNull) 4) Signal "current_patient_unset" is sent At this point resetting GUI fields to empty should be done. The active patient is not there anymore. This does not wait for nor check results. 5) The current patient is set to the new value. The new patient can also remain gmNull.cNull in case the calling code explicitely unset the current patient. 6) Signal "post_patient_selection" is sent. Code listening to this signal can assume that the new patient is already active. """ def __init__(self, patient=None, forced_reload=False): """Change or get currently active patient. patient: * None: get currently active patient * -1: unset currently active patient * cPatient instance: set active patient if possible """ # make sure we do have a patient pointer try: self.patient except AttributeError: self.patient = gmNull.cNull() self.__register_interests() # set initial lock state, # this lock protects against activating another patient # when we are controlled from a remote application self.__lock_depth = 0 # initialize callback state self.__callbacks_before_switching_away_from_patient = [] # user wants copy of current patient if patient is None: return None # do nothing if patient is locked if self.locked: _log.error('patient [%s] is locked, cannot change to [%s]' % (self.patient['pk_identity'], patient)) return None # user wants to explicitly unset current patient if patient == -1: _log.debug('explicitly unsetting current patient') if not self.__run_callbacks_before_switching_away_from_patient(): _log.error('not unsetting current patient, at least one pre-change callback failed') return None self.__send_pre_unselection_notification() self.patient.cleanup() self.patient = gmNull.cNull() self.__send_unselection_notification() # give it some time time.sleep(0.5) self.__send_selection_notification() return None # must be cPatient instance, then if not isinstance(patient, cPatient): _log.error('cannot set active patient to [%s], must be either None, -1 or cPatient instance' % str(patient)) raise TypeError, 'gmPerson.gmCurrentPatient.__init__(): must be None, -1 or cPatient instance but is: %s' % str(patient) # same ID, no change needed if (self.patient['pk_identity'] == patient['pk_identity']) and not forced_reload: return None # user wants different patient _log.info('patient change [%s] -> [%s] requested', self.patient['pk_identity'], patient['pk_identity']) if not self.__run_callbacks_before_switching_away_from_patient(): _log.error('not changing current patient, at least one pre-change callback failed') return None # everything seems swell self.__send_pre_unselection_notification() self.patient.cleanup() self.patient = gmNull.cNull() self.__send_unselection_notification() # give it some time time.sleep(0.5) self.patient = patient # for good measure ... # however, actually we want to get rid of that self.patient.get_emr() self.__send_selection_notification() return None #-------------------------------------------------------- def __register_interests(self): gmDispatcher.connect(signal = u'gm_table_mod', receiver = self._on_database_signal) #-------------------------------------------------------- def _on_database_signal(self, **kwds): # we don't have a patient: don't process signals if isinstance(self.patient, gmNull.cNull): return True # we only care about identity and name changes if kwds['table'] not in [u'dem.identity', u'dem.names']: return True # signal is not about our patient: ignore signal if int(kwds['pk_identity']) != self.patient.ID: return True if kwds['table'] == u'dem.identity': # we don't care about newly INSERTed or DELETEd patients if kwds['operation'] != 'UPDATE': return True self.patient.refetch_payload() return True #-------------------------------------------------------- # external API #-------------------------------------------------------- def register_before_switching_from_patient_callback(self, callback=None): # callbacks are run synchronously before # switching *away* from the current patient, # if a callback returns false the current # patient will not be switched away from, # callbacks will not be passed any arguments if not callable(callback): raise TypeError(u'callback [%s] not callable' % callback) self.__callbacks_before_switching_away_from_patient.append(callback) #-------------------------------------------------------- def _get_connected(self): return (not isinstance(self.patient, gmNull.cNull)) connected = property(_get_connected, lambda x:x) #-------------------------------------------------------- def _get_locked(self): return (self.__lock_depth > 0) def _set_locked(self, locked): if locked: self.__lock_depth = self.__lock_depth + 1 gmDispatcher.send(signal = 'patient_locked', sender = self.__class__.__name__) else: if self.__lock_depth == 0: _log.error('lock/unlock imbalance, tried to refcount lock depth below 0') return else: self.__lock_depth = self.__lock_depth - 1 gmDispatcher.send(signal = 'patient_unlocked', sender = self.__class__.__name__) locked = property(_get_locked, _set_locked) #-------------------------------------------------------- def force_unlock(self): _log.info('forced patient unlock at lock depth [%s]' % self.__lock_depth) self.__lock_depth = 0 gmDispatcher.send(signal = 'patient_unlocked', sender = self.__class__.__name__) #-------------------------------------------------------- # patient change handling #-------------------------------------------------------- def __run_callbacks_before_switching_away_from_patient(self): if isinstance(self.patient, gmNull.cNull): return True for call_back in self.__callbacks_before_switching_away_from_patient: try: successful = call_back() except: _log.exception('callback [%s] failed', call_back) print "*** pre-change callback failed ***" print type(call_back) print call_back return False if not successful: _log.error('callback [%s] returned False', call_back) return False return True #-------------------------------------------------------- def __send_pre_unselection_notification(self): """Sends signal when current patient is about to be unset. This does NOT wait for signal handlers to complete. """ kwargs = { 'signal': u'pre_patient_unselection', 'sender': self.__class__.__name__, 'pk_identity': self.patient['pk_identity'] } gmDispatcher.send(**kwargs) #-------------------------------------------------------- def __send_unselection_notification(self): """Sends signal when the previously active patient has been unset during a change of active patient. This is the time to initialize GUI fields to empty values. This does NOT wait for signal handlers to complete. """ kwargs = { 'signal': u'current_patient_unset', 'sender': self.__class__.__name__ } gmDispatcher.send(**kwargs) #-------------------------------------------------------- def __send_selection_notification(self): """Sends signal when another patient has actually been made active.""" kwargs = { 'signal': u'post_patient_selection', 'sender': self.__class__.__name__, 'pk_identity': self.patient['pk_identity'] } gmDispatcher.send(**kwargs) #-------------------------------------------------------- # __getattr__ handling #-------------------------------------------------------- def __getattr__(self, attribute): # override __getattr__ here, not __getattribute__ because # the former is used _after_ ordinary attribute lookup # failed while the latter is applied _before_ ordinary # lookup (and is easy to drive into infinite recursion), # this is also why subsequent access to self.patient # simply returns the .patient member value :-) if attribute == 'patient': raise AttributeError if isinstance(self.patient, gmNull.cNull): _log.error("[%s]: cannot getattr(%s, '%s'), patient attribute not connected to a patient", self, self.patient, attribute) raise AttributeError("[%s]: cannot getattr(%s, '%s'), patient attribute not connected to a patient" % (self, self.patient, attribute)) return getattr(self.patient, attribute) #-------------------------------------------------------- # __get/setitem__ handling #-------------------------------------------------------- def __getitem__(self, attribute = None): """Return any attribute if known how to retrieve it by proxy. """ return self.patient[attribute] #-------------------------------------------------------- def __setitem__(self, attribute, value): self.patient[attribute] = value #============================================================ # match providers #============================================================ class cMatchProvider_Provider(gmMatchProvider.cMatchProvider_SQL2): def __init__(self): gmMatchProvider.cMatchProvider_SQL2.__init__( self, queries = [ u"""SELECT pk_staff AS data, short_alias || ' (' || coalesce(title, '') || ' ' || firstnames || ' ' || lastnames || ')' AS list_label, short_alias || ' (' || coalesce(title, '') || ' ' || firstnames || ' ' || lastnames || ')' AS field_label FROM dem.v_staff WHERE is_active AND ( short_alias %(fragment_condition)s OR firstnames %(fragment_condition)s OR lastnames %(fragment_condition)s OR db_user %(fragment_condition)s ) """ ] ) self.setThresholds(1, 2, 3) #============================================================ # convenience functions #============================================================ def create_name(pk_person, firstnames, lastnames, active=False): queries = [{ 'cmd': u"select dem.add_name(%s, %s, %s, %s)", 'args': [pk_person, firstnames, lastnames, active] }] rows, idx = gmPG2.run_rw_queries(queries=queries, return_data=True) name = cPersonName(aPK_obj = rows[0][0]) return name #============================================================ def create_identity(gender=None, dob=None, lastnames=None, firstnames=None): cmd1 = u"""INSERT INTO dem.identity (gender, dob) VALUES (%s, %s)""" cmd2 = u""" INSERT INTO dem.names ( id_identity, lastnames, firstnames ) VALUES ( currval('dem.identity_pk_seq'), coalesce(%s, 'xxxDEFAULTxxx'), coalesce(%s, 'xxxDEFAULTxxx') ) RETURNING id_identity""" # cmd2 = u"select dem.add_name(currval('dem.identity_pk_seq')::integer, coalesce(%s, 'xxxDEFAULTxxx'), coalesce(%s, 'xxxDEFAULTxxx'), True)" rows, idx = gmPG2.run_rw_queries ( queries = [ {'cmd': cmd1, 'args': [gender, dob]}, {'cmd': cmd2, 'args': [lastnames, firstnames]} #{'cmd': cmd2, 'args': [firstnames, lastnames]} ], return_data = True ) ident = cIdentity(aPK_obj = rows[0][0]) gmHooks.run_hook_script(hook = u'post_person_creation') return ident #============================================================ def create_dummy_identity(): cmd = u"INSERT INTO dem.identity(gender) VALUES (NULL::text) RETURNING pk" rows, idx = gmPG2.run_rw_queries ( queries = [{'cmd': cmd}], return_data = True ) return gmDemographicRecord.cIdentity(aPK_obj = rows[0][0]) #============================================================ def identity_exists(pk_identity): cmd = u'SELECT EXISTS(SELECT 1 FROM dem.identity where pk = %(pk)s)' args = {'pk': pk_identity} rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return rows[0][0] #============================================================ def set_active_patient(patient=None, forced_reload=False): """Set active patient. If patient is -1 the active patient will be UNset. """ if isinstance(patient, gmCurrentPatient): return True if isinstance(patient, cPatient): pat = patient elif isinstance(patient, cIdentity): pat = pat.as_patient elif patient == -1: pat = patient else: # maybe integer ? success, pk = gmTools.input2int(initial = patient, minval = 1) if not success: raise ValueError(' must be either -1, >0, or a cPatient, cIdentity or gmCurrentPatient instance, is: %s' % patient) # but also valid patient ID ? try: pat = cPatient(aPK_obj = pk) except: _log.exception('identity [%s] not found' % patient) return False # attempt to switch try: gmCurrentPatient(patient = pat, forced_reload = forced_reload) except: _log.exception('error changing active patient to [%s]' % patient) return False return True #============================================================ # gender related #------------------------------------------------------------ def get_gender_list(): """Retrieves the list of known genders from the database.""" global __gender_idx global __gender_list if __gender_list is None: cmd = u"SELECT tag, l10n_tag, label, l10n_label, sort_weight FROM dem.v_gender_labels ORDER BY sort_weight DESC" __gender_list, __gender_idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True) _log.debug(u'genders in database: %s' % __gender_list) return (__gender_list, __gender_idx) #------------------------------------------------------------ map_gender2mf = { 'm': u'm', 'f': u'f', 'tf': u'f', 'tm': u'm', 'h': u'mf' } # https://tools.ietf.org/html/rfc6350#section-6.2.7 # M F O N U map_gender2vcard = { u'm': u'M', u'f': u'F', u'tf': u'F', u'tm': u'M', u'h': u'O', None: u'U' } #------------------------------------------------------------ # maps GNUmed related i18n-aware gender specifiers to a unicode symbol map_gender2symbol = { u'm': u'\u2642', u'f': u'\u2640', u'tf': u'\u26A5\u2640', # 'tf': u'\u2642\u2640-\u2640', u'tm': u'\u26A5\u2642', # 'tm': u'\u2642\u2640-\u2642', u'h': u'\u26A5', # 'h': u'\u2642\u2640', None: u'?\u26A5?' } #------------------------------------------------------------ def map_gender2string(gender=None): """Maps GNUmed related i18n-aware gender specifiers to a human-readable string.""" global __gender2string_map if __gender2string_map is None: genders, idx = get_gender_list() __gender2string_map = { u'm': _(u'male'), u'f': _(u'female'), u'tf': u'', u'tm': u'', u'h': u'', None: _(u'unknown gender') } for g in genders: __gender2string_map[g[idx['l10n_tag']]] = g[idx['l10n_label']] __gender2string_map[g[idx['tag']]] = g[idx['l10n_label']] _log.debug(u'gender -> string mapping: %s' % __gender2string_map) return __gender2string_map[gender] #------------------------------------------------------------ def map_gender2salutation(gender=None): """Maps GNUmed related i18n-aware gender specifiers to a human-readable salutation.""" global __gender2salutation_map if __gender2salutation_map is None: genders, idx = get_gender_list() __gender2salutation_map = { 'm': _('Mr'), 'f': _('Mrs'), 'tf': u'', 'tm': u'', 'h': u'', None: u'' } for g in genders: __gender2salutation_map[g[idx['l10n_tag']]] = __gender2salutation_map[g[idx['tag']]] __gender2salutation_map[g[idx['label']]] = __gender2salutation_map[g[idx['tag']]] __gender2salutation_map[g[idx['l10n_label']]] = __gender2salutation_map[g[idx['tag']]] _log.debug(u'gender -> salutation mapping: %s' % __gender2salutation_map) return __gender2salutation_map[gender] #------------------------------------------------------------ def map_firstnames2gender(firstnames=None): """Try getting the gender for the given first name.""" if firstnames is None: return None rows, idx = gmPG2.run_ro_queries(queries = [{ 'cmd': u"SELECT gender FROM dem.name_gender_map WHERE name ILIKE %(fn)s LIMIT 1", 'args': {'fn': firstnames} }]) if len(rows) == 0: return None return rows[0][0] #============================================================ def get_person_IDs(): cmd = u'SELECT pk FROM dem.identity' rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = False) return [ r[0] for r in rows ] #============================================================ def get_persons_from_pks(pks=None): return [ cIdentity(aPK_obj = pk) for pk in pks ] #============================================================ def get_person_from_xdt(filename=None, encoding=None, dob_format=None): from Gnumed.business import gmXdtObjects return gmXdtObjects.read_person_from_xdt(filename=filename, encoding=encoding, dob_format=dob_format) #============================================================ def get_persons_from_pracsoft_file(filename=None, encoding='ascii'): from Gnumed.business import gmPracSoftAU return gmPracSoftAU.read_persons_from_pracsoft_file(filename=filename, encoding=encoding) #============================================================ # main/testing #============================================================ if __name__ == '__main__': if len(sys.argv) == 1: sys.exit() if sys.argv[1] != 'test': sys.exit() import datetime gmI18N.activate_locale() gmI18N.install_domain() gmDateTime.init() #-------------------------------------------------------- def test_set_active_pat(): ident = cIdentity(1) print "setting active patient with", ident set_active_patient(patient=ident) patient = cPatient(12) print "setting active patient with", patient set_active_patient(patient=patient) pat = gmCurrentPatient() print pat['dob'] #pat['dob'] = 'test' # staff = cStaff() # print "setting active patient with", staff # set_active_patient(patient=staff) print "setting active patient with -1" set_active_patient(patient=-1) #-------------------------------------------------------- def test_dto_person(): dto = cDTO_person() dto.firstnames = 'Sepp' dto.lastnames = 'Herberger' dto.gender = 'male' dto.dob = pyDT.datetime.now(tz=gmDateTime.gmCurrentLocalTimezone) print dto print dto['firstnames'] print dto['lastnames'] print dto['gender'] print dto['dob'] for key in dto.keys(): print key #-------------------------------------------------------- def test_identity(): # create patient print '\n\nCreating identity...' new_identity = create_identity(gender='m', dob='2005-01-01', lastnames='test lastnames', firstnames='test firstnames') print 'Identity created: %s' % new_identity print '\nSetting title and gender...' new_identity['title'] = 'test title'; new_identity['gender'] = 'f'; new_identity.save_payload() print 'Refetching identity from db: %s' % cIdentity(aPK_obj=new_identity['pk_identity']) print '\nGetting all names...' for a_name in new_identity.get_names(): print a_name print 'Active name: %s' % (new_identity.get_active_name()) print 'Setting nickname...' new_identity.set_nickname(nickname='test nickname') print 'Refetching all names...' for a_name in new_identity.get_names(): print a_name print 'Active name: %s' % (new_identity.get_active_name()) print '\nIdentity occupations: %s' % new_identity['occupations'] print 'Creating identity occupation...' new_identity.link_occupation('test occupation') print 'Identity occupations: %s' % new_identity['occupations'] print '\nIdentity addresses: %s' % new_identity.get_addresses() print 'Creating identity address...' # make sure the state exists in the backend new_identity.link_address ( number = 'test 1234', street = 'test street', postcode = 'test postcode', urb = 'test urb', region_code = u'SN', country_code = u'DE' ) print 'Identity addresses: %s' % new_identity.get_addresses() print '\nIdentity communications: %s' % new_identity.get_comm_channels() print 'Creating identity communication...' new_identity.link_comm_channel('homephone', '1234566') print 'Identity communications: %s' % new_identity.get_comm_channels() #-------------------------------------------------------- def test_name(): for pk in range(1,16): name = cPersonName(aPK_obj=pk) print name.description print ' ', name #-------------------------------------------------------- def test_gender_list(): genders, idx = get_gender_list() print "\n\nRetrieving gender enum (tag, label, weight):" for gender in genders: print "%s, %s, %s" % (gender[idx['tag']], gender[idx['l10n_label']], gender[idx['sort_weight']]) #-------------------------------------------------------- def test_export_area(): person = cIdentity(aPK_obj = 12) print person print person.export_area print person.export_area.items #-------------------------------------------------------- def test_ext_id(): person = cIdentity(aPK_obj = 9) print person.get_external_ids(id_type=u'Fachgebiet', issuer=u'Ă„rztekammer') #print person.get_external_ids() #-------------------------------------------------------- def test_vcf(): person = cIdentity(aPK_obj = 12) print person.export_as_vcard() #-------------------------------------------------------- def test_current_patient(): pat = gmCurrentPatient() print "pat.get_emr()", pat.get_emr() #-------------------------------------------------------- #test_dto_person() #test_identity() #test_set_active_pat() #test_search_by_dto() #test_name() #test_gender_list() #map_gender2salutation('m') # module functions #comms = get_comm_list() #print "\n\nRetrieving communication media enum (id, description): %s" % comms #test_export_area() #test_ext_id() #test_vcf() test_current_patient() #============================================================