Source code for intermine.webservice

from __future__ import unicode_literals

from xml.dom import minidom
from contextlib import closing

try:
    from urlparse import urlparse
    from UserDict import DictMixin
    from urllib import urlopen
    from urllib import urlencode
except ImportError:
    from urllib.parse import urlparse
    from urllib.parse import urlencode
    from collections import MutableMapping as DictMixin
    from urllib.request import urlopen

try:
    import simplejson as json # Prefer this as it is faster
except ImportError: # pragma: no cover
    try:
        import json
    except ImportError:
        raise ImportError("Could not find any JSON module to import - "
            + "please install simplejson or jsonlib to continue")

# Local intermine imports
from intermine.query import Query, Template
from intermine.model import Model, Attribute, Reference, Collection, Column
from intermine.lists.listmanager import ListManager
from intermine.errors import ServiceError, WebserviceError
from intermine.results import InterMineURLOpener, ResultIterator
from intermine import idresolution
from intermine.decorators import requires_version

"""
Webservice Interaction Routines for InterMine Webservices
=========================================================

Classes for dealing with communication with an InterMine
RESTful webservice.

"""

__author__ = "Alex Kalderimis"
__organization__ = "InterMine"
__license__ = "LGPL"
__contact__ = "dev@intermine.org"

[docs]class Registry(DictMixin): """ A Class representing an InterMine registry. =========================================== Registries are web-services that mines can automatically register themselves with, and thus enable service discovery by clients. SYNOPSIS -------- example:: from intermine.webservice import Registry # Connect to the default registry service # at www.intermine.org/registry registry = Registry() # Find all the available mines: for name, mine in registry.items(): print name, mine.version # Dict-like interface for accessing mines. flymine = registry["flymine"] # The mine object is a Service for gene in flymine.select("Gene.*").results(): process(gene) This class is meant to aid with interoperation between mines by allowing them to discover one-another, and allow users to always have correct connection information. """ MINES_PATH = "/mines.json" def __init__(self, registry_url="http://www.intermine.org/registry"): self.registry_url = registry_url opener = InterMineURLOpener() data = opener.open(registry_url + Registry.MINES_PATH).read() mine_data = json.loads(data) self.__mine_dict = dict(( (mine["name"], mine) for mine in mine_data["mines"])) self.__synonyms = dict(( (name.lower(), name) for name in list(self.__mine_dict.keys()) )) self.__mine_cache = {} def __contains__(self, name): return name.lower() in self.__synonyms def __getitem__(self, name): lc = name.lower() if lc in self.__synonyms: if lc not in self.__mine_cache: self.__mine_cache[lc] = Service(self.__mine_dict[self.__synonyms[lc]]["webServiceRoot"]) return self.__mine_cache[lc] else: raise KeyError("Unknown mine: " + name) def __setitem__(self, name, item): raise NotImplementedError("You cannot add items to a registry") def __delitem__(self, name): raise NotImplementedError("You cannot remove items from a registry") def __len__(self): return len(self.__mine_dict) def __iter__(self): return iter(self.__mine_dict)
[docs] def keys(self): return list(self.__mine_dict.keys())
[docs]def ensure_str(stringlike): if hasattr(stringlike, 'decode'): return stringlike.decode('utf8') else: return str(stringlike)
[docs]class Service(object): """ A class representing connections to different InterMine WebServices =================================================================== The intermine.webservice.Service class is the main interface for the user. It will provide access to queries and templates, as well as doing the background task of fetching the data model, and actually requesting the query results. SYNOPSIS -------- example:: from intermine.webservice import Service service = Service("http://www.flymine.org/query/service") template = service.get_template("Gene_Pathways") for row in template.results(A={"value":"zen"}): do_something_with(row) ... query = service.new_query() query.add_view("Gene.symbol", "Gene.pathway.name") query.add_constraint("Gene", "LOOKUP", "zen") for row in query.results(): do_something_with(row) ... new_list = service.create_list("some/file/with.ids", "Gene") list_on_server = service.get_list("On server") in_both = new_list & list_on_server in_both.name = "Intersection of these lists" for row in in_both: do_something_with(row) ... OVERVIEW -------- The two methods the user will be most concerned with are: - L{Service.new_query}: constructs a new query to query a service with - L{Service.get_template}: gets a template from the service - L{ListManager.create_list}: creates a new list on the service For list management information, see L{ListManager}. TERMINOLOGY ----------- X{Query} is the term for an arbitrarily complex structured request for data from the webservice. The user is responsible for specifying the structure that determines what records are returned, and what information about each record is provided. X{Template} is the term for a predefined "Query", ie: one that has been written and saved on the webservice you will access. The definition of the query is already done, but the user may want to specify the values of the constraints that exist on the template. Templates are accessed by name, and while you can easily introspect templates, it is assumed you know what they do when you use them X{List} is a saved result set containing a set of objects previously identified in the database. Lists can be created and managed using this client library. @see: L{intermine.query} """ QUERY_PATH = '/query/results' LIST_ENRICHMENT_PATH = '/list/enrichment' WIDGETS_PATH = '/widgets' SEARCH_PATH = '/search' QUERY_LIST_UPLOAD_PATH = '/query/tolist' QUERY_LIST_APPEND_PATH = '/query/append/tolist' MODEL_PATH = '/model' TEMPLATES_PATH = '/templates' TEMPLATEQUERY_PATH = '/template/results' LIST_PATH = '/lists' LIST_CREATION_PATH = '/lists' LIST_RENAME_PATH = '/lists/rename' LIST_APPENDING_PATH = '/lists/append' LIST_TAG_PATH = '/list/tags' SAVEDQUERY_PATH = '/savedqueries/xml' VERSION_PATH = '/version/ws' RELEASE_PATH = '/version/release' SCHEME = 'http://' SERVICE_RESOLUTION_PATH = "/check/" IDS_PATH = "/ids" USERS_PATH = "/users" def __init__(self, root, username=None, password=None, token=None, prefetch_depth=1, prefetch_id_only=False): """ Constructor =========== Construct a connection to a webservice:: url = "http://www.flymine.org/query/service" # An unauthenticated connection - access to all public data service = Service(url) # An authenticated connection - access to private and public data service = Service(url, token="ABC123456") @param root: the root url of the webservice (required) @param username: your login name (optional) @param password: your password (required if a username is given) @param token: your API access token(optional - used in preference to username and password) @raise ServiceError: if the version cannot be fetched and parsed @raise ValueError: if a username is supplied, but no password There are two alternative authentication systems supported by InterMine webservices. The first is username and password authentication, which is supported by all webservices. Newer webservices (version 6+) also support API access token authentication, which is the recommended system to use. Token access is more secure as you will never have to transmit your username or password, and the token can be easily changed or disabled without changing your webapp login details. """ o = urlparse(root) if not o.scheme: root = "http://" + root if not root.endswith("/service"): root = root + "/service" self.root = root self.prefetch_depth = prefetch_depth self.prefetch_id_only = prefetch_id_only # Initialize empty cached data. self._templates = None self._model = None self._version = None self._release = None self._widgets = None self._list_manager = ListManager(self) self.__missing_method_name = None if token: self.opener = InterMineURLOpener(token=token) elif username: if token: raise ValueError("Both username and token credentials supplied") if not password: raise ValueError("Username given, but no password supplied") self.opener = InterMineURLOpener((username, password)) else: self.opener = InterMineURLOpener() try: self.version except WebserviceError as e: raise ServiceError("Could not validate service - is the root url (%s) correct? %s" % (root, e)) if token and self.version < 6: raise ServiceError("This service does not support API access token authentication") # Set up sugary aliases self.query = self.new_query # Delegated list methods LIST_MANAGER_METHODS = frozenset(["get_list", "get_all_lists", "get_all_list_names", "create_list", "get_list_count", "delete_lists", "l"])
[docs] def list_manager(self): """ Get a new ListManager to use with this service. =============================================== This method is primarily useful as a context manager when creating temporary lists, since on context exit all temporary lists will be cleaned up:: with service.list_manager() as manager: temp_a = manager.create_list(file_a, "Gene") temp_b = manager.create_list(file_b, "Gene") for gene in (temp_a & temp_b): print gene.primaryIdentifier, "is in both" @rtype: ListManager """ return ListManager(self)
def __getattribute__(self, name): return object.__getattribute__(self, name) def __getattr__(self, name): if name in self.LIST_MANAGER_METHODS: method = getattr(self._list_manager, name) return method raise AttributeError("Could not find " + name) def __del__(self): # On going out of scope, try and clean up. try: self._list_manager.delete_temporary_lists() except ReferenceError: pass @property def version(self): """ Returns the webservice version ============================== The version specifies what capabilities a specific webservice provides. The most current version is 3 may raise ServiceError: if the version cannot be fetched @rtype: int """ try: if self._version is None: try: url = self.root + self.VERSION_PATH self._version = int(self.opener.open(url).read()) except ValueError as e: raise ServiceError("Could not parse a valid webservice version: " + str(e)) except AttributeError as e: raise Exception(e) return self._version
[docs] def resolve_service_path(self, variant): """Resolve the path to optional services""" url = self.root + self.SERVICE_RESOLUTION_PATH + variant return self.opener.open(url).read()
@property def release(self): """ Returns the datawarehouse release ================================= Service.release S{->} string The release is an arbitrary string used to distinguish releases of the datawarehouse. This usually coincides with updates to the data contained within. While a string, releases usually sort in ascending order of recentness (eg: "release-26", "release-27", "release-28"). They can also have less machine readable meanings (eg: "beta") @rtype: string """ if self._release is None: self._release = ensure_str(urlopen(self.root + self.RELEASE_PATH).read()).strip() return self._release
[docs] def load_query(self, xml, root=None): """ Construct a new Query object for the given webservice ===================================================== This is the standard method for instantiating new Query objects. Queries require access to the data model, as well as the service itself, so it is easiest to access them through this factory method. @return: L{intermine.query.Query} """ return Query.from_xml(xml, self.model, root=root)
[docs] def select(self, *columns, **kwargs): """ Construct a new Query object with the given columns selected. ============================================================= As new_query, except that instead of a root class, a list of output column expressions are passed instead. """ if "xml" in kwargs: return self.load_query(kwargs["xml"]) if len(columns) == 1: view = columns[0] if isinstance(view, Attribute): return Query(self.model, self).select("%s.%s" % (view.declared_in.name, view)) if isinstance(view, Reference): return Query(self.model, self).select("%s.%s.*" % (view.declared_in.name, view)) elif not isinstance(view, Column) and not str(view).endswith("*"): path = self.model.make_path(view) if not path.is_attribute(): return Query(self.model, self).select(str(view) + ".*") return Query(self.model, self).select(*columns)
new_query = select
[docs] def get_template(self, name): """ Returns a template of the given name ==================================== Tries to retrieve a template of the given name from the webservice. If you are trying to fetch a private template (ie. one you made yourself and is not available to others) then you may need to authenticate @see: L{intermine.webservice.Service.__init__} @param name: the template's name @type name: string @raise ServiceError: if the template does not exist @raise QueryParseError: if the template cannot be parsed @return: L{intermine.query.Template} """ try: t = self.templates[name] except KeyError: raise ServiceError("There is no template called '" + name + "' at this service") if not isinstance(t, Template): t = Template.from_xml(t, self.model, self) self.templates[name] = t return t
def _get_json(self, path, payload = None): headers = {'Accept': 'application/json'} with closing(self.opener.open(self.root + path, payload, headers = headers)) as resp: data = json.loads(ensure_str(resp.read())) if data['error'] is not None: raise ServiceError(data['error']) return data def _get_xml(self, path): headers = {'Accept': 'application/xml'} with closing(self.opener.open(self.root + path, headers = headers)) as sock: return minidom.parse(sock)
[docs] def search(self, term, **facets): """ Perform an unstructured search by term ======================================= This seach method performs a search of all objects indexed by the service endpoint, returning results and facets for those results. @param term The search term @param facets The facets to search by (eg: Organism = 'H. sapiens') @return (list, dict) The results, and a dictionary of facetting informtation. """ if hasattr(term, 'encode'): term = term.encode('utf8') params = [('q', term)] for facet, value in list(facets.items()): if hasattr(value, 'encode'): value = value.encode('utf8') params.append(("facet_{0}".format(facet), value)) payload = urlencode(params, doseq = True) resp = self._get_json(self.SEARCH_PATH, payload = payload) return (resp['results'], resp['facets'])
@property def widgets(self): """ The dictionary of widgets from the webservice ============================================== The set of widgets available to a service does not change between releases, so they are cached. If you are running a long running process, you may wish to periodically dump the cache by calling L{Service.flush}, or simply get a new Service object. @return dict """ if self._widgets is None: ws = self._get_json(self.WIDGETS_PATH)['widgets'] self._widgets = dict(([w['name'], w] for w in ws)) return self._widgets
[docs] def resolve_ids(self, data_type, identifiers, extra = '', case_sensitive = False, wildcards = False): """ Submit an Identifier Resolution Job =================================== Request that a set of identifiers be resolved to objects in the data store. @param data_type: The type of these identifiers (eg. 'Gene') @type data_type: String @param identifiers: The ids to resolve (eg. ['eve', 'zen', 'pparg']) @type identifiers: iterable of string @param extra: A disambiguating value (eg. "Drosophila melanogaster") @type extra: String @param case_sensitive: Whether to treat IDs case sensitively. @type case_sensitive: Boolean @param wildcards: Whether or not to interpret wildcards (eg: "eve*") @type wildcards: Boolean @return: {idresolution.Job} The job. """ if self.version < 10: raise ServiceError("This feature requires API version 10+") if not data_type: raise ServiceError("No data-type supplied") if not identifiers: raise ServiceError("No identifiers supplied") data = json.dumps({ "type": data_type, "identifiers": list(identifiers), "extra": extra, "caseSensitive": case_sensitive, "wildCards": wildcards }) text = self.opener.post_content(self.root + self.IDS_PATH, data, InterMineURLOpener.JSON) ret = json.loads(text) if ret['error'] is not None: raise ServiceError(ret['error']) if ret['uid'] is None: raise Exception("No uid found in " + ret) return idresolution.Job(self, ret['uid'])
[docs] def flush(self): """ Flushes any cached data. """ self._list_manager.delete_temporary_lists() self._list_manager = ListManager(self) self._templates = None self._model = None self._version = None self._release = None self._widgets = None
@property def templates(self): """ The dictionary of templates from the webservice =============================================== Service.templates S{->} dict(intermine.query.Template|string) For efficiency's sake, Templates are not parsed until they are required, and until then they are stored as XML strings. It is recommended that in most cases you would want to use L{Service.get_template}. You can use this property however to test for template existence though:: if name in service.templates: template = service.get_template(name) @rtype: dict """ if self._templates is None: templates = {} dom = self._get_xml(self.TEMPLATES_PATH) for e in dom.getElementsByTagName('template'): name = e.getAttribute('name') if name in templates: raise ServiceError("Two templates with same name: " + name) else: templates[name] = e.toxml() self._templates = templates return self._templates @property def model(self): """ The data model for the webservice you are querying ================================================== Service.model S{->} L{intermine.model.Model} This is used when constructing queries to provide them with information on the structure of the data model they are accessing. You are very unlikely to want to access this object directly. raises ModelParseError: if the model cannot be read @rtype: L{intermine.model.Model} """ if self._model is None: model_url = self.root + self.MODEL_PATH self._model = Model(model_url, self) return self._model
[docs] def get_results(self, path, params, rowformat, view, cld=None): """ Return an Iterator over the rows of the results =============================================== This method is called internally by the query objects when they are called to get results. You will not normally need to call it directly @param path: The resource path (eg: "/query/results") @type path: string @param params: The query parameters for this request as a dictionary @type params: dict @param rowformat: One of "rr", "object", "count", "dict", "list", "tsv", "csv", "jsonrows", "jsonobjects" @type rowformat: string @param view: The output columns @type view: list @raise WebserviceError: for failed requests @return: L{intermine.webservice.ResultIterator} """ return ResultIterator(self, path, params, rowformat, view, cld)
@requires_version(9)
[docs] def register(self, username, password): """ Register a new user with this service. ======================================= @return {Service} an authenticated service. """ username = bytearray(username, 'utf8') password = bytearray(password, 'utf8') payload = urlencode({'name': username, 'password': password}) registrar = Service(self.root) resp = registrar._get_json(self.USERS_PATH, payload = payload) token = resp['user']['temporaryToken'] return Service(self.root, token = token)
@requires_version(16)
[docs] def get_deregistration_token(self, validity = 300): if validity < 1 or validity > 24 * 60 * 60: raise ValueError("Validity not a reasonable value: 1ms - 2hrs") params = urlencode({'validity': str(validity)}) resp = self._get_json('/user/deregistration', payload = params) return resp['token']
@requires_version(16)
[docs] def deregister(self, deregistration_token): """ Remove a User from the service ============================== @param deregistration_token A token to prove you really want to do this @return string All the user's data. """ if 'uuid' in deregistration_token: deregistration_token = deregistration_token['uuid'] path = self.root + '/user' params = {'deregistrationToken': deregistration_token, 'format': 'xml'} uri = path + '?' + urlencode(params) self.flush() userdata = self.opener.delete(uri) return userdata