Skip to content

itkdb

itkdb

__version__ module-attribute

__version__ = '0.4.1.dev3'

Client

Client(use_eos = False, **session_kwargs)

Bases: Session

The top-level user-facing client for interacting with the ITk Production Database API.

Changed in version 0.4.0

  • added use_eos argument
Source code in itkdb/client.py
def __init__(self, use_eos=False, **session_kwargs):
    self._use_eos = use_eos
    super().__init__(**session_kwargs)

limit class-attribute

limit = -1

use_eos property

use_eos

Flag indicating whether to use eos for uploading attachments.

get

get(url, **kwargs)
Source code in itkdb/client.py
def get(self, url, **kwargs):
    is_cern_url = ".cern.ch" in urlparse(url).netloc
    # is_binary_data = "uu-app-binarystore/getBinaryData" in url
    if is_cern_url:
        log.info(
            "Identified a cern.ch request, will attach CERN SSL chain to request by overriding `verify`."
        )
        kwargs["verify"] = itkdb_data / "CERN_chain.pem"

    # getBinaryData does not handle chunked requests
    # if is_cern_url or is_binary_data:
    if is_cern_url:
        log.info(
            "Identified a request that potentially downloads larger amounts of data, will execute chunked requests (stream=True)."
        )
        kwargs["stream"] = True
        headers = kwargs.get("headers", {})
        headers["transfer-encoding"] = "chunked"
        kwargs["headers"] = headers
    return super().get(url, **kwargs)

prepare_request

prepare_request(request)
Source code in itkdb/client.py
def prepare_request(self, request):
    request.url = self._normalize_url(request.url)
    self._request_handler(request)
    return super().prepare_request(request)

request

request(method, url, *args, **kwargs)
Source code in itkdb/client.py
def request(self, method, url, *args, **kwargs):
    self.limit = kwargs.pop("limit", -1)

    response = super(Session, self).request(method, url, *args, **kwargs)
    return self._response_handler(response)

upload_to_eos

upload_to_eos(
    response, eos_file_details=None, **_
) -> None

requests response hook function to upload a file to eos.

Source code in itkdb/client.py
def upload_to_eos(self, response, eos_file_details=None, **_) -> None:
    """
    requests response hook function to upload a file to eos.
    """
    log.info("I was able to get a token to upload to EOS. Let me upload.")
    try:
        response.raise_for_status()
    except HTTPError:
        log.warning("Something went wrong with uploading to EOS.")
        return response

    # see _request_handler for this information
    fname, fpointer, ftype, fheaders = eos_file_details

    token_request = response.json()

    headers = {
        "Authorization": f"Bearer {token_request['token']}",
        "User-Agent": f"itkdb/{__version__}",
        "Content-Type": ftype,
        **fheaders,
    }

    buffer_header = BytesIO()
    buffer_body = BytesIO()

    curl = pycurl.Curl()
    curl.setopt(curl.URL, token_request["url"])
    curl.setopt(curl.FOLLOWLOCATION, True)
    curl.setopt(curl.UPLOAD, True)
    curl.setopt(
        curl.HTTPHEADER, [f'{capwords(k, "-")}: {v}' for k, v in headers.items()]
    )
    curl.setopt(curl.CAINFO, str((itkdb_data / "CERN_chain.pem").resolve()))
    curl.setopt(curl.READDATA, fpointer)
    curl.setopt(curl.INFILESIZE_LARGE, utils.get_filesize(fname, fpointer))
    curl.setopt(curl.HEADERFUNCTION, buffer_header.write)
    curl.setopt(curl.WRITEFUNCTION, buffer_body.write)
    curl.perform()
    curl.close()

    resp_header = buffer_header.getvalue().decode()
    resp_body = buffer_body.getvalue().decode()

    header_blocks = []
    for item in resp_header.strip().split("\r\n"):
        if item.startswith("HTTP"):
            header_blocks.append([item])
        elif item:
            header_blocks[-1].append(item)

    eos_response = Response()
    eos_response.status_code = int(header_blocks[-1][0].split()[1])
    eos_response.request = Request(
        method="PUT",
        url=token_request["url"],
        headers=headers,
        files={"file": (fname, fpointer, ftype)},
    )

    additional_message = f"  - I was not able to upload file to EOS. Please report the above information to developers.\r\n\r\n{resp_body}\r\n\r\n"
    if eos_response.status_code != 201:
        for header_block in header_blocks:
            additional_message += "\r\n".join(header_block)
            additional_message += "\r\n" + "-" * 10 + "\r\n"
        raise exceptions.ResponseException(
            eos_response, additional_message=additional_message
        )
    response.eos_response = eos_response
    return None

Last update: April 19, 2023