"""Module describing a connection."""
# Copyright 2017 Qarnot computing
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import Logger
import sys
from typing import Dict, Iterable, Iterator, List, Optional
from qarnot.helper import Log
from . import get_url, raise_on_error, __version__ # type: ignore
from .hardware_constraint import HardwareConstraint, CpuModelHardware
from .task import Task, BulkTaskResponse
from .pool import Pool
from .paginate import PaginateResponse, OffsetResponse
from .bucket import Bucket
from .job import Job
from ._filter import create_pool_filter, create_task_filter, create_job_filter
from ._retry import with_retry
from .exceptions import (QarnotGenericException, BucketStorageUnavailableException, MissingProfileException,
MissingTaskException, MissingPoolException, MissingJobException)
from ._util import get_error_message_from_http_response
import requests
import warnings
import os
import boto3
import concurrent.futures
import botocore
import deprecation
from json import dumps as json_dumps
import urllib3
import configparser as config
#########
# class #
#########
[docs]
class Connection(object):
"""Represents the couple cluster/user to which submit tasks.
"""
[docs]
def __init__(self, fileconf=None, client_token=None,
cluster_url=None, cluster_unsafe=False, cluster_timeout=None,
storage_url=None, storage_unsafe=False,
retry_count=5, retry_wait=1.0,
cluster_custom_certificate=None, storage_custom_certificate=None,
sanitize_bucket_paths=True, show_bucket_warnings=True, logger: Logger = None):
"""Create a connection to a cluster with given config file, options or environment variables.
Available environment variable are
`QARNOT_CLUSTER_URL`, `QARNOT_CLUSTER_UNSAFE`, `QARNOT_CLUSTER_TIMEOUT` and `QARNOT_CLIENT_TOKEN`.
:param fileconf: path to a qarnot configuration file or a corresponding dict
:type fileconf: str or dict
:param str client_token: API Token
:param str cluster_url: (optional) Cluster url.
:param bool cluster_unsafe: (optional) Disable certificate check
:param int cluster_timeout: (optional) Timeout value for every request
:param str storage_url: (optional) Storage service url.
:param bool storage_unsafe: (optional) Disable certificate check
:param int retry_count: (optional) ConnectionError retry count. Default to 5.
:param float retry_wait: (optional) Retry on error wait time, progressive. (wait * (retry_count - retry_num). Default to 1s
:param bool sanitize_bucket_paths: (optional) Flag to automatically sanitize bucket paths (remove extra slashes). Default to true
:param bool show_bucket_warnings: (optional) Flag to show warnings of bucket paths sanitization. Default to true
:param logger: which job to attach the task to
:type logger: :class:`logging.Logger`
Configuration sample:
.. code-block:: ini
[cluster]
# url of the REST API
url=https://localhost
# No SSL verification ?
unsafe=False
[client]
# auth string of the client
token=login
[storage]
url=https://storage
unsafe=False
"""
self.logger = logger if logger is not None else Log.get_logger_for_stream(sys.stdout, "stdout")
self.logger_stderr = logger if logger is not None else Log.get_logger_for_stream(sys.stderr, "stderr") # to avoid breaking change of task stderr logs
self._version = "qarnot-sdk-python/" + __version__
self._http = requests.session()
self._retry_count = retry_count
self._retry_wait = retry_wait
self._sanitize_bucket_paths = sanitize_bucket_paths
self._show_bucket_warnings = show_bucket_warnings
if fileconf is not None:
self.storage = None
if isinstance(fileconf, dict):
warnings.warn("Dict config should be replaced by constructor explicit arguments.")
self.cluster = None
if fileconf.get('cluster_url'):
self.cluster = fileconf.get('cluster_url')
if fileconf.get('storage_url'):
self.storage = fileconf.get('storage_url')
auth = fileconf.get('client_auth')
self.timeout: int = int(fileconf.get('cluster_timeout'))
if fileconf.get('cluster_unsafe'):
self._http.verify = False
elif fileconf.get('cluster_custom_certificate'):
self._http.verify = fileconf.get('cluster_custom_certificate')
else:
cfg = config.ConfigParser()
with open(fileconf, "r", encoding="utf-8") as cfg_file:
cfg.read_string(cfg_file.read())
self.cluster = None
if cfg.has_option('cluster', 'url'):
self.cluster = cfg.get('cluster', 'url')
if cfg.has_option('storage', 'url'):
self.storage = cfg.get('storage', 'url')
if cfg.has_option('client', 'token'):
auth = cfg.get('client', 'token')
elif cfg.has_option('client', 'auth'):
warnings.warn('auth is deprecated, use token instead.')
auth = cfg.get('client', 'auth')
else:
auth = None
self.timeout = None
if cfg.has_option('cluster', 'timeout'):
self.timeout = cfg.getint('cluster', 'timeout')
if cfg.has_option('cluster', 'unsafe') \
and cfg.getboolean('cluster', 'unsafe'):
self._http.verify = False
elif cfg.has_option('cluster', 'custom_certificate'):
self._http.verify = cfg.get('cluster', 'custom_certificate')
if cfg.has_option('storage', 'unsafe') \
and cfg.getboolean('storage', 'unsafe'):
storage_unsafe = True
if cfg.has_option('storage', 'custom_certificate'):
storage_custom_certificate = cfg.get('storage', 'custom_certificate')
else:
self.cluster = cluster_url
self.timeout = cluster_timeout
self._http.verify = not cluster_unsafe
if not cluster_unsafe and cluster_custom_certificate:
self._http.verify = cluster_custom_certificate
self.storage = storage_url
auth = client_token
if not self._http.verify:
urllib3.disable_warnings()
if self.cluster is None:
self.cluster = os.getenv("QARNOT_CLUSTER_URL")
if self.storage is None:
self.storage = os.getenv("QARNOT_STORAGE_URL")
if auth is None:
auth = os.getenv("QARNOT_CLIENT_TOKEN")
if os.getenv("QARNOT_CLUSTER_UNSAFE") is not None:
self._http.verify = not os.getenv("QARNOT_CLUSTER_UNSAFE") in ["true", "True", "1"]
if os.getenv("QARNOT_CLUSTER_TIMEOUT") is not None:
self.timeout = int(os.getenv("QARNOT_CLUSTER_TIMEOUT"))
if auth is None:
raise QarnotGenericException("Token is mandatory.")
self._http.headers.update({"Authorization": auth})
self._http.headers.update({"User-Agent": self._version})
if self.cluster is None:
self.cluster = "https://api.qarnot.com"
api_settings = self._get(get_url("settings")).json()
if self.storage is None:
self.storage = api_settings.get("storage", "https://storage.qarnot.com")
if self.storage is None: # api_settings["storage"] is None
self._s3client = None
self._s3resource = None
return
user = self.user_info
session = boto3.session.Session()
conf = botocore.config.Config(user_agent=self._version, request_checksum_calculation="when_required")
should_verify_or_certificate_path = True
if storage_unsafe:
should_verify_or_certificate_path = not storage_unsafe
elif storage_custom_certificate is not None:
should_verify_or_certificate_path = storage_custom_certificate
self._s3client = session.client(service_name='s3',
aws_access_key_id=user.email,
aws_secret_access_key=auth,
verify=should_verify_or_certificate_path,
endpoint_url=self.storage,
config=conf)
self._s3resource = session.resource(service_name='s3',
aws_access_key_id=user.email,
aws_secret_access_key=auth,
verify=should_verify_or_certificate_path,
endpoint_url=self.storage,
config=conf)
@with_retry
def _get(self, url, **kwargs):
"""Perform a GET request on the cluster.
:param str url:
relative url of the file (according to the cluster url)
:rtype: :class:`requests.Response`
:returns: The response to the given request.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: Additional keyword arguments are passed to the underlying
:func:`requests.Pool.get`.
"""
return self._http.get(self.cluster + url, timeout=self.timeout, **kwargs)
@with_retry
def _patch(self, url, json=None, **kwargs):
"""perform a PATCH request on the cluster
:param str url: relative url of the file (according to the cluster url)
:param json: the data to json serialize and post
:rtype: :class:`requests.Response`
:returns: The response to the given request.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: Additional keyword arguments are passed to the underlying
:attr:`requests.Pool.post()`.
"""
kwargs = Connection._prepare_json_payload(json, **(kwargs or {}))
return self._http.patch(self.cluster + url, timeout=self.timeout, **(kwargs or {}))
@with_retry
def _post(self, url, json=None, **kwargs):
"""perform a POST request on the cluster
:param str url: relative url of the file (according to the cluster url)
:param json: the data to json serialize and post
:rtype: :class:`requests.Response`
:returns: The response to the given request.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: Additional keyword arguments are passed to the underlying
:attr:`requests.Pool.post()`.
"""
kwargs = Connection._prepare_json_payload(json, **(kwargs or {}))
return self._http.post(self.cluster + url, timeout=self.timeout, **kwargs)
@with_retry
def _delete(self, url, **kwargs):
"""Perform a DELETE request on the cluster.
:param str url: relative url of the file (according to the cluster url)
:rtype: :class:`requests.Response`
:returns: The response to the given request.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
.. note:: Additional keyword arguments are passed to the underlying
:attr:`requests.Pool.delete()`.
"""
return self._http.delete(self.cluster + url, timeout=self.timeout, **kwargs)
@with_retry
def _put(self, url, json=None, **kwargs):
"""Performs a PUT on the cluster."""
kwargs = Connection._prepare_json_payload(json, **(kwargs or {}))
return self._http.put(self.cluster + url, timeout=self.timeout, **kwargs)
@staticmethod
def _prepare_json_payload(json, **kwargs):
if json is None:
return kwargs
if 'headers' not in kwargs:
kwargs['headers'] = dict()
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['data'] = json_dumps(json)
return kwargs
@property
def s3client(self):
"""Pre-configured s3 client object.
:rtype: list(:class:`S3.Client`)
:returns: A list of ObjectSummary resources
"""
return self._s3client
@property
def s3resource(self):
"""Pre-configured s3 resource object.
:rtype: list(:class:`S3.ServiceResource`)
:returns: A list of ObjectSummary resources
"""
return self._s3resource
@property
def user_info(self):
"""Get information of the current user on the cluster.
:rtype: :class:`UserInfo`
:returns: Requested information.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
resp = self._get(get_url('user'))
raise_on_error(resp)
ret = resp.json()
return UserInfo(ret)
[docs]
def buckets(self):
"""Get the list of buckets.
:rtype: list(:class:`~qarnot.bucket.Bucket`).
:returns: List of buckets
:raises ~qarnot.exceptions.BucketStorageUnavailableException: the bucket storage engine is not available
"""
if self._s3client is None:
raise BucketStorageUnavailableException()
buckets = [Bucket(self, x.name, create=False) for x in self._s3resource.buckets.all()]
return buckets
[docs]
@deprecation.deprecated(deprecated_in="2.5.0", removed_in="3.0",
current_version=__version__, # type: ignore
details="Use the all_pools function instead")
def pools(self, summary=True, tags_intersect=None, tags=None):
"""Get the list of pools stored on this cluster for this user.
if tags and tags_intersect are set, the connection will only return the pools with tag intersect values.
:param bool summary: only get the summaries.
:param tags_intersect: Desired filtering tags, all of them
:type tags_intersect: list of `str`, optional
:param tags: Desired filtering tags, any of them
:type tags: list of `str`, optional
:rtype: List of :class:`~qarnot.pool.Pool`.
:returns: Pools stored on the cluster owned by the user.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
.. deprecated:: 2.5.0
This function can be inefficient, Use :func:`all_pools` instead.
"""
return list(self.all_pools(summary=summary, tags=tags, tags_intersect=tags_intersect))
[docs]
@deprecation.deprecated(deprecated_in="2.5.0", removed_in="3.0",
current_version=__version__, # type: ignore
details="Use the all_tasks function instead")
def tasks(self, tags=None, summary=True, tags_intersect=None):
"""Get the list of tasks stored on this cluster for this user.
if tags and tags_intersect are set, the connection will only return the tasks with tag_intersect values.
:param tags: Desired filtering tags, any of them
:type tags: list of `str`, optional
:param bool summary: only get the summaries.
:param tags_intersect: Desired filtering tags, all of them
:type tags_intersect: list of `str`, optional
:rtype: List of :class:`~qarnot.task.Task`.
:returns: Tasks stored on the cluster owned by the user.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
.. deprecated:: 2.5.0
This function can be inefficient, Use :func:`all_tasks` instead.
"""
return list(self.all_tasks(summary=summary, tags=tags, tags_intersect=tags_intersect))
[docs]
@deprecation.deprecated(deprecated_in="2.5.0", removed_in="3.0",
current_version=__version__, # type: ignore
details="Use the all_jobs function instead")
def jobs(self, tags=None, tags_intersect=None):
"""Get the list of jobs stored on this cluster for this user.
if tags and tags_intersect are set, the connection will only return the jobs with tag intersect values.
:param tags: Desired filtering tags, any of them
:type tags: list of `str`, optional
:param tags_intersect: Desired filtering tags, the jobs must have all of them
:type tags_intersect: list of `str`, optional
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
.. deprecated:: 2.5.0
This function can be inefficient, Use :func:`all_jobs` instead.
"""
return list(self.all_jobs(tags=tags, tags_intersect=tags_intersect))
[docs]
def all_pools(self, summary: bool = True, tags: List[str] = None, tags_intersect: List[str] = None):
"""Get an iterator of all the pools.
if tags and tags_intersect are set, the connection will only return the pools with tag intersect values.
:param bool summary: only get the summaries.
:param tags_intersect: Desired filtering tags, all of them
:type tags_intersect: list of `str`, optional
:param tags: Desired filtering tags, any of them
:type tags: list of `str`, optional
:rtype: List of :class:`~qarnot.pool.Pool`.
:returns: Pools stored on the cluster owned by the user.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
return self._all_pages(self.pools_page, summary=summary, tags=tags, tags_intersect=tags_intersect)
[docs]
def all_tasks(self, summary: bool = True, tags: List[str] = None, tags_intersect: List[str] = None):
"""Get an iterator of all the tasks.
if tags and tags_intersect are set, the connection will only return the tasks with tag_intersect values.
:param tags: Desired filtering tags, any of them
:type tags: list of `str`, optional
:param bool summary: only get the summaries.
:param tags_intersect: Desired filtering tags, all of them
:type tags_intersect: list of `str`, optional
:rtype: List of :class:`~qarnot.task.Task`.
:returns: Tasks stored on the cluster owned by the user.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
return self._all_pages(self.tasks_page, summary=summary, tags=tags, tags_intersect=tags_intersect)
[docs]
def all_jobs(self, tags: List[str] = None, tags_intersect: List[str] = None) -> Iterator[Iterable]:
"""Get an iterator of all the jobs.
if tags and tags_intersect are set, the connection will only return the jobs with tag intersect values.
:param tags: Desired filtering tags, any of them
:type tags: list(`str`), optional
:param tags_intersect: Desired filtering tags, the jobs must have all of them
:type tags_intersect: list of `str`, optional
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
return self._all_pages(self.jobs_page, tags=tags, tags_intersect=tags_intersect)
def _all_pages(self, page_function, **kwargs) -> Iterator[Iterable]:
"""Return all the pages of an object.
:param page_function: the call function used to retrieve a paginate list.
:type page_function: Function
:yield: An object
:rtype: Iterator[Iterable]
"""
next_token = None
is_truncated = True
while is_truncated:
page = page_function(token=next_token, **kwargs)
next_token = page.next_token
is_truncated = page.is_truncated and next_token is not None
for task in page.page_data:
yield task
[docs]
def all_hardware_constraints(self) -> Iterator[Iterable]:
"""Get all the hardware constraints.
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
return self._all_offset_pages(self.hardware_constraints_page)
def _all_offset_pages(self, page_function, **kwargs) -> Iterator[Iterable]:
"""Return all the offset pages of an object.
:param page_function: the call function used to retrieve an offset page list.
:type page_function: Function
:yield: An object
:rtype: Iterator[Iterable]
"""
next_offset = 0
is_truncated = True
while is_truncated:
page = page_function(offset=next_offset, **kwargs)
next_offset = page.offset + page.limit
is_truncated = page.total > next_offset
for data in page.page_data:
yield data
def _paginate_request(self, filters: Dict, token: Optional[str], maximum: int) -> Dict:
"""A paginate request creator.
:return: The json request to be call.
:rtype: Dict
"""
return {
"filter": filters,
"token": token,
"maximumResults": maximum
}
def _offset_request(self, limit: int, offset: int) -> Dict:
"""An offset request creator.
:return: The dictionary of query parameters to add to request.
:rtype: Dict
"""
return {
"limit": limit,
"offset": offset
}
def _offset_call(self, url, params) -> Dict:
"""Call the api and return the response body of the GET request
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:return: The response body
:rtype: Dict
"""
response = self._get(url, params=params)
raise_on_error(response)
return response.json()
def _page_call(self, url, request) -> Dict:
"""Call the api and return the response body
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:return: The response body
:rtype: Dict
"""
response = self._post(url, request)
raise_on_error(response)
return response.json()
[docs]
def pools_page(self, token: Optional[str] = None, maximum: Optional[int] = None, summary: bool = True, tags: List = None, tags_intersect: List = None) -> PaginateResponse:
"""Return a paginate pool object retriever.
:param summary: retrieve a full pool details if False or a pool summary if True, defaults to True
:type summary: `bool`, optional
:param tags_intersect: use a tag exclusive filter, defaults to None
:type tags_intersect: list of `str` , optional
:param token: the first paginate token to be used, defaults to None
:type token: `str`, optional
:param maximum: the maximum number of pages to retrieve, defaults to 10
:type maximum: `int`, optional
:return: A paginate object
:rtype: :class:`~qarnot.paginate.PaginateResponse`
"""
filters = create_pool_filter(tags=tags, tags_intersect=tags_intersect)
url = get_url('paginate pools summaries') if summary and filters is None else get_url('paginate pools')
result = self._page_call(url, self._paginate_request(filters, token, maximum))
data = [Pool.from_json(self, pool, summary) for pool in result.get("data")]
return PaginateResponse(token=result.get("token", token), next_token=result.get("nextToken"), is_truncated=result.get("isTruncated"), page_data=data)
[docs]
def tasks_page(self, token: Optional[str] = None, maximum: Optional[int] = None, summary: bool = True, tags: List = None, tags_intersect: List = None) -> PaginateResponse:
"""Return a paginate task object.
:param summary: retrieve a full task details if False or a task summary if True, defaults to True
:type summary: `bool`, optional
:param tags_intersect: use a tag exclusive filter, defaults to None
:type tags_intersect: list of `str`, optional
:param token: the first paginate token to be used, defaults to None
:type token: `str`, optional
:param maximum: the maximum number of pages to retrieve, defaults to 10
:type maximum: `int`, optional
:return: A paginate object
:rtype: :class:`~qarnot.paginate.PaginateResponse`
"""
filters = create_task_filter(tags=tags, tags_intersect=tags_intersect)
url = get_url('paginate tasks summaries') if summary and filters is None else get_url('paginate tasks')
result = self._page_call(url, self._paginate_request(filters, token, maximum))
data = [Task.from_json(self, task, summary) for task in result.get("data")]
return PaginateResponse(token=result.get("token", token), next_token=result.get("nextToken"), is_truncated=result.get("isTruncated"), page_data=data)
[docs]
def jobs_page(self, token: Optional[str] = None, maximum: Optional[int] = None, tags: List = None, tags_intersect: List = None) -> PaginateResponse:
"""Return a paginate job object.
:param tags_intersect: use a tag exclusive filter, defaults to None
:type tags_intersect: list of `str`
:param token: the first paginate token to be used, defaults to None
:type token: `str`, optional
:param maximum: the maximum number of pages to retrieve, defaults to 10
:type maximum: `int`, optional
:return: A paginate object
:rtype: :class:`~qarnot.paginate.PaginateResponse`
"""
filters = create_job_filter(tags=tags, tags_intersect=tags_intersect)
result = self._page_call(get_url('paginate jobs'), self._paginate_request(filters, token, maximum))
data = [Job.from_json(self, job) for job in result.get("data")]
return PaginateResponse(token=result.get("token", token), next_token=result.get("nextToken"), is_truncated=result.get("isTruncated"), page_data=data)
[docs]
def hardware_constraints_page(self, limit: Optional[int] = 50, offset: Optional[int] = 0) -> OffsetResponse:
"""Return a list of hardware constraints limited with offset.
:param limit: limit the number of displayed constraints, defaults to 50
:type limit: `int`, optional
:param offset: the number of constraints ignored in the response, defaults to 0
:type token: `int`, optional
:return: An offset object
:rtype: :class:`~qarnot.paginate.OffsetResponse`
"""
result = self._offset_call(get_url('hardware constraints'), self._offset_request(limit, offset))
data = [HardwareConstraint.from_json(hw_constraint) for hw_constraint in result.get("data")]
return OffsetResponse(total=result.get("total"), limit=result.get("limit"), offset=result.get("offset"), page_data=data)
[docs]
def search_cpu_model_constraints(self, cpu_model: str) -> List[CpuModelHardware]:
"""Return a list of CPU model hardware constraints matching the search term.
These constraints are ordered by relevancy.
:param str cpu_model: search term to match available cpu constraints.
:return: A list of CPU model hardware constraints ordered by relevancy.
:rtype: list(:class:`~qarnot.hardware_constraint.CpuModelHardware`)
"""
response = self._get(get_url('cpu model constraints search'), params={'cpuModel': cpu_model})
raise_on_error(response)
return [
HardwareConstraint.from_json(hw_constraint)
for hw_constraint in response.json()
]
[docs]
def retrieve_pool(self, uuid):
"""Retrieve a :class:`~qarnot.pool.Pool` from its uuid
:param str uuid: Desired pool uuid
:rtype: :class:`~qarnot.pool.Pool`
:returns: Existing pool defined by the given uuid
:raises ~qarnot.exceptions.MissingPoolException: pool does not exist
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
response = self._get(get_url('pool update', uuid=uuid))
if response.status_code == 404:
raise MissingPoolException(get_error_message_from_http_response(response))
raise_on_error(response)
return Pool.from_json(self, response.json())
[docs]
def retrieve_task(self, uuid):
"""Retrieve a :class:`~qarnot.task.Task` from its uuid
:param str uuid: Desired task uuid
:rtype: :class:`~qarnot.task.Task`
:returns: Existing task defined by the given uuid
:raises ~qarnot.exceptions.MissingTaskException: task does not exist
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
response = self._get(get_url('task update', uuid=uuid))
if response.status_code == 404:
raise MissingTaskException(get_error_message_from_http_response(response))
raise_on_error(response)
return Task.from_json(self, response.json())
[docs]
def retrieve_job(self, uuid):
"""Retrieve a :class:`~qarnot.job.Job` from its uuid
:param str uuid: Desired job uuid
:rtype: :class:`~qarnot.job.Job`
:returns: Existing job defined by the given uuid
:raises ~qarnot.exceptions.MissingJobException: job does not exist
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
response = self._get(get_url('job update', uuid=uuid))
if response.status_code == 404:
raise MissingJobException(get_error_message_from_http_response(response))
raise_on_error(response)
return Job.from_json(self, response.json())
[docs]
def retrieve_or_create_bucket(self, uuid):
"""Retrieve a :class:`~qarnot.bucket.Bucket` from its description, or create a new one.
:param str uuid: the bucket uuid (name)
:rtype: :class:`~qarnot.bucket.Bucket`
:returns: Existing or newly created bucket defined by the given name
:raises ~qarnot.exceptions.BucketStorageUnavailableException: the bucket storage engine is not available
"""
if self._s3client is None:
raise BucketStorageUnavailableException()
return Bucket(self, uuid)
[docs]
def retrieve_bucket(self, uuid):
"""Retrieve a :class:`~qarnot.bucket.Bucket` from its uuid (name)
:param str uuid: Desired bucket uuid (name)
:rtype: :class:`~qarnot.bucket.Bucket`
:returns: Existing bucket defined by the given uuid (name)
:raises ~qarnot.exceptions.BucketStorageUnavailableException: the bucket storage engine is not available
:raises: botocore.exceptions.ClientError: Bucket does not exist, or invalid credentials
"""
if self._s3client is None:
raise BucketStorageUnavailableException()
self._s3client.head_bucket(Bucket=uuid)
return Bucket(self, uuid, create=False)
[docs]
def create_pool(self, name, profile, instancecount=1, shortname=None):
"""Create a new :class:`~qarnot.pool.Pool`.
:param str name: given name of the pool
:param str profile: which profile to use with this pool
:param instancecount: number of instances to run for the pool
:type instancecount: int
:param str shortname: optional unique friendly shortname of the pool
:rtype: :class:`~qarnot.pool.Pool`
:returns: The created :class:`~qarnot.pool.Pool`.
.. note:: See available profiles with :meth:`profiles`.
"""
return Pool(self, name, profile, instancecount, shortname)
[docs]
def create_elastic_pool(self, name, profile, minimum_total_slots=0, maximum_total_slots=1, minimum_idle_slots=0, minimum_idle_time_seconds=0, resize_factor=1, resize_period=90, shortname=None):
"""Create a new :class:`~qarnot.pool.Pool`.
:param str name: given name of the pool
:param str profile: which profile to use with this pool
:param int minimum_total_slots: minimum number of instances to run for the pool
:param int maximum_total_slots: maximum number of instances to run for the pool
:param int minimum_idle_slots: the number of instances that can be idle before considering shrinking the pool
:param int minimum_idle_time_seconds: the number of seconds before considering shrinking the pool
:param float resize_factor: the speed with which we grow the pool to meet the demand
:param int resize_period: the time between the load checks that decide if the pool grows or shrinks
:param str shortname: optional unique friendly shortname of the pool
:rtype: :class:`~qarnot.pool.Pool`
:returns: The created :class:`~qarnot.pool.Pool`.
.. note:: See available profiles with :meth:`profiles`.
"""
pool = Pool(self, name, profile, shortname=shortname)
pool.setup_elastic(minimum_total_slots, maximum_total_slots, minimum_idle_slots, minimum_idle_time_seconds, resize_factor, resize_period)
return pool
[docs]
def create_task(self, name, profile_or_pool=None, instancecount_or_range=1, shortname=None, job=None):
"""Create a new :class:`~qarnot.task.Task`.
:param str name: given name of the task
:param profile_or_pool: which profile to use with this task, or which Pool to run task, or which job to attach it to.
.. warning:: The :class:`~qarnot.pool.Pool` or :class:`~qarnot.job.Job` need to be already submitted to the api with :meth:`.Pool.submit` or :meth:`.Job.submit` in order to be correctly linked to the task.
:type profile_or_pool: str or :class:`~qarnot.pool.Pool` or None
:param instancecount_or_range: number of instances, or ranges on which to run task. Defaults to 1.
:type instancecount_or_range: int or str
:param str shortname: optional unique friendly shortname of the task
:rtype: :class:`~qarnot.task.Task`
:returns: The created :class:`~qarnot.task.Task`.
:param job: which job to attach the task to
:type job: :class:`~qarnot.job.Job`
.. note:: See available profiles with :meth:`profiles`.
"""
return Task(self, name, profile_or_pool, instancecount_or_range, shortname, job)
[docs]
def submit_tasks(self, tasks):
"""Submit a list of :class:`~qarnot.task.Task`.
:param List of :class:`~qarnot.task.Task`.
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
.. note:: Will ensure all added files are on the resource bucket
regardless of their uploading mode.
"""
error_message = ""
for task in tasks:
task._pre_submit()
payload = [task._to_json() for task in tasks]
responses = self._post(get_url('tasks'), json=payload)
if responses.status_code == 503:
raise QarnotGenericException("Service Unavailable")
raise_on_error(responses)
bulk_responses = [BulkTaskResponse(x) for x in responses.json()]
# The contract with the API is that the response list and the request list should be in the same order
for i, response in enumerate(bulk_responses):
if not response.is_success():
error_message += "[{0}] : {1}, {2}\n".format(tasks[i], response.status_code, response.message)
else:
tasks[i]._uuid = response.uuid
tasks[i]._post_submit()
if error_message:
raise QarnotGenericException(error_message)
[docs]
def profiles_names(self):
"""Get list of profiles names available on the cluster.
:rtype: list of `str`
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
url = get_url('profiles')
response = self._get(url)
raise_on_error(response)
return response.json()
[docs]
def profile_details(self, profile_name):
"""Get a profile available on the cluster.
:rtype: :class:`Profile`
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
url = get_url('profile details', profile=profile_name)
response = self._get(url)
if response.status_code == 404:
return None
raise_on_error(response)
return Profile(response.json())
[docs]
def profiles(self):
"""Get list of profiles available on the cluster.
:rtype: list of :class:`Profile`
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
return list(filter(lambda x: x is not None, executor.map(self.profile_details, self.profiles_names())))
[docs]
def retrieve_profile(self, name):
"""Get details of a profile from its name.
:rtype: :class:`Profile`
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.MissingProfileException: profile not found
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
"""
url = get_url('profile details', profile=name)
response = self._get(url)
if response.status_code == 404:
raise MissingProfileException(get_error_message_from_http_response(response))
raise_on_error(response)
return Profile(response.json())
[docs]
def create_bucket(self, name):
"""Create a new :class:`~qarnot.bucket.Bucket`.
If the bucket already exist, retrieve the existing bucket.
:param str name: bucket name
:rtype: :class:`~qarnot.bucket.Bucket`
:returns: The created or existing :class:`~qarnot.bucket.Bucket`.
"""
return Bucket(self, name)
[docs]
def create_job(self, name, pool=None, shortname=None, useDependencies=False):
"""Create a new :class:`~qarnot.job.Job`.
:param name: given name of the job
:type name: :class:`str`
:param pool: which Pool to submit the job in.
.. warning:: The :class:`~qarnot.pool.Pool` needs to be already submitted to the api with :meth:`.Pool.submit` in order to be correctly linked to the job.
:type pool: :class:`~qarnot.pool.Pool` or None
:param shortname: userfriendly job name
:type shortname: :class:`str`
:param use_dependencies: allow dependencies between tasks in this job
:type job: :class:`bool`
:returns: The created :class:`~qarnot.job.Job`.
"""
return Job(self, name, pool, shortname, useDependencies)
###################
# utility Classes #
###################
[docs]
class UserInfo(object):
"""Information about a qarnot user."""
[docs]
def __init__(self, info):
self.email = info.get('email', '')
""":type: :class:`str`
User email address."""
self.max_bucket = info.get('maxBucket')
""":type: :class:`int`
Maximum number of buckets allowed (resource and result buckets)."""
self.bucket_count = info.get('bucketCount', -1)
""":type: :class:`int`
Number of buckets owned by the user."""
self.quota_bytes_bucket = info.get('quotaBytesBucket')
""":type: :class:`int`
Total storage space allowed for the user's buckets (in Bytes)."""
self.used_quota_bytes_bucket = info.get('usedQuotaBytesBucket')
""":type: :class:`int`
Total storage space used by the user's buckets (in Bytes)."""
self.task_count = info.get('taskCount')
""":type: :class:`int`
Total number of tasks belonging to the user."""
self.max_task = info.get('maxTask')
""":type: :class:`int`
Maximum number of tasks the user is allowed to create."""
self.running_task_count = info.get('runningTaskCount')
""":type: :class:`int`
Number of tasks currently in 'Submitted' state."""
self.max_running_task = info.get('maxRunningTask')
""":type: :class:`int`
Maximum number of running tasks."""
self.max_instances = info.get('maxInstances')
""":type: :class:`int`
Maximum number of instances."""
self.max_cores = info.get('maxFlexCores')
""":type: :class:`int`
Maximum number of cores."""
self.max_pool = info.get('maxPool')
""":type: :class:`int`
Maximum number of pool the user is allowed to create."""
self.pool_count = info.get('poolCount')
""":type: :class:`int`
Total number of pools belonging to the user."""
self.max_running_pool = info.get('maxRunningPool')
""":type: :class:`int`
Maximum number of running pools the user is allowed to create."""
self.running_pool_count = info.get('runningPoolCount')
""":type: :class:`int`
Number of pools currently submitted or running."""
self.running_instance_count = info.get('runningInstanceCount', -1)
""":type: :class:`int`
Number of Instances currently submitted or running."""
self.running_core_count = info.get('runningCoreCount', -1)
""":type: :class:`int`
Number of cores currently submitted or running."""
self.max_flex_instances = info.get('maxFlexInstances')
""":type: :class:`int`
Maximum number of instances simultaneously used with Flex scheduling plan."""
self.max_flex_cores = info.get('maxFlexCores')
""":type: :class:`int`
Maximum number of cores simultaneously used with Flex scheduling plan."""
self.max_on_demand_instances = info.get('maxOnDemandInstances')
""":type: :class:`int`
Maximum number of instances simultaneously used with OnDemand scheduling plan."""
self.max_on_demand_cores = info.get('maxOnDemandCores')
""":type: :class:`int`
Maximum number of cores simultaneously used with OnDemand scheduling plan."""
[docs]
class Profile(object):
"""Information about a profile."""
[docs]
def __init__(self, info):
self.name = info.get('name', '')
""":type: :class:`str`
Name of the profile."""
self.constants = tuple((cst.get('name'), cst.get('value'))
for cst in info.get('constants', []))
""":type: List of (:class:`str`, :class:`str`)
List of couples (name, value) representing constants for this profile
and their default values."""
[docs]
def __repr__(self):
return 'Profile(name=%s, constants=%r}' % (self.name, self.constants)