# -*- coding: utf-8 -*- # import requests import re from obsapi.logger import logger from requests.auth import HTTPBasicAuth from .null import Null try: import osc.conf as osc_conf except Exception as e: osc_conf = None DEFAULTAPIURL = 'https://api.opensuse.org' url_validate = re.compile(r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE).match class ObsHttpApi(object): default_xml = '' rootapi = '/' def __init__(self, apiurl=None): self._apiurl = DEFAULTAPIURL self.apiurl = apiurl self._auth = {} self._response = Null() self.retries = 0 self.verify_ssl = True @property def __auth(self): return self._auth.get(self.apiurl, self.__get_auth()) def __get_auth(self): conf = {} if osc_conf: try: osc_conf.get_config() conf = osc_conf.get_apiurl_api_host_options(self.apiurl) except Exception as e: logger.debug(e) pass user = conf.get('user', None) password = conf.get('pass', None) if user and password: self._auth[self.apiurl] = HTTPBasicAuth(user, password) else: self._auth[self.apiurl] = None return self._auth[self.apiurl] def __api_get(self, api, params=None): url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api) stream = params.pop('stream', False) def try_get(): r = requests.get(url, auth=self.__auth, params=params, stream=stream, verify=self.verify_ssl) self._response = r return r for attempt in range(self.retries): r = try_get() if self.success: return r if self.retries == 0: r = try_get() return r def __api_put(self, api, data, params=None): url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api) r = requests.put(url, auth=self.__auth, data=data, params=params, verify=self.verify_ssl) self._response = r return r def __api_post(self, api, data, params=None): url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api) r = requests.post(url, auth=self.__auth, data=data, params=params, verify=self.verify_ssl) self._response = r return r @property def apiurl(self): return self._apiurl @apiurl.setter def apiurl(self, url): if url_validate(url) is None: raise(Exception('Invalid URL: {}'.format(url))) self._apiurl = url @property def response(self): '''Return requests response from last api query''' return self._response @property def success(self): '''Return True if last api query was successful, else return False''' return self._response.status_code == requests.codes.ok def __api(self, *args): args = [elem for elem in args if elem] return '/'.join(['{}'] * len(args)).format(*args) def get(self, *args, **params): binary_get = params.pop('binary_get', False) api = self.__api(*args) stream = params.get('stream', False) r = self.__api_get(api, params=params) if not self.success: return None if binary_get: if stream: return r return r.content return r.text def put(self, *args, **kwargs): data = kwargs.pop('data', None) api = self.__api(*args) r = self.__api_put(api, data=data, params=kwargs) return r def post(self, *args, **kwargs): data = kwargs.pop('data', None) api = self.__api(*args) r = self.__api_post(api, data=data, params=kwargs) return r