from requests import Session
from urllib.parse import quote
from numbers import Number
from collections.abc import Sequence, Mapping
import json
import asnake.configurator as conf
import asnake.logging as logging
log = None # initialized on first client init
[docs]class ASnakeAuthError(Exception): pass
[docs]class ASnakeWeirdReturnError(Exception): pass
[docs]def listlike_seq(seq):
'''Determine if a thing is a list-like (sequence of values) sequence that's not string-like.'''
return isinstance(seq, Sequence) and not isinstance(seq, (str, bytes, Mapping,))
[docs]def http_meth_factory(meth):
'''Utility method for producing HTTP proxy methods for ASnakeProxyMethods mixin class.
Urls are prefixed with the value of baseurl from the client's ASnakeConfig. Arguments are
passed unaltered to the matching requests.Session method.'''
def http_method(self, url, *args, **kwargs):
# aspace uses the PHP convention where array-typed form values use names with '[]' appended
if 'params' in kwargs:
kwargs['params'] = {k + '[]' if listlike_seq(v) and k[-2:] != '[]' else k:v for k,v in kwargs['params'].items()}
full_url = "/".join([self.config['baseurl'].rstrip("/"), url.lstrip("/")])
result = getattr(self.session, meth)(full_url, *args, **kwargs)
if result.status_code == 403 and self.config['retry_with_auth']:
self.authorize()
result = getattr(self.session, meth)(full_url, *args, **kwargs)
log.debug("proxied http method", method=meth.upper(), url=full_url, status=result.status_code)
return result
return http_method
[docs]class ASnakeProxyMethods(type):
'''Metaclass to set up proxy methods for all requests-supported HTTP methods'''
def __init__(cls, name, parents, dct):
for meth in ('get', 'post', 'head', 'put', 'delete', 'options',):
fn = http_meth_factory(meth)
fn.__name__ = meth
fn.__doc__ = '''Proxied :meth:`requests.Session.{}` method from :class:`requests.Session`'''.format(meth)
setattr(cls, meth, fn)
[docs]class ASnakeClient(metaclass=ASnakeProxyMethods):
'''ArchivesSnake Web Client'''
def __init__(self, **config):
global log
if 'config_file' in config:
self.config = conf.ASnakeConfig(config['config_file'])
else:
self.config = conf.ASnakeConfig()
self.config.update(config)
# Only a subset of logging config can be supported in config
# For more complex setups (configuring output format, say),
# configure logs in Python code prior to loading
#
# Properties supported are:
# filename, filemode, level, and default_config
# Default config can be any of the default configurations exposed in logging
if not log:
if not logging.already_configured and 'logging_config' in self.config:
if 'default_config' in self.config['logging_config']:
default_logging_config = logging.configurations.get(
self.config['logging_config']['default_config'])
del self.config['logging_config']['default_config']
else:
default_logging_config = None
logging.setup_logging(config = default_logging_config,
**self.config['logging_config'])
log = logging.get_logger(__name__)
if not hasattr(self, 'session'): self.session = Session()
self.session.headers.update({'Accept': 'application/json',
'User-Agent': 'ArchivesSnake/0.1'})
log.debug("client created")
[docs] def authorize(self, username=None, password=None):
'''Authorizes the client against the configured archivesspace instance.
Parses the JSON response, and stores the returned session token in the session.headers for future requests.
Asks for a "non-expiring" session, which isn't truly immortal, just long-lived.'''
username = username or self.config['username']
password = password or self.config['password']
log.debug("authorizing against ArchivesSpace", user=username)
resp = self.session.post(
"/".join([self.config['baseurl'].rstrip("/"), 'users/{username}/login']).format(username=quote(username)),
data={"password": password, "expiring": False}
)
if resp.status_code != 200:
log.debug("authorization failure", status=resp.status_code)
raise ASnakeAuthError("Failed to authorize ASnake with status: {}".format(resp.status_code))
else:
session_token = json.loads(resp.text)['session']
self.session.headers['X-ArchivesSpace-Session'] = session_token
log.debug("authorization success", session_token=session_token)
return session_token
[docs] def get_paged(self, url, *args, page_size=100, **kwargs):
'''get list of json objects from urls of paged items'''
params = {}
if "params" in kwargs:
params.update(**kwargs['params'])
del kwargs['params']
# special-cased bc all_ids doesn't work on repositories index route
if "all_ids" in params and url in {"/repositories", "repositories"}:
del params['all_ids']
params.update(page_size=page_size, page=1)
current_page = self.get(url, params=params, **kwargs)
current_json = current_page.json()
# Regular paged object
if hasattr(current_json, 'keys') and \
{'results', 'this_page', 'last_page'} <= set(current_json.keys()):
while current_json['this_page'] <= current_json['last_page']:
for obj in current_json['results']:
yield obj
if current_json['this_page'] == current_json['last_page']: break
params['page'] += 1
current_page = self.get(url, params=params)
current_json = current_page.json()
# routes that just return a list, or ids, i.e. queries with all_ids param
elif isinstance(current_json, list):
# e.g. repositories
if len(current_json) >= 1:
if hasattr(current_json[0], 'keys'):
for obj in current_json:
yield obj
elif isinstance(current_json[0], Number):
for i in current_json:
yield self.get("/".join([url, str(i)])).json()
else:
raise ASnakeWeirdReturnError("get_paged doesn't know how to handle {}".format(current_json))
else:
raise ASnakeWeirdReturnError("get_paged doesn't know how to handle {}".format(current_json))