"""Module for bucket object."""
# Copyright 2017 Qarnot computing
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import hashlib
import io
import os
import posixpath
import shutil
import itertools
import deprecation
from . import __version__
from typing import Optional
from boto3.s3.transfer import TransferConfig
from itertools import groupby
from operator import attrgetter
from . import _util
from .exceptions import BucketStorageUnavailableException, MissingBucketException
from .storage import Storage
from .advanced_bucket import Filtering, ResourcesTransformation
# Max size in bytes before uploading in parts.
AWS_UPLOAD_MAX_SIZE = 8 * 1024 * 1024
# Size of parts when uploading in parts
AWS_UPLOAD_PART_SIZE = 8 * 1024 * 1024
s3_multipart_config = TransferConfig(
multipart_threshold=AWS_UPLOAD_MAX_SIZE,
multipart_chunksize=AWS_UPLOAD_PART_SIZE,
max_concurrency=10,
num_download_attempts=10,
)
[docs]
class Bucket(Storage): # pylint: disable=W0223
"""Represents a resource/result bucket.
This class is the interface to manage resources or results from a
:class:`~qarnot.bucket.Bucket`.
:raises ~qarnot.exceptions.BucketStorageUnavailableException: the bucket storage engine is not available
.. note::
A :class:`Bucket` must be created with
:meth:`.Connection.create_bucket`
or retrieved with :meth:`.Connection.buckets`, :meth:`.Connection.retrieve_bucket`,
or :meth:`.Connection.retrieve_or_create_bucket`.
.. note::
Paths given as 'remote' arguments,
(or as path arguments for :func:`Bucket.directory`)
**must** be valid unix-like paths.
"""
[docs]
def __init__(self, connection, name, create=True, filtering: Filtering = None, resources_transformation: ResourcesTransformation = None, cacheTTLSec: Optional[int] = None):
super().__init__()
if connection.s3client is None:
raise BucketStorageUnavailableException()
self._connection = connection
self._uuid = name
self._filtering: Optional[Filtering] = filtering or Filtering()
self._resources_transformation: Optional[ResourcesTransformation] = \
resources_transformation or ResourcesTransformation()
self._cache_ttl_sec: Optional[int] = cacheTTLSec
if (self._connection._sanitize_bucket_paths):
self._filtering.sanitize_filter_paths(self._connection._show_bucket_warnings)
self._resources_transformation.sanitize_transformation_paths(self._connection._show_bucket_warnings)
if create:
self._connection.s3client.create_bucket(Bucket=name)
[docs]
def to_json(self):
"""Get a dict ready to be json packed from this bucket."""
as_json_dict = dict()
as_json_dict['bucketName'] = self._uuid
as_json_dict['filtering'] = None if self._filtering is None else self._filtering.to_json()
as_json_dict['resourcesTransformation'] = (None if self._resources_transformation is None
else self._resources_transformation.to_json())
as_json_dict['cacheTTLSec'] = self._cache_ttl_sec
return as_json_dict
[docs]
@classmethod
def from_json(cls, connection, json_bucket):
"""Create a Bucket object from a json advance bucket.
:param ~qarnot.connection.Connection connection: the cluster connection
:param dict json_bucket: Dictionary representing the bucket
:returns: The created :class:`~qarnot.bucket.Bucket`.
"""
filtering = None
if "filtering" in json_bucket and json_bucket.get('filtering'):
filtering = Filtering.from_json(json_bucket.get('filtering'))
resource_transformation = None
if "resourcesTransformation" in json_bucket and json_bucket.get('resourcesTransformation'):
resource_transformation = ResourcesTransformation.from_json(json_bucket.get('resourcesTransformation'))
bucket = Bucket(connection, json_bucket.get('bucketName'), create=False, filtering=filtering, resources_transformation=resource_transformation, cacheTTLSec=json_bucket.get('cacheTTLSec'))
return bucket
[docs]
def with_filtering(self, filtering):
"""Create a new Bucket object from the given bucket with a specific filtering.
examples:
.. code-block:: python
filtered_bucket = bucket.with_filtering(BucketPrefixFiltering("prefix1"))
.. code-block:: python
other_filtered_bucket = Bucket(connection, "name", False).with_filtering(BucketPrefixFiltering("prefix1"))
:param ~qarnot.advanced_bucket.AbstractFiltering filtering: Filtering to add to the bucket.
:returns: The created :class:`~qarnot.bucket.Bucket`.
"""
bucket_copy = Bucket(self._connection, self._uuid,
create=False, filtering=Filtering(), resources_transformation=self._resources_transformation)
bucket_copy._filtering.append(filtering)
return bucket_copy
[docs]
def with_cache_ttl(self, ttl: int):
"""Create a new Bucket object from the given bucket with a specific cache ttl (in seconds).
examples:
.. code-block:: python
new_bucket = bucket.with_cache_ttl(2592000)
.. code-block:: python
new_bucket = Bucket(connection, "name", False).with_cache_ttl(2592000)
:param int ttl: Time to live for the bucket resource cache.
:returns: The created :class:`~qarnot.bucket.Bucket`.
"""
bucket_copy = Bucket(self._connection, self._uuid,
create=False, filtering=self._filtering, resources_transformation=self._resources_transformation, cacheTTLSec=ttl)
return bucket_copy
@classmethod
def _retrieve(cls, connection, bucket_uuid):
"""Retrieve information of a bucket on a cluster.
:param ~qarnot.connection.Connection connection: the cluster
to get the bucket from
:param str bucket_uuid: the UUID of the bucket to retrieve
:rtype: :class:`~qarnot.bucket.Bucket`
:returns: The retrieved bucket.
:raises ~qarnot.exceptions.BucketStorageUnavailableException: the bucket storage engine is not available
"""
return connection.retrieve_bucket(uuid=bucket_uuid)
[docs]
def delete(self):
""" Delete the bucket represented by this :class:`Bucket`."""
n = 1000 # delete object count max request
try:
bucket = self._connection.s3resource.Bucket(self._uuid)
versioned_bucket = self._connection.s3resource.BucketVersioning(self._uuid)
if versioned_bucket.status == 'None':
objectlist = list(bucket.objects.all())
listofobjectlist = [[{'Key': x.key} for x in objectlist[i:i + n]] for i in range(0, len(objectlist), n)]
else:
objectlist = list(bucket.object_versions.all())
listofobjectlist = [[{'Key': x.key, 'VersionId': x.id} for x in objectlist[i:i + n]] for i in range(0, len(objectlist), n)]
for item in listofobjectlist:
bucket.delete_objects(
Delete={
'Objects': item
}
)
self._connection.s3client.delete_bucket(Bucket=self._uuid)
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot delete {}. Bucket not found.".format(err.response.get('Error').get('BucketName'))) from err
[docs]
def list_files(self):
"""List files in the bucket
:rtype: list(:class:`S3.ObjectSummary`)
:returns: A list of ObjectSummary resources
"""
try:
bucket = self._connection.s3resource.Bucket(self._uuid)
return [b for b in bucket.objects.all() if b.key is not None]
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot list files. Bucket {} not found.".format(err.response.get('Error').get('BucketName'))) from err
[docs]
def directory(self, directory=''):
"""List files in a directory of the bucket according to prefix.
:rtype: list(:class:`S3.ObjectSummary`)
:returns: A list of ObjectSummary resources
"""
bucket = self._connection.s3resource.Bucket(self._uuid)
return bucket.objects.filter(Prefix=directory)
[docs]
def sync_remote_to_local(self, local_directoy, remote_directory=None):
"""Synchronize a remote directory to a local directory.
:param str local_directoy: The local directory to use for synchronization
:param str remote_directory: path of the directory on remote node (defaults to whole bucket)
.. warning::
Distant changes are reflected on the local filesystem, a file not present on the
bucket but in the local directory might be deleted from the local filesystem.
.. note::
The following parameters are used to determine whether
synchronization is required :
* name
* size
* sha1sum
"""
def get_key_for_local(remote_key: str) -> str:
if remote_directory:
return removeprefix(remote_key, remote_directory).lstrip('/')
return remote_key.lstrip('/')
def removeprefix(target_str: str, prefix: str) -> str:
if target_str.startswith(prefix):
return target_str[len(prefix):]
else:
return target_str[:]
try:
if remote_directory:
entries = self.directory(remote_directory)
else:
entries = self.list_files()
list_files_only = [x for x in entries if not x.key.endswith('/')]
list_directories_only = [x for x in entries if x.key.endswith('/')]
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot synchronize. Bucket {} not found.".format(err.response.get('Error').get('BucketName'))) from err
for directory in list_directories_only:
if not os.path.isdir(os.path.join(local_directoy, get_key_for_local(directory.key))):
os.makedirs(os.path.join(local_directoy, get_key_for_local(directory.key)), exist_ok=True)
for _, dupes in groupby(sorted(list_files_only, key=attrgetter('e_tag')), attrgetter('e_tag')):
file_info = next(dupes)
first_file = os.path.join(local_directoy, get_key_for_local(file_info.key))
self.get_file(file_info.get()['Body'], local=first_file) # avoids making a useless HEAD request
for dupe in dupes:
local = os.path.join(local_directoy, get_key_for_local(dupe.key))
directory = os.path.dirname(local)
if not os.path.exists(directory):
os.makedirs(directory)
if (os.path.abspath(os.path.realpath(local)) is not os.path.abspath(os.path.realpath(first_file))):
shutil.copy(first_file, local)
[docs]
def sync_directory(self, directory, verbose=False, remote=None):
"""Synchronize a local directory with the remote buckets.
:param str directory: The local directory to use for synchronization
:param bool verbose: Print information about synchronization operations
:param str remote: path of the directory on remote node (defaults to *local*)
.. warning::
Local changes are reflected on the server, a file present on the
bucket but not in the local directory will be deleted from the bucket.
A file present in the directory but not in the bucket will be uploaded.
.. note::
The following parameters are used to determine whether
synchronization is required :
* name
* size
* sha1sum
"""
if not directory.endswith(os.sep):
directory += os.sep
filesdict = {}
for root, _, files in os.walk(directory):
root = _util.decode(root)
files = list(map(_util.decode, files))
for file_ in files:
filepath = os.path.join(root, file_)
name = filepath[len(directory):]
name = name.replace(os.sep, '/')
filesdict[name] = filepath
self.sync_files(filesdict, verbose, remote)
[docs]
def sync_files(self, files, verbose=False, remote=None):
"""Synchronize files with the remote buckets.
:param dict files: Dictionary of synchronized files
:param bool verbose: Print information about synchronization operations
:param str remote: path of the directory on remote node (defaults to *local*)
:raises ~qarnot.exceptions.MissingBucketException: the bucket is not on the server
Dictionary key is the remote file path while value is the local file
path.
.. warning::
Local changes are reflected on the server, a file present on the
bucket but
not in the local directory will be deleted from the bucket.
A file present in the directory but not in the bucket will be uploaded.
.. note::
The following parameters are used to determine whether
synchronization is required :
* name
* size
* sha1sum
"""
class Comparable(object):
def __init__(self, name_, e_tag, filepath_):
self.name = name_
self.e_tag = e_tag
self.filepath = filepath_
def __repr__(self):
return "Name {0}, ETag {1}".format(self.name, self.e_tag)
def __eq__(self, other):
return self.name == other.name and self.e_tag == other.e_tag
def __hash__(self):
return hash(self.name) ^ hash(self.e_tag)
def aws_md5sum(sourcepath):
if os.stat(sourcepath).st_size < AWS_UPLOAD_MAX_SIZE:
hash_md5 = hashlib.md5()
with open(sourcepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return "\"{0}\"".format(hash_md5.hexdigest())
else:
md5s = []
with open(sourcepath, 'rb') as fp:
while True:
data = fp.read(AWS_UPLOAD_PART_SIZE)
if not data:
break
md5s.append(hashlib.md5(data))
digests = b"".join(m.digest() for m in md5s)
new_md5 = hashlib.md5(digests)
return "\"{0}-{1}\"".format(new_md5.hexdigest(), len(md5s))
def localtocomparable(name_, filepath_, remote):
if remote is not None:
name_ = os.path.join(remote, name_.lstrip('/'))
return Comparable(name_.replace(os.sep, '/'), aws_md5sum(filepath_), filepath_)
def objectsummarytocomparable(object_):
return Comparable(object_.key, object_.e_tag, None)
try:
localfiles = set()
if self._connection._sanitize_bucket_paths:
remote = _util.get_sanitized_bucket_path(remote, self._connection._show_bucket_warnings)
for name, filepath in files.items():
localfiles.add(localtocomparable(name.replace(os.path.sep, '/'), filepath, remote))
remotefiles = set(map(objectsummarytocomparable, self.list_files()))
adds = localfiles - remotefiles
removes = remotefiles - localfiles
seen_tags = set() # To avoid copying the same objects multiple times when renaming
for file_ in removes:
if remote is not None and not file_.name.startswith(remote):
continue
renames = (x for x in adds if x.e_tag not in seen_tags and x.e_tag == file_.e_tag
and all(rem.name != x.name for rem in remotefiles))
for dup in renames:
if verbose:
self._connection.logger.info("Copy %s to %s" % (file_.name, dup.name))
self.copy_file(file_.name, dup.name)
if verbose:
self._connection.logger.info("Remove: %s" % file_.name)
self.delete_file(file_.name)
seen_tags.add(file_.e_tag)
remotefiles = set(map(objectsummarytocomparable, self.list_files()))
sadds = sorted(adds, key=lambda x: x.e_tag)
groupedadds = (list(g) for _, g in itertools.groupby(sadds, lambda x: x.e_tag))
for entry in groupedadds:
try:
rem = next(x for x in remotefiles if x.e_tag == entry[0].e_tag)
if rem.name == entry[0].name:
continue
if verbose:
self._connection.logger.info("Copy %s to %s" % (rem.name, entry[0].name))
self.copy_file(rem.name, entry[0].name)
except StopIteration:
if verbose:
self._connection.logger.info("Upload: %s -> %s" % (entry[0].filepath, entry[0].name))
self.add_file(entry[0].filepath, entry[0].name)
for link in entry[1:]: # duplicate files
if verbose:
self._connection.logger.info("Copy %s to %s" % (entry[0].name, link.name))
self.copy_file(entry[0].name, link.name)
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot sync files. Bucket {} not found.".format(err.response.get('Error').get('BucketName'))) from err
[docs]
def add_string(self, string, remote):
"""Add a string on the storage.
:param str string: the string to add
:param str remote: name of the remote file
"""
self.add_file(io.BytesIO(bytes(string, 'utf-8')), remote)
[docs]
@_util.copy_docs(Storage.add_file)
def add_file(self, local_or_file, remote=None):
tobeclosed = False
if self._connection._sanitize_bucket_paths:
remote = _util.get_sanitized_bucket_path(remote, self._connection._show_bucket_warnings)
if _util.is_string(local_or_file):
file_ = open(local_or_file, 'rb')
tobeclosed = True
else:
file_ = local_or_file
dest = remote or os.path.basename(file_.name)
try:
self._connection.s3client.upload_fileobj(file_, self._uuid, dest, Config=s3_multipart_config)
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot add string. Bucket {} not found.".format(err.response.get('Error').get('BucketName'))) from err
finally:
if tobeclosed:
file_.close()
[docs]
@_util.copy_docs(Storage.get_all_files)
def get_all_files(self, output_dir, progress=None):
self.sync_remote_to_local(output_dir, None)
[docs]
@_util.copy_docs(Storage.get_file)
def get_file(self, remote, local=None, progress=None):
return super(Bucket, self).get_file(remote, local, progress)
[docs]
@_util.copy_docs(Storage.add_directory)
def add_directory(self, local, remote=""):
if not os.path.isdir(local):
raise IOError("Not a valid directory")
if self._connection._sanitize_bucket_paths:
remote = _util.get_sanitized_bucket_path(remote, self._connection._show_bucket_warnings)
if remote and not remote.endswith('/'):
remote += '/'
for dirpath, _, files in os.walk(local):
dirpath = _util.decode(dirpath)
files = list(map(_util.decode, files))
remote_loc = dirpath.replace(local, remote, 1)
for filename in files:
self.add_file(os.path.join(dirpath, filename),
posixpath.join(remote_loc, filename))
[docs]
@_util.copy_docs(Storage.copy_file)
def copy_file(self, source, dest):
try:
copy_source = {
'Bucket': self._uuid,
'Key': source
}
return self._connection.s3client.copy_object(CopySource=copy_source, Bucket=self._uuid, Key=dest)
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot copy file {} to {} from bucket {}. Bucket not found.".format(source, dest, err.response.get('Error').get('BucketName'))) from err
[docs]
@deprecation.deprecated(deprecated_in="2.6.0", removed_in="3.0",
current_version=__version__, # type: ignore
details="Legacy function")
@_util.copy_docs(Storage.flush)
def flush(self):
pass
[docs]
@deprecation.deprecated(deprecated_in="2.6.0", removed_in="3.0",
current_version=__version__, # type: ignore
details="Legacy function")
@_util.copy_docs(Storage.update)
def update(self, flush=False):
pass
def _download_file(self, remote, local, progress=None):
with open(local, 'wb') as data:
if hasattr(remote, 'read'):
shutil.copyfileobj(remote, data)
else:
try:
self._connection.s3client.download_fileobj(self._uuid, remote, data)
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot download file {} from bucket {}. Bucket not found.".format(remote, err.response.get('Error').get('BucketName'))) from err
return local
[docs]
@_util.copy_docs(Storage.delete_file)
def delete_file(self, remote):
try:
if self._connection._sanitize_bucket_paths:
remote = _util.get_sanitized_bucket_path(remote, self._connection._show_bucket_warnings)
self._connection.s3client.delete_object(Bucket=self._uuid, Key=remote)
except self._connection.s3resource.meta.client.exceptions.NoSuchBucket as err:
raise MissingBucketException("Cannot delete file {} from bucket {}. Bucket not found.".format(remote, err.response.get('Error').get('BucketName'))) from err
@property
def uuid(self):
""" Bucket identifier"""
return self._uuid
@property
def description(self):
""" Bucket identifier"""
return self._uuid