# -*- coding: utf-8 -*-
#
import requests
from requests.auth import HTTPBasicAuth
from io import BytesIO
from lxml import etree
from collections import namedtuple
from datetime import datetime
from obsapi.formatter import Formatter
try:
import osc.conf as osc_conf
osc_conf.get_config()
except:
osc_conf = None
LSItem = namedtuple('LSItem', 'name md5 size mtime')
Directory = namedtuple('Directory', 'name rev vrev srcmd5')
SourceInfo = namedtuple('SourceInfo', 'package rev vrev srcmd5 verifymd5')
Binary = namedtuple('Binary', 'filename size mtime')
DEFAULTAPIURL = 'https://api.opensuse.org'
emptyxml = ''
EMPTYTREE = etree.fromstring(emptyxml)
EMPTY = EMPTYTREE.xpath('.')[0]
class FileInfo(object):
file_str_template = ("{filename}"
"size : {size}"
"mtime : {mtime}"
)
pkg_str_template = ("{filename}"
"Name : {name}"
"Version : {version}"
"Release : {release}"
"Architecture: {arch}"
"Size : {size}"
"Source RPM : {source}"
"Build Date : {mtime}"
"Summary : {summary}"
"Description :"
"{description}"
"Provides :"
"{provides}"
)
src_str_template = ("{filename}"
"Name : {name}"
"Version : {version}"
"Release : {release}"
"Architecture: {arch}"
"Size : {size}"
"Build Date : {mtime}"
"Summary : {summary}"
"Description :"
"{description}"
)
def __init__(self, xml):
self.finfo = etree.fromstring(xml)
self.filename = self.finfo.get('filename', None)
self.formatter = Formatter(self)
@property
def info(self):
info = {'filename': [self.filename]}
for item in self.finfo:
info.setdefault(item.tag, []).append(item.text)
for item, value in info.items():
if len(value) == 1:
info[item] = value[0]
# info[item] = '\n'.join(value)
return info
def __str__(self):
return self.formatter.render()
def __getattr__(self, attr):
try:
values = [value.text for value in self.finfo.findall(attr)]
except:
values = [None]
if len(values) == 1:
return values[0]
else:
return values
@property
def datetime(self):
if self.mtime:
return datetime.fromtimestamp(int(self.mtime))
@property
def xml(self):
return etree.tostring(self.finfo)
@property
def is_pkg(self):
if self.arch:
return True
else:
return False
@property
def is_src(self):
if self.source:
return False
else:
return True
@property
def is_debug_info(self):
if '-' in self.name:
if self.name.split('-')[-1] in ('debuginfo', 'debugsource'):
return True
return False
class ObsApi(object):
default_xml = ''
def __init__(self, apiurl=None):
self.apiurl = apiurl or DEFAULTAPIURL
self.__get_auth()
self._response = None
self.retries = 3
self.verify_ssl = True
def __get_auth(self):
conf = {}
if osc_conf:
try:
conf = osc_conf.get_apiurl_api_host_options(self.apiurl)
except:
pass
user = conf.get('user', None)
password = conf.get('pass', None)
if user and password:
self.auth = HTTPBasicAuth(user, password)
else:
self.auth = None
def __api_get(self, api, payload=None):
def try_get():
url = '{0}/{1}'.format(self.apiurl, api)
r = requests.get(url,
auth=self.auth,
params=payload,
verify=self.verify_ssl)
self._response = r
return r
for attempt in range(self.retries):
r = try_get()
if self.success:
return r
print("Failed: %s" % self.response)
print("retry: %s" % (attempt + 1))
if self.retries == 0:
r = try_get()
return r
def __api_put(self, api, data):
url = '{0}/{1}'.format(self.apiurl, api)
r = requests.put(url,
auth=self.auth,
data=data,
verify=self.verify_ssl)
self._response = r
return r
def __xml2etree(self, xml):
xml = xml.encode('utf-8')
parser = etree.XMLParser(remove_blank_text=True)
return etree.parse(BytesIO(xml), parser).xpath('.')[0]
@property
def response(self):
'''Return requests response from last api query'''
return self._response
@property
def success(self):
'''Return True if last api query was successful, else
return False'''
return self._response.status_code == requests.codes.ok
def get_xml(self, api, payload=None):
r = self.__api_get(api, payload)
if not self.success:
print self.response
return self.default_xml
return r.text
def put_xml(self, api, data):
r = self.__api_put(api, data)
return r
def get_meta(self, prj, pkg=None):
if pkg is not None:
api = '/source/{}/{}/_meta'.format(prj, pkg)
else:
api = '/source/{}/_meta'.format(prj)
return self.get_xml(api)
def put_meta(self, prj, pkg=None, xml=None):
if pkg is not None:
api = '/source/{}/{}/_meta'.format(prj, pkg)
else:
api = '/source/{}/_meta'.format(prj)
return self.put_xml(api, data=xml)
def get_project_meta(self, prj):
return self.get_meta(prj)
def put_project_meta(self, prj, xml):
return self.put_meta(prj, xml)
def get_package_meta(self, prj, pkg):
return self.get_meta(prj, pkg)
def put_package_meta(self, prj, pkg, xml):
return self.put_meta(prj, pkg, xml)
def ls(self, prj=None, pkg=None, repo=None, arch=None):
if repo and arch:
return self.binaries_ls(prj, pkg, repo, arch)
if prj and pkg:
return self.package_ls(prj, pkg)
if prj:
return self.project_ls(prj)
return None
def project_ls(self, prj):
api = '/source/{}'.format(prj)
xml = self.get_xml(api)
d = etree.fromstring(xml)
lsitems = [e.get('name') for e in d.findall('entry')]
return lsitems
def package_ls(self, prj, pkg):
api = '/source/{}/{}'.format(prj, pkg)
xml = self.get_xml(api)
d = etree.fromstring(xml)
directory = Directory(d.get('name'),
d.get('rev'),
d.get('vrev'),
d.get('srcmd5')
)
lsitems = []
for item in d.findall('entry'):
lsitems.append(LSItem(item.get('name'),
item.get('md5'),
item.get('size'),
item.get('mtime')
)
)
return (directory, lsitems)
def get_source_info(self, prj, pkg):
api = '/source/{}/{}'.format(prj, pkg)
query = {'view': 'info'}
xml = self.get_xml(api, payload=query)
tree = etree.fromstring(xml)
sinfo = SourceInfo(tree.get('package'),
tree.get('rev'),
tree.get('vrev'),
tree.get('srcmd5'),
tree.get('verifymd5')
)
return sinfo
def binaries_ls(self, prj, pkg, repo, arch):
api = '/build/{}/{}/{}/{}'.format(prj, repo, arch, pkg)
xml = self.get_xml(api)
blist = etree.fromstring(xml)
binaries = [Binary(filename=i.get('filename'),
size=i.get('size'),
mtime=i.get('mtime'))
for i in blist.findall('binary')]
return binaries
def get_binary_fileinfo(self, prj, pkg, repo, arch, binary):
payload = dict(view='fileinfo')
api = '/build/{}/{}/{}/{}/{}'.format(prj, repo, arch, pkg, binary)
xml = self.get_xml(api, payload)
return FileInfo(xml)
def get_project_repos(self, prj):
api = '/build/{}'.format(prj)
xml = self.get_xml(api)
directory = etree.fromstring(xml)
entries = []
for entry in directory.findall('entry'):
entries.append(entry.get('name'))
return entries
def get_package_version(self, prj, pkg, repo, arch, full=True):
binaries = self.get_binaries(prj, pkg, repo, arch)
r_finfo = ''
for item in binaries:
if item.filename.endswith('.src.rpm'):
r_finfo = self.get_binary_fileinfo(prj,
pkg,
repo,
arch,
item.filename
)
break
if r_finfo:
finfo = etree.fromstring(r_finfo)
version = finfo.find('version').text
release = finfo.find('release').text
if full:
version = '%s-%s' % (version, release)
else:
version = ''
return version
def get_build_config(self, prj, repo):
api = '/build/{}/{}/_buildconfig'.format(prj, repo)
return self.get_xml(api)
def get_build_info(self, prj, pkg, repo, arch):
api = '/build/{}/{}/{}/{}/_buildinfo'.format(prj, repo, arch, pkg)
return self.get_xml(api)
def get_nothing_provides(self, prj, pkg, repo, arch):
tree = etree.fromstring(self.get_build_info(prj, pkg, repo, arch))
errors = (tree.xpath('error') or [EMPTY])[0]
nothing_provides = []
if errors is None or 'unresolvable' not in errors:
return nothing_provides
for unresolvable in [u.strip() for
u in errors.split('unresolvable:')[1].split(',')]:
if unresolvable.startswith('nothing provides'):
nothing_provides.append(unresolvable.split(
'nothing provides ')[1])
return nothing_provides
def get_vendor(self, prj, repo=None):
''' Attempt to get the value of the %vendor macro if exists
Search build configs from all repos. Take first occurance.
'''
project_repos = self.get_project_repos(prj)
repos = [repo] or project_repos
vendor = None
for repo in repos:
for line in self.get_build_config(prj, repo).splitlines():
if line.strip().startswith('%vendor '):
vendor = line.split(' ', 1)[1]
# We take the first occurance
if vendor is not None:
break
return vendor
def lock(self, prj, pkg=None):
if pkg is None:
xml = self.get_project_meta(prj)
put = self.put_project_meta
else:
xml = self.get_package_meta(prj, pkg)
put = self.put_package_meta
meta = self.__xml2etree(xml)
if meta.tag not in ['project', 'package']:
if pkg:
raise Exception('Failed to lock package. %s/%s' % (prj, pkg))
else:
raise Exception('Failed to lock project. %s' % prj)
if meta.find('lock') is not None:
return 'Already Locked'
lock = self.__xml2etree('')
meta.append(lock)
if pkg:
r = put(prj, pkg, etree.tostring(meta))
else:
r = put(prj, etree.tostring(meta))
return r