Source code for pilosa.client

# Copyright 2017 Pilosa Corp.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
#

import json
import logging
import re
import threading

import urllib3

from .exceptions import PilosaError, PilosaURIError, IndexExistsError, FrameExistsError
from .imports import batch_bits
from .internal import public_pb2 as internal
from .orm import TimeQuantum, Schema, CacheType
from .response import QueryResponse
from .version import VERSION

__all__ = ("Client", "Cluster", "URI")

_LOGGER = logging.getLogger("pilosa")
_MAX_HOSTS = 10


[docs]class Client(object): """Pilosa HTTP client This client uses Pilosa's http+protobuf API. Usage: :: import pilosa # Create a Client instance client = pilosa.Client() # Create an Index instance index = pilosa.Index("repository") stargazer = index.frame("stargazer") response = client.query(stargazer.bitmap(5)) # Act on the result print(response.result) * See `Pilosa API Reference <https://www.pilosa.com/docs/api-reference/>`_. * See `Query Language <https://www.pilosa.com/docs/query-language/>`_. """ __NO_RESPONSE, __RAW_RESPONSE, __ERROR_CHECKED_RESPONSE = range(3) def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=300000, pool_size_per_route=10, pool_size_total=100, retry_count=3): if cluster_or_uri is None: self.cluster = Cluster(URI()) elif isinstance(cluster_or_uri, Cluster): self.cluster = cluster_or_uri.copy() elif isinstance(cluster_or_uri, URI): self.cluster = Cluster(cluster_or_uri) elif isinstance(cluster_or_uri, str): self.cluster = Cluster(URI.address(cluster_or_uri)) else: raise PilosaError("Invalid cluster_or_uri: %s" % cluster_or_uri) self.connect_timeout = connect_timeout / 1000.0 self.socket_timeout = socket_timeout / 1000.0 self.pool_size_per_route = pool_size_per_route self.pool_size_total = pool_size_total self.retry_count = retry_count self.__current_host = None self.__client = None
[docs] def query(self, query, columns=False, exclude_bits=False, exclude_attrs=False): """Runs the given query against the server with the given options. :param pilosa.PqlQuery query: a PqlQuery object with a non-null index :param bool columns: Enables returning column data from bitmap queries :param bool exclude_bits: Disables returning bits from bitmap queries :param bool exclude_attrs: Disables returning attributes from bitmap queries :return: Pilosa response :rtype: pilosa.Response """ request = _QueryRequest(query.serialize(), columns=columns, exclude_bits=exclude_bits, exclude_attrs=exclude_attrs) data = bytearray(request.to_protobuf()) path = "/index/%s/query" % query.index.name response = self.__http_request("POST", path, data, Client.__RAW_RESPONSE) query_response = QueryResponse._from_protobuf(response.data) if query_response.error_message: raise PilosaError(query_response.error_message) return query_response
[docs] def create_index(self, index): """Creates an index on the server using the given Index object. :param pilosa.Index index: :raises pilosa.IndexExistsError: if there already is a index with the given name """ data = json.dumps({ "options": {"columnLabel": index.column_label} }) path = "/index/%s" % index.name self.__http_request("POST", path, data=data) if index.time_quantum != TimeQuantum.NONE: self.__patch_index_time_quantum(index)
[docs] def delete_index(self, index): """Deletes the given index on the server. :param pilosa.Index index: :raises pilosa.PilosaError: if the index does not exist """ path = "/index/%s" % index.name self.__http_request("DELETE", path)
[docs] def create_frame(self, frame): """Creates a frame on the server using the given Frame object. :param pilosa.Frame frame: :raises pilosa.FrameExistsError: if there already is a frame with the given name """ data = frame._get_options_string() path = "/index/%s/frame/%s" % (frame.index.name, frame.name) self.__http_request("POST", path, data=data)
[docs] def delete_frame(self, frame): """Deletes the given frame on the server. :param pilosa.Frame frame: :raises pilosa.PilosaError: if the frame does not exist """ path = "/index/%s/frame/%s" % (frame.index.name, frame.name) self.__http_request("DELETE", path)
[docs] def ensure_index(self, index): """Creates an index on the server if it does not exist. :param pilosa.Index index: """ try: self.create_index(index) except IndexExistsError: pass
[docs] def ensure_frame(self, frame): """Creates a frame on the server if it does not exist. :param pilosa.Frame frame: """ try: self.create_frame(frame) except FrameExistsError: pass
[docs] def status(self): response = self.__http_request("GET", "/status", client_response=Client.__ERROR_CHECKED_RESPONSE) return json.loads(response.data.decode('utf-8'))["status"]
[docs] def schema(self): status = self.status() nodes = status.get("Nodes") schema = Schema() for indexInfo in nodes[0].get("Indexes", []): meta = indexInfo["Meta"] options = { "column_label": meta["ColumnLabel"], "time_quantum": TimeQuantum(meta.get("TimeQuantum", "")), } index = schema.index(indexInfo["Name"], **options) for frameInfo in indexInfo.get("Frames", []): meta = frameInfo["Meta"] options = { "row_label": meta["RowLabel"], "cache_size": meta["CacheSize"], "cache_type": CacheType(meta["CacheType"]), "inverse_enabled": meta.get("InverseEnabled", False), "time_quantum": TimeQuantum(meta.get("TimeQuantum", "")), } index.frame(frameInfo["Name"], **options) return schema
[docs] def sync_schema(self, schema): server_schema = self.schema() # find out local - remote schema diff_schema = schema._diff(server_schema) # create the indexes and frames which doesn't exist on the server side for index_name, index in diff_schema._indexes.items(): if index_name not in server_schema._indexes: self.ensure_index(index) for frame_name, frame in index._frames.items(): self.ensure_frame(frame) # find out remote - local schema diff_schema = server_schema._diff(schema) for index_name, index in diff_schema._indexes.items(): local_index = schema._indexes.get(index_name) if local_index is None: schema._indexes[index_name] = index else: for frame_name, frame in index._frames.items(): local_index._frames[frame_name] = frame
[docs] def import_frame(self, frame, bit_reader, batch_size=100000): """Imports a frame using the given bit reader :param frame: :param bit_reader: :param batch_size: """ index_name = frame.index.name frame_name = frame.name import_bits = self._import_bits for slice, bits in batch_bits(bit_reader, batch_size): import_bits(index_name, frame_name, slice, bits)
def _import_bits(self, index_name, frame_name, slice, bits): # sort by row_id then by column_id bits.sort(key=lambda bit: (bit.row_id, bit.column_id)) nodes = self._fetch_fragment_nodes(index_name, slice) for node in nodes: client = Client(URI.address(node["host"])) client._import_node(_ImportRequest(index_name, frame_name, slice, bits)) def _fetch_fragment_nodes(self, index_name, slice): path = "/fragment/nodes?slice=%d&index=%s" % (slice, index_name) response = self.__http_request("GET", path, client_response=Client.__ERROR_CHECKED_RESPONSE) content = response.data.decode('utf-8') return json.loads(content) def _import_node(self, import_request): data = import_request.to_protobuf() self.__http_request("POST", "/import", data=data, client_response=Client.__RAW_RESPONSE) def __patch_index_time_quantum(self, index): path = "/index/%s/time-quantum" % index.name data = '{\"timeQuantum\":\"%s\"}"' % str(index.time_quantum) self.__http_request("PATCH", path, data=data) def __http_request(self, method, path, data=None, client_response=0): if not self.__client: self.__connect() # try at most 10 non-failed hosts; protect against broken cluster.remove_host for _ in range(_MAX_HOSTS): uri = "%s%s" % (self.__get_address(), path) try: _LOGGER.debug("Request: %s %s %s", method, uri, "[binary]" if client_response == Client.__RAW_RESPONSE else data) response = self.__client.request(method, uri, body=data) break except urllib3.exceptions.MaxRetryError as e: self.cluster.remove_host(self.__current_host) _LOGGER.warning("Removed %s from the cluster due to %s", self.__current_host, str(e)) self.__current_host = None else: raise PilosaError("Tried %s hosts, still failing" % _MAX_HOSTS) if client_response == Client.__RAW_RESPONSE: return response if 200 <= response.status < 300: return None if client_response == Client.__NO_RESPONSE else response content = response.data.decode('utf-8') ex = self.__RECOGNIZED_ERRORS.get(content) if ex is not None: raise ex raise PilosaError("Server error (%d): %s", response.status, content) def __get_address(self): if self.__current_host is None: self.__current_host = self.cluster.get_host() _LOGGER.debug("Current host set: %s", self.__current_host) return self.__current_host._normalize() def __connect(self): num_pools = float(self.pool_size_total) / self.pool_size_per_route headers = { 'Content-Type': 'application/x-protobuf', 'Accept': 'application/x-protobuf', 'User-Agent': 'python-pilosa/' + VERSION, } timeout = urllib3.Timeout(connect=self.connect_timeout, read=self.socket_timeout) client = urllib3.PoolManager(num_pools=num_pools, maxsize=self.pool_size_per_route, block=True, headers=headers, timeout=timeout, retries=self.retry_count) self.__client = client __RECOGNIZED_ERRORS = { "index already exists\n": IndexExistsError, "frame already exists\n": FrameExistsError, }
[docs]class URI: """Represents a Pilosa URI A Pilosa URI consists of three parts: * Scheme: Protocol of the URI. Default: ``http`` * Host: Hostname or IP URI. Default: ``localhost`` * Port: Port of the URI. Default ``10101`` All parts of the URI are optional. The following are equivalent: * ``http://localhost:10101`` * ``http://localhost`` * ``http://:10101`` * ``localhost:10101`` * ``localhost`` * ``:10101`` :param str scheme: is the scheme of the Pilosa Server. Currently only ``http`` is supported :param str host: is the hostname or IP address of the Pilosa server :param int port: is the port of the Pilosa server """ __PATTERN = re.compile("^(([+a-z]+)://)?([0-9a-z.-]+)?(:([0-9]+))?$") def __init__(self, scheme="http", host="localhost", port=10101): self.scheme = scheme self.host = host self.port = port @classmethod
[docs] def address(cls, address): """ Creates a URI from an address. :param str address: of the form ``${SCHEME}://${HOST}:{$PORT}`` :return: a Pilosa URI :type: pilosa.URI """ uri = cls() uri._parse(address) return uri
def _normalize(self): scheme = self.scheme try: index = scheme.index("+") scheme = scheme[:index] except ValueError: pass return "%s://%s:%s" % (scheme, self.host, self.port) def _parse(self, address): m = self.__PATTERN.search(address) if m: scheme = m.group(2) if scheme: self.scheme = scheme host = m.group(3) if host: self.host = host port = m.group(5) if port: self.port = int(port) return raise PilosaURIError("Not a Pilosa URI") def __str__(self): return "%s://%s:%s" % (self.scheme, self.host, self.port) def __repr__(self): return "<URI %s>" % self def __eq__(self, other): if id(self) == id(other): return True if other is None or not isinstance(other, self.__class__): return False return self.scheme == other.scheme and \ self.host == other.host and \ self.port == other.port
[docs]class Cluster: """Contains hosts in a Pilosa cluster. :param hosts: URIs of hosts. Leaving out hosts creates the default cluster """ def __init__(self, *hosts): """Returns the cluster with the given hosts""" self.hosts = [(host, True) for host in hosts] self.__next_index = 0 self.__lock = threading.RLock()
[docs] def add_host(self, uri): """Makes a host available. :param pilosa.URI uri: """ with self.__lock: for i, item in enumerate(self.hosts): host, _ = item if host == uri: self.hosts[i] = (host, True) break else: self.hosts.append((uri, True))
[docs] def remove_host(self, uri): """Makes a host unavailable. :param pilosa.URI uri: """ with self.__lock: for i, item in enumerate(self.hosts): if uri == item[0]: self.hosts[i] = (item[0], False)
[docs] def get_host(self): """Returns the next host in the cluster. :return: next host :rtype: pilosa.URI """ for host, ok in self.hosts: if not ok: continue return host else: self._reset() raise PilosaError("There are no available hosts")
[docs] def copy(self): c = Cluster() c.hosts = self.hosts[:] return c
def _reset(self): with self.__lock: self.hosts = [(host, True) for host, _ in self.hosts]
class _QueryRequest: def __init__(self, query, columns=False, exclude_bits=False, exclude_attrs=False): self.query = query self.columns = columns self.exclude_bits = exclude_bits self.exclude_attrs = exclude_attrs def to_protobuf(self): qr = internal.QueryRequest() qr.Query = self.query qr.ColumnAttrs = self.columns qr.ExcludeBits = self.exclude_bits qr.ExcludeAttrs = self.exclude_attrs return qr.SerializeToString() class _ImportRequest: def __init__(self, index_name, frame_name, slice, bits): self.index_name = index_name self.frame_name = frame_name self.slice = slice self.bits = bits def to_protobuf(self): import_request = internal.ImportRequest() import_request.Index = self.index_name import_request.Frame = self.frame_name import_request.Slice = self.slice row_ids = import_request.RowIDs column_ids = import_request.ColumnIDs timestamps = import_request.Timestamps for bit in self.bits: row_ids.append(bit.row_id) column_ids.append(bit.column_id) timestamps.append(bit.timestamp) return import_request.SerializeToString()