The code below shows a sample integration of PainChek with a Resident Care System (RCS). The same code fetches clients records from the RCS (using a REST API) and reflects those changes in PainChek by:
-
Adding or updating PainChek resident(patient) records
-
Adding Site records (a site being a location where a resident lives)
-
Adding and updating admission records (that link a resident to a site and includes the specific bed/room/ward(wing) where the resident is located)
Note
-
The RCS acts as the master system for the client details and PainChek as the slave system
-
The sample code demonstrates the synchronisation of resident data only. It does not include any logic to send PainChek assessments to the RCS.
-
The code is written in the Python programming language
Note
This code is proof-of-concept code only and should be used as a guide only. You will need to add additional data integrity checks and functionality in order to build a production-ready system.
To assist with the understanding of the sample code, the following block shows a typical response from the RCS when client records are requested from it (using the RCS API):
[{ "Title": "Mr", "FirstName": "Graeme", "MiddleName": "Alfred", "Surname": "Smith", "PreferredName": "Graeme", "BirthDate": "1937-08-06T00:00:00", "AdmissionDate": "2010-09-05T00:00:00", "GenderDesc": "Male", "LocationName": "Kookaburra Lodge", "WingName": "Bethel - Eden", "RoomName": "004", "BedName": "", "PersonId": 1794, "LastUpdatedDate": "2019-03-03T23:51:56.787Z" }, { "Title": "Mrs", "FirstName": "Angela", "MiddleName": "", "Surname": "Jones", "PreferredName": "", "BirthDate": "1946-05-20T00:00:00", "AdmissionDate": "2014-06-06T00:00:00", "GenderDesc": "Female", "LocationName": "Wattle Hill", "WingName": "Bethel - Eden", "RoomName": "017", "BedName": "", "PersonId": 1798, "LastUpdatedDate": "2019-01-30T04:45:04.137Z" }]
The sample code uses a configuration file to record credentials and other configuration data:
-
rcs - Settings for the RCS
-
painchek - Settings for PainChek
-
whitelisted_sites - This is a list of the site names that are relevant to PainChek. Only residents at these sites should be synchronised.
{ "rcs": { "base_url": "http://0.0.0.0/api/", "key": "thesecretkey" }, "painchek": { "base_url": "https://ap.ua.painchek.com/api/", "client_id": "rcs", "client_secret": "somelongsecretkey", "username": "rcs@painchek.com", "password": "apassword" }, "whitelisted_sites": ["Wattle Hill", "Koala Cove"] }
The sample code uses a cache (saved to disk between runs) to cache three key data sets:
-
assessment_update_stamp - Records the update stamp of the last processed assessment (so that the script knows which assessments to synchronise on its next run)
-
avatars - Records the URL for the avatar of each RCS resident. This is to allow the sync script to detect changes to the existing avatar (so that the new avatar can be synced with PainChek).
-
resident_update_stamp - This records the update time of the most recently updated resident record that has been synchronised. We pass this value to the RCS to fetch the records that have been updated since the last synchronisation run.
A sample of the cached data is shown below:
{ "assessment_update_stamp": "12345", "avatars": { "1794": "person/headshot/1794.20190304105156787.png", "1798": "person/headshot/1798.20180214151617477.png", "1800": "person/headshot/1800.20180214150915163.png", "1801": "person/headshot/1801.20181113162553730.png", "1839": "person/headshot/1839.20181114111210267.png" }, "resident_update_stamp": "2019-04-12T05:03:30.567Z" }
"" "This sample code shows how a third party Resident Care System (RCS) can be integrated with PainChek to keepPainChek resident records up-to-date. The RCS is considered the master system and changes made to records in it are reflected into PainChek (c) 2019 PainChek Ltd" "" import loggingimport requestsimport jsonimport osimport time from pck_utilities import APIError, LogicError, get_mandatory_str, get_mandatory_value, get_optional_str, \Config, Cache, Counts, Loggerfrom pck_integration import PainChek # Cache fieldsASSESSMENT_UPDATE_STAMP = 'assessment_update_stamp' RESIDENT_UPDATE_STAMP = 'resident_update_stamp' AVATARS_URLS = 'avatars' class RCS: "" "Class to access the RCS API" "" resident_update_stamp_field = 'resident_update_stamp' assessment_update_stamp_field = 'assessment_update_stamp' def __init__(self, config, logger): "" "Initialise an instance of an RCS object:param config: A dictionary containing- base_url: base URL for accessing the RCS API- key: client id for API Access:param logger: Optional logger object for writing debug messages" "" self.base_url = get_mandatory_str(config, 'base_url') self.key = get_mandatory_str(config, 'key') self.logger = logger def full_url(self, relative_url): "" "Given a partial, relative URL, return a full URL for a call to the RCSInclude the API key as a parameter:param relative_url: the URL to invoke (without the base/root):return: full URL (or None in the case relative URL is not supplied)" "" if relative_url: url = f '{self.base_url}{relative_url}?x-api-key={self.key}' else :url = None return url def call_api(self, method, relative_url, time_since_last = None, payload = None): "" "Calls the RCS API:param method: the call type - post, get, etc:param relative_url: the URL to invoke (without the base/root):param payload: data when posting:param time_since_last: Only records update since this date/time are returned. If not supplied, all recordsare returned.Applies to 'gets' only.:return: the response text from the API call" "" url = self.full_url(relative_url) if time_since_last: if method != 'get': raise LogicError(f 'time_since_last only supported for a get: {method}') url += f '&changedSinceDateTimeUtc={time_since_last}' if self.logger: self.logger.debug(f 'URL: {url}, payload:{payload}') if (payload is None) and(method in ('post', 'patch', 'put')): raise LogicError(f 'json data required for method: {method}') if method == 'get': response = requests.get(url) elif method == 'post': response = requests.post(url, json = payload) else :raise LogicError(f "Unsupported method: {method}") # logger.debug(response.text) if response.status_code not in (200, 201): description = f "Failed to {method} to {url}" if self.logger: self.logger.critical(description) raise APIError(description, url, payload, response) return json.loads(response.text) @staticmethoddef rcs_id(rcs_resident): "" "Return the id of a resident:param rcs_resident: The resident record as returned by the RCS:return: The resident id" "" return get_mandatory_str(rcs_resident, 'PersonId') @staticmethoddef client_status(rcs_resident): "" "Return the Client Status Type Id of a resident:param rcs_resident: The resident record as returned by the RCS:return: The resident id" "" return get_mandatory_str(rcs_resident, 'ClientStatusTypeId') @staticmethoddef headshot_url(rcs_resident): "" "Return the (partial) URL for the residents headshot image:param rcs_resident: The resident record as returned by the RCS:return: The URL (or None)" "" return get_optional_str(rcs_resident, 'HeadShotUrl') def get_painchek_patient(self, rcs_resident): "" "Returns a patient record in a format suitable for processing by PainChek:param rcs_resident: The resident record as returned by the RCS:return: A PainChek formatted resident record" "" patient = {} patient['external_id'] = self.rcs_id(rcs_resident) patient['first_name'] = get_mandatory_str(rcs_resident, 'FirstName') patient['last_name'] = get_mandatory_str(rcs_resident, 'LastName') patient['nickname'] = get_optional_str(rcs_resident, 'PreferredName') rcs_gender = get_optional_str(rcs_resident, 'GenderDesc', 'n/a') if rcs_gender == 'Male': patient['gender'] = 'm' elif rcs_gender == 'Female': patient['gender'] = 'f' else :patient['gender'] = 'x' # Return the RCS dob in PainChek format - '1945-04-19T00:00:00'-- > '1945-04-19' # ToDo: Remove defaulting of DOB and handle situation properly...patient['birth_date'] = get_optional_str(rcs_resident, 'BirthDate', '1900-01-01') patient['birth_date'] = patient['birth_date'][0: 10] # See if the resident is archived.# The status of the resident: # Current = 1-- > Treat as active PainChek resident # Departed = 2-- > Treat as archived PainChek resident # Waiting = 3-- > Ignore this resident for the time beingif self.client_status(rcs_resident) == '2': archive_reason = 'Other' else :archive_reason = None patient['deleted_reason'] = archive_reason if self.logger: self.logger.debug(f 'Patient record: {patient}') return patient def get_painchek_admission(self, rcs_resident): "" "Returns an admission record in a format suitable for processing by PainChek.:param rcs_resident: The resident record as returned by the RCS:return: A PainChek formatted admission record" "" admission = {} admission['patient_external_id'] = self.rcs_id(rcs_resident) admission['site_external_id'] = self.get_site_id(get_mandatory_str(rcs_resident, 'LocationName')) admission['bed'] = get_optional_str(rcs_resident, 'BedName', 'unknown') admission['room'] = get_optional_str(rcs_resident, 'RoomName', 'unknown') admission['ward'] = get_optional_str(rcs_resident, 'WingName', 'unknown') if self.logger: self.logger.debug(f 'Admission record: {admission}') return admission @staticmethoddef get_site_id(site_name): "" "Given a site name, return a site id, which is the lowercase version of the name, withunderscores replacing spaces:param site_name: Name of site:return: If od site" "" return site_name.lower().replace(' ', '_') def get_painchek_site(self, site_name): "" "Returns a site record in a format suitable for processing by PainChek:param site_name: the name of the site as identified by the RCS:return:" "" # print(facility) site = {} site['external_id'] = self.get_site_id(site_name) site['type'] = 2 # 2 = Care Homesite['name'] = site_namesite['phone'] = 'unknown' site['email'] = 'unknown@unknown.com' site['address'] = 'unknown' site['postcode'] = 'unknown' site['city'] = 'unknown' site['postcode'] = 'unknown' site['country'] = 'AU' if self.logger: self.logger.debug(f 'site record: {site}') return site @staticmethoddef is_whitelisted(whitelist, resident): "" "Check the whitelist to see if the site of the supplied resident should be processedIf the whitelist is empty, all sites are assume to be whitelisted:param whitelist: A list of sites:param resident: The RCS resident record to check:return: boolean indicating if the site is white-listed or not" "" if not whitelist: return True elif len(whitelist) == 0: return True else :site_name = get_mandatory_str(resident, 'LocationName') return site_name in whitelist def process_rcs_residents_using_update_stamp(config, cache, logger): "" "Fetch a list of updated resident records from the RCS and reflect the changes into PainChek This function would be called regularly to keep the RCS and PainChek in sync The process relies on a cached last updated stamp to determine which are the new/updated records that requireprocessing If this function finds no last updated value in the cache, it assumes that it is doing an initial sync (aka amigration and it will soft-match the RCS and PainChek resident records)...:param config: A dictionary containing the settings required to access the RCS and PainChek:param cache: The caching object (to persist data between runs):param logger: The logging object for debugging:return:" "" # Track the sites already processedsites = [] # Record statisticscounts = Counts(logger) rcs_config = config.get('rcs', mandatory = True) painchek_config = config.get('painchek', mandatory = True) whitelist = config.get('whitelisted_sites') rcs = RCS(rcs_config, logger) painchek = PainChek(painchek_config, logger) # Get the update stamp for the residentcache_last_updated = cache.get(RESIDENT_UPDATE_STAMP) logger.warning(f 'Processing residents. Processing changes since "update stamp" {cache_last_updated}...') # Invoke the RCS APIrcs_residents = rcs.call_api('get', 'client', cache_last_updated) # Process each resident returned from the RCSfor rcs_resident in rcs_residents: counts.increment() # Get some basic details about the residentrcs_id = rcs.rcs_id(rcs_resident) first_name = get_mandatory_str(rcs_resident, 'FirstName') last_name = get_mandatory_str(rcs_resident, 'LastName') site_name = get_mandatory_str(rcs_resident, 'LocationName') status = rcs.client_status(rcs_resident) headshot_url = rcs.headshot_url(rcs_resident) logger.info(f 'Record: {counts.get()}, Id: {rcs_id}, ' + f 'FirstName: {first_name}, LastName: {last_name}, ' + f 'SiteName: {site_name}, status: {status}') # logger.debug(str(rcs_residents)) # Ensure the patient is located in a Whitelisted siteif not rcs.is_whitelisted(whitelist, rcs_resident): counts.increment('not whitelisted') logger.debug('Skipping non-whitelisted resident') continue # The status of the resident: # Current = 1-- > Treat as active PainChek resident # Departed = 2-- > Treat as archived PainChek resident # Waiting = 3-- > Ignore this resident for the time beingif status == '3': counts.increment('Waiting residents') continue elif status not in ['1', '2']: raise LogicError(f 'Unexpected status: {status}') # Step 1 - Process the Site # Check the site to see if we should process itif site_name not in sites: logger.info(f ' Processing site: {site_name}') # Create a site record in thePainChek formatpainchek_site = rcs.get_painchek_site(site_name) # print(painchek_site) # Update PainChekresponse = painchek.create_or_update_site(painchek_site) counts.process_integration_response('sites', response) sites.append(site_name) # Step 2 - Process the Residentpainchek_patient = rcs.get_painchek_patient(rcs_resident) # See if the RCS avatar has been updatedstale = cache.is_stale(AVATARS_URLS, rcs_id, headshot_url) if stale: painchek_patient['image_from_url'] = rcs.full_url(headshot_url) logger.debug(f 'Avatar updated: {headshot_url}') cache.set(AVATARS_URLS, rcs_id, headshot_url) else :logger.debug(f 'Avatar unchanged: {headshot_url}') response = painchek.create_or_update_patient(painchek_patient) counts.process_integration_response('residents', response) # Save the cache, but only once the resident is updated in PainChekif stale: cache.persist() # Step 3 - Process the Admissionpainchek_admission = rcs.get_painchek_admission(rcs_resident) response = painchek.create_or_update_admission(painchek_admission) counts.process_integration_response('admissions', response) # Fetch the Date / Time record was last touched and use it to update cache_last_updatedlast_updated = get_mandatory_value(rcs_resident, 'LastUpdatedDate') if (cache_last_updated is None) or(last_updated > cache_last_updated): logger.debug(f 'Updating update stamp from {cache_last_updated} to {last_updated}') cache_last_updated = last_updated # Update the cache with the new update stampcache.set(RESIDENT_UPDATE_STAMP, value = cache_last_updated) # Display a summary of what we didlogger.warning(counts.display()) def process_painchek_assessments_using_timestamp(config, cache, logger): "" "Fetch a list of new assessments records from PainChek and reflect the changes into the RCS.This function would be called regularly to keep the RCS and PainChek in sync.:param config: A dictionary containing the settings required to access the RCS and PainChek:param cache: The caching object (to persist data between runs):param logger: The logging object for debugging:return: None" "" counts = Counts(logger) rcs_config = config.get('rcs', mandatory = True) painchek_config = config.get('painchek', mandatory = True) rcs = RCS(rcs_config, logger) painchek = PainChek(painchek_config, logger) cache_update_stamp = cache.get_integer(ASSESSMENT_UPDATE_STAMP, default = '0') logger.warning(f 'Processing assessments. Processing changes since "update stamp" {cache_update_stamp}...') assessments = painchek.get_list_from_api(f 'assessments/?sync={cache_update_stamp}') for assessment in assessments: counts.increment() uuid = get_mandatory_str(assessment, 'uuid') logger.info(f 'Record: {counts.get()}, assessment: {uuid}') # Fetch the update stamp value for the assessment and use it to update cached_update_stampupdate_stamp = get_mandatory_value(assessment, 'update_stamp') if update_stamp > cache_update_stamp: logger.debug(f 'Updating update stamp from {cache_update_stamp} to {update_stamp}') cache_update_stamp = update_stamp # print(json.dumps(assessment)) note = painchek.get_assessment_note(assessment) # ToDo: Call the RCS # print(f 'Notes: {note}') # Update the cache with the new update stampcache.set(ASSESSMENT_UPDATE_STAMP, value = cache_update_stamp, persist = True) # Display a summary of what we didlogger.warning(counts.display()) def main(): start_time = time.time() # Folder for logging and for the cachebase_path = os.path.dirname(os.path.abspath(__file__)) # Start the logger for debugging / tracinglogger = Logger(base_path, 'rcs') # Disable "info" enabled by default for urllib3, used by the requests librarylogging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.WARNING) # Get the configuration for the runconfig = Config(base_path, logger = logger) # Get the cache for the runcache = Cache(base_path, logger = logger) # Folder for logging and for the cacheprocess_rcs_residents_using_update_stamp(config, cache, logger) process_painchek_assessments_using_timestamp(config, cache, logger) logger.info(f 'Processing time: {int(time.time() - start_time)} seconds') if __name__ == "__main__": main()
"" "Module containing core PainChek integration logic. Key logic is containined in class PainChek, which provides a number of methods to access the PainChek API. (c) PainChek Ltd 2019" "" import jsonimport requestsimport os from pck_utilities import APIError, LogicError, get_mandatory_value, get_optional_str def domains(): "" "Return a list of PainChek domains:return: domains" "" return ['face', 'voice', 'movement', 'behaviour', 'activity', 'body'] def descriptors(domain): "" "Return a list of descriptors for the given domain:param domain: PainChek domain:return: list of descriptors" "" if domain == 'face': return ['brow_lowering', 'cheek_raising', 'tightening_eyelids', 'wrinkling_nose', 'raising_upper_lip', 'pulling_corner_lip', 'mouth_stretch', 'parting_lips', 'closing_eyes'] elif domain == 'voice': return ['noisy_sounds', 'requesting_help', 'groaning', 'moaning', 'crying', 'screaming', 'loudtalk', 'howling', 'sighing'] elif domain == 'movement': return ['altered_random_movement', 'restlessness', 'freezing', 'guarding_touching', 'moving_away', 'abnormal_movement', 'pacing_wandering'] elif domain == 'behaviour': return ['introvert', 'verbally_offensive', 'aggressive', 'fear_or_dislike', 'inappropriate_behaviour', 'confused', 'distressed'] elif domain == 'activity': return ['resisting_care', 'prolonged_resting', 'altered_sleep', 'altered_routine'] elif domain == 'body': return ['profuse_sweating', 'pale_flushed', 'feverish_cold', 'rapid_breathing', 'painful_injuries', 'painful_conditions'] else :return None class PainChek: def __init__(self, config, logger): "" "Initialise an instance of a PainChek object:param config: Dictionary containing- base_url: base URL for accessing the PainChek API- client_id: client id for API Access- client_secret: client secret for API access- username: PainChek user name- password: PainChek user password:param logger: Optional logger for writing debug messages" "" self.token = Noneself.base_url = get_mandatory_value(config, 'base_url') self.client_id = get_mandatory_value(config, 'client_id') self.client_secret = get_mandatory_value(config, 'client_secret') self.username = get_mandatory_value(config, 'username') self.password = get_mandatory_value(config, 'password') self.logger = logger def get_token(self): "" "Returns a PainChek token, using the credentials supplied when the object was created:return: a PainChek Token" "" if self.token: if self.logger: self.logger.debug('Using existing PainChek token') return self.token url = f '{self.base_url}o/token/' payload = { 'grant_type': 'password', 'client_id': self.client_id, 'client_secret': self.client_secret, 'username': self.username, 'password': self.password } response = requests.request("POST", url, data = payload) response_text = json.loads(response.text) if (response.status_code == 200) and('access_token' in response_text): self.token = response_text['access_token'] if self.logger: self.logger.debug(f 'Fetched new PainChek token: {self.token}') return self.token else :if self.logger: self.logger.info(response.text) raise APIError(f 'Failed to get token ({response.status_code})', url, payload, response_text) def call_api(self, method, relative_url, payload = None, pathname = None): url = f '{self.base_url}{relative_url}' if self.logger: if payload and('image_encoded' in payload) and(payload['image_encoded']): tmp_payload = payload.copy() new_image = tmp_payload['image_encoded'][0: 50] tmp_payload['image_encoded'] = f '{new_image}...' else :tmp_payload = payload self.logger.debug(f "URL: {url}, payload:{tmp_payload}, pathname: {pathname}") headers = { 'Authorization': f 'Bearer {self.get_token()}' } if (payload is None) and(pathname is None) and(method in ("post", "path", "put")): raise LogicError(f "json data required for method: {method}") if method == 'get': response = requests.get(url, headers = headers) elif method == 'post': response = requests.post(url, json = payload, headers = headers) elif method == 'patch': response = requests.patch(url, json = payload, headers = headers) elif method == 'put': files = None if pathname: files = { 'image': open(pathname, 'rb') } response = requests.put(url, json = payload, headers = headers, files = files) elif method == 'delete': response = requests.delete(url, headers = headers) else :raise LogicError(f "Unsupported method: {method}") # logger.debug(response.text) if response.status_code not in (200, 201, 204): description = f 'Failed to {method} to {url} ({pathname})' if self.logger: self.logger.critical(description) raise APIError(description, url, payload, response.text) return json.loads(response.text) def get_painchek_api(self, relative_url): return self.call_api(method = 'get', relative_url = relative_url) def put_painchek_api(self, relative_url, payload): return self.call_api(method = "put", relative_url = relative_url, payload = payload) def post_painchek_api(self, relative_url, payload): return self.call_api(method = "post", relative_url = relative_url, payload = payload) def create_or_update_patient(self, payload): response = self.call_api('post', 'integration/patients/', payload) # print(response) return response def create_or_update_site(self, payload): response = self.call_api('post', 'integration/sites/', payload) # print(response) return response def create_or_update_admission(self, payload): response = self.call_api('post', 'integration/admissions/', payload) # print(response) return response def get_patient(self, patient_id): response = self.call_api('get', f 'patients/{patient_id}') # print(response) return response def get_list_from_api(self, relative_url): "" "Calls the PainChek API and returns a list of objects fetched from the url In the case the PainChek returns many pages of data, they are consolidated into a single:return: List of records" "" data = [] page = 0 while True: page += 1 # Form the url to fetch the right page of dataif '?' in relative_url: page_url = f '{relative_url}&' else :page_url = f '{relative_url}?' page_url = f '{page_url}page_size=50&page={page}' if self.logger: self.logger.debug('page URL: ' + page_url) # Get the page of dataresponse = self.call_api(method = 'get', relative_url = page_url) # logger.debug("response: " + str(response)) if 'results' not in response: raise APIError('results not found in response', page_url, None, response) data += response['results'] # If there 's no next page, we' re doneif not response['next']: break # Need to draw the line somewhereif page > 100: raise LogicError(f 'Too many pages of results returned. Increase number of allowable pages: {page}') # print("data: " + str(data)) return data @staticmethoddef add_to_string(s, value, prefix = None, quotes = None): "" "Appends value to s, adding a ', ' before the value if string already contains data:param s: string to add the value to:param value: value to add:param prefix: optional prefix to add before value. Only added if value is not None/empty string:param quotes: optional quotes to place around the value:return: s, with value appended" "" # logger.info(f 'Adding "{value}" to "{s}"') # check value and bail if not data is suppliedif not value: value = '' else :value = str(value).strip() if value == '': # logger.info(f 'Returned s: "{s}"') return s # add quotesif quotes: value = f '{quotes}{value}{quotes}' # add prefixif prefix: value = f '{prefix}{value}' # Check and cleanup sif not s: s = '' else :s = str(s).strip() if s == '': # logger.info(f 'Returned value: "{value}"') return value else :# logger.info(f 'Added "{value}" to "{s}"') return f '{s}, {value}' def get_assessment_note(self, assessment): "" "Given a PainChek assessment, returns a progress note. The note has three sections:* Overview - A summary of the pain level detected* Descriptors - A list of each descriptor noted* Comments - A list of the comments entered by the user (it any):param assessment: A PainChek assessment as returned by the PainChek API:return: Assessment note" "" domains = {} domains['face'] = { 'brow_lowering': 'Brow lowering', 'cheek_raising': 'Cheek raising', 'tightening_eyelids': 'Tightening of eyelids', 'wrinkling_nose': 'Wrinkling of nose', 'raising_upper_lip': 'Raising of upper lip', 'pulling_corner_lip': 'Pulling at corner lip', 'mouth_stretch': 'Horizontal mouth stretch', 'parting_lips': 'Parting lips', 'closing_eyes': 'Closing eyes' } domains['voice'] = { 'noisy_sounds': 'Noisy pain sounds', 'requesting_help': 'Requesting help repeatedly', 'groaning': 'Groaning', 'moaning': 'Moaning', 'crying': 'Crying', 'screaming': 'Screaming', 'loudtalk': 'Loudtalk', 'howling': 'Howling', 'sighing': 'Sighing' } domains['movement'] = { 'altered_random_movement': 'Altered or random movement', 'restlessness': 'Restlessness', 'freezing': 'Freezing', 'guarding_touching': 'Guarding/touching body part', 'moving_away': 'Moving away', 'abnormal_movement': 'Abnormal sitting/standing/walking', 'pacing_wandering': 'Pacing/wandering' } domains['behaviour'] = { 'introvert': 'Introvert or altered behaviour', 'verbally_offensive': 'Verbally offensive', 'aggressive': 'Aggressive', 'fear_or_dislike': 'Fear or extreme dislike', 'inappropriate_behaviour': 'Inappropriate behaviour', 'confused': 'Confused', 'distressed': 'Distressed' } domains['activity'] = { 'resisting_care': 'Resisting care', 'prolonged_resting': 'Prolonged resting', 'altered_sleep': 'Altered sleep cycle', 'altered_routine': 'Altered routines' } domains['body'] = { 'profuse_sweating': 'Profuse sweating', 'pale_flushed': 'Pale/flushed (red-faced)', 'feverish_cold': 'Feverish/cold', 'rapid_breathing': 'Rapid breathing', 'painful_injuries': 'Painful injuries', 'painful_conditions': 'Painful medical conditions' } # Build up list of descriptors and notesdescriptor_note = Nonecomment_note = None # Get the relevant assessment datadata = get_mandatory_value(assessment, 'data') if self.logger: self.logger.debug(f 'data: {data}') # Fetch the main domain fieldsassessment = get_mandatory_value(data, 'assessment') if self.logger: self.logger.debug(f 'assessment: {assessment}') pain_score = get_mandatory_value(assessment, 'pain_score') pain_level = get_mandatory_value(assessment, 'pain_level') # Iterate though each of the 6 PainChek domainsfor domain in domains: if self.logger: self.logger.debug(f 'Domain: {domain}') domain_data = get_mandatory_value(data, domain) # Fetch and return the comment, if anycomment = get_optional_str(domain_data, 'comment') comment_note = self.add_to_string(comment_note, comment, f '{domain}: ', '"') # Iterate though each of the descriptors in the domainfor key, value in domains[domain].items(): if self.logger: self.logger.debug(f 'key: {key}, descriptor: {value}') assessment_descriptors = get_mandatory_value(domain_data, 'descriptors') # print(f 'assessment_descriptors: {type(assessment_descriptors)}') # print(f 'key: {type(key)}') descriptor = get_mandatory_value(assessment_descriptors, key) if descriptor: if self.logger: self.logger.debug(f 'Descriptor was set: {domain}, {key}, {value}') descriptor_note = self.add_to_string(descriptor_note, value) else :if self.logger: self.logger.debug(f 'Descriptor was not set: {domain}, {key}, {value}') note = f 'A PainChek assessment was performed and indicated a pain level '\ f 'of {pain_level} (score: {pain_score}).\n\n' if descriptor_note: note += f 'The following descriptors were set: {descriptor_note}.' else :note += 'No descriptors were set.' if comment_note: note += '\n\nComments: ' + comment_note if self.logger: self.logger.debug(f 'Note: {note}') return note
"" "Module containing help functions and classes. No core PainChek integration logic is contained in this mode, but PainChek integration scriptswill rely on these functions (c) PainChek Ltd 2019" "" import loggingfrom datetime import datetimeimport jsonimport os # Helper functions def get_optional_value(model, key, default = None, logger = None): "" "Extract the value for " key " from " model ":param model: dictionary to extract the value from:param key: key to extract from the dictionary:param default: default value to return if key is not found:param logger: logging object:return: the value for the key" "" if key in model: value = model[key] if isinstance(value, str): value = value.strip() if value == "": value = None else :value = None if value is None: value = default if logger: logger.debug(f "key: {key}, value: {value}") return value def get_optional_str(model, key, default = None, logger = None): "" "Extract the value for " key " from " model " as a string:param model: dictionary to extract the value from:param key: key to extract from the dictionary:param default: default value to return if key is not found:param logger: logging object:return: the value for the key as a string (or None)" "" value = get_optional_value(model, key, default, logger) if value: value = str(value) return value def get_mandatory_value(model, key, logger = None): "" "Extract the value for " key " from " model ".Raises an exception if key is not found in the model.:param model: dictionary to extract the value from:param key: key to extract from the dictionary:param logger: logging object:return: the value for the key" "" value = get_optional_value(model, key, logger) if value is None: raise LogicError(f 'key "{key}" is missing from model "{model}"') return value def get_mandatory_str(model, key, logger = None): "" "Extract the value for " key " from " model " as a string.Raises an exception if key is not found in the model.:param model: dictionary to extract the value from:param key: key to extract from the dictionary:param logger: logging object:return: the value for the key as a string (or None)" "" value = get_mandatory_value(model, key, logger) if value is not None: value = str(value) return value class Config: "" "Class to manage a read-only configuration file" "" def __init__(self, path, filename = 'config.json', logger = None): "" "Initialise an instance of a Config object:param path: File system path to config file:param filename: name of config file:param logger: Optional logger object for writing debug messages" "" self.data = {} self.pathname = f '{path}/{filename}' self.logger = loggerself.load() def clear(self): "" "Clears the content of the config:return: Nothing" "" if self.logger: self.logger.debug('config: cleared') self.data = {} def load(self): "" "Load the config from file:return: None" "" if self.logger: self.logger.debug('config: loading') if os.path.exists(self.pathname): with open(self.pathname, 'r') as config_file: try: self.data = json.loads(config_file.read()) except json.decoder.JSONDecodeError: if self.logger: self.logger.debug('config: load failed, file contents ignored and will be replaced') if self.logger: self.logger.debug(f 'Config file data: {self.data}') # Ensure cache data object is properly formattedif self.data is None: self.clear() def get(self, key, sub_key = None, default = None, mandatory = False): "" "Fetches the item 'key' from the config file.We can use 'sub_key' to indicate 'key' is a dictionary and we are retrieving a value from that dictionary:param key: the type of item to return from the config:param sub_key: id of item to return from the 'key' dictionary:param default: default value for the item:param mandatory: indicates if the key (or sub_key) are expected to exist or not:return: the value from the config, or None" "" value = None if key in self.data: if sub_key is None: value = self.data[key] if self.logger: self.logger.debug(f "config: getting [{key}]: {value}") else :if sub_key in self.data[key]: value = self.data[key][sub_key] if self.logger: self.logger.debug(f "config: getting [{key}][{sub_key}]: {value}") else :if mandatory: raise LogicError(f '"{key}[{sub_key}]" is missing from data "{self.data}"') if self.logger: self.logger.debug(f "config: missing [{key}][{sub_key}]") else :if mandatory: raise LogicError(f '"{key}" is missing from data "{self.data}"') if self.logger: self.logger.debug(f "config: missing {key}") # Assign the default value if nothing foundif value is None: value = default return value def get_integer(self, key, sub_key = None, default = None, mandatory = False): "" "Returns the cached value for the assessment update stamp.i.e. the update stamp value of the last assessment processed by this module:return: Assessment last update stamp value" "" value = self.get(key, sub_key, default, mandatory) try: return int(value) except TypeError: return 0 def is_stale(self, key, sub_key = None, current = None): "" "Determines if the value for key (or key['sub_key']) is stale - i.e. if the stored value is different tothe supplied value:param key: the type of item to return from the config:param sub_key: id of item to return from the 'key' dictionary:param current: the current value for the item" "" return self.get(key, sub_key) != current class Cache(Config): "" "Class to manage a read/write cache of data persisted between runs Can cache single values, or a dictionary of values if a key is specified Extends the read-only Config class" "" def __init__(self, path, filename = 'cache.json', logger = None): "" "Initialise an instance of a Cache object:param path: File system path to cache file:param logger: Optional logger object for writing debug messages" "" super().__init__(path, filename, logger) def persist(self): "" "Writes the cache to disk:return: Nothing" "" if self.logger: self.logger.debug("cache: persisting") with open(self.pathname, 'w') as cache_file: json.dump(self.data, cache_file, sort_keys = True, indent = 2) def set(self, key, sub_key = None, value = None, persist = False): "" "Saves key to the cache.We can use 'sub_key' to indicate 'key' is a dictionary and we are saving a value in that dictionary.Creates the items in the cache if they do not exist:param key: the type of item to write to the cache:param sub_key: id of specific item to save to the 'key' dictionary:param value: the items' value:param persist: Indicates if Cache should be written to disk after it is updated:return: None" "" if value is not None: value = str(value) if sub_key is None: if self.logger: self.logger.debug(f 'cache: setting [{key}]: {value}') self.data[key] = value else :# print(self.data) if key not in self.data: if self.logger: self.logger.debug(f 'cache: created new cache: {key}') self.data[key] = {} # print(cache) if self.logger: self.logger.debug(f 'cache: setting [{key}][{sub_key}]: {value}') self.data[key][sub_key] = value if persist: self.persist() # Class for Capturing API exceptionsclass APIError(Exception): "" "Class for Capturing API exceptions" "" def __init__(self, description, url, payload, response): "" "Initialise an APIError object:param description: An description of the error:param url: The URL invoked:param payload: The data sent to the API:param response: The response received from the API" "" self.description = descriptionself.url = urlself.payload = payloadself.response = response class LogicError(Exception): "" "Class used to raise general logic errors" "" def __init__(self, description): "" "" Initialise an LogicError object: param description: An description of the error: param description: "" "self.description = description class Logger:" "" Class for logging issues to both the console and to file "" "def __init__(self, path, log_name, console_level=logging.INFO, file_level=logging.DEBUG):" "" Initialise a Logger object Logging levels are used as follows: -DEBUG - Verbose logging - INFO - Calls to APIs are recorded - WARNING - Import messages, such as the resident being processed - CRITICAL - Failures: param path: Folder for log file: param log_name: base name of log file: param console_level: console logging level: param file_level: file logging level "" "self.logger = logging.getLogger()self.logger.setLevel(logging.DEBUG) # create file handler which logs even debug messageslog_pathname = f'{path}/{log_name}.{datetime.today().strftime(" % Y. % m ")}.log'fh = logging.FileHandler(log_pathname)fh.setLevel(file_level) # create console handler with a higher log levelch = logging.StreamHandler()ch.setLevel(console_level) # create formatter and add it to the handlersformatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')ch.setFormatter(formatter)fh.setFormatter(formatter) # remove current handlersfor h in list(self.logger.handlers):self.logger.removeHandler(h) # add the handlers to loggerself.logger.addHandler(ch)self.logger.addHandler(fh) print(f'Logging to {log_pathname} (console: {console_level}, file: {file_level})') def debug(self, msg):self.logger.debug(msg) def info(self, msg):self.logger.info(msg) def warning(self, msg):self.logger.warning(msg) def critical(self, msg):self.logger.critical(msg) class Counts:" "" Simple class to maintain a dictionary of counts, with a key, hard coded counter of "total" that isupdated or returned unless a specific counter is asked for "" "def __init__(self, logger=None):" "" Initialise an instance of a Counts object: param logger: Optional logger for writing debug messages "" "self.data = {}self.data['total'] = 0self.logger = logger def increment(self, counter='total'):" "" Increments a count, defaulting to updating the 'total' counter is one is not supplied: param counter: specific counter to increment: return :Current count "" "if counter not in self.data:self.data[counter] = 0 self.data[counter] += 1 if self.logger:self.logger.debug(f" Count { counter }: { self.data[counter] } ") return self.data[counter] def get(self, counter='total'):" "" Return the current value for a counter, defaulting to updating the 'total' counter is one is not supplied: param counter: counter: specific counter to return :return :current value for the counter "" "if counter not in self.data:return 0else:return self.data[counter] def display(self):" "" Return all current counters and their values as CSV string: return :as above "" "s = '' for counter, count in self.data.items(): if s != '':s += ', ' s += f'{counter}: {count}' return s def process_integration_response(self, name, response):" "" Processes a response from a call to a PainChek integration API endpoint and updates the appropriate counter: param name: Name of counter: param response: Response from API: return :"" " if 'status' not in response:raise LogicError(f'Status not found in response: {response}') status = response['status'] if status == 'record_created':msg = f'{name} created' elif status == 'record_updated':msg = f'{name} updated' elif status == 'no_change':msg = f'{name} unchanged' else:raise LogicError(f'Unexpected response: {status}') if self.logger:self.logger.info(f' {msg}') self.increment(msg)