Skip to content

core

core

log module-attribute

log = getLogger(__name__)

Session

Session(
    user: User | None = None,
    prefix_url: str = settings.ITKDB_API_URL,
    save_auth: Path | str | None = None,
    cache: bool | object = True,
    expires_after: dict[str, int] | None = None,
    auth_expiry_threshold: int = 15,
)

Bases: Session

Lightweight wrapper around requests.Session with basic error-handling, auto-(re)authentication, and URI prefixing.

For more information, see python requests.

Attributes:

Name Type Description
STATUS_EXCEPTIONS dict

Mapping from status code to itkdb.exceptions

SUCCESS_STATUSES dict

List of status codes that are OK

auth callable
prefix_url str

The prefix for all non-absolute URIs

user User

The user object for authentication

Parameters:

Name Type Description Default
user User

A user object. Create one if not specified.

None
prefix_url str

The prefix url to use for all requests.

ITKDB_API_URL
save_auth Path | str | None

A file path to where to save authentication information.

None
cache str

A CacheControl.caches object for cache (default: cachecontrol.caches.file_cache.FileCache). Set to False to disable cache.

True
expires_after dict

The arguments are the same as the datetime.timedelta object. This will override or add the Expires header and override or set the Cache-Control header to public.

None
auth_expiry_threshold int

Number of seconds until token expiration to do a reauthentication (see itkdb.core.User)

15

Added in version 0.4.7

  • auth_expiry_threshold to force reauthentication sooner
Source code in src/itkdb/core.py
def __init__(
    self,
    user: User | None = None,
    prefix_url: str = settings.ITKDB_API_URL,
    save_auth: Path | str | None = None,
    cache: bool | object = True,
    expires_after: dict[str, int] | None = None,
    auth_expiry_threshold: int = 15,
):
    super().__init__()
    self.headers.update({"User-Agent": f"itkdb/{__version__}"})
    self.user: UserLike = (
        user
        if user
        else User(save_auth=save_auth, auth_expiry_threshold=auth_expiry_threshold)
    )
    self.auth: Callable[[requests.PreparedRequest], requests.PreparedRequest] = (
        self.authorize
    )
    self.prefix_url: str = prefix_url
    # store last call
    self._response: requests.Response | None = None

    cache_options = {}
    if cache:
        cache = (
            cachecontrol.caches.file_cache.FileCache(".webcache")
            if cache is True
            else cache
        )
        cache_options.update({"cache": cache})

    # handle expirations for cache
    if expires_after and isinstance(expires_after, dict):
        cache_options.update({"heuristic": ExpiresAfter(**expires_after)})

    if cache_options:
        # add caching
        super().mount(
            self.prefix_url,
            CacheControlAdapter(controller_class=CacheController, **cache_options),
        )

STATUS_EXCEPTIONS class-attribute

STATUS_EXCEPTIONS: dict[int, type[ITkDBException]] = {
    codes["bad_gateway"]: ServerError,
    codes["bad_request"]: BadRequest,
    codes["conflict"]: Conflict,
    codes["found"]: Redirect,
    codes["forbidden"]: Forbidden,
    codes["gateway_timeout"]: ServerError,
    codes["internal_server_error"]: ServerError,
    codes["media_type"]: SpecialError,
    codes["not_found"]: NotFound,
    codes["request_entity_too_large"]: TooLarge,
    codes["service_unavailable"]: ServerError,
    codes["unauthorized"]: Forbidden,
    codes[
        "unavailable_for_legal_reasons"
    ]: UnavailableForLegalReasons,
}

SUCCESS_STATUSES class-attribute

SUCCESS_STATUSES: set[int] = {codes["created"], codes["ok"]}

auth instance-attribute

auth: Callable[[PreparedRequest], PreparedRequest] = (
    authorize
)

prefix_url instance-attribute

prefix_url: str = prefix_url

user instance-attribute

user: UserLike = (
    user
    if user
    else User(
        save_auth=save_auth,
        auth_expiry_threshold=auth_expiry_threshold,
    )
)

__call__

__call__(*args: Any, **kwargs: Any) -> requests.Response
Source code in src/itkdb/core.py
def __call__(self, *args: Any, **kwargs: Any) -> requests.Response:
    if len(args) == 1:
        return self.send(self.prepare_request(*args), **kwargs)

    return self.request(*args, **kwargs)

authorize

authorize(
    req: requests.PreparedRequest,
) -> requests.PreparedRequest

Add authentication information to the request by updating the headers.

Source code in src/itkdb/core.py
def authorize(self, req: requests.PreparedRequest) -> requests.PreparedRequest:
    """
    Add authentication information to the request by updating the headers.
    """
    if req.url.startswith(self.prefix_url):  # type: ignore[union-attr]
        self.user.authenticate()
        req.headers.update({"Authorization": f"Bearer {self.user.bearer:s}"})
    return req

prepare_request

prepare_request(
    request: requests.Request,
) -> requests.PreparedRequest
Source code in src/itkdb/core.py
def prepare_request(self, request: requests.Request) -> requests.PreparedRequest:
    request.url = self._normalize_url(request.url)
    return super().prepare_request(request)

request

request(
    method: str | bytes,
    url: str | bytes,
    *args: Any,
    **kwargs: Any
) -> requests.Response
Source code in src/itkdb/core.py
def request(
    self, method: str | bytes, url: str | bytes, *args: Any, **kwargs: Any
) -> requests.Response:
    url = self._normalize_url(url)
    return super().request(method, url, *args, **kwargs)

send

send(
    request: requests.PreparedRequest, **kwargs: Any
) -> requests.Response
Source code in src/itkdb/core.py
def send(
    self, request: requests.PreparedRequest, **kwargs: Any
) -> requests.Response:
    response = super().send(request, **kwargs)
    self._response = response
    log.debug(
        "Response: %s (%s bytes)",
        response.status_code,
        response.headers.get("content-length"),
    )
    self._check_response(response)
    return response

User

User(
    access_code1: str = settings.ITKDB_ACCESS_CODE1,
    access_code2: str = settings.ITKDB_ACCESS_CODE2,
    audience: str = settings.ITKDB_ACCESS_AUDIENCE,
    prefix_url: str = settings.ITKDB_AUTH_URL,
    jwt_options: dict[str, Any] | None = None,
    save_auth: Path | str | None = None,
    auth_expiry_threshold: int = 15,
)

Bases: UserLike

Class for managing user tokens and authentication flow.

Parameters:

Name Type Description Default
access_code1 str

ITkPD Access Code 1

ITKDB_ACCESS_CODE1
access_code2 str

ITkPD Access Code 2

ITKDB_ACCESS_CODE2
audience str

ITkPD OIDC Audience

ITKDB_ACCESS_AUDIENCE
prefix_url str

The prefix for all non-absolute URIs

ITKDB_AUTH_URL
jwt_options dict

Additional JWT options to pass through

None
save_auth Path | str | None

If set, save authentication information to the file path specified

None
auth_expiry_threshold int

Number of seconds until token expiration to do a reauthentication

15

Changed in version 0.4.0

  • renamed accessCode1 / accessCode2 to access_code1 / access_code2

Added in version 0.4.7

  • auth_expiry_threshold to force reauthentication sooner
Source code in src/itkdb/core.py
def __init__(
    self,
    access_code1: str = settings.ITKDB_ACCESS_CODE1,
    access_code2: str = settings.ITKDB_ACCESS_CODE2,
    audience: str = settings.ITKDB_ACCESS_AUDIENCE,
    prefix_url: str = settings.ITKDB_AUTH_URL,
    jwt_options: dict[str, Any] | None = None,
    save_auth: Path | str | None = None,
    auth_expiry_threshold: int = 15,
):
    # session handling (for injection in tests)
    self._session = requests.Session()
    self._session.headers.update({"User-Agent": f"itkdb/{__version__}"})
    # store last call to authenticate
    self._response: requests.Response | None = None
    self._status_code: int | None = None
    # store jwks for validation/verification
    self._jwks: dict[str, Any] | None = None
    # store information after authorization occurs
    self._access_token: str | None = None
    self._raw_id_token: str | None = None
    self._id_token: dict[str, Any] | None = None
    # initialization configuration
    self._access_code1: str = access_code1
    self._access_code2: str = access_code2
    self._audience: str = audience
    self._prefix_url: str = prefix_url
    # update jwt_options if provided
    self._jwt_options: dict[str, Any] = {
        "leeway": int(settings.ITKDB_LEEWAY)
    }  # **jwt_options, python3 only
    self._jwt_options.update(jwt_options or {})
    # serialization/persistence
    self._save_auth: Path | None = Path(save_auth) if save_auth else None
    self.auth_expiry_threshold: int = auth_expiry_threshold
    self._load()

access_code1 property

access_code1: str

The first access code.

access_code2 property

access_code2: str

The second access code.

access_token property

access_token: str | None

The opaque access token for the user.

auth_expiry_threshold instance-attribute

auth_expiry_threshold: int = auth_expiry_threshold

bearer property

bearer: str

The bearer token for the user.

expires_at property

expires_at: int

The Epoch Unix Timestamp that the user session expires at.

expires_in property

expires_in: int

The time until expiration in seconds.

id_token property

id_token: dict[str, str | list[str] | int]

The parsed JWT identity token for the user.

identity property

identity: str

The identity for the user in the ITk Production Database.

name property

name: str

The name for the user.

__repr__

__repr__() -> str
Source code in src/itkdb/core.py
def __repr__(self) -> str:
    return f"{self.__class__.__name__:s}(name={self.name:s}, expires_in={self.expires_in:d}s)"

authenticate

authenticate() -> bool

Authenticate the current user if not already authenticated.

If the current user session is expired, this will attempt to reauthenticate.

Source code in src/itkdb/core.py
def authenticate(self) -> bool:
    """
    Authenticate the current user if not already authenticated.

    If the current user session is expired, this will attempt to reauthenticate.
    """
    # if not expired, do nothing
    if self.is_authenticated():
        if not self.is_expired():
            return True
        log.warning("User session is expired. Creating a new one.")

    # session-less request
    response = self._session.post(
        requests.compat.urljoin(self._prefix_url, "grantToken"),
        json={
            "grant_type": "password",
            "accessCode1": self._access_code1,
            "accessCode2": self._access_code2,
            "scope": settings.ITKDB_ACCESS_SCOPE,
        },
    )
    self._response = response
    self._status_code = response.status_code
    self._access_token = response.json().get("access_token")
    self._raw_id_token = response.json().get("id_token")
    self._id_token = None

    # handle parsing the id token
    self._parse_id_token()

    if not self.is_authenticated():
        raise exceptions.ResponseException(self._response)

    self._dump()
    return True

is_authenticated

is_authenticated() -> bool

Whether current user is authenticated.

Source code in src/itkdb/core.py
def is_authenticated(self) -> bool:
    """
    Whether current user is authenticated.
    """
    return bool(
        self._status_code == codes["ok"]
        and self._access_token
        and self._raw_id_token
    )

is_expired

is_expired() -> bool

Whether current user session is expired given the expiration threshold.

Source code in src/itkdb/core.py
def is_expired(self) -> bool:
    """
    Whether current user session is expired given the expiration threshold.
    """
    return not self.expires_in > self.auth_expiry_threshold

UserBearer

UserBearer(bearer: str = settings.ITKDB_AUDREYTWO_API_KEY)

Bases: UserLike

Class for managing bearer tokens.

Parameters:

Name Type Description Default
bearer str

Bearer token

ITKDB_AUDREYTWO_API_KEY
prefix_url str

The prefix for all non-absolute URIs

required

Added in version 0.6.0

Source code in src/itkdb/core.py
def __init__(
    self,
    bearer: str = settings.ITKDB_AUDREYTWO_API_KEY,
):
    # session handling (for injection in tests)
    self._session = requests.Session()
    self._session.headers.update({"User-Agent": f"itkdb/{__version__}"})
    # store last call to authenticate
    self._response: requests.Response | None = None
    self._status_code: int | None = None
    # initialization configuration
    self._bearer: str = bearer

bearer property

bearer: str

The bearer token.

__repr__

__repr__() -> str
Source code in src/itkdb/core.py
def __repr__(self) -> str:
    return f"{self.__class__.__name__:s}(bearer={self.bearer})"

authenticate

authenticate() -> bool
Source code in src/itkdb/core.py
def authenticate(self) -> bool:
    return True