1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# -*- coding: utf-8 -*-
#
import requests
from log import logger
from requests.auth import HTTPBasicAuth
from null import Null
try:
import osc.conf as osc_conf
except Exception as e:
osc_conf = None
DEFAULTAPIURL = 'https://api.opensuse.org'
class ObsHttpApi(object):
default_xml = '<None/>'
rootapi = '/'
def __init__(self, apiurl=None):
self.apiurl = apiurl or DEFAULTAPIURL
self._auth = {}
self._response = Null()
self.retries = 3
self.verify_ssl = True
@property
def __auth(self):
return self._auth.get(self.apiurl, self.__get_auth())
def __get_auth(self):
conf = {}
if osc_conf:
try:
osc_conf.get_config()
conf = osc_conf.get_apiurl_api_host_options(self.apiurl)
except Exception as e:
logger.debug(e)
pass
user = conf.get('user', None)
password = conf.get('pass', None)
if user and password:
self._auth[self.apiurl] = HTTPBasicAuth(user, password)
else:
self._auth[self.apiurl] = None
return self._auth[self.apiurl]
def __api_get(self, api, params=None):
url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api)
def try_get():
r = requests.get(url,
auth=self.__auth,
params=params,
verify=self.verify_ssl)
self._response = r
return r
for attempt in range(self.retries):
r = try_get()
if self.success:
return r
print("Failed: %s" % self.response)
print("retry: %s" % (attempt + 1))
if self.retries == 0:
r = try_get()
return r
def __api_put(self, api, data, params=None):
url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api)
r = requests.put(url,
auth=self.__auth,
data=data,
params=params,
verify=self.verify_ssl)
self._response = r
return r
def __api_post(self, api, data, params=None):
url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api)
r = requests.post(url,
auth=self.__auth,
data=data,
params=params,
verify=self.verify_ssl)
self._response = r
return r
@property
def response(self):
'''Return requests response from last api query'''
return self._response
@property
def success(self):
'''Return True if last api query was successful, else
return False'''
return self._response.status_code == requests.codes.ok
def __api(self, *args):
args = [elem for elem in args if elem]
return '/'.join(['{}'] * len(args)).format(*args)
def get(self, *args, **params):
api = self.__api(*args)
r = self.__api_get(api, params=params)
if not self.success:
return self.default_xml
return r.text
def put(self, *args, **kwargs):
data = kwargs.pop('data', None)
api = self.__api(*args)
r = self.__api_put(api, data=data, params=kwargs)
return r
def post(self, *args, **kwargs):
data = kwargs.pop('data', None)
api = self.__api(*args)
r = self.__api_post(api, data=data, params=kwargs)
return r
|