Source code for qarnot.secrets

"""Secrets prototype"""

# Copyright 2017 Qarnot computing
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=W0613


from typing import Dict, List

from . import get_url, raise_on_error, raise_on_secrets_specific_error


[docs] class SecretAccessRightBySecret(object): """ Secret to be made available to a task, described by exact match on its key property. """
[docs] def __init__(self, key: str): """The SecretAccessRightBySecret constructor. :param key: the exact key of the secret. :type key: `str` """ self._key = key
def __str__(self) -> str: return "Secrets access by key \"{}\".".format(self._key) def __repr__(self) -> str: return self._key def __eq__(self, other) -> bool: if other is None or not isinstance(other, self.__class__): return False return self._key == other._key
[docs] def to_json(self) -> Dict[str, str]: """Get a SecretAccessRightBySecret ready to be json packed. :return: the json representation of a secret accessible by exact match. :rtype: Dict[str,str] """ return {"key": self._key}
[docs] class SecretAccessRightByPrefix(object): """ Secrets to be made available to a task, by prefix match on its prefix property. """
[docs] def __init__(self, prefix: str): """The SecretAccessRightByPrefix constructor. :param key: the prefix of the secrets' keys. :type key: `str` """ self._prefix = prefix
def __str__(self) -> str: return "Secrets access by prefix \"{}\".".format(self._prefix) def __repr__(self) -> str: return self._prefix def __eq__(self, other) -> bool: if other is None or not isinstance(other, self.__class__): return False return self._prefix == other._prefix
[docs] def to_json(self) -> Dict[str, str]: """Get a SecretAccessRightByPrefix ready to be json packed. :return: the json representation of a secret accessible by prefix match. :rtype: Dict[str, str] """ return {"prefix": self._prefix}
[docs] class SecretsAccessRights(object): """ Description of all the secrets a task will have access to when running. """
[docs] def __init__(self, by_secret: List[SecretAccessRightBySecret] = None, by_prefix: List[SecretAccessRightByPrefix] = None): """The SecretsAccessRights constructor :param by_secret: the list of secrets the task will have access to, described using an exact key match :type by_secret: `List[~qarnot.secrets.SecretAccessRightBySecret]` :param by_prefix: the list of secrets the task will have access to, described using a prefix key match :type by_prefix: `List[~qarnot.secrets.SecretAccessRightByPrefix]` """ self._by_secret: List[SecretAccessRightBySecret] = by_secret or [] self._by_prefix: List[SecretAccessRightByPrefix] = by_prefix or []
[docs] def to_json(self) -> Dict[str, List[Dict[str, str]]]: """Get a SecretsAccessRights ready to be json packed. :return: the json representation of the secrets a task will have access to when running. :rtype: Dict[str, List[str]] """ result: Dict[str, List[Dict[str, str]]] = { "bySecret": [by_secret.to_json() for by_secret in self._by_secret] or [], "byPrefix": [by_prefix.to_json() for by_prefix in self._by_prefix] or [], } return result
[docs] @classmethod def from_json(cls, json: Dict[str, List[Dict[str, str]]]): """Create a SecretsAccessRights from a json representation :param json: the json to use to create the SecretsAccessRights object. :type json: `Dict[str, Any]` :returns: The created :class:`~qarnot.secrets.SecretsAccessRights`. """ by_secret, by_prefix = None, None if "bySecret" in json: by_secret = [SecretAccessRightBySecret(secret.get("key")) for secret in json.get("bySecret")] if "byPrefix" in json: by_prefix = [SecretAccessRightByPrefix(secret.get("prefix")) for secret in json.get("byPrefix")] return SecretsAccessRights(by_secret=by_secret, by_prefix=by_prefix)
[docs] def add_secret_by_key(self, key: str): """Add `key` as an available secret to the task. :param key: Key to exactly match secrets on. :type key: `str` """ self._by_secret.append(SecretAccessRightBySecret(key)) return self
[docs] def add_secrets_by_keys(self, keys: List[str]): """Add multiple keys as available secrets to the task. :param key: Keys to exactly match secrets on. :type key: `List[str]` """ self._by_secret.extend(SecretAccessRightBySecret(key) for key in keys) return self
[docs] def add_secret_by_prefix(self, prefix: str): """Add all secrets starting with `prefix` as available secrets to the task. :param prefix: Prefix to match secrets against. :type prefix: `str` """ self._by_prefix.append(SecretAccessRightByPrefix(prefix)) return self
[docs] def add_secrets_by_prefixes(self, prefixes: List[str]): """Add all secrets starting with any of the `prefixes` as available secrets to the task. :param prefixes: Prefixes to match secrets against. :type prefixes: `List[str]` """ self._by_prefix.extend(SecretAccessRightByPrefix(prefix) for prefix in prefixes) return self
def __bool__(self): return abs(len(self._by_secret)) + abs(len(self._by_prefix)) > 0
[docs] class Secrets(object): """ Client used to interact with the Qarnot secrets API. """
[docs] def __init__(self, connection): """The Secrets constructor. :param connection: the cluster one where secrets are retrieved. :type connection: `qarnot.connection.Connection` """ self._connection = connection
def _get_secret_raw(self, key: str): """Retrieves the value of the secret with key `key`. :param key: the key of the secret :type key: `str` :rtype: `requests.Response` :raises ~qarnot.exceptions.SecretNotFoundException: Secret was not found. :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ key = key.strip('/') response = self._connection._get(get_url('secrets data', secret_key=key)) raise_on_secrets_specific_error(response) raise_on_error(response) return response
[docs] def get_secret(self, key: str) -> str: """Retrieves the value of the secret with key `key` and parses it to a string. :param key: the key of the secret :type key: `str` :rtype: str :raises ~qarnot.exceptions.SecretNotFoundException: Secret was not found. :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ raw_secret = self._get_secret_raw(key) return raw_secret.json().get("value")
def _create_secret_raw(self, key: str, value: str): """Creates a secret with key `key` and value `value`. :param key: the key of the secret :type key: `str` :param value: the value of the secret :type value: `str` :rtype: `requests.Response` :raises ~qarnot.exceptions.SecretConflictException: Secret with this key already exists. :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ key = key.strip('/') response = self._connection._put(get_url('secrets data', secret_key=key), json={"Value": value}) raise_on_secrets_specific_error(response) raise_on_error(response) return response
[docs] def create_secret(self, key: str, value: str) -> str: """Creates a secret with key `key` and value `value`. Returns back the key. :param key: the key of the secret :type key: `str` :param value: the value of the secret :type value: `str` :rtype: `str` :raises ~qarnot.exceptions.SecretConflictException: Secret with this key already exists. :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ key = key.strip('/') _ = self._create_secret_raw(key, value) return key
[docs] def update_secret(self, key: str, value: str) -> None: """Updates secret with key `key` and sets its value to `value`. :param key: the key of the secret :type key: `str` :param value: the new value of the secret :type value: `str` :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.SecretNotFoundException: The secret was not found. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ key = key.strip('/') response = self._connection._patch(get_url('secrets data', secret_key=key), json={"Value": value}) raise_on_secrets_specific_error(response) raise_on_error(response)
[docs] def delete_secret(self, key: str) -> None: """Deletes secret with key `key`. :param key: the key of the secret :type key: `str` :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.SecretNotFoundException: The secret was not found. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ key = key.strip('/') response = self._connection._delete(get_url('secrets data', secret_key=key)) raise_on_secrets_specific_error(response) raise_on_error(response)
[docs] def list_secrets(self, prefix: str, recursive: bool = False) -> List[str]: """Lists all the secrets starting with `prefix` When not using recursive mode, only keys and folders directly under `prefix` are returned. For example, listing with a prefix of "prefix" will return "prefix/a" but won't return "prefix/a/b". Folders can be identified by a trailing "/", for example "prefix/nested/". When in recursive mode, only the secrets are returned, not the folders. :param prefix: the prefix :type prefix: `str` :param recursive: lists secrets recursively or not (defaults to `False`) :type recursive: `bool` :rtype: `List[str]` :raises ~qarnot.exceptions.UnauthorizedException: Unauthorized. :raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details """ if not recursive: return self._list_secrets_once(prefix) results: List[str] = [] pending: List[str] = [prefix] while pending: key = pending.pop() keys = self._list_secrets_once(key) results.extend(k for k in keys if not k.endswith('/')) pending.extend(k for k in keys if k.endswith('/')) return results
def _list_secrets_once(self, prefix: str) -> List[str]: prefix = prefix.strip('/') response = self._connection._get(get_url('secrets search', secret_prefix=prefix)) raise_on_secrets_specific_error(response) raise_on_error(response) return ["{}/{}".format(prefix, key) if prefix else key for key in response.json().get("keys")]