core
core ¶
Session ¶
Session(
user: User | None = None,
prefix_url: str = settings.ITKDB_API_URL,
save_auth: Path | str | None = None,
cache: bool | object = True,
expires_after: dict[str, int] | None = None,
auth_expiry_threshold: int = 15,
)
Bases: Session
Lightweight wrapper around requests.Session
with basic error-handling, auto-(re)authentication, and URI prefixing.
For more information, see python requests.
Attributes:
Name | Type | Description |
---|---|---|
STATUS_EXCEPTIONS | dict | Mapping from status code to itkdb.exceptions |
SUCCESS_STATUSES | dict | List of status codes that are OK |
auth | callable | |
prefix_url | str | The prefix for all non-absolute URIs |
user | User | The user object for authentication |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user | User | A user object. Create one if not specified. | None |
prefix_url | str | The prefix url to use for all requests. | ITKDB_API_URL |
save_auth | Path | str | None | A file path to where to save authentication information. | None |
cache | str | A CacheControl.caches object for cache (default: cachecontrol.caches.file_cache.FileCache). Set to False to disable cache. | True |
expires_after | dict | The arguments are the same as the datetime.timedelta object. This will override or add the Expires header and override or set the Cache-Control header to public. | None |
auth_expiry_threshold | int | Number of seconds until token expiration to do a reauthentication (see itkdb.core.User) | 15 |
Added in version 0.4.7
auth_expiry_threshold
to force reauthentication sooner
Source code in src/itkdb/core.py
def __init__(
self,
user: User | None = None,
prefix_url: str = settings.ITKDB_API_URL,
save_auth: Path | str | None = None,
cache: bool | object = True,
expires_after: dict[str, int] | None = None,
auth_expiry_threshold: int = 15,
):
super().__init__()
self.headers.update({"User-Agent": f"itkdb/{__version__}"})
self.user: UserLike = (
user
if user
else User(save_auth=save_auth, auth_expiry_threshold=auth_expiry_threshold)
)
self.auth: Callable[[requests.PreparedRequest], requests.PreparedRequest] = (
self.authorize
)
self.prefix_url: str = prefix_url
# store last call
self._response: requests.Response | None = None
cache_options = {}
if cache:
cache = (
cachecontrol.caches.file_cache.FileCache(".webcache")
if cache is True
else cache
)
cache_options.update({"cache": cache})
# handle expirations for cache
if expires_after and isinstance(expires_after, dict):
cache_options.update({"heuristic": ExpiresAfter(**expires_after)})
if cache_options:
# add caching
super().mount(
self.prefix_url,
CacheControlAdapter(controller_class=CacheController, **cache_options),
)
STATUS_EXCEPTIONS class-attribute
¶
STATUS_EXCEPTIONS: dict[int, type[ITkDBException]] = {
codes["bad_gateway"]: ServerError,
codes["bad_request"]: BadRequest,
codes["conflict"]: Conflict,
codes["found"]: Redirect,
codes["forbidden"]: Forbidden,
codes["gateway_timeout"]: ServerError,
codes["internal_server_error"]: ServerError,
codes["media_type"]: SpecialError,
codes["not_found"]: NotFound,
codes["request_entity_too_large"]: TooLarge,
codes["service_unavailable"]: ServerError,
codes["unauthorized"]: Forbidden,
codes[
"unavailable_for_legal_reasons"
]: UnavailableForLegalReasons,
}
user instance-attribute
¶
user: UserLike = (
user
if user
else User(
save_auth=save_auth,
auth_expiry_threshold=auth_expiry_threshold,
)
)
__call__ ¶
__call__(*args: Any, **kwargs: Any) -> requests.Response
Source code in src/itkdb/core.py
def __call__(self, *args: Any, **kwargs: Any) -> requests.Response:
if len(args) == 1:
return self.send(self.prepare_request(*args), **kwargs)
return self.request(*args, **kwargs)
authorize ¶
authorize(
req: requests.PreparedRequest,
) -> requests.PreparedRequest
Add authentication information to the request by updating the headers.
Source code in src/itkdb/core.py
def authorize(self, req: requests.PreparedRequest) -> requests.PreparedRequest:
"""
Add authentication information to the request by updating the headers.
"""
if req.url.startswith(self.prefix_url): # type: ignore[union-attr]
self.user.authenticate()
req.headers.update({"Authorization": f"Bearer {self.user.bearer:s}"})
return req
prepare_request ¶
prepare_request(
request: requests.Request,
) -> requests.PreparedRequest
Source code in src/itkdb/core.py
def prepare_request(self, request: requests.Request) -> requests.PreparedRequest:
request.url = self._normalize_url(request.url)
return super().prepare_request(request)
request ¶
request(
method: str | bytes,
url: str | bytes,
*args: Any,
**kwargs: Any
) -> requests.Response
Source code in src/itkdb/core.py
def request(
self, method: str | bytes, url: str | bytes, *args: Any, **kwargs: Any
) -> requests.Response:
url = self._normalize_url(url)
return super().request(method, url, *args, **kwargs)
send ¶
send(
request: requests.PreparedRequest, **kwargs: Any
) -> requests.Response
Source code in src/itkdb/core.py
def send(
self, request: requests.PreparedRequest, **kwargs: Any
) -> requests.Response:
response = super().send(request, **kwargs)
self._response = response
log.debug(
"Response: %s (%s bytes)",
response.status_code,
response.headers.get("content-length"),
)
self._check_response(response)
return response
User ¶
User(
access_code1: str = settings.ITKDB_ACCESS_CODE1,
access_code2: str = settings.ITKDB_ACCESS_CODE2,
audience: str = settings.ITKDB_ACCESS_AUDIENCE,
prefix_url: str = settings.ITKDB_AUTH_URL,
jwt_options: dict[str, Any] | None = None,
save_auth: Path | str | None = None,
auth_expiry_threshold: int = 15,
)
Bases: UserLike
Class for managing user tokens and authentication flow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
access_code1 | str | ITkPD Access Code 1 | ITKDB_ACCESS_CODE1 |
access_code2 | str | ITkPD Access Code 2 | ITKDB_ACCESS_CODE2 |
audience | str | ITkPD OIDC Audience | ITKDB_ACCESS_AUDIENCE |
prefix_url | str | The prefix for all non-absolute URIs | ITKDB_AUTH_URL |
jwt_options | dict | Additional JWT options to pass through | None |
save_auth | Path | str | None | If set, save authentication information to the file path specified | None |
auth_expiry_threshold | int | Number of seconds until token expiration to do a reauthentication | 15 |
Changed in version 0.4.0
- renamed
accessCode1
/accessCode2
toaccess_code1
/access_code2
Added in version 0.4.7
auth_expiry_threshold
to force reauthentication sooner
Source code in src/itkdb/core.py
def __init__(
self,
access_code1: str = settings.ITKDB_ACCESS_CODE1,
access_code2: str = settings.ITKDB_ACCESS_CODE2,
audience: str = settings.ITKDB_ACCESS_AUDIENCE,
prefix_url: str = settings.ITKDB_AUTH_URL,
jwt_options: dict[str, Any] | None = None,
save_auth: Path | str | None = None,
auth_expiry_threshold: int = 15,
):
# session handling (for injection in tests)
self._session = requests.Session()
self._session.headers.update({"User-Agent": f"itkdb/{__version__}"})
# store last call to authenticate
self._response: requests.Response | None = None
self._status_code: int | None = None
# store jwks for validation/verification
self._jwks: dict[str, Any] | None = None
# store information after authorization occurs
self._access_token: str | None = None
self._raw_id_token: str | None = None
self._id_token: dict[str, Any] | None = None
# initialization configuration
self._access_code1: str = access_code1
self._access_code2: str = access_code2
self._audience: str = audience
self._prefix_url: str = prefix_url
# update jwt_options if provided
self._jwt_options: dict[str, Any] = {
"leeway": int(settings.ITKDB_LEEWAY)
} # **jwt_options, python3 only
self._jwt_options.update(jwt_options or {})
# serialization/persistence
self._save_auth: Path | None = Path(save_auth) if save_auth else None
self.auth_expiry_threshold: int = auth_expiry_threshold
self._load()
id_token property
¶
id_token: dict[str, str | list[str] | int]
The parsed JWT identity token for the user.
__repr__ ¶
__repr__() -> str
Source code in src/itkdb/core.py
def __repr__(self) -> str:
return f"{self.__class__.__name__:s}(name={self.name:s}, expires_in={self.expires_in:d}s)"
authenticate ¶
authenticate() -> bool
Authenticate the current user if not already authenticated.
If the current user session is expired, this will attempt to reauthenticate.
Source code in src/itkdb/core.py
def authenticate(self) -> bool:
"""
Authenticate the current user if not already authenticated.
If the current user session is expired, this will attempt to reauthenticate.
"""
# if not expired, do nothing
if self.is_authenticated():
if not self.is_expired():
return True
log.warning("User session is expired. Creating a new one.")
# session-less request
response = self._session.post(
requests.compat.urljoin(self._prefix_url, "grantToken"),
json={
"grant_type": "password",
"accessCode1": self._access_code1,
"accessCode2": self._access_code2,
"scope": settings.ITKDB_ACCESS_SCOPE,
},
)
self._response = response
self._status_code = response.status_code
self._access_token = response.json().get("access_token")
self._raw_id_token = response.json().get("id_token")
self._id_token = None
# handle parsing the id token
self._parse_id_token()
if not self.is_authenticated():
raise exceptions.ResponseException(self._response)
self._dump()
return True
is_authenticated ¶
is_authenticated() -> bool
Whether current user is authenticated.
Source code in src/itkdb/core.py
def is_authenticated(self) -> bool:
"""
Whether current user is authenticated.
"""
return bool(
self._status_code == codes["ok"]
and self._access_token
and self._raw_id_token
)
is_expired ¶
is_expired() -> bool
Whether current user session is expired given the expiration threshold.
Source code in src/itkdb/core.py
def is_expired(self) -> bool:
"""
Whether current user session is expired given the expiration threshold.
"""
return not self.expires_in > self.auth_expiry_threshold
UserBearer ¶
UserBearer(bearer: str = settings.ITKDB_AUDREYTWO_API_KEY)
Bases: UserLike
Class for managing bearer tokens.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bearer | str | Bearer token | ITKDB_AUDREYTWO_API_KEY |
prefix_url | str | The prefix for all non-absolute URIs | required |
Added in version 0.6.0
Source code in src/itkdb/core.py
def __init__(
self,
bearer: str = settings.ITKDB_AUDREYTWO_API_KEY,
):
# session handling (for injection in tests)
self._session = requests.Session()
self._session.headers.update({"User-Agent": f"itkdb/{__version__}"})
# store last call to authenticate
self._response: requests.Response | None = None
self._status_code: int | None = None
# initialization configuration
self._bearer: str = bearer