itkdb
itkdb ¶
Client ¶
Client(
use_eos=False,
pagination_history=False,
**session_kwargs
)
Bases: Session
The top-level user-facing client for interacting with the ITk Production Database API.
Changed in version 0.4.0
- added
use_eos
argument
Changed in version 0.4.6
- added
pagination_history
argument
Source code in src/itkdb/client.py
def __init__(self, use_eos=False, pagination_history=False, **session_kwargs):
self._use_eos = use_eos
self._pagination_history = pagination_history
super().__init__(**session_kwargs)
delete_from_eos ¶
delete_from_eos(response, **_) -> None
requests response hook function to delete a file from eos.
Source code in src/itkdb/client.py
def delete_from_eos(self, response, **_) -> None:
"""
requests response hook function to delete a file from eos.
"""
try:
response.raise_for_status()
except HTTPError:
log.warning("Something went wrong with deleting the attachment.")
return response
data = response.json()
# do nothing if it's not an EOS-type attachment
# or if betamax is being used (no need to run the cURL for EOS)
if (
data["attachment"]["type"] != "eos"
or response.connection.__class__.__name__ == "BetamaxAdapter"
):
return None
if "token" not in data:
log.warning(
"It seems there is no token, so we are not deleting this from EOS."
)
return None
log.info(
"It looks like you're deleting an attachment from ITk PD that is stored on EOS, I will try to delete it from EOS for you."
)
headers = {
"Authorization": f"Bearer {data['token']}",
"User-Agent": f"itkdb/{__version__}",
}
buffer_header = BytesIO()
buffer_body = BytesIO()
curl = pycurl.Curl()
curl.setopt(curl.URL, data["attachment"]["url"])
curl.setopt(curl.FOLLOWLOCATION, True)
curl.setopt(curl.CUSTOMREQUEST, "DELETE")
curl.setopt(
curl.HTTPHEADER, [f'{capwords(k, "-")}: {v}' for k, v in headers.items()]
)
curl.setopt(curl.CAINFO, str((itkdb_data / "CERN_chain.pem").resolve()))
curl.setopt(curl.HEADERFUNCTION, buffer_header.write)
curl.setopt(curl.WRITEFUNCTION, buffer_body.write)
curl.perform()
curl.close()
resp_header = buffer_header.getvalue().decode()
resp_body = buffer_body.getvalue().decode()
header_blocks = []
for item in resp_header.strip().split("\r\n"):
if item.startswith("HTTP"):
header_blocks.append([item])
elif item:
header_blocks[-1].append(item)
eos_response = Response()
eos_response.status_code = int(header_blocks[-1][0].split()[1])
eos_response.request = Request(
method="DELETE",
url=data["attachment"]["url"],
headers=headers,
)
additional_message = f" - I was not able to delete the file from EOS. Please report the above information to developers.\r\n\r\n{resp_body}\r\n\r\n"
if eos_response.status_code != 204:
for header_block in header_blocks:
additional_message += "\r\n".join(header_block)
additional_message += "\r\n" + "-" * 10 + "\r\n"
raise exceptions.ResponseException(
eos_response, additional_message=additional_message
)
response.eos_response = eos_response
return None
get ¶
get(url, **kwargs)
Source code in src/itkdb/client.py
def get(self, url, **kwargs):
is_cern_url = ".cern.ch" in urlparse(url).netloc
# is_binary_data = "uu-app-binarystore/getBinaryData" in url
if is_cern_url:
log.info(
"Identified a cern.ch request, will attach CERN SSL chain to request by overriding `verify`."
)
kwargs["verify"] = itkdb_data / "CERN_chain.pem"
# getBinaryData does not handle chunked requests
# if is_cern_url or is_binary_data:
if is_cern_url:
log.info(
"Identified a request that potentially downloads larger amounts of data, will execute chunked requests (stream=True)."
)
kwargs["stream"] = True
headers = kwargs.get("headers", {})
headers["transfer-encoding"] = "chunked"
kwargs["headers"] = headers
return super().get(url, **kwargs)
prepare_request ¶
prepare_request(request)
Source code in src/itkdb/client.py
def prepare_request(self, request):
request.url = self._normalize_url(request.url)
self._request_handler(request)
return super().prepare_request(request)
request ¶
request(method, url, *args, **kwargs)
Source code in src/itkdb/client.py
def request(self, method, url, *args, **kwargs):
self.limit = kwargs.pop("limit", -1)
response = super(Session, self).request(method, url, *args, **kwargs)
return self._response_handler(response)
upload_to_eos ¶
upload_to_eos(response, eos_file_details=None, **_) -> None
requests response hook function to upload a file to eos.
Source code in src/itkdb/client.py
def upload_to_eos(self, response, eos_file_details=None, **_) -> None:
"""
requests response hook function to upload a file to eos.
"""
log.info("I was able to get a token to upload to EOS. Let me upload.")
try:
response.raise_for_status()
except HTTPError:
log.warning("Something went wrong with uploading to EOS.")
return response
# do nothing if betamax is being used (no need to run the cURL for EOS)
if response.connection.__class__.__name__ == "BetamaxAdapter":
return None
# see _request_handler for this information
fname, fpointer, ftype, fheaders = eos_file_details
token_request = response.json()
log.info(token_request)
headers = {
"Authorization": f"Bearer {token_request['token']}",
"User-Agent": f"itkdb/{__version__}",
"Content-Type": ftype,
**fheaders,
}
buffer_header = BytesIO()
buffer_body = BytesIO()
curl = pycurl.Curl()
curl.setopt(curl.URL, token_request["url"])
curl.setopt(curl.FOLLOWLOCATION, True)
curl.setopt(curl.UPLOAD, True)
curl.setopt(
curl.HTTPHEADER, [f'{capwords(k, "-")}: {v}' for k, v in headers.items()]
)
curl.setopt(curl.CAINFO, str((itkdb_data / "CERN_chain.pem").resolve()))
curl.setopt(curl.READDATA, fpointer)
curl.setopt(curl.INFILESIZE_LARGE, utils.get_filesize(fname, fpointer))
curl.setopt(curl.HEADERFUNCTION, buffer_header.write)
curl.setopt(curl.WRITEFUNCTION, buffer_body.write)
curl.setopt(curl.SEEKFUNCTION, fpointer.seek)
curl.perform()
curl.close()
resp_header = buffer_header.getvalue().decode()
resp_body = buffer_body.getvalue().decode()
header_blocks = []
for item in resp_header.strip().split("\r\n"):
if item.startswith("HTTP"):
header_blocks.append([item])
elif item:
header_blocks[-1].append(item)
eos_response = Response()
eos_response.status_code = int(header_blocks[-1][0].split()[1])
eos_response.request = Request(
method="PUT",
url=token_request["url"],
headers=headers,
files={"file": (fname, fpointer, ftype)},
)
additional_message = f" - I was not able to upload file to EOS. Please report the above information to developers.\r\n\r\n{resp_body}\r\n\r\n"
if eos_response.status_code != 201:
for header_block in header_blocks:
additional_message += "\r\n".join(header_block)
additional_message += "\r\n" + "-" * 10 + "\r\n"
raise exceptions.ResponseException(
eos_response, additional_message=additional_message
)
response.eos_response = eos_response
return None