# -*- coding: utf-8 -*-
#
import re
from io import BytesIO
from lxml import etree
from collections import namedtuple
from datetime import datetime
from obsapi.sourceapi import ObsSourceApi
from obsapi.buildapi import ObsBuildApi
from obsapi.formatter import Formatter
from obsapi.repoflags import RepoFlags
LSItem = namedtuple('LSItem', 'name md5 size mtime')
Directory = namedtuple('Directory', 'name rev vrev srcmd5')
SourceInfo = namedtuple('SourceInfo', 'package rev vrev srcmd5 verifymd5')
Binary = namedtuple('Binary', 'filename size mtime')
User = namedtuple('User', 'userid role')
Group = namedtuple('Group', 'groupid role')
REGEX_NS = 'http://exslt.org/regular-expressions'
DEFAULTAPIURL = 'https://api.opensuse.org'
emptyxml = ''
EMPTYTREE = etree.fromstring(emptyxml)
EMPTY = EMPTYTREE.xpath('.')[0]
class FileInfo(object):
file_str_template = ('{filename}'
'size : {size}'
'mtime : {mtime}'
)
pkg_str_template = ('{filename}'
'Name : {name}'
'Version : {version}'
'Release : {release}'
'Architecture: {arch}'
'Size : {size}'
'Source RPM : {source}'
'Build Date : {mtime}'
'Summary : {summary}'
'Description :'
'{description}'
'Provides :'
'{provides}'
)
src_str_template = ('{filename}'
'Name : {name}'
'Version : {version}'
'Release : {release}'
'Architecture: {arch}'
'Size : {size}'
'Build Date : {mtime}'
'Summary : {summary}'
'Description :'
'{description}'
)
def __init__(self, xml):
self.finfo = etree.fromstring(xml)
self.filename = self.finfo.get('filename', None)
self.formatter = Formatter(self)
@property
def info(self):
info = {'filename': [self.filename]}
for item in self.finfo:
info.setdefault(item.tag, []).append(item.text)
for item, value in info.items():
if len(value) == 1:
info[item] = value[0]
# info[item] = '\n'.join(value)
return info
def __str__(self):
return self.formatter.render()
def __getattr__(self, attr):
try:
values = [value.text for value in self.finfo.findall(attr)]
except Exception:
values = [None]
if len(values) == 1:
return values[0]
else:
return values
@property
def datetime(self):
if self.mtime:
return datetime.fromtimestamp(int(self.mtime))
@property
def xml(self):
return etree.tostring(self.finfo)
@property
def is_pkg(self):
if self.arch:
return True
else:
return False
@property
def is_src(self):
if self.source:
return False
else:
return True
@property
def is_debug_info(self):
if '-' in self.name:
if self.name.split('-')[-1] in ('debuginfo', 'debugsource'):
return True
return False
class ObsApi(object):
default_xml = ''
def __init__(self, apiurl=None):
self.apiurl = apiurl or DEFAULTAPIURL
self.source = ObsSourceApi(self.apiurl)
self.build = ObsBuildApi(self.apiurl)
self.source.retries = 3
self.source.verify_ssl = True
self.build.retries = 3
self.build.verify_ssl = True
self.lastapi = None
def __xml2etree(self, xml):
xml = xml.encode('utf-8')
parser = etree.XMLParser(remove_blank_text=True)
return etree.parse(BytesIO(xml), parser).xpath('.')[0]
def get_meta(self, prj, pkg=None):
return self.source.get_meta(prj, pkg=pkg)
def put_meta(self, prj, pkg=None, xml=None):
return self.source.put_meta(prj, pkg=pkg, xml=xml)
def ls(self, prj=None, pkg=None, repo=None, arch=None):
if repo and arch:
return self.binaries_ls(prj, pkg, repo, arch)
if prj and pkg:
return self.package_ls(prj, pkg)
return self.project_ls(prj)
def project_ls(self, prj):
xml = self.source.get(prj)
if xml is None:
return []
d = self.__xml2etree(xml)
lsitems = [e.get('name') for e in d.findall('entry')]
return lsitems
def package_ls(self, prj, pkg):
xml = self.source.get(prj, pkg=pkg)
if xml is None:
return (Directory(None, None, None, None), [])
d = etree.fromstring(xml)
directory = Directory(d.get('name'),
d.get('rev'),
d.get('vrev'),
d.get('srcmd5')
)
lsitems = []
for item in d.findall('entry'):
lsitems.append(LSItem(item.get('name'),
item.get('md5'),
item.get('size'),
item.get('mtime')
)
)
return (directory, lsitems)
def get_source_info(self, prj, pkg, rev=None):
if rev is None:
xml = self.source.get(prj, pkg=pkg, view='info')
else:
xml = self.source.get(prj, pkg=pkg, view='info', rev=rev)
tree = etree.fromstring(xml)
sinfo = SourceInfo(tree.get('package'),
tree.get('rev'),
tree.get('vrev'),
tree.get('srcmd5'),
tree.get('verifymd5')
)
return sinfo
def binaries_ls(self, prj, pkg, repo, arch):
binaries = []
if pkg is None:
xml = self.build.get_repository(prj, repo, arch)
else:
xml = self.build.get(prj, repo, arch, pkg)
if xml:
blist = etree.fromstring(xml)
binaries = [Binary(filename=i.get('filename'),
size=i.get('size'),
mtime=i.get('mtime'))
for i in blist.findall('binary')]
return binaries
def get_binary_fileinfo(self, prj, pkg, repo, arch, binary):
xml = self.build.get(prj, repo, arch, pkg, binary, view='fileinfo')
return FileInfo(xml)
def get_project_repos(self, prj):
xml = self.build.get(prj)
directory = etree.fromstring(xml)
entries = []
for entry in directory.findall('entry'):
entries.append(entry.get('name'))
return entries
def get_package_version(self, prj, pkg, repo, arch, full=True):
binaries = self.binaries_ls(prj, pkg, repo, arch)
r_finfo = ''
for item in binaries:
if item.filename.endswith('.src.rpm'):
r_finfo = self.get_binary_fileinfo(prj,
pkg,
repo,
arch,
item.filename
)
break
if r_finfo:
finfo = etree.fromstring(r_finfo)
version = finfo.find('version').text
release = finfo.find('release').text
if full:
version = '%s-%s' % (version, release)
else:
version = ''
return version
def get_build_config(self, prj, repo):
return self.build.get_buildconfig(prj, repo)
def get_build_info(self, prj, pkg, repo, arch):
return self.build.get_buildinfo(prj, pkg, repo, arch)
def get_nothing_provides(self, prj, pkg, repo, arch):
tree = etree.fromstring(self.get_build_info(prj, pkg, repo, arch))
errors = (tree.xpath('error') or [EMPTY])[0]
nothing_provides = []
if errors is None or 'unresolvable' not in errors:
return nothing_provides
for unresolvable in [u.strip() for
u in errors.split('unresolvable:')[1].split(',')]:
if unresolvable.startswith('nothing provides'):
nothing_provides.append(unresolvable.split(
'nothing provides ')[1])
return nothing_provides
def get_vendor(self, prj, repo=None):
""" Attempt to get the value of the %vendor macro if exists
Search build configs from all repos. Take first occurance.
"""
if repo is not None:
repos = [repo]
else:
repos = self.get_project_repos(prj)
vendor_re = re.compile(r'.*vendor (.+?)\n')
vendor = None
for repo in repos:
prj_config = self.get_build_config(prj, repo)
matches = vendor_re.findall(prj_config)
if matches:
vendor = matches[-1]
break
return vendor
def get_repo_flags(self, flag_type, prj, pkg=None):
return RepoFlags(flag_type, self.get_meta(prj, pkg))
def locked(self, prj, pkg=None):
meta = self.__xml2etree(self.get_meta(prj, pkg))
if meta.find('lock') is not None:
return True
return False
def lock(self, prj, pkg=None, comment=None):
if self.locked(prj, pkg):
return
if comment is None:
comment = 'Lock'
return self.source.post(prj, pkg, cmd='lock', comment=comment)
def unlock(self, prj, pkg=None, comment=None):
if not self.locked(prj, pkg):
return
if comment is None:
comment = 'Unlock'
return self.source.post(prj, pkg, cmd='unlock', comment=comment)
def get_users(self, prj, pkg=None, role=None):
xml = self.get_meta(prj, pkg)
if xml is None:
return None
if role is None:
role = '.*' # regex to match all
meta = self.__xml2etree(xml)
persons = meta.xpath('.//person[re:test(@role, role)]', namespaces={'re': REGEX_NS})
return [User(userid=p.get('userid'), role=p.get('role')) for p in persons]
def get_groups(self, prj, pkg=None, role=None):
xml = self.get_meta(prj, pkg)
if xml is None:
return None
if role is None:
role = '.*' # regex to match all
meta = self.__xml2etree(xml)
groups = meta.xpath('.//group[re:test(@role, role)]', namespaces={'re': REGEX_NS})
return [Group(groupid=g.get('groupid'), role=g.get('role')) for g in groups]
def add_user(self, user, prj, pkg=None):
try:
userid = user.userid
except Exception as e:
raise Exception(e)
try:
role = user.role
except Exception as e:
raise Exception(e)
xml = self.get_meta(prj, pkg)
if xml is None:
raise Exception('unable to retrieve project meta')
meta = self.__xml2etree(xml)
etree.SubElement(meta, 'person', userid=userid, role=role)
return self.put_meta(prj, pkg, etree.tostring(meta))
def add_group(self, group, prj, pkg=None):
try:
groupid = group.groupid
except Exception as e:
raise Exception(e)
try:
role = group.role
except Exception as e:
raise Exception(e)
xml = self.get_meta(prj, pkg)
if xml is None:
raise Exception('unable to retrieve project meta')
meta = self.__xml2etree(xml)
etree.SubElement(meta, 'group', groupid=groupid, role=role)
return self.put_meta(prj, pkg, etree.tostring(meta))
def get_spec_files(self, prj, pkg=None):
if pkg is not None:
pkgs = [pkg]
else:
pkgs = self.ls(prj)
for pkg in pkgs:
d, flist = self.ls(prj, pkg)
for srcfile in flist:
if srcfile.name.endswith('.spec'):
yield self.source.get(prj, pkg, srcfile.name)