# Copyright 2011-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from threading import enumerate as activethreads
from os import fstat
from hashlib import new as newhashlib
from time import time
from StringIO import StringIO
from binascii import hexlify
from kamaki.clients import SilentEvent, sendlog
from kamaki.clients.pithos.rest_api import PithosRestClient
from kamaki.clients.storage import ClientError
from kamaki.clients.utils import path4url, filter_in, readall
def _pithos_hash(block, blockhash):
h = newhashlib(blockhash)
h.update(block.rstrip('\x00'))
return h.hexdigest()
def _range_up(start, end, max_value, a_range):
"""
:param start: (int) the window bottom
:param end: (int) the window top
:param max_value: (int) maximum accepted value
:param a_range: (str) a range string in the form X[,X'[,X''[...]]]
where X: x|x-y|-x where x < y and x, y natural numbers
:returns: (str) a range string cut-off for the start-end range
an empty response means this window is out of range
"""
assert start >= 0, '_range_up called w. start(%s) < 0' % start
assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
end, start)
assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
max_value, end)
if not a_range:
return '%s-%s' % (start, end)
selected = []
for some_range in a_range.split(','):
v0, sep, v1 = some_range.partition('-')
if v0:
v0 = int(v0)
if sep:
v1 = int(v1)
if v1 < start or v0 > end or v1 < v0:
continue
v0 = v0 if v0 > start else start
v1 = v1 if v1 < end else end
selected.append('%s-%s' % (v0, v1))
elif v0 < start:
continue
else:
v1 = v0 if v0 <= end else end
selected.append('%s-%s' % (start, v1))
else:
v1 = int(v1)
if max_value - v1 > end:
continue
v0 = (max_value - v1) if max_value - v1 > start else start
selected.append('%s-%s' % (v0, end))
return ','.join(selected)
[docs]class PithosClient(PithosRestClient):
"""Synnefo Pithos+ API client"""
def __init__(self, endpoint_url, token, account=None, container=None):
super(PithosClient, self).__init__(
endpoint_url, token, account, container)
[docs] def create_container(
self,
container=None, sizelimit=None, versioning=None, metadata=None,
project_id=None, **kwargs):
"""
:param container: (str) if not given, self.container is used instead
:param sizelimit: (int) container total size limit in bytes
:param versioning: (str) can be auto or whatever supported by server
:param metadata: (dict) Custom user-defined metadata of the form
{ 'name1': 'value1', 'name2': 'value2', ... }
:returns: (dict) response headers
"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
r = self.container_put(
quota=sizelimit, versioning=versioning, metadata=metadata,
project_id=project_id, **kwargs)
return r.headers
finally:
self.container = cnt_back_up
[docs] def purge_container(self, container=None):
"""Delete an empty container and destroy associated blocks"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
r = self.container_delete(until=unicode(time()))
finally:
self.container = cnt_back_up
return r.headers
[docs] def upload_object_unchunked(
self, obj, f,
withHashFile=False,
size=None,
etag=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
public=None):
"""
:param obj: (str) remote object path
:param f: open file descriptor
:param withHashFile: (bool)
:param size: (int) size of data to upload
:param etag: (str)
:param content_encoding: (str)
:param content_disposition: (str)
:param content_type: (str)
:param sharing: {'read':[user and/or grp names],
'write':[usr and/or grp names]}
:param public: (bool)
:returns: (dict) created object metadata
"""
self._assert_container()
if withHashFile:
data = f.read()
try:
import json
data = json.dumps(json.loads(data))
except ValueError:
raise ClientError('"%s" is not json-formated' % f.name, 1)
except SyntaxError:
msg = '"%s" is not a valid hashmap file' % f.name
raise ClientError(msg, 1)
f = StringIO(data)
else:
data = readall(f, size) if size else f.read()
r = self.object_put(
obj,
data=data,
etag=etag,
content_encoding=content_encoding,
content_disposition=content_disposition,
content_type=content_type,
permissions=sharing,
public=public,
success=201)
return r.headers
[docs] def create_object_by_manifestation(
self, obj,
etag=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
public=None):
"""
:param obj: (str) remote object path
:param etag: (str)
:param content_encoding: (str)
:param content_disposition: (str)
:param content_type: (str)
:param sharing: {'read':[user and/or grp names],
'write':[usr and/or grp names]}
:param public: (bool)
:returns: (dict) created object metadata
"""
self._assert_container()
r = self.object_put(
obj,
content_length=0,
etag=etag,
content_encoding=content_encoding,
content_disposition=content_disposition,
content_type=content_type,
permissions=sharing,
public=public,
manifest='%s/%s' % (self.container, obj))
return r.headers
# upload_* auxiliary methods
def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
def _put_block(self, data, hash):
r = self.container_post(
update=True,
content_type='application/octet-stream',
content_length=len(data),
data=data,
format='json')
assert r.json[0] == hash, 'Local hash does not match server'
def _get_file_block_info(self, fileobj, size=None, cache=None):
"""
:param fileobj: (file descriptor) source
:param size: (int) size of data to upload from source
:param cache: (dict) if provided, cache container info response to
avoid redundant calls
"""
if isinstance(cache, dict):
try:
meta = cache[self.container]
except KeyError:
meta = self.get_container_info()
cache[self.container] = meta
else:
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
size = size if size is not None else fstat(fileobj.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
hashmap=True,
content_type=None,
if_etag_match=None,
if_etag_not_match=None,
content_encoding=None,
content_disposition=None,
permissions=None,
public=None,
success=(201, 409)):
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
json=json,
if_etag_match=if_etag_match,
if_etag_not_match=if_etag_not_match,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=permissions,
public=public,
success=success)
return (None if r.status_code == 201 else r.json), r.headers
def _calculate_blocks_for_upload(
self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
hash_cb=None):
offset = 0
if hash_cb:
hash_gen = hash_cb(nblocks)
hash_gen.next()
for i in xrange(nblocks):
block = readall(fileobj, min(blocksize, size - offset))
bytes = len(block)
if bytes <= 0:
break
hash = _pithos_hash(block, blockhash)
hashes.append(hash)
hmap[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
msg = ('Failed to calculate uploading blocks: '
'read bytes(%s) != requested size (%s)' % (offset, size))
assert offset == size, msg
def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
"""upload missing blocks asynchronously"""
self._init_thread_limit()
flying = []
failures = []
for hash in missing:
offset, bytes = hmap[hash]
fileobj.seek(offset)
data = readall(fileobj, bytes)
r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
if thread.exception:
failures.append(thread)
if isinstance(
thread.exception,
ClientError) and thread.exception.status == 502:
self.POOLSIZE = self._thread_limit
elif thread.isAlive():
flying.append(thread)
elif upload_gen:
try:
upload_gen.next()
except:
pass
flying = unfinished
for thread in flying:
thread.join()
if thread.exception:
failures.append(thread)
elif upload_gen:
try:
upload_gen.next()
except:
pass
return [failure.kwargs['hash'] for failure in failures]
[docs] def upload_object(
self, obj, f,
size=None,
hash_cb=None,
upload_cb=None,
etag=None,
if_etag_match=None,
if_not_exist=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
public=None,
container_info_cache=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
:param f: open file descriptor (rb)
:param hash_cb: optional progress.bar object for calculating hashes
:param upload_cb: optional progress.bar object for uploading
:param etag: (str)
:param if_etag_match: (str) Push that value to if-match header at file
creation
:param if_not_exist: (bool) If true, the file will be uploaded ONLY if
it does not exist remotely, otherwise the operation will fail.
Involves the case of an object with the same path is created while
the object is being uploaded.
:param content_encoding: (str)
:param content_disposition: (str)
:param content_type: (str)
:param sharing: {'read':[user and/or grp names],
'write':[usr and/or grp names]}
:param public: (bool)
:param container_info_cache: (dict) if given, avoid redundant calls to
server for container info (block size and hash information)
"""
self._assert_container()
block_info = (
blocksize, blockhash, size, nblocks) = self._get_file_block_info(
f, size, container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
content_type = content_type or 'application/octet-stream'
self._calculate_blocks_for_upload(
*block_info,
hashes=hashes,
hmap=hmap,
fileobj=f,
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=sharing,
public=public)
if missing is None:
return obj_headers
if upload_cb:
upload_gen = upload_cb(len(hashmap['hashes']))
for i in range(len(hashmap['hashes']) + 1 - len(missing)):
try:
upload_gen.next()
except:
sendlog.debug('Progress bar failure')
break
else:
upload_gen = None
retries = 7
while retries:
sendlog.info('%s blocks missing' % len(missing))
num_of_blocks = len(missing)
missing = self._upload_missing_blocks(
missing, hmap, f, upload_gen)
if missing:
if num_of_blocks == len(missing):
retries -= 1
else:
num_of_blocks = len(missing)
else:
break
if missing:
try:
details = ['%s' % thread.exception for thread in missing]
except Exception:
details = ['Also, failed to read thread exceptions']
raise ClientError(
'%s blocks failed to upload' % len(missing),
details=details)
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
content_encoding=content_encoding,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
json=hashmap,
permissions=sharing,
public=public,
success=201)
return r.headers
[docs] def upload_from_string(
self, obj, input_str,
hash_cb=None,
upload_cb=None,
etag=None,
if_etag_match=None,
if_not_exist=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
public=None,
container_info_cache=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
:param input_str: (str) upload content
:param hash_cb: optional progress.bar object for calculating hashes
:param upload_cb: optional progress.bar object for uploading
:param etag: (str)
:param if_etag_match: (str) Push that value to if-match header at file
creation
:param if_not_exist: (bool) If true, the file will be uploaded ONLY if
it does not exist remotely, otherwise the operation will fail.
Involves the case of an object with the same path is created while
the object is being uploaded.
:param content_encoding: (str)
:param content_disposition: (str)
:param content_type: (str)
:param sharing: {'read':[user and/or grp names],
'write':[usr and/or grp names]}
:param public: (bool)
:param container_info_cache: (dict) if given, avoid redundant calls to
server for container info (block size and hash information)
"""
self._assert_container()
blocksize, blockhash, size, nblocks = self._get_file_block_info(
fileobj=None, size=len(input_str), cache=container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
if not content_type:
content_type = 'application/octet-stream'
hashes = []
hmap = {}
for blockid in range(nblocks):
start = blockid * blocksize
block = input_str[start: (start + blocksize)]
hashes.append(_pithos_hash(block, blockhash))
hmap[hashes[blockid]] = (start, block)
hashmap = dict(bytes=size, hashes=hashes)
missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=sharing,
public=public)
if missing is None:
return obj_headers
num_of_missing = len(missing)
if upload_cb:
self.progress_bar_gen = upload_cb(nblocks)
for i in range(nblocks + 1 - num_of_missing):
self._cb_next()
tries = 7
old_failures = 0
try:
while tries and missing:
flying = []
failures = []
for hash in missing:
offset, block = hmap[hash]
bird = self._put_block_async(block, hash)
flying.append(bird)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
if thread.exception:
failures.append(thread.kwargs['hash'])
if thread.isAlive():
flying.append(thread)
else:
self._cb_next()
flying = unfinished
for thread in flying:
thread.join()
if thread.exception:
failures.append(thread.kwargs['hash'])
self._cb_next()
missing = failures
if missing and len(missing) == old_failures:
tries -= 1
old_failures = len(missing)
if missing:
raise ClientError('%s blocks failed to upload' % len(missing))
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
self._cb_next()
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
content_encoding=content_encoding,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
json=hashmap,
permissions=sharing,
public=public,
success=201)
return r.headers
# download_* auxiliary methods
def _get_remote_blocks_info(self, obj, **restargs):
#retrieve object hashmap
myrange = restargs.pop('data_range', None)
hashmap = self.get_object_hashmap(obj, **restargs)
restargs['data_range'] = myrange
blocksize = int(hashmap['block_size'])
blockhash = hashmap['block_hash']
total_size = hashmap['bytes']
#assert total_size/blocksize + 1 == len(hashmap['hashes'])
map_dict = {}
for i, h in enumerate(hashmap['hashes']):
# map_dict[h] = i CHAGE
if h in map_dict:
map_dict[h].append(i)
else:
map_dict[h] = [i]
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
def _dump_blocks_sync(
self, obj, remote_hashes, blocksize, total_size, dst, crange,
**args):
if not total_size:
return
for blockid, blockhash in enumerate(remote_hashes):
if blockhash:
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
data_range = _range_up(start, end, total_size, crange)
if not data_range:
self._cb_next()
continue
args['data_range'] = 'bytes=%s' % data_range
r = self.object_get(obj, success=(200, 206), **args)
self._cb_next()
dst.write(r.content)
dst.flush()
def _get_block_async(self, obj, **args):
event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
event.start()
return event
def _hash_from_file(self, fp, start, size, blockhash):
fp.seek(start)
block = readall(fp, size)
h = newhashlib(blockhash)
h.update(block.strip('\x00'))
return hexlify(h.digest())
def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
"""write the results of a greenleted rest call to a file
:param offset: the offset of the file up to blocksize
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
raise g.exception
block = g.value.content
for block_start in blockids[key]:
local_file.seek(block_start + offset)
local_file.write(block)
self._cb_next()
flying.pop(key)
blockids.pop(key)
local_file.flush()
def _dump_blocks_async(
self, obj, remote_hashes, blocksize, total_size, local_file,
blockhash=None, resume=False, filerange=None, **restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
flying = dict()
blockid_dict = dict()
offset = 0
self._init_thread_limit()
for block_hash, blockids in remote_hashes.items():
blockids = [blk * blocksize for blk in blockids]
unsaved = [blk for blk in blockids if not (
blk < file_size and block_hash == self._hash_from_file(
local_file, blk, blocksize, blockhash))]
self._cb_next(len(blockids) - len(unsaved))
if unsaved:
key = unsaved[0]
self._watch_thread_limit(flying.values())
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
end = total_size - 1 if (
key + blocksize > total_size) else key + blocksize - 1
if end < key:
self._cb_next()
continue
data_range = _range_up(key, end, total_size, filerange)
if not data_range:
self._cb_next()
continue
restargs[
'async_headers'] = {'Range': 'bytes=%s' % data_range}
flying[key] = self._get_block_async(obj, **restargs)
blockid_dict[key] = unsaved
for thread in flying.values():
thread.join()
self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
[docs] def download_object(
self, obj, dst,
download_cb=None,
version=None,
resume=False,
range_str=None,
if_match=None,
if_none_match=None,
if_modified_since=None,
if_unmodified_since=None):
"""Download an object (multiple connections, random blocks)
:param obj: (str) remote object path
:param dst: open file descriptor (wb+)
:param download_cb: optional progress.bar object for downloading
:param version: (str) file version
:param resume: (bool) if set, preserve already downloaded file parts
:param range_str: (str) from, to are file positions (int) in bytes
:param if_match: (str)
:param if_none_match: (str)
:param if_modified_since: (str) formated date
:param if_unmodified_since: (str) formated date"""
restargs = dict(
version=version,
data_range=None if range_str is None else 'bytes=%s' % range_str,
if_match=if_match,
if_none_match=if_none_match,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
(
blocksize,
blockhash,
total_size,
hash_list,
remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
assert total_size >= 0
if download_cb:
self.progress_bar_gen = download_cb(len(hash_list))
self._cb_next()
if dst.isatty():
self._dump_blocks_sync(
obj,
hash_list,
blocksize,
total_size,
dst,
range_str,
**restargs)
else:
self._dump_blocks_async(
obj,
remote_hashes,
blocksize,
total_size,
dst,
blockhash,
resume,
range_str,
**restargs)
if not range_str:
dst.truncate(total_size)
self._complete_cb()
[docs] def download_to_string(
self, obj,
download_cb=None,
version=None,
range_str=None,
if_match=None,
if_none_match=None,
if_modified_since=None,
if_unmodified_since=None):
"""Download an object to a string (multiple connections). This method
uses threads for http requests, but stores all content in memory.
:param obj: (str) remote object path
:param download_cb: optional progress.bar object for downloading
:param version: (str) file version
:param range_str: (str) from, to are file positions (int) in bytes
:param if_match: (str)
:param if_none_match: (str)
:param if_modified_since: (str) formated date
:param if_unmodified_since: (str) formated date
:returns: (str) the whole object contents
"""
restargs = dict(
version=version,
data_range=None if range_str is None else 'bytes=%s' % range_str,
if_match=if_match,
if_none_match=if_none_match,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
(
blocksize,
blockhash,
total_size,
hash_list,
remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
assert total_size >= 0
if download_cb:
self.progress_bar_gen = download_cb(len(hash_list))
self._cb_next()
num_of_blocks = len(remote_hashes)
ret = [''] * num_of_blocks
self._init_thread_limit()
flying = dict()
try:
for blockid, blockhash in enumerate(remote_hashes):
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
data_range_str = _range_up(start, end, end, range_str)
if data_range_str:
self._watch_thread_limit(flying.values())
restargs['data_range'] = 'bytes=%s' % data_range_str
flying[blockid] = self._get_block_async(obj, **restargs)
for runid, thread in flying.items():
if (blockid + 1) == num_of_blocks:
thread.join()
elif thread.isAlive():
continue
if thread.exception:
raise thread.exception
ret[runid] = thread.value.content
self._cb_next()
flying.pop(runid)
return ''.join(ret)
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
#Command Progress Bar method
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
try:
for i in xrange(step):
self.progress_bar_gen.next()
except:
pass
def _complete_cb(self):
while True:
try:
self.progress_bar_gen.next()
except:
break
[docs] def get_object_hashmap(
self, obj,
version=None,
if_match=None,
if_none_match=None,
if_modified_since=None,
if_unmodified_since=None):
"""
:param obj: (str) remote object path
:param if_match: (str)
:param if_none_match: (str)
:param if_modified_since: (str) formated date
:param if_unmodified_since: (str) formated date
:returns: (list)
"""
try:
r = self.object_get(
obj,
hashmap=True,
version=version,
if_etag_match=if_match,
if_etag_not_match=if_none_match,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
except ClientError as err:
if err.status == 304 or err.status == 412:
return {}
raise
return r.json
[docs] def set_account_group(self, group, usernames):
"""
:param group: (str)
:param usernames: (list)
"""
r = self.account_post(update=True, groups={group: usernames})
return r
[docs] def del_account_group(self, group):
"""
:param group: (str)
"""
self.account_post(update=True, groups={group: []})
[docs] def get_account_info(self, until=None):
"""
:param until: (str) formated date
:returns: (dict)
"""
r = self.account_head(until=until)
if r.status_code == 401:
raise ClientError("No authorization", status=401)
return r.headers
[docs] def get_account_quota(self):
"""
:returns: (dict)
"""
return filter_in(
self.get_account_info(),
'X-Account-Policy-Quota',
exactMatch=True)
#def get_account_versioning(self):
# """
# :returns: (dict)
# """
# return filter_in(
# self.get_account_info(),
# 'X-Account-Policy-Versioning',
# exactMatch=True)
[docs] def get_account_group(self):
"""
:returns: (dict)
"""
return filter_in(self.get_account_info(), 'X-Account-Group-')
[docs] def list_containers(self):
"""
:returns: (dict)
"""
r = self.account_get()
return r.json
[docs] def del_container(self, until=None, delimiter=None):
"""
:param until: (str) formated date
:param delimiter: (str) with / empty container
:raises ClientError: 404 Container does not exist
:raises ClientError: 409 Container is not empty
"""
self._assert_container()
r = self.container_delete(
until=until,
delimiter=delimiter,
success=(204, 404, 409))
if r.status_code == 404:
raise ClientError(
'Container "%s" does not exist' % self.container,
r.status_code)
elif r.status_code == 409:
raise ClientError(
'Container "%s" is not empty' % self.container,
r.status_code)
return r.headers
[docs] def get_container_versioning(self, container=None):
"""
:param container: (str)
:returns: (dict)
"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
return filter_in(
self.get_container_info(),
'X-Container-Policy-Versioning')
finally:
self.container = cnt_back_up
[docs] def get_container_limit(self, container=None):
"""
:param container: (str)
:returns: (dict)
"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
return filter_in(
self.get_container_info(),
'X-Container-Policy-Quota')
finally:
self.container = cnt_back_up
[docs] def get_container_info(self, container=None, until=None):
"""
:param until: (str) formated date
:returns: (dict)
:raises ClientError: 404 Container not found
"""
bck_cont = self.container
try:
self.container = container or bck_cont
self._assert_container()
r = self.container_head(until=until)
except ClientError as err:
err.details.append('for container %s' % self.container)
raise err
finally:
self.container = bck_cont
return r.headers
[docs] def set_container_limit(self, limit):
"""
:param limit: (int)
"""
r = self.container_post(update=True, quota=limit)
return r.headers
[docs] def set_container_versioning(self, versioning):
"""
:param versioning: (str)
"""
r = self.container_post(update=True, versioning=versioning)
return r.headers
[docs] def del_object(self, obj, until=None, delimiter=None):
"""
:param obj: (str) remote object path
:param until: (str) formated date
:param delimiter: (str)
"""
self._assert_container()
r = self.object_delete(obj, until=until, delimiter=delimiter)
return r.headers
[docs] def publish_object(self, obj):
"""
:param obj: (str) remote object path
:returns: (str) access url
"""
self.object_post(obj, update=True, public=True)
info = self.get_object_info(obj)
return info['x-object-public']
pref, sep, rest = self.endpoint_url.partition('//')
base = rest.split('/')[0]
return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
[docs] def unpublish_object(self, obj):
"""
:param obj: (str) remote object path
"""
r = self.object_post(obj, update=True, public=False)
return r.headers
[docs] def get_object_info(self, obj, version=None):
"""
:param obj: (str) remote object path
:param version: (str)
:returns: (dict)
"""
try:
r = self.object_head(obj, version=version)
return r.headers
except ClientError as ce:
if ce.status == 404:
raise ClientError('Object %s not found' % obj, status=404)
raise
[docs] def get_object_sharing(self, obj):
"""
:param obj: (str) remote object path
:returns: (dict)
"""
r = filter_in(
self.get_object_info(obj),
'X-Object-Sharing',
exactMatch=True)
reply = {}
if len(r) > 0:
perms = r['x-object-sharing'].split(';')
for perm in perms:
try:
perm.index('=')
except ValueError:
raise ClientError('Incorrect reply format')
(key, val) = perm.strip().split('=')
reply[key] = val
return reply
[docs] def set_object_sharing(
self, obj,
read_permission=False, write_permission=False):
"""Give read/write permisions to an object.
:param obj: (str) remote object path
:param read_permission: (list - bool) users and user groups that get
read permission for this object - False means all previous read
permissions will be removed
:param write_permission: (list - bool) of users and user groups to get
write permission for this object - False means all previous write
permissions will be removed
:returns: (dict) response headers
"""
perms = dict(read=read_permission or '', write=write_permission or '')
r = self.object_post(obj, update=True, permissions=perms)
return r.headers
[docs] def del_object_sharing(self, obj):
"""
:param obj: (str) remote object path
"""
return self.set_object_sharing(obj)
[docs] def append_object(self, obj, source_file, upload_cb=None):
"""
:param obj: (str) remote object path
:param source_file: open file descriptor
:param upload_db: progress.bar for uploading
"""
self._assert_container()
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
nblocks = 1 + (filesize - 1) // blocksize
offset = 0
headers = {}
if upload_cb:
self.progress_bar_gen = upload_cb(nblocks)
self._cb_next()
flying = {}
self._init_thread_limit()
try:
for i in range(nblocks):
block = source_file.read(min(blocksize, filesize - offset))
offset += len(block)
self._watch_thread_limit(flying.values())
unfinished = {}
flying[i] = SilentEvent(
method=self.object_post,
obj=obj,
update=True,
content_range='bytes */*',
content_type='application/octet-stream',
content_length=len(block),
data=block)
flying[i].start()
for key, thread in flying.items():
if thread.isAlive():
if i < (nblocks - 1):
unfinished[key] = thread
continue
thread.join()
if thread.exception:
raise thread.exception
headers[key] = thread.value.headers
self._cb_next()
flying = unfinished
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
finally:
from time import sleep
sleep(2 * len(activethreads()))
self._cb_next()
return headers.values()
[docs] def truncate_object(self, obj, upto_bytes):
"""
:param obj: (str) remote object path
:param upto_bytes: max number of bytes to leave on file
:returns: (dict) response headers
"""
ctype = self.get_object_info(obj)['content-type']
r = self.object_post(
obj,
update=True,
content_range='bytes 0-%s/*' % upto_bytes,
content_type=ctype,
object_bytes=upto_bytes,
source_object=path4url(self.container, obj))
return r.headers
[docs] def overwrite_object(
self, obj, start, end, source_file,
source_version=None, upload_cb=None):
"""Overwrite a part of an object from local source file
ATTENTION: content_type must always be application/octet-stream
:param obj: (str) remote object path
:param start: (int) position in bytes to start overwriting from
:param end: (int) position in bytes to stop overwriting at
:param source_file: open file descriptor
:param upload_db: progress.bar for uploading
"""
self._assert_container()
r = self.get_object_info(obj, version=source_version)
rf_size = int(r['content-length'])
start, end = int(start), int(end)
assert rf_size >= start, 'Range start %s exceeds file size %s' % (
start, rf_size)
assert rf_size >= end, 'Range end %s exceeds file size %s' % (
end, rf_size)
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
datasize = end - start + 1
nblocks = 1 + (datasize - 1) // blocksize
offset = 0
if upload_cb:
self.progress_bar_gen = upload_cb(nblocks)
self._cb_next()
headers = []
for i in range(nblocks):
read_size = min(blocksize, filesize - offset, datasize - offset)
block = source_file.read(read_size)
r = self.object_post(
obj,
update=True,
content_type='application/octet-stream',
content_length=len(block),
content_range='bytes %s-%s/*' % (
start + offset,
start + offset + len(block) - 1),
source_version=source_version,
data=block)
headers.append(dict(r.headers))
offset += len(block)
self._cb_next()
self._cb_next()
return headers
[docs] def copy_object(
self, src_container, src_object, dst_container,
dst_object=None,
source_version=None,
source_account=None,
public=False,
content_type=None,
delimiter=None):
"""
:param src_container: (str) source container
:param src_object: (str) source object path
:param dst_container: (str) destination container
:param dst_object: (str) destination object path
:param source_version: (str) source object version
:param source_account: (str) account to copy from
:param public: (bool)
:param content_type: (str)
:param delimiter: (str)
:returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
src_path = path4url(src_container, src_object)
r = self.object_put(
dst_object or src_object,
success=201,
copy_from=src_path,
content_length=0,
source_version=source_version,
source_account=source_account,
public=public,
content_type=content_type,
delimiter=delimiter)
return r.headers
[docs] def move_object(
self, src_container, src_object, dst_container,
dst_object=False,
source_account=None,
source_version=None,
public=False,
content_type=None,
delimiter=None):
"""
:param src_container: (str) source container
:param src_object: (str) source object path
:param dst_container: (str) destination container
:param dst_object: (str) destination object path
:param source_account: (str) account to move from
:param source_version: (str) source object version
:param public: (bool)
:param content_type: (str)
:param delimiter: (str)
:returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
dst_object = dst_object or src_object
src_path = path4url(src_container, src_object)
r = self.object_put(
dst_object,
success=201,
move_from=src_path,
content_length=0,
source_account=source_account,
source_version=source_version,
public=public,
content_type=content_type,
delimiter=delimiter)
return r.headers
[docs] def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
"""Get accounts that share with self.account
:param limit: (str)
:param marker: (str)
:returns: (dict)
"""
self._assert_account()
self.set_param('format', 'json')
self.set_param('limit', limit, iff=limit is not None)
self.set_param('marker', marker, iff=marker is not None)
path = ''
success = kwargs.pop('success', (200, 204))
r = self.get(path, *args, success=success, **kwargs)
return r.json
[docs] def get_object_versionlist(self, obj):
"""
:param obj: (str) remote object path
:returns: (list)
"""
self._assert_container()
r = self.object_get(obj, format='json', version='list')
return r.json['versions']
[docs] def reassign_container(self, project_id):
r = self.container_post(project_id=project_id)
return r.headers