Skip to content

eos

eos

HAS_PYCURL module-attribute

HAS_PYCURL = True

imp_err_msg module-attribute

imp_err_msg = "There is an error importing pycurl. Please see the documentation for some hints.\n\n  https://itkdb.docs.cern.ch/0.6/meta/faq/"

log module-attribute

log = getLogger(__name__)

delete

delete(eos_token, eos_url) -> Response

Function for deleting from EOS, by wrapping pyCURL appropriately.

Parameters:

Name Type Description Default
eos_token str

EOS token

required
eos_url str

Path on EOS to upload file to

required

Added in version 0.5.0

Source code in src/itkdb/eos.py
def delete(eos_token, eos_url) -> Response:
    """
    Function for deleting from EOS, by wrapping pyCURL appropriately.

    Args:
        eos_token (str): EOS token
        eos_url (str): Path on EOS to upload file to

    !!! note "Added in version 0.5.0"
    """

    headers = {
        "Authorization": f"Bearer {eos_token}",
        "User-Agent": f"itkdb/{__version__}",
    }

    buffer_header = BytesIO()
    buffer_body = BytesIO()

    curl = pycurl.Curl()
    curl.setopt(curl.URL, eos_url)
    curl.setopt(curl.FOLLOWLOCATION, True)
    curl.setopt(curl.CUSTOMREQUEST, "DELETE")
    curl.setopt(
        curl.HTTPHEADER, [f'{capwords(k, "-")}: {v}' for k, v in headers.items()]
    )
    curl.setopt(curl.CAINFO, str((itkdb_data / "CERN_chain.pem").resolve()))
    curl.setopt(curl.HEADERFUNCTION, buffer_header.write)
    curl.setopt(curl.WRITEFUNCTION, buffer_body.write)
    curl.perform()
    curl.close()

    resp_header = buffer_header.getvalue().decode()
    resp_body = buffer_body.getvalue().decode()

    header_blocks = []
    for item in resp_header.strip().split("\r\n"):
        if item.startswith("HTTP"):
            header_blocks.append([item])
        elif item:
            header_blocks[-1].append(item)

    eos_response = Response()
    eos_response.status_code = int(header_blocks[-1][0].split()[1])
    eos_response.request = Request(
        method="DELETE",
        url=eos_url,
        headers=headers,
    )

    additional_message = f"  - I was not able to delete the file from EOS. Please report the above information to developers.\r\n\r\n{resp_body}\r\n\r\n"
    if eos_response.status_code != 204:
        for header_block in header_blocks:
            additional_message += "\r\n".join(header_block)
            additional_message += "\r\n" + "-" * 10 + "\r\n"
        raise exceptions.ResponseException(
            eos_response, additional_message=additional_message
        )

    return eos_response

put

put(eos_token, eos_url, eos_file_details=None) -> Response

Function for uploading to EOS, by wrapping pyCURL appropriately.

Parameters:

Name Type Description Default
eos_token str

EOS token

required
eos_url str

Path on EOS to upload file to

required
eos_file_details tuple or None

Details on the file being uploaded: (fname, fpointer, ftype, fheaders)

None

Added in version 0.5.0

Source code in src/itkdb/eos.py
def put(eos_token, eos_url, eos_file_details=None) -> Response:
    """
    Function for uploading to EOS, by wrapping pyCURL appropriately.

    Args:
        eos_token (str): EOS token
        eos_url (str): Path on EOS to upload file to
        eos_file_details (tuple or None): Details on the file being uploaded: (fname, fpointer, ftype, fheaders)

    !!! note "Added in version 0.5.0"
    """

    # see _request_handler for this information
    fname, fpointer, ftype, fheaders = eos_file_details

    headers = {
        "Authorization": f"Bearer {eos_token}",
        "User-Agent": f"itkdb/{__version__}",
        "Content-Type": ftype,
        **fheaders,
    }

    buffer_header = BytesIO()
    buffer_body = BytesIO()

    curl = pycurl.Curl()
    curl.setopt(curl.URL, eos_url)
    curl.setopt(curl.FOLLOWLOCATION, True)
    curl.setopt(curl.UPLOAD, True)
    curl.setopt(
        curl.HTTPHEADER, [f'{capwords(k, "-")}: {v}' for k, v in headers.items()]
    )
    curl.setopt(curl.CAINFO, str((itkdb_data / "CERN_chain.pem").resolve()))
    curl.setopt(curl.READDATA, fpointer)
    curl.setopt(curl.INFILESIZE_LARGE, utils.get_filesize(fname, fpointer))
    curl.setopt(curl.HEADERFUNCTION, buffer_header.write)
    curl.setopt(curl.WRITEFUNCTION, buffer_body.write)
    curl.setopt(curl.SEEKFUNCTION, fpointer.seek)
    curl.perform()
    curl.close()

    resp_header = buffer_header.getvalue().decode()
    resp_body = buffer_body.getvalue().decode()

    header_blocks = []
    for item in resp_header.strip().split("\r\n"):
        if item.startswith("HTTP"):
            header_blocks.append([item])
        elif item:
            header_blocks[-1].append(item)

    eos_response = Response()
    eos_response.status_code = int(header_blocks[-1][0].split()[1])
    eos_response.request = Request(
        method="PUT",
        url=eos_url,
        headers=headers,
        files={"file": (fname, fpointer, ftype)},
    )

    additional_message = f"  - I was not able to upload file to EOS. Please report the above information to developers.\r\n\r\n{resp_body}\r\n\r\n"
    if eos_response.status_code != 201:
        for header_block in header_blocks:
            additional_message += "\r\n".join(header_block)
            additional_message += "\r\n" + "-" * 10 + "\r\n"
        raise exceptions.ResponseException(
            eos_response, additional_message=additional_message
        )

    return eos_response