Skip to content

core

core

log module-attribute

log = logging.getLogger(__name__)

Session

Session(
    user: User = None,
    prefix_url=settings.ITKDB_SITE_URL,
    save_auth: Path | str | None = None,
    cache: bool = True,
    expires_after=None,
)

Bases: requests.Session

Lightweight wrapper around requests.Session with basic error-handling, auto-(re)authentication, and URI prefixing.

For more information, see python requests.

Attributes:

Name Type Description
STATUS_EXCEPTIONS dict

Mapping from status code to itkdb.exceptions

SUCCESS_STATUSES dict

List of status codes that are OK

auth callable

Call itkdb.core.Session.authorize

prefix_url str

The prefix for all non-absolute URIs

user itkdb.core.User

The user object for authentication

Parameters:

Name Type Description Default
user itkdb.core.User

A user object. Create one if not specified.

None
prefix_url str

The prefix url to use for all requests.

settings.ITKDB_SITE_URL
save_auth pathlib.Path | str | None

A file path to where to save authentication information.

None
cache str

A CacheControl.caches object for cache (default: cachecontrol.caches.file_cache.FileCache). Set to False to disable cache.

True
expires_after dict

The arguments are the same as the datetime.timedelta object. This will override or add the Expires header and override or set the Cache-Control header to public.

None
Source code in itkdb/core.py
def __init__(
    self,
    user: User = None,
    prefix_url=settings.ITKDB_SITE_URL,
    save_auth: Path | str | None = None,
    cache: bool = True,
    expires_after=None,
):
    super().__init__()
    self.headers.update({"User-Agent": f"itkdb/{__version__}"})
    self.user = user if user else User(save_auth=save_auth)
    self.auth = self.authorize
    self.prefix_url = prefix_url
    # store last call
    self._response = None

    cache_options = {}
    if cache:
        cache = (
            cachecontrol.caches.file_cache.FileCache(".webcache")
            if cache is True
            else cache
        )
        cache_options.update({"cache": cache})

    # handle expirations for cache
    if expires_after and isinstance(expires_after, dict):
        cache_options.update({"heuristic": ExpiresAfter(**expires_after)})

    if cache_options:
        # add caching
        super().mount(
            self.prefix_url,
            CacheControlAdapter(controller_class=CacheController, **cache_options),
        )

STATUS_EXCEPTIONS class-attribute

STATUS_EXCEPTIONS = {
    codes["bad_gateway"]: exceptions.ServerError,
    codes["bad_request"]: exceptions.BadRequest,
    codes["conflict"]: exceptions.Conflict,
    codes["found"]: exceptions.Redirect,
    codes["forbidden"]: exceptions.Forbidden,
    codes["gateway_timeout"]: exceptions.ServerError,
    codes["internal_server_error"]: exceptions.ServerError,
    codes["media_type"]: exceptions.SpecialError,
    codes["not_found"]: exceptions.NotFound,
    codes["request_entity_too_large"]: exceptions.TooLarge,
    codes["service_unavailable"]: exceptions.ServerError,
    codes["unauthorized"]: exceptions.Forbidden,
    codes[
        "unavailable_for_legal_reasons"
    ]: exceptions.UnavailableForLegalReasons,
}

SUCCESS_STATUSES class-attribute

SUCCESS_STATUSES = {codes['created'], codes['ok']}

auth instance-attribute

auth = self.authorize

prefix_url instance-attribute

prefix_url = prefix_url

user instance-attribute

user = user if user else User(save_auth=save_auth)

__call__

__call__(*args, **kwargs)
Source code in itkdb/core.py
def __call__(self, *args, **kwargs):
    if len(args) == 1:
        return self.send(self.prepare_request(*args), **kwargs)

    return self.request(*args, **kwargs)

authorize

authorize(req)

Add authentication information to the request by updating the headers.

Source code in itkdb/core.py
def authorize(self, req):
    """
    Add authentication information to the request by updating the headers.
    """
    if req.url.startswith(settings.ITKDB_SITE_URL):
        self.user.authenticate()
        req.headers.update({"Authorization": f"Bearer {self.user.bearer:s}"})
    return req

prepare_request

prepare_request(request)
Source code in itkdb/core.py
def prepare_request(self, request):
    request.url = self._normalize_url(request.url)
    return super().prepare_request(request)

request

request(method, url, *args, **kwargs)
Source code in itkdb/core.py
def request(self, method, url, *args, **kwargs):
    url = self._normalize_url(url)
    return super().request(method, url, *args, **kwargs)

send

send(request, **kwargs)
Source code in itkdb/core.py
def send(self, request, **kwargs):
    response = super().send(request, **kwargs)
    self._response = response
    log.debug(
        "Response: %s (%s bytes)",
        response.status_code,
        response.headers.get("content-length"),
    )
    self._check_response(response)
    return response

User

User(
    access_code1=settings.ITKDB_ACCESS_CODE1,
    access_code2=settings.ITKDB_ACCESS_CODE2,
    audience=settings.ITKDB_ACCESS_AUDIENCE,
    prefix_url=settings.ITKDB_AUTH_URL,
    jwt_options=None,
    save_auth: Path | str | None = None,
)

Class for managing user tokens and authentication flow.

Parameters:

Name Type Description Default
access_code1 str

ITkPD Access Code 1

settings.ITKDB_ACCESS_CODE1
access_code2 str

ITkPD Access Code 2

settings.ITKDB_ACCESS_CODE2
audience str

ITkPD OIDC Audience

settings.ITKDB_ACCESS_AUDIENCE
prefix_url str

The prefix for all non-absolute URIs

settings.ITKDB_AUTH_URL
jwt_options dict

Additional JWT options to pass through

None
save_auth pathlib.Path | str | None

If set, save authentication information to the file path specified

None

Changed in version 0.4.0

  • renamed accessCode1 / accessCode2 to access_code1 / access_code2
Source code in itkdb/core.py
def __init__(
    self,
    access_code1=settings.ITKDB_ACCESS_CODE1,
    access_code2=settings.ITKDB_ACCESS_CODE2,
    audience=settings.ITKDB_ACCESS_AUDIENCE,
    prefix_url=settings.ITKDB_AUTH_URL,
    jwt_options=None,
    save_auth: Path | str | None = None,
):
    # session handling (for injection in tests)
    self._session = requests.Session()
    self._session.headers.update({"User-Agent": f"itkdb/{__version__}"})
    # store last call to authenticate
    self._response = None
    self._status_code = None
    # store jwks for validation/verification
    self._jwks = None
    # store information after authorization occurs
    self._access_token = None
    self._raw_id_token = None
    self._id_token = None
    # initialization configuration
    self._access_code1 = access_code1
    self._access_code2 = access_code2
    self._audience = audience
    self._prefix_url = prefix_url
    # update jwt_options if provided
    self._jwt_options = {
        "leeway": int(settings.ITKDB_LEEWAY)
    }  # **jwt_options, python3 only
    self._jwt_options.update(jwt_options or {})
    # serialization/persistence
    self._save_auth: Path | None = Path(save_auth) if save_auth else None
    self._load()

access_code1 property

access_code1: str

The first access code.

access_code2 property

access_code2: str

The second access code.

access_token property

access_token: str

The opaque access token for the user.

bearer property

bearer: str

The bearer token for the user.

expires_at property

expires_at: int

The Epoch Unix Timestamp that the user session expires at.

expires_in property

expires_in: int

The time until expiration in seconds.

id_token property

id_token: dict[str, str | list[str] | int]

The parsed JWT identity token for the user.

identity property

identity: str

The identity for the user in the ITk Production Database.

name property

name: str

The name for the user.

__repr__

__repr__()
Source code in itkdb/core.py
def __repr__(self):
    return f"{self.__class__.__name__:s}(name={self.name:s}, expires_in={self.expires_in:d}s)"

authenticate

authenticate() -> bool

Authenticate the current user if not already authenticated.

If the current user session is expired, this will attempt to reauthenticate.

Source code in itkdb/core.py
def authenticate(self) -> bool:
    """
    Authenticate the current user if not already authenticated.

    If the current user session is expired, this will attempt to reauthenticate.
    """
    # if not expired, do nothing
    if self.is_authenticated():
        if not self.is_expired():
            return True
        log.warning("User session is expired. Creating a new one.")

    # session-less request
    response = self._session.post(
        requests.compat.urljoin(self._prefix_url, "grantToken"),
        json={
            "grant_type": "password",
            "accessCode1": self._access_code1,
            "accessCode2": self._access_code2,
            "scope": settings.ITKDB_ACCESS_SCOPE,
        },
    )
    self._response = response
    self._status_code = response.status_code
    self._access_token = response.json().get("access_token")
    self._raw_id_token = response.json().get("id_token")
    self._id_token = None

    # handle parsing the id token
    self._parse_id_token()

    if not self.is_authenticated():
        raise exceptions.ResponseException(self._response)

    self._dump()
    return True

is_authenticated

is_authenticated() -> bool

Whether current user is authenticated.

Source code in itkdb/core.py
def is_authenticated(self) -> bool:
    """
    Whether current user is authenticated.
    """
    return bool(
        self._status_code == codes["ok"]
        and self._access_token
        and self._raw_id_token
    )

is_expired

is_expired() -> bool

Whether current user session is expired.

Source code in itkdb/core.py
def is_expired(self) -> bool:
    """
    Whether current user session is expired.
    """
    return not self.expires_in > 0

Last update: April 19, 2023