# -*- coding: utf-8 -*- # import re from io import BytesIO from lxml import etree from collections import namedtuple from datetime import datetime from obsapi.sourceapi import ObsSourceApi from obsapi.buildapi import ObsBuildApi from obsapi.formatter import Formatter from obsapi.repoflags import RepoFlags LSItem = namedtuple('LSItem', 'name md5 size mtime') Directory = namedtuple('Directory', 'name rev vrev srcmd5') SourceInfo = namedtuple('SourceInfo', 'package rev vrev srcmd5 verifymd5') Binary = namedtuple('Binary', 'filename size mtime') User = namedtuple('User', 'userid role') Group = namedtuple('Group', 'groupid role') REGEX_NS = 'http://exslt.org/regular-expressions' DEFAULTAPIURL = 'https://api.opensuse.org' emptyxml = '' EMPTYTREE = etree.fromstring(emptyxml) EMPTY = EMPTYTREE.xpath('.')[0] class FileInfo(object): file_str_template = ('{filename}' 'size : {size}' 'mtime : {mtime}' ) pkg_str_template = ('{filename}' 'Name : {name}' 'Version : {version}' 'Release : {release}' 'Architecture: {arch}' 'Size : {size}' 'Source RPM : {source}' 'Build Date : {mtime}' 'Summary : {summary}' 'Description :' '{description}' 'Provides :' '{provides}' ) src_str_template = ('{filename}' 'Name : {name}' 'Version : {version}' 'Release : {release}' 'Architecture: {arch}' 'Size : {size}' 'Build Date : {mtime}' 'Summary : {summary}' 'Description :' '{description}' ) def __init__(self, xml): self.finfo = etree.fromstring(xml) self.filename = self.finfo.get('filename', None) self.formatter = Formatter(self) @property def info(self): info = {'filename': [self.filename]} for item in self.finfo: info.setdefault(item.tag, []).append(item.text) for item, value in info.items(): if len(value) == 1: info[item] = value[0] # info[item] = '\n'.join(value) return info def __str__(self): return self.formatter.render() def __getattr__(self, attr): try: values = [value.text for value in self.finfo.findall(attr)] except Exception: values = [None] if len(values) == 1: return values[0] else: return values @property def datetime(self): if self.mtime: return datetime.fromtimestamp(int(self.mtime)) @property def xml(self): return etree.tostring(self.finfo) @property def is_pkg(self): if self.arch: return True else: return False @property def is_src(self): if self.source: return False else: return True @property def is_debug_info(self): if '-' in self.name: if self.name.split('-')[-1] in ('debuginfo', 'debugsource'): return True return False class ObsApi(object): default_xml = '' def __init__(self, apiurl=None): self.apiurl = apiurl or DEFAULTAPIURL self.source = ObsSourceApi(self.apiurl) self.build = ObsBuildApi(self.apiurl) self.source.retries = 3 self.source.verify_ssl = True self.build.retries = 3 self.build.verify_ssl = True self.lastapi = None def __xml2etree(self, xml): xml = xml.encode('utf-8') parser = etree.XMLParser(remove_blank_text=True) return etree.parse(BytesIO(xml), parser).xpath('.')[0] def get_meta(self, prj, pkg=None): return self.source.get_meta(prj, pkg=pkg) def put_meta(self, prj, pkg=None, xml=None): return self.source.put_meta(prj, pkg=pkg, xml=xml) def ls(self, prj=None, pkg=None, repo=None, arch=None): if repo and arch: return self.binaries_ls(prj, pkg, repo, arch) if prj and pkg: return self.package_ls(prj, pkg) return self.project_ls(prj) def project_ls(self, prj): xml = self.source.get(prj) if xml is None: return [] d = self.__xml2etree(xml) lsitems = [e.get('name') for e in d.findall('entry')] return lsitems def package_ls(self, prj, pkg): xml = self.source.get(prj, pkg=pkg) if xml is None: return (Directory(None, None, None, None), []) d = etree.fromstring(xml) directory = Directory(d.get('name'), d.get('rev'), d.get('vrev'), d.get('srcmd5') ) lsitems = [] for item in d.findall('entry'): lsitems.append(LSItem(item.get('name'), item.get('md5'), item.get('size'), item.get('mtime') ) ) return (directory, lsitems) def get_source_info(self, prj, pkg, rev=None): if rev is None: xml = self.source.get(prj, pkg=pkg, view='info') else: xml = self.source.get(prj, pkg=pkg, view='info', rev=rev) tree = etree.fromstring(xml) sinfo = SourceInfo(tree.get('package'), tree.get('rev'), tree.get('vrev'), tree.get('srcmd5'), tree.get('verifymd5') ) return sinfo def binaries_ls(self, prj, pkg, repo, arch): binaries = [] if pkg is None: xml = self.build.get_repository(prj, repo, arch) else: xml = self.build.get(prj, repo, arch, pkg) if xml: blist = etree.fromstring(xml) binaries = [Binary(filename=i.get('filename'), size=i.get('size'), mtime=i.get('mtime')) for i in blist.findall('binary')] return binaries def get_binary_fileinfo(self, prj, pkg, repo, arch, binary): xml = self.build.get(prj, repo, arch, pkg, binary, view='fileinfo') return FileInfo(xml) def get_project_repos(self, prj): xml = self.build.get(prj) directory = etree.fromstring(xml) entries = [] for entry in directory.findall('entry'): entries.append(entry.get('name')) return entries def get_package_version(self, prj, pkg, repo, arch, full=True): binaries = self.binaries_ls(prj, pkg, repo, arch) r_finfo = '' for item in binaries: if item.filename.endswith('.src.rpm'): r_finfo = self.get_binary_fileinfo(prj, pkg, repo, arch, item.filename ) break if r_finfo: finfo = etree.fromstring(r_finfo) version = finfo.find('version').text release = finfo.find('release').text if full: version = '%s-%s' % (version, release) else: version = '' return version def get_build_config(self, prj, repo): return self.build.get_buildconfig(prj, repo) def get_build_info(self, prj, pkg, repo, arch): return self.build.get_buildinfo(prj, pkg, repo, arch) def get_nothing_provides(self, prj, pkg, repo, arch): tree = etree.fromstring(self.get_build_info(prj, pkg, repo, arch)) errors = (tree.xpath('error') or [EMPTY])[0] nothing_provides = [] if errors is None or 'unresolvable' not in errors: return nothing_provides for unresolvable in [u.strip() for u in errors.split('unresolvable:')[1].split(',')]: if unresolvable.startswith('nothing provides'): nothing_provides.append(unresolvable.split( 'nothing provides ')[1]) return nothing_provides def get_vendor(self, prj, repo=None): """ Attempt to get the value of the %vendor macro if exists Search build configs from all repos. Take first occurance. """ if repo is not None: repos = [repo] else: repos = self.get_project_repos(prj) vendor_re = re.compile(r'.*vendor (.+?)\n') vendor = None for repo in repos: prj_config = self.get_build_config(prj, repo) matches = vendor_re.findall(prj_config) if matches: vendor = matches[-1] break return vendor def get_repo_flags(self, prj, pkg=None): return RepoFlags(self.get_meta(prj, pkg)) def locked(self, prj, pkg=None): meta = self.__xml2etree(self.get_meta(prj, pkg)) if meta.find('lock') is not None: return True return False def lock(self, prj, pkg=None, comment=None): if comment is None: comment = 'Lock' return self.source.post(prj, pkg, cmd='lock', comment=comment) def unlock(self, prj, pkg=None, comment=None): if comment is None: comment = 'Unlock' return self.source.post(prj, pkg, cmd='unlock', comment=comment) def get_users(self, prj, pkg=None, role=None): xml = self.get_meta(prj, pkg) if xml is None: return None if role is None: role = '.*' # regex to match all meta = self.__xml2etree(xml) persons = meta.xpath('.//person[re:test(@role, role)]', namespaces={'re': REGEX_NS}) return [User(userid=p.get('userid'), role=p.get('role')) for p in persons] def get_groups(self, prj, pkg=None, role=None): xml = self.get_meta(prj, pkg) if xml is None: return None if role is None: role = '.*' # regex to match all meta = self.__xml2etree(xml) groups = meta.xpath('.//group[re:test(@role, role)]', namespaces={'re': REGEX_NS}) return [Group(groupid=g.get('groupid'), role=g.get('role')) for g in groups] def add_user(self, user, prj, pkg=None): try: userid = user.userid except Exception as e: raise Exception(e) try: role = user.role except Exception as e: raise Exception(e) xml = self.get_meta(prj, pkg) if xml is None: raise Exception('unable to retrieve project meta') meta = self.__xml2etree(xml) etree.SubElement(meta, 'person', userid=userid, role=role) return self.put_meta(prj, pkg, etree.tostring(meta)) def add_group(self, group, prj, pkg=None): try: groupid = group.groupid except Exception as e: raise Exception(e) try: role = group.role except Exception as e: raise Exception(e) xml = self.get_meta(prj, pkg) if xml is None: raise Exception('unable to retrieve project meta') meta = self.__xml2etree(xml) etree.SubElement(meta, 'group', groupid=groupid, role=role) return self.put_meta(prj, pkg, etree.tostring(meta)) def get_spec_files(self, prj, pkg=None): if pkg is not None: pkgs = [pkg] else: pkgs = self.ls(prj) for pkg in pkgs: d, flist = self.ls(prj, pkg) for srcfile in flist: if srcfile.name.endswith('.spec'): yield self.source.get(prj, pkg, srcfile.name) def __flag_to_params(self, flag): try: flag_type = getattr(flag, 'flag', flag.get('flag', None)) if flag_type is None: raise Exception('Invalid flag type: None') except AttributeError: raise Exception('flag type %s missing get function' % type(flag)) status = getattr(flag, 'status', flag.get('status', None)) if status not in ['enable', 'disable']: raise ValueError('flag status expected "enable" or "disable" got %s' % status) return dict(flag=flag_type, status=status, repository=getattr(flag, 'repository', flag.get('repository', None)), arch=getattr(flag, 'arch', flag.get('arch', None)), ) def parse_status_code(self, xml): status_code = etree.fromstring(xml) code = status_code.get('code', None) summary = status_code.find('summary') if summary is not None: summary = summary.text return (code, summary) def __check_status(self, r): if r.status_code == 400: code, summary = self.parse_status(r.text) msg = '{}: {}'.format(code.replace('_', ' ').capitalize(), summary) raise Exception(msg) return r def set_flag(self, flag, prj, pkg=None): params = self.__flag_to_params(flag) params['cmd'] = 'set_flag' r = self.source.post(prj, pkg, **params) try: return self.__check_status(r) except Exception as e: raise e def remove_flag(self, flag, prj, pkg=None): params = self.__flag_to_params(flag) params['cmd'] = 'remove_flag' r = self.source.post(prj, pkg, **params) try: return self.__check_status(r) except Exception as e: raise e