Source code for globus_sdk.transport.requests

from __future__ import annotations

import contextlib
import logging
import pathlib
import time
import typing as t

import requests

from globus_sdk import __version__, config, exc
from globus_sdk.authorizers import GlobusAuthorizer
from globus_sdk.transport.encoders import (
    FormRequestEncoder,
    JSONRequestEncoder,
    RequestEncoder,
)

from ._clientinfo import GlobusClientInfo
from .caller_info import RequestCallerInfo
from .retry import RetryContext
from .retry_check_runner import RetryCheckRunner
from .retry_config import RetryConfig

log = logging.getLogger(__name__)


[docs] class RequestsTransport: """ The RequestsTransport handles HTTP request sending and retries. It receives raw request information from a client class, and then performs the following steps - encode the data in a prepared request - repeatedly send the request until no retry is requested by the configured hooks - return the last response or reraise the last exception If the maximum number of retries is reached, the final response or exception will be returned or raised. :param verify_ssl: Explicitly enable or disable SSL verification, or configure the path to a CA certificate bundle to use for SSL verification :param http_timeout: Explicitly set an HTTP timeout value in seconds. This parameter defaults to 60s but can be set via the ``GLOBUS_SDK_HTTP_TIMEOUT`` environment variable. Any value set via this parameter takes precedence over the environment variable. :ivar dict[str, str] headers: The headers which are sent on every request. These may be augmented by the transport when sending requests. """ #: default maximum number of retries DEFAULT_MAX_RETRIES = 5 #: the encoders are a mapping of encoding names to encoder objects encoders: dict[str, RequestEncoder] = { "text": RequestEncoder(), "json": JSONRequestEncoder(), "form": FormRequestEncoder(), } BASE_USER_AGENT = f"globus-sdk-py-{__version__}" def __init__( self, verify_ssl: bool | str | pathlib.Path | None = None, http_timeout: float | None = None, ) -> None: self.session = requests.Session() self.verify_ssl = config.get_ssl_verify(verify_ssl) self.http_timeout = config.get_http_timeout(http_timeout) self._user_agent = self.BASE_USER_AGENT self.globus_client_info: GlobusClientInfo = GlobusClientInfo( update_callback=self._handle_clientinfo_update ) self.headers: dict[str, str] = { "Accept": "application/json", "User-Agent": self.user_agent, "X-Globus-Client-Info": self.globus_client_info.format(), }
[docs] def close(self) -> None: """ Closes all resources owned by the transport, primarily the underlying network session. """ self.session.close()
@property def user_agent(self) -> str: return self._user_agent @user_agent.setter def user_agent(self, value: str) -> None: """ Set the ``user_agent`` and update the ``User-Agent`` header in ``headers``. :param value: The new user-agent string to set (after the base user-agent) """ self._user_agent = f"{self.BASE_USER_AGENT}/{value}" self.headers["User-Agent"] = self._user_agent def _handle_clientinfo_update( self, info: GlobusClientInfo, # pylint: disable=unused-argument ) -> None: """ When the attached ``GlobusClientInfo`` is updated, write it back into ``headers``. If the client info is cleared, it will be removed from the headers. """ formatted = self.globus_client_info.format() if formatted: self.headers["X-Globus-Client-Info"] = formatted else: # discard the element, so that this can be invoked multiple times self.headers.pop("X-Globus-Client-Info", None)
[docs] @contextlib.contextmanager def tune( self, *, verify_ssl: bool | str | pathlib.Path | None = None, http_timeout: float | None = None, ) -> t.Iterator[None]: """ Temporarily adjust some of the request sending settings of the transport. This method works as a context manager, and will reset settings to their original values after it exits. :param verify_ssl: Explicitly enable or disable SSL verification, or configure the path to a CA certificate bundle to use for SSL verification :param http_timeout: Explicitly set an HTTP timeout value in seconds **Example Usage** This can be used with any client class to temporarily set values in the context of one or more HTTP requests. To increase the HTTP request timeout from the default of 60 to 120 seconds, >>> client = ... # any client class >>> with client.transport.tune(http_timeout=120): >>> foo = client.get_foo() See also: :meth:`RetryConfig.tune`. """ saved_settings = ( self.verify_ssl, self.http_timeout, ) if verify_ssl is not None: if isinstance(verify_ssl, bool): self.verify_ssl = verify_ssl else: self.verify_ssl = str(verify_ssl) if http_timeout is not None: self.http_timeout = http_timeout yield ( self.verify_ssl, self.http_timeout, ) = saved_settings
def _encode( self, method: str, url: str, query_params: dict[str, t.Any] | None = None, data: dict[str, t.Any] | list[t.Any] | str | bytes | None = None, headers: dict[str, str] | None = None, encoding: str | None = None, ) -> requests.Request: if headers: headers = {**self.headers, **headers} else: headers = self.headers if encoding is None: if isinstance(data, (bytes, str)): encoding = "text" else: encoding = "json" if encoding not in self.encoders: raise ValueError( f"Unknown encoding '{encoding}' is not supported by this transport." ) return self.encoders[encoding].encode(method, url, query_params, data, headers) def _set_authz_header( self, authorizer: GlobusAuthorizer | None, req: requests.Request ) -> None: if authorizer: authz_header = authorizer.get_authorization_header() if authz_header: req.headers["Authorization"] = authz_header else: req.headers.pop("Authorization", None) # remove any possible value def _retry_sleep(self, retry_config: RetryConfig, ctx: RetryContext) -> None: """ Given a retry context, compute the amount of time to sleep and sleep that much This is always the minimum of the backoff (run on the context) and the ``max_sleep``. :param ctx: The context object which describes the state of the request and the retries which may already have been attempted. """ sleep_period = min(retry_config.backoff(ctx), retry_config.max_sleep) log.debug( "request retry_sleep(%s) [max=%s]", sleep_period, retry_config.max_sleep, ) time.sleep(sleep_period)
[docs] def request( self, method: str, url: str, *, caller_info: RequestCallerInfo, query_params: dict[str, t.Any] | None = None, data: dict[str, t.Any] | list[t.Any] | str | bytes | None = None, headers: dict[str, str] | None = None, encoding: str | None = None, allow_redirects: bool = True, stream: bool = False, ) -> requests.Response: """ Send an HTTP request :param url: URL for the request :param method: HTTP request method, as an all caps string :param caller_info: Contextual information about the caller of the request, including the authorizer and retry configuration. :param query_params: Parameters to be encoded as a query string :param headers: HTTP headers to add to the request :param data: Data to send as the request body. May pass through encoding. :param encoding: A way to encode request data. "json", "form", and "text" are all valid values. Custom encodings can be used only if they are registered with the transport. By default, strings get "text" behavior and all other objects get "json". :param allow_redirects: Follow Location headers on redirect response automatically. Defaults to ``True`` :param stream: Do not immediately download the response content. Defaults to ``False`` :return: ``requests.Response`` object """ log.debug("starting request for %s", url) resp: requests.Response | None = None req = self._encode(method, url, query_params, data, headers, encoding) retry_config = caller_info.retry_config checker = RetryCheckRunner(caller_info.retry_config.checks) log.debug("transport request state initialized") for attempt in range(retry_config.max_retries + 1): log.debug("transport request retry cycle. attempt=%d", attempt) # add Authorization header, or (if it's a NullAuthorizer) possibly # explicitly remove the Authorization header # done fresh for each request, to handle potential for refreshed credentials self._set_authz_header(caller_info.authorizer, req) ctx = RetryContext(attempt, caller_info=caller_info) try: log.debug("request about to send") resp = ctx.response = self.session.send( req.prepare(), timeout=self.http_timeout, verify=self.verify_ssl, allow_redirects=allow_redirects, stream=stream, ) except requests.RequestException as err: log.debug("request hit error (RequestException)") ctx.exception = err if attempt >= retry_config.max_retries or not checker.should_retry(ctx): log.warning("request done (fail, error)") raise exc.convert_request_exception(err) log.debug("request may retry (should-retry=true)") else: log.debug("request success, still check should-retry") if not checker.should_retry(ctx): log.debug("request done (success)") return resp log.debug("request may retry, will check attempts") # the request will be retried, so sleep... if attempt < retry_config.max_retries: log.debug("under attempt limit, will sleep") self._retry_sleep(retry_config, ctx) if resp is None: raise ValueError("Somehow, retries ended without a response") log.warning("request reached max retries, done (fail, response)") return resp