core
core ¶
Session ¶
Session(
user: User = None,
prefix_url=settings.ITKDB_SITE_URL,
save_auth: Path | str | None = None,
cache: bool = True,
expires_after=None,
)
Bases: requests.Session
Lightweight wrapper around requests.Session
with basic error-handling, auto-(re)authentication, and URI prefixing.
For more information, see python requests.
Attributes:
Name | Type | Description |
---|---|---|
STATUS_EXCEPTIONS | dict | Mapping from status code to itkdb.exceptions |
SUCCESS_STATUSES | dict | List of status codes that are OK |
auth | callable | |
prefix_url | str | The prefix for all non-absolute URIs |
user | itkdb.core.User | The user object for authentication |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user | itkdb.core.User | A user object. Create one if not specified. | None |
prefix_url | str | The prefix url to use for all requests. | settings.ITKDB_SITE_URL |
save_auth | pathlib.Path | str | None | A file path to where to save authentication information. | None |
cache | str | A CacheControl.caches object for cache (default: cachecontrol.caches.file_cache.FileCache). Set to False to disable cache. | True |
expires_after | dict | The arguments are the same as the datetime.timedelta object. This will override or add the Expires header and override or set the Cache-Control header to public. | None |
Source code in itkdb/core.py
def __init__(
self,
user: User = None,
prefix_url=settings.ITKDB_SITE_URL,
save_auth: Path | str | None = None,
cache: bool = True,
expires_after=None,
):
super().__init__()
self.headers.update({"User-Agent": f"itkdb/{__version__}"})
self.user = user if user else User(save_auth=save_auth)
self.auth = self.authorize
self.prefix_url = prefix_url
# store last call
self._response = None
cache_options = {}
if cache:
cache = (
cachecontrol.caches.file_cache.FileCache(".webcache")
if cache is True
else cache
)
cache_options.update({"cache": cache})
# handle expirations for cache
if expires_after and isinstance(expires_after, dict):
cache_options.update({"heuristic": ExpiresAfter(**expires_after)})
if cache_options:
# add caching
super().mount(
self.prefix_url,
CacheControlAdapter(controller_class=CacheController, **cache_options),
)
STATUS_EXCEPTIONS class-attribute
¶
STATUS_EXCEPTIONS = {
codes["bad_gateway"]: exceptions.ServerError,
codes["bad_request"]: exceptions.BadRequest,
codes["conflict"]: exceptions.Conflict,
codes["found"]: exceptions.Redirect,
codes["forbidden"]: exceptions.Forbidden,
codes["gateway_timeout"]: exceptions.ServerError,
codes["internal_server_error"]: exceptions.ServerError,
codes["media_type"]: exceptions.SpecialError,
codes["not_found"]: exceptions.NotFound,
codes["request_entity_too_large"]: exceptions.TooLarge,
codes["service_unavailable"]: exceptions.ServerError,
codes["unauthorized"]: exceptions.Forbidden,
codes[
"unavailable_for_legal_reasons"
]: exceptions.UnavailableForLegalReasons,
}
__call__ ¶
__call__(*args, **kwargs)
Source code in itkdb/core.py
def __call__(self, *args, **kwargs):
if len(args) == 1:
return self.send(self.prepare_request(*args), **kwargs)
return self.request(*args, **kwargs)
authorize ¶
authorize(req)
Add authentication information to the request by updating the headers.
Source code in itkdb/core.py
def authorize(self, req):
"""
Add authentication information to the request by updating the headers.
"""
if req.url.startswith(settings.ITKDB_SITE_URL):
self.user.authenticate()
req.headers.update({"Authorization": f"Bearer {self.user.bearer:s}"})
return req
prepare_request ¶
prepare_request(request)
Source code in itkdb/core.py
def prepare_request(self, request):
request.url = self._normalize_url(request.url)
return super().prepare_request(request)
request ¶
request(method, url, *args, **kwargs)
Source code in itkdb/core.py
def request(self, method, url, *args, **kwargs):
url = self._normalize_url(url)
return super().request(method, url, *args, **kwargs)
send ¶
send(request, **kwargs)
Source code in itkdb/core.py
def send(self, request, **kwargs):
response = super().send(request, **kwargs)
self._response = response
log.debug(
"Response: %s (%s bytes)",
response.status_code,
response.headers.get("content-length"),
)
self._check_response(response)
return response
User ¶
User(
access_code1=settings.ITKDB_ACCESS_CODE1,
access_code2=settings.ITKDB_ACCESS_CODE2,
audience=settings.ITKDB_ACCESS_AUDIENCE,
prefix_url=settings.ITKDB_AUTH_URL,
jwt_options=None,
save_auth: Path | str | None = None,
)
Class for managing user tokens and authentication flow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
access_code1 | str | ITkPD Access Code 1 | settings.ITKDB_ACCESS_CODE1 |
access_code2 | str | ITkPD Access Code 2 | settings.ITKDB_ACCESS_CODE2 |
audience | str | ITkPD OIDC Audience | settings.ITKDB_ACCESS_AUDIENCE |
prefix_url | str | The prefix for all non-absolute URIs | settings.ITKDB_AUTH_URL |
jwt_options | dict | Additional JWT options to pass through | None |
save_auth | pathlib.Path | str | None | If set, save authentication information to the file path specified | None |
Changed in version 0.4.0
- renamed
accessCode1
/accessCode2
toaccess_code1
/access_code2
Source code in itkdb/core.py
def __init__(
self,
access_code1=settings.ITKDB_ACCESS_CODE1,
access_code2=settings.ITKDB_ACCESS_CODE2,
audience=settings.ITKDB_ACCESS_AUDIENCE,
prefix_url=settings.ITKDB_AUTH_URL,
jwt_options=None,
save_auth: Path | str | None = None,
):
# session handling (for injection in tests)
self._session = requests.Session()
self._session.headers.update({"User-Agent": f"itkdb/{__version__}"})
# store last call to authenticate
self._response = None
self._status_code = None
# store jwks for validation/verification
self._jwks = None
# store information after authorization occurs
self._access_token = None
self._raw_id_token = None
self._id_token = None
# initialization configuration
self._access_code1 = access_code1
self._access_code2 = access_code2
self._audience = audience
self._prefix_url = prefix_url
# update jwt_options if provided
self._jwt_options = {
"leeway": int(settings.ITKDB_LEEWAY)
} # **jwt_options, python3 only
self._jwt_options.update(jwt_options or {})
# serialization/persistence
self._save_auth: Path | None = Path(save_auth) if save_auth else None
self._load()
id_token property
¶
id_token: dict[str, str | list[str] | int]
The parsed JWT identity token for the user.
__repr__ ¶
__repr__()
Source code in itkdb/core.py
def __repr__(self):
return f"{self.__class__.__name__:s}(name={self.name:s}, expires_in={self.expires_in:d}s)"
authenticate ¶
authenticate() -> bool
Authenticate the current user if not already authenticated.
If the current user session is expired, this will attempt to reauthenticate.
Source code in itkdb/core.py
def authenticate(self) -> bool:
"""
Authenticate the current user if not already authenticated.
If the current user session is expired, this will attempt to reauthenticate.
"""
# if not expired, do nothing
if self.is_authenticated():
if not self.is_expired():
return True
log.warning("User session is expired. Creating a new one.")
# session-less request
response = self._session.post(
requests.compat.urljoin(self._prefix_url, "grantToken"),
json={
"grant_type": "password",
"accessCode1": self._access_code1,
"accessCode2": self._access_code2,
"scope": settings.ITKDB_ACCESS_SCOPE,
},
)
self._response = response
self._status_code = response.status_code
self._access_token = response.json().get("access_token")
self._raw_id_token = response.json().get("id_token")
self._id_token = None
# handle parsing the id token
self._parse_id_token()
if not self.is_authenticated():
raise exceptions.ResponseException(self._response)
self._dump()
return True
is_authenticated ¶
is_authenticated() -> bool
Whether current user is authenticated.
Source code in itkdb/core.py
def is_authenticated(self) -> bool:
"""
Whether current user is authenticated.
"""
return bool(
self._status_code == codes["ok"]
and self._access_token
and self._raw_id_token
)
is_expired ¶
is_expired() -> bool
Whether current user session is expired.
Source code in itkdb/core.py
def is_expired(self) -> bool:
"""
Whether current user session is expired.
"""
return not self.expires_in > 0