import os
import subprocess
import random
import time
import logging
import requests
from cellmaps_hierarchyeval.exceptions import CellmapshierarchyevalError
logger = logging.getLogger(__name__)
[docs]
class Hierarchy(object):
"""
Represents an assembly of proteins in a Hierarchy
"""
def __init__(self, hierarchy=None, interactome=None,
ndex_username=None, ndex_password=None):
"""
Constructor
:param hierarchy: Hierarchy
:type hierarchy: :py:class:`~ndex2.cx2.CX2Network`
:param interactome: Parent interactome
:type interactome: :py:class:`~ndex2.cx2.CX2Network`
:param ndex_username: NDEx username to use when connecting
to NDEx to obtain interactomes from hierarchy
:type ndex_username: str
:param ndex_password: NDEx password to use when connecting
to NDEx to obtain interactomes from hierarchy
:type ndex_password: str
"""
self._hierarchy = hierarchy
self._interactome = interactome
self._ndex_username = ndex_username
self._ndex_password = ndex_password
[docs]
def get_next_assembly(self):
"""
Generator that gets next assembly in hierarchy
:return:
:rtype: :py:class:`~cellmaps_hierarchyeval.assembly.Assembly`
"""
raise NotImplementedError('not done yet')
# will iterate across hierarchy CX2Network and create an assembly
# object which contains list of gene names and needed info to link
# back to this hierarchy node
# for node_id, node_data in self._hierarchy.get_nodes().items():
#yield X
[docs]
class Assembly(object):
"""
Represents assembly in a hierarchy
"""
def __init__(self, node_id=None, gene_names=None):
"""
Constructor
:param node_id: Id of hierarchy node
:type node_id: int
:param gene_names: list of gene names
:type gene_names: list
"""
self._node_id = node_id
self._gene_names = gene_names
[docs]
def get_assembly_name(self):
"""
Gets name of assembly
:return:
"""
return None
[docs]
def set_assembly_name(self):
"""
Sets assembly name
:return:
"""
pass
[docs]
def get_node_id(self):
"""
Gets node id
:return:
"""
return self._node_id
[docs]
def get_gene_names(self):
"""
Gets gene names
:return:
"""
return self._gene_names
[docs]
class GenesetAgent(object):
"""
Represents a Gene set analysis agent
whose job is to consume a list of gene names
and return a term name, confidence score, and
analysis
"""
GENE_SET_TOKEN = 'GENE_SET'
def __init__(self, attribute_name_prefix=None):
"""
Constructor
"""
self._attribute_name_prefix = attribute_name_prefix
[docs]
def annotate_gene_set(self, gene_names=None):
"""
Should be implemented by subclasses
:param gene_names: gene symbols
:type gene_names:
:return:
:rtype: tuple
"""
raise NotImplementedError('Subclasses should implement')
[docs]
def get_attribute_name_prefix(self):
"""
Gets suggested attribute name prefix
:return:
:rtype: str
"""
return self._attribute_name_prefix
[docs]
class FakeGeneSetAgent(GenesetAgent):
"""
Fake geneset agent that generates random numbers for values
"""
def __init__(self, random_seed=None, attribute_name_prefix=None):
"""
Constructor
:param random_seed:
"""
super().__init__(attribute_name_prefix=attribute_name_prefix)
random.seed(random_seed)
if self._attribute_name_prefix is None:
self._attribute_name_prefix = 'fake_' + str(random.random()) + '::'
[docs]
def annotate_gene_set(self, gene_names=None):
"""
:param gene_names:
:return:
"""
return 'Fake ' +\
str(random.randint(0, 1000)), random.random(), 'Fake full text' +\
str(random.randint(0, 1000))
[docs]
class OllamaCommandLineGeneSetAgent(GenesetAgent):
"""
Runs
"""
DEFAULT_PROMPT_FILE = 'default_prompt.txt'
def __init__(self, prompt=None, model='llama2:latest',
ollama_binary='/usr/local/bin/ollama',
attribute_name_prefix=None):
"""
Constructor
:param prompt: Prompt to pass to LLM put @@GENE_SET@@
into prompt to denote where gene set
should be inserted. If ``None`` default
internal prompt is used
:type prompt: str
"""
super().__init__(attribute_name_prefix=attribute_name_prefix)
if prompt is None:
logger.debug('Using default prompt')
self._prompt = OllamaCommandLineGeneSetAgent.get_default_prompt()
else:
self._prompt = prompt
self._model = model
self._ollama_binary = ollama_binary
if self._attribute_name_prefix is None:
self._attribute_name_prefix = 'ollama_' + str(self._model) + '::'
[docs]
def get_prompt(self):
"""
Gets prompt used by this agent
:return:
"""
return self._prompt
[docs]
@staticmethod
def get_default_prompt():
"""
Gets default prompt stored with this package
:return:
:rtype: str
"""
with open(os.path.join(os.path.dirname(__file__),
OllamaCommandLineGeneSetAgent.DEFAULT_PROMPT_FILE),
'r') as f:
return f.read()
def _run_cmd(self, cmd, cwd=None, timeout=360):
"""
Runs command as a command line process
:param cmd: command to run
:type cmd: list
:param cwd: current working directory
:type cwd: str
:param timeout: timeout in seconds before killing process
:type timeout: int or float
:raises CellMapsProvenanceError: If **raise_on_error** passed
into constructor is ``True`` and
process times out before completing
:return: (return code, standard out, standard error)
:rtype: tuple
"""
logger.debug('Running command under ' + str(cwd) +
' path: ' + str(cmd))
p = subprocess.Popen(cmd, cwd=cwd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
out, err = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
logger.warning('Timeout reached. Killing process')
p.kill()
out, err = p.communicate()
raise CellmapshierarchyevalError('Process timed out. '
'exit code: ' +
str(p.returncode) +
' stdout: ' + str(out) +
' stderr: ' + str(err))
# Removing ending new line if value is not None
if out is not None:
out = out.rstrip()
return p.returncode, out, err
def _update_prompt_with_gene_set(self, gene_names=None):
"""
Updates prompt inserting gene names
:param gene_names:
:type gene_names: list
:return: prompt with gene names inserted
:rtype: str
"""
return self._prompt.format(GENE_SET=','.join(gene_names))
[docs]
def annotate_gene_set(self, gene_names=None):
"""
Using prompt passed in via constructor, this call
invokes the LLM specified by **model** set in constructor
:param gene_names: Genes to analyze
:type gene_names: list
:raises CellmapshierarchyevalError: If LLM failed to run
:return: ('process name (score)', full output from LLM)
:rtype: tuple
"""
updated_prompt = self._update_prompt_with_gene_set(gene_names=gene_names)
e_code, out, err = self._run_cmd([self._ollama_binary, 'run',
self._model,
updated_prompt])
if e_code != 0:
raise CellmapshierarchyevalError('Received non zero exit code + ' +
str(e_code) +
' calling ' +
str(self._ollama_binary) +
'\nstdout: ' + str(out) +
'stderr\n' + str(err))
process_name = None
confidence = None
if out is not None:
for line in out.split('\n'):
if line.startswith('Process: '):
process_name = line[line.index(':')+2:]
if line.startswith('Confidence Score: '):
confidence = line[line.index(':')+2:]
else:
logger.info('LLM output is None')
return process_name, confidence, out
[docs]
class OllamaRestServiceGenesetAgent(GenesetAgent):
"""
Calls LLM via REST service. Derived from ServerModel_LLM in
https://github.com/idekerlab/agent_evaluation llm.py
"""
def __init__(self, prompt=None, model='llama2:latest',
username=None, password=None,
rest_url=None, temperature=0, max_tokens=1000, seed=42,
attribute_name_prefix=None,
max_retries=5, timeout=120, retry_wait=10):
"""
Constructor
:param prompt: Prompt to send to LLM
:type prompt: str
:param model: Name of model
:type model: str
:param username: Username to send via Basic Auth to service
:type username: str
:param password: Password to send via Basic Auth to service
:type password: str
:param rest_url: URL for service, should end with api/generate
:type rest_url: str
:param temperature:
:param max_tokens:
:param seed:
:param attribute_name_prefix:
:param max_retries: Number of times to retry failed query
:type max_retries: int
:param timeout: Time in seconds to wait for response from service
:type timeout: int or float
:param retry_wait: Time in seconds to wait between retries for failed
query
:type retry_wait: int or float
"""
super().__init__(attribute_name_prefix=attribute_name_prefix)
if prompt is None:
logger.debug('Using default prompt')
self._prompt = OllamaCommandLineGeneSetAgent.get_default_prompt()
else:
self._prompt = prompt
self._model = model
self._username = username
self._password = password
self._temperature = temperature
self._seed = seed
self._max_tokens = max_tokens
self._rest_url = rest_url
self._max_retries = max_retries
self._timeout = timeout
self._retry_wait = retry_wait
if self._attribute_name_prefix is None:
self._attribute_name_prefix = 'ollama_' + str(self._model) + '::'
[docs]
def get_prompt(self):
"""
Gets prompt used by this agent
:return:
"""
return self._prompt
def _update_prompt_with_gene_set(self, gene_names=None):
"""
Updates prompt inserting gene names
:param gene_names:
:type gene_names: list
:return: prompt with gene names inserted
:rtype: str
"""
return self._prompt.format(GENE_SET=','.join(gene_names))
def _get_query(self, gene_names=None):
"""
Gets query for rest service
:return:
:rtype: dict
"""
updated_prompt = self._update_prompt_with_gene_set(gene_names=gene_names)
query = {
"model": self._model,
"prompt": updated_prompt,
"stream": False,
"options": {
"seed": self._seed,
"temperature": self._temperature,
"num_predict": self._max_tokens
}
}
return query
def _get_auth_creds(self):
"""
If user and password are set in constructor
return them as a tuple otherwise just return
None
:return: (user as str, password as str) or None
:rtype: tuple
"""
if self._username is not None or self._password is not None:
return self._username, self._password
return None
def _query_service(self, query=None):
"""
Query the service
:param query:
:type query: dict
:return: (response from LLM as str, error message as str or None)
:rtype: tuple
"""
retries = 0
backoff_time = self._retry_wait
auth_creds = self._get_auth_creds()
while retries < self._max_retries:
try:
response = requests.post(self._rest_url, json=query,
timeout=self._timeout,
auth=auth_creds)
# Check if the request was successful
if response.status_code == 200:
# return the response
return response.json()['response'], None
elif response.status_code in [500, 502, 503, 504]:
logger.info(response.text)
logger.error('Encountering server issue ' + str(response.status_code) +
'. Retrying in ' + str(backoff_time) + ' seconds')
time.sleep(self._retry_wait)
retries += 1
backoff_time *= 2
else:
logger.info(response.text)
error_message = 'The request failed with status code: ' + str(response.status_code)
logger.error(error_message)
return None, error_message
except requests.exceptions.RequestException as e:
logger.error(response.text)
logger.error('status code: ' + str(response.status_code))
logger.error('The request failed with an exception: ' + str(e) +
' Retrying in ' + str(backoff_time) + ' seconds')
time.sleep(backoff_time)
retries += 1
backoff_time *= 2 # Double the backoff time for the next retry
except Exception as e:
logger.error('An unexpected error occurred: ' + str(e))
return None, str(e)
return None, "Error: Max retries exceeded, last response error was: " + str(response.status_code)
[docs]
def annotate_gene_set(self, gene_names=None):
"""
Using prompt passed in via constructor, this call
invokes the LLM specified by **model** set in constructor
:param gene_names: Genes to analyze
:type gene_names: list
:raises CellmapshierarchyevalError: If LLM failed to run
:return: ('process name (score)', full output from LLM)
:rtype: tuple
"""
query = self._get_query(gene_names=gene_names)
out, err_mesage = self._query_service(query=query)
if err_mesage is not None:
raise CellmapshierarchyevalError('Error running LLM: ' + str(err_mesage))
process_name = None
confidence = None
if out is not None:
for line in out.split('\n'):
if line.startswith('Process: '):
process_name = line[line.index(':')+2:]
if line.startswith('Confidence Score: '):
confidence = line[line.index(':')+2:]
else:
logger.info('LLM output is None')
return process_name, confidence, out