# -*- coding: utf-8 -*-
#
from io import BytesIO
from lxml import etree
from collections import namedtuple
from datetime import datetime
from obsapi.sourceapi import ObsSourceApi
from obsapi.buildapi import ObsBuildApi
from obsapi.formatter import Formatter
LSItem = namedtuple('LSItem', 'name md5 size mtime')
Directory = namedtuple('Directory', 'name rev vrev srcmd5')
SourceInfo = namedtuple('SourceInfo', 'package rev vrev srcmd5 verifymd5')
Binary = namedtuple('Binary', 'filename size mtime')
User = namedtuple('User', 'userid role')
Group = namedtuple('Group', 'groupid role')
REGEX_NS = "http://exslt.org/regular-expressions"
DEFAULTAPIURL = 'https://api.opensuse.org'
emptyxml = ''
EMPTYTREE = etree.fromstring(emptyxml)
EMPTY = EMPTYTREE.xpath('.')[0]
class FileInfo(object):
file_str_template = ("{filename}"
"size : {size}"
"mtime : {mtime}"
)
pkg_str_template = ("{filename}"
"Name : {name}"
"Version : {version}"
"Release : {release}"
"Architecture: {arch}"
"Size : {size}"
"Source RPM : {source}"
"Build Date : {mtime}"
"Summary : {summary}"
"Description :"
"{description}"
"Provides :"
"{provides}"
)
src_str_template = ("{filename}"
"Name : {name}"
"Version : {version}"
"Release : {release}"
"Architecture: {arch}"
"Size : {size}"
"Build Date : {mtime}"
"Summary : {summary}"
"Description :"
"{description}"
)
def __init__(self, xml):
self.finfo = etree.fromstring(xml)
self.filename = self.finfo.get('filename', None)
self.formatter = Formatter(self)
@property
def info(self):
info = {'filename': [self.filename]}
for item in self.finfo:
info.setdefault(item.tag, []).append(item.text)
for item, value in info.items():
if len(value) == 1:
info[item] = value[0]
# info[item] = '\n'.join(value)
return info
def __str__(self):
return self.formatter.render()
def __getattr__(self, attr):
try:
values = [value.text for value in self.finfo.findall(attr)]
except Exception:
values = [None]
if len(values) == 1:
return values[0]
else:
return values
@property
def datetime(self):
if self.mtime:
return datetime.fromtimestamp(int(self.mtime))
@property
def xml(self):
return etree.tostring(self.finfo)
@property
def is_pkg(self):
if self.arch:
return True
else:
return False
@property
def is_src(self):
if self.source:
return False
else:
return True
@property
def is_debug_info(self):
if '-' in self.name:
if self.name.split('-')[-1] in ('debuginfo', 'debugsource'):
return True
return False
class ObsApi(object):
default_xml = ''
def __init__(self, apiurl=None):
self.apiurl = apiurl or DEFAULTAPIURL
self.source = ObsSourceApi(self.apiurl)
self.build = ObsBuildApi(self.apiurl)
self.source.retries = 3
self.source.verify_ssl = True
self.build.retries = 3
self.build.verify_ssl = True
self.lastapi = None
def __xml2etree(self, xml):
xml = xml.encode('utf-8')
parser = etree.XMLParser(remove_blank_text=True)
return etree.parse(BytesIO(xml), parser).xpath('.')[0]
def get_meta(self, prj, pkg=None):
return self.source.get_meta(prj, pkg=pkg)
def put_meta(self, prj, pkg=None, xml=None):
return self.source.put_meta(prj, pkg=pkg, xml=xml)
def ls(self, prj=None, pkg=None, repo=None, arch=None):
if repo and arch:
return self.binaries_ls(prj, pkg, repo, arch)
if prj and pkg:
return self.package_ls(prj, pkg)
return self.project_ls(prj)
def project_ls(self, prj):
xml = self.source.get(prj)
if xml is None:
return []
d = self.__xml2etree(xml)
lsitems = [e.get('name') for e in d.findall('entry')]
return lsitems
def package_ls(self, prj, pkg):
xml = self.source.get(prj, pkg=pkg)
if xml is None:
return (Directory(None, None, None, None), [])
d = etree.fromstring(xml)
directory = Directory(d.get('name'),
d.get('rev'),
d.get('vrev'),
d.get('srcmd5')
)
lsitems = []
for item in d.findall('entry'):
lsitems.append(LSItem(item.get('name'),
item.get('md5'),
item.get('size'),
item.get('mtime')
)
)
return (directory, lsitems)
def get_source_info(self, prj, pkg, rev=None):
if rev is None:
xml = self.source.get(prj, pkg=pkg, view='info')
else:
xml = self.source.get(prj, pkg=pkg, view='info', rev=rev)
tree = etree.fromstring(xml)
sinfo = SourceInfo(tree.get('package'),
tree.get('rev'),
tree.get('vrev'),
tree.get('srcmd5'),
tree.get('verifymd5')
)
return sinfo
def binaries_ls(self, prj, pkg, repo, arch):
binaries = []
xml = self.build.get(prj, repo, arch, pkg)
if xml:
blist = etree.fromstring(xml)
binaries = [Binary(filename=i.get('filename'),
size=i.get('size'),
mtime=i.get('mtime'))
for i in blist.findall('binary')]
return binaries
def get_binary_fileinfo(self, prj, pkg, repo, arch, binary):
xml = self.build.get(prj, repo, arch, pkg, binary, view='fileinfo')
return FileInfo(xml)
def get_project_repos(self, prj):
xml = self.build.get(prj)
directory = etree.fromstring(xml)
entries = []
for entry in directory.findall('entry'):
entries.append(entry.get('name'))
return entries
def get_package_version(self, prj, pkg, repo, arch, full=True):
binaries = self.binaries_ls(prj, pkg, repo, arch)
r_finfo = ''
for item in binaries:
if item.filename.endswith('.src.rpm'):
r_finfo = self.get_binary_fileinfo(prj,
pkg,
repo,
arch,
item.filename
)
break
if r_finfo:
finfo = etree.fromstring(r_finfo)
version = finfo.find('version').text
release = finfo.find('release').text
if full:
version = '%s-%s' % (version, release)
else:
version = ''
return version
def get_build_config(self, prj, repo):
return self.build.get_buildconfig(prj, repo)
def get_build_info(self, prj, pkg, repo, arch):
return self.build.get_buildinfo(prj, pkg, repo, arch)
def get_nothing_provides(self, prj, pkg, repo, arch):
tree = etree.fromstring(self.get_build_info(prj, pkg, repo, arch))
errors = (tree.xpath('error') or [EMPTY])[0]
nothing_provides = []
if errors is None or 'unresolvable' not in errors:
return nothing_provides
for unresolvable in [u.strip() for
u in errors.split('unresolvable:')[1].split(',')]:
if unresolvable.startswith('nothing provides'):
nothing_provides.append(unresolvable.split(
'nothing provides ')[1])
return nothing_provides
def get_vendor(self, prj, repo=None):
''' Attempt to get the value of the %vendor macro if exists
Search build configs from all repos. Take first occurance.
'''
if repo is not None:
repos = [repo]
else:
repos = self.get_project_repos(prj)
vendor = None
for repo in repos:
for line in self.get_build_config(prj, repo).splitlines():
if line.strip().startswith('%vendor '):
vendor = line.split(' ', 1)[1]
# We take the first occurance
if vendor is not None:
break
return vendor
def locked(self, prj, pkg=None):
meta = self.__xml2etree(self.get_meta(prj, pkg))
if meta.find('lock') is not None:
return True
return False
def lock(self, prj, pkg=None):
xml = self.get_meta(prj, pkg)
meta = self.__xml2etree(xml)
if meta.tag not in ['project', 'package']:
if pkg:
raise Exception('Failed to lock package. %s/%s' % (prj, pkg))
else:
raise Exception('Failed to lock project. %s' % prj)
if self.locked(prj, pkg):
return 'Already Locked'
lock = etree.SubElement(meta, 'lock')
etree.SubElement(lock, 'enable')
r = self.put_meta(prj, pkg, etree.tostring(meta))
return r
def unlock(self, prj, pkg=None, comment=None):
if not self.locked(prj, pkg):
return
comment = comment or "Unlock"
self.source.post(prj, pkg, cmd="unlock", comment="Unlock")
def get_users(self, prj, pkg=None, role=None):
xml = self.get_meta(prj, pkg)
if xml is None:
return None
if role is None:
role = '.*' # regex to match all
meta = self.__xml2etree(xml)
persons = meta.xpath('.//person[re:test(@role, role)]', namespaces={'re': REGEX_NS})
return [User(userid=p.get('userid'), role=p.get('role')) for p in persons]
def get_groups(self, prj, pkg=None, role=None):
xml = self.get_meta(prj, pkg)
if xml is None:
return None
if role is None:
role = '.*' # regex to match all
meta = self.__xml2etree(xml)
groups = meta.xpath('.//group[re:test(@role, role)]', namespaces={'re': REGEX_NS})
return [Group(groupid=g.get('groupid'), role=g.get('role')) for g in groups]
def add_user(self, user, prj, pkg=None):
try:
userid = user.userid
except Exception as e:
raise Exception(e)
try:
role = user.role
except Exception as e:
raise Exception(e)
xml = self.get_meta(prj, pkg)
if xml is None:
raise Exception('unable to retrieve project meta')
meta = self.__xml2etree(xml)
etree.SubElement(meta, 'person', userid=userid, role=role)
return self.put_meta(prj, pkg, etree.tostring(meta))
def add_group(self, group, prj, pkg=None):
try:
groupid = group.groupid
except Exception as e:
raise Exception(e)
try:
role = group.role
except Exception as e:
raise Exception(e)
xml = self.get_meta(prj, pkg)
if xml is None:
raise Exception('unable to retrieve project meta')
meta = self.__xml2etree(xml)
etree.SubElement(meta, 'group', groupid=groupid, role=role)
return self.put_meta(prj, pkg, etree.tostring(meta))