1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# -*- coding: utf-8 -*-
#
import requests
from requests.auth import HTTPBasicAuth
try:
import osc.conf as osc_conf
osc_conf.get_config()
except:
osc_conf = None
DEFAULTAPIURL = 'https://api.opensuse.org'
class ObsHttpApi(object):
default_xml = '<None/>'
rootapi = '/'
def __init__(self, apiurl=None):
self.apiurl = apiurl or DEFAULTAPIURL
self.__get_auth()
self._response = None
self.retries = 3
self.verify_ssl = True
def __get_auth(self):
conf = {}
if osc_conf:
try:
conf = osc_conf.get_apiurl_api_host_options(self.apiurl)
except:
pass
user = conf.get('user', None)
password = conf.get('pass', None)
if user and password:
self.auth = HTTPBasicAuth(user, password)
else:
self.auth = None
def __api_get(self, api, params=None):
def try_get():
url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api)
print(url)
r = requests.get(url,
auth=self.auth,
params=params,
verify=self.verify_ssl)
self._response = r
return r
for attempt in range(self.retries):
r = try_get()
if self.success:
return r
print("Failed: %s" % self.response)
print("retry: %s" % (attempt + 1))
if self.retries == 0:
r = try_get()
return r
def __api_put(self, api, data):
url = '{0}{1}{2}'.format(self.apiurl, self.rootapi, api)
r = requests.put(url,
auth=self.auth,
data=data,
verify=self.verify_ssl)
self._response = r
return r
@property
def response(self):
'''Return requests response from last api query'''
return self._response
@property
def success(self):
'''Return True if last api query was successful, else
return False'''
return self._response.status_code == requests.codes.ok
def __api(self, *args):
args = [elem for elem in args if elem]
return '/'.join(['{}'] * len(args)).format(*args)
def get(self, *args, **params):
api = self.__api(*args)
r = self.__api_get(api, params=params)
if not self.success:
print self.response
return self.default_xml
return r.text
def put(self, args, data):
api = self.__api(*args)
r = self.__api_put(api, data)
return r
|