Source code for medacy.pipeline_components.metamap.metamap
"""
A utility class to Metamap medical text documents.
Metamap a file and utilize it the output or manipulate stored metamap output
"""
import subprocess
import xmltodict
import json
import tempfile, os, warnings
from ...tools.unicode_to_ascii import UNICODE_TO_ASCII
[docs]class MetaMap:
def __init__(self, metamap_path=None, cache_output = False, cache_directory = None, convert_ascii=True):
"""
A python wrapper for metamap that includes built in caching of metamap output.
:param cache_output: Whether to cache output as it run through metamap, will by default store in a
temp directory tmp/medacy*/
:param cache_directory: alternatively, specify a directory to cache metamapped files to
:param metamap_path: The location of the metamap executable.
(ex. /home/share/programs/metamap/2016/public_mm/bin/metamap)
"""
if metamap_path is None:
raise ValueError("metamap_path is not set. Insure Metamap is running and a path to the metamap executable is being given (ex. metamap/2016/public_mm/bin/metamap)")
if cache_output:
if cache_directory is None: #set cache directory to tmp directory, creating if not exists
tmp = tempfile.gettempdir()
files = [filename for filename in os.listdir(tmp) if filename.startswith("medacy")]
if files:
cache_directory = os.path.join(tmp,files[0])
else:
tmp_dir = tempfile.mkdtemp(prefix="medacy")
cache_directory = os.path.join(tmp, tmp_dir)
self.cache_directory = cache_directory
self.metamap_path = metamap_path
self.convert_ascii = convert_ascii
[docs] def map_file(self, file_to_map, max_prune_depth=10):
"""
Maps a given document from a file_path and returns a formatted dict
:param file_to_map: the path of the file that will be metamapped
:param max_prune_depth: set to larger for better results. See metamap specs about pruning depth.
:return:
"""
self.recent_file = file_to_map
if self.cache_directory is not None: #look up file if exists, otherwise continue metamapping
cached_file_path = os.path.join(
self.cache_directory,
os.path.splitext(os.path.basename(file_to_map))[0] + ".metamapped"
)
if os.path.exists(cached_file_path):
print(cached_file_path)
return self.load(cached_file_path)
try:
with open(file_to_map, 'r') as file:
contents = file.read()
except:
raise FileNotFoundError("Error opening file while attempting to map: %s" % file_to_map)
metamap_dict = self._run_metamap('--XMLf --blanklines 0 --silent --prune %i' % max_prune_depth, contents)
if self.cache_directory is not None:
with open(cached_file_path, 'w') as mapped_file:
try:
#print("Writing to", os.path.join(self.cache_directory, file_name))
mapped_file.write(json.dumps(metamap_dict))
except Exception as e:
mapped_file.write(str(e))
return metamap_dict
[docs] def map_text(self, text, max_prune_depth=10):
#TODO add caching here as in map_file
#An example of this cachine is available in the map_file
self.metamap_dict = self._run_metamap('--XMLf --blanklines 0 --silent --prune %i' % max_prune_depth, text)
return self.metamap_dict
[docs] def map_corpus(self, documents, directory=None, n_job=-1):
"""
Metamaps a large amount of files quickly by forking processes and utilizing multiple cores
:param documents: an array of documents to map
:param directory: location to map all files
:param n_job: number of cores to utilize at once while mapping - this may use a large amount of memory
:return:
"""
raise NotImplementedError() #TODO implement utilizing code for parallel process mapper from n2c2
[docs] def _run_metamap(self, args, document):
"""
Runs metamap through bash and feeds in appropriate arguments
:param args: arguments to feed into metamap
:param document: the raw text to be metamapped
:return:
"""
if self.convert_ascii:
document, ascii_diff = self._convert_to_ascii(document)
bashCommand = 'bash %s %s' % (self.metamap_path, args)
process = subprocess.Popen(bashCommand, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate(input=bytes(document, 'UTF-8'))
output = str(output.decode('utf-8'))
xml = ""
for line in output.split("\n")[1:]:
if 'DOCTYPE' not in line and 'xml' not in line:
xml += line+'\n'
xml = "<metamap>\n" + xml + "</metamap>" # surround in single root tag - hacky.
xml = '<?xml version="1.0" encoding="UTF-8"?>\n<!DOCTYPE MMOs PUBLIC "-//NLM//DTD MetaMap Machine Output//EN" "http://metamap.nlm.nih.gov/DTD/MMOtoXML_v5.dtd">\n'+xml
if output is None:
raise Exception("An error occured while using metamap: %s" % error)
metamap_dict = xmltodict.parse(xml)
if self.convert_ascii:
document, metamap_dict = self._restore_from_ascii(document, ascii_diff, metamap_dict)
return metamap_dict
[docs] def _item_generator(self, json_input, lookup_key):
if isinstance(json_input, dict):
for k, v in json_input.items():
if k == lookup_key:
yield v
else:
yield from self._item_generator(v, lookup_key)
elif isinstance(json_input, list):
for item in json_input:
yield from self._item_generator(item, lookup_key)
[docs] def extract_mapped_terms(self, metamap_dict):
"""
Extracts an array of term dictionaries from metamap_dict
:param metamap_dict: A dictionary containing the metamap output
:return: an array of mapped_terms
"""
if metamap_dict['metamap'] is None:
warnings.warn("Metamap output is none for a file in the pipeline. Exiting.")
return
utterances = metamap_dict['metamap']['MMOs']['MMO']['Utterances']['Utterance']
mapped_terms = []
mapped_terms = list(self._item_generator(metamap_dict, 'Candidate'))
all_terms = []
for term in mapped_terms:
if isinstance(term, dict):
all_terms.append(term)
if isinstance(term, list):
all_terms = all_terms + term
return all_terms
[docs] def mapped_terms_to_spacy_ann(self, mapped_terms, entity_label=None):
"""
Transforms an array of mapped_terms in a spacy annotation object. Label for each annotation
defaults to first semantic type in semantic_type array
:param mapped_terms: an array of mapped terms
:param label: the label to assign to each annotation, defaults to first semantic type of mapped_term
:return: a annotation formatted to spacy's specifications
"""
annotations = {'entities': {}}
count = 0
for term in mapped_terms:
for span in self.get_span_by_term(term): #if a single entity corresonds to a disjunct span
entity_start, entity_end = span
if entity_label is None:
annotations['entities'][count] = (entity_start, entity_end, self.get_semantic_types_by_term(term)[0])
else:
annotations['entities'][count] = (entity_start, entity_end, entity_label)
count+=1
return annotations
[docs] def get_term_by_semantic_type(self, mapped_terms, include=[], exclude=None):
"""
Returns Metamapped utterances that all contain a given set of semantic types found in include
:param mapped_terms: An array of candidate dictionaries
:return: the dictionaries that contain a term with all the semantic types in semantic_types
"""
if exclude is not None:
intersection = set(include).intersection(exclude)
if intersection:
raise Exception("Include and exclude overlap with the following semantic types: "+", ".join(intersection))
matches = []
for term in mapped_terms:
found_types = []
if int(term['SemTypes']['@Count']) == 0:
continue
if int(term['SemTypes']['@Count']) == 1:
found_types.append(term['SemTypes']['SemType'])
if int(term['SemTypes']['@Count']) > 1:
found_types = term['SemTypes']['SemType']
if exclude is not None and set(exclude).issubset(set(found_types)):
continue
if set(include).issubset(set(found_types)):
matches.append(term)
return matches
[docs] def get_span_by_term(self,term):
"""
Takes a given utterance dictionary (term) and extracts out the character indices of the utterance
:param term: The full dictionary corresponding to a metamap term
:return: the span of the referenced term in the document
"""
if int(term['ConceptPIs']['@Count']) == 1:
start = int(term['ConceptPIs']['ConceptPI']['StartPos'])
length = int(term['ConceptPIs']['ConceptPI']['Length'])
return [(start, start+length)]
spans = []
for span in term['ConceptPIs']['ConceptPI']:
start = int(span['StartPos'])
length = int(span['Length'])
spans.append((start, start+length))
return spans
[docs] def get_semantic_types_by_term(self, term):
"""
Returns an array of the semantic types of a given term
:param term:
:return:
"""
if int(term['SemTypes']['@Count']) == 1:
return [term['SemTypes']['SemType']]
return term['SemTypes']['SemType']
def __call__(self, file_path):
"""
Metamaps a file and returns an array of mapped terms from the file
:param file_path:
:return: array of mapped_terms
"""
metamap_dict = self.map_file(file_path)
return self.extract_mapped_terms(metamap_dict)
[docs] def _convert_to_ascii(self, text):
"""Takes in a text string and converts it to ASCII,
keeping track of each character change
The changes are recorded in a list of objects, each object
detailing the original non-ASCII character and the starting
index and length of the replacement in the new string (keys
``original``, ``start``, and ``length``, respectively).
Args:
text (string): The text to be converted
Returns:
tuple: tuple containing:
**text** (*string*): The converted text
**diff** (*list*): Record of all ASCII conversions
"""
diff = list()
offset = 0
for i, char in enumerate(text):
if char in UNICODE_TO_ASCII and UNICODE_TO_ASCII[char] is not char:
ascii = UNICODE_TO_ASCII[char]
text = text[:i+offset] + ascii + text[i+1+offset:]
diff.append({
'start': i+offset,
'length': len(ascii),
'original': char
})
offset += len(ascii) - len(char)
return text, diff
[docs] def _restore_from_ascii(self, text, diff, metamap_dict):
"""Takes in non-ascii text and the list of changes made to it from the `convert()` function,
as well as a dictionary of metamap taggings, converts the text back to its original state
and updates the character spans in the metamap dict to match
Arguments:
text (string): Output of ``_convert_to_ascii()``
diff (list): Output of ``_convert_to_ascii()``
metamap_dict (dict): Dictionary of metamap information obtained from ``text``
Returns:
tuple: tuple containing:
**text** (*string*): The input with all of the changes listed in ``diff`` reversed
**metamap_dict** (*dict*): The input with all of its character spans updated to reflect the changes to ``text``
"""
offset = 0
for conv in diff: # Go through each recorded change to undo it & update metamap character spans accordingly
conv_start = conv['start'] + offset
conv_end = conv_start + conv['length']-1 # Ending index of converted span, INCLUSIVE
# Undo the change to the text (restore ascii characters)
text = text[:conv_start] + conv['original'] + text[conv_end+1:]
delta = len(conv['original']) - conv['length']
offset += delta
# Check each metamap entry and update its character spans to reflect this change
for mapping in metamap_dict['metamap']['MMOs']['MMO']['Utterances']['Utterance']['Phrases']['Phrase']['Mappings']['Mapping']:
for candidate in mapping['MappingCandidates']['Candidate']:
match_start = int(candidate['ConceptPIs']['ConceptPI']['StartPos'])
match_length = int(candidate['ConceptPIs']['ConceptPI']['Length'])
match_end = match_start + match_length-1
if match_start == conv_start and match_end == conv_end: # If match is equal to conversion (a [conversion] and some text)
# print("Perfect match")
match_length += delta
elif match_start < conv_start and match_end < conv_end: # If match intersects conversion on left ([a con]version and some text)
# print("Left intersect")
match_length += delta + conv_start
elif conv_start < match_start and conv_end < match_end: # If match intersects conversion on right (a conver[sion and som]e text)
# print("Right intersect ")
if conv_end + delta < match_start:
print(match_end, conv_end)
match_start = conv_end + delta + 1
match_length = match_end - conv_end
else:
match_length += delta
elif conv_end < match_start: # If match is totally to the right of the conversion (a conversion and a [match])
# print("Full right")
match_start += delta
else: # If match is totally to right of conversion, no action needed (a [match] and a conversion)
# print("Full left")
pass
# Update metamap entry with new indices
candidate['MatchedWords']['MatchedWord'] = text[match_start:match_end+1]
candidate['ConceptPIs']['ConceptPI']['StartPos'] = str(match_start)
candidate['ConceptPIs']['ConceptPI']['Length'] = str(match_length)
return text, metamap_dict