# -*- coding: utf-8 -*- # import re from lxml import etree from collections import namedtuple from obsapi import helpers from obsapi.sourceapi import ObsSourceApi from obsapi.buildapi import ObsBuildApi from obsapi.repoflags import RepoFlags from obsapi.fileinfo import FileInfo LSItem = namedtuple('LSItem', 'name md5 size mtime') Directory = namedtuple('Directory', 'name rev vrev srcmd5') SourceInfo = namedtuple('SourceInfo', 'package rev vrev srcmd5 verifymd5') Binary = namedtuple('Binary', 'filename size mtime') User = namedtuple('User', 'userid role') Group = namedtuple('Group', 'groupid role') REGEX_NS = 'http://exslt.org/regular-expressions' DEFAULTAPIURL = 'https://api.opensuse.org' emptyxml = '' EMPTYTREE = etree.fromstring(emptyxml) EMPTY = EMPTYTREE.xpath('.')[0] xml2etree = helpers.xml2etree as_string = helpers.as_string class ObsApi(object): default_xml = '' def __init__(self, apiurl=None): self.apiurl = apiurl or DEFAULTAPIURL self.source = ObsSourceApi(self.apiurl) self.build = ObsBuildApi(self.apiurl) self.source.retries = 3 self.source.verify_ssl = True self.build.retries = 3 self.build.verify_ssl = True self.lastapi = None def get_meta(self, prj, pkg=None): return self.source.get_meta(prj, pkg=pkg) def put_meta(self, prj, pkg=None, xml=None): return self.source.put_meta(prj, pkg=pkg, xml=xml) def ls(self, prj=None, pkg=None, repo=None, arch=None): if repo and arch: return self.binaries_ls(prj, pkg, repo, arch) if prj and pkg: return self.package_ls(prj, pkg) return self.project_ls(prj) def project_ls(self, prj): xml = self.source.get(prj) if xml is None: return [] d = xml2etree(xml) lsitems = [e.get('name') for e in d.findall('entry')] return lsitems def package_ls(self, prj, pkg): xml = self.source.get(prj, pkg=pkg) if xml is None: return (Directory(None, None, None, None), []) d = xml2etree(xml) directory = Directory(d.get('name'), d.get('rev'), d.get('vrev'), d.get('srcmd5') ) lsitems = [] for item in d.findall('entry'): lsitems.append(LSItem(item.get('name'), item.get('md5'), item.get('size'), item.get('mtime') ) ) return (directory, lsitems) def get_source_info(self, prj, pkg, rev=None): if rev is None: xml = self.source.get(prj, pkg=pkg, view='info') else: xml = self.source.get(prj, pkg=pkg, view='info', rev=rev) tree = etree.fromstring(xml) sinfo = SourceInfo(tree.get('package'), tree.get('rev'), tree.get('vrev'), tree.get('srcmd5'), tree.get('verifymd5') ) return sinfo def binaries_ls(self, prj, pkg, repo, arch): binaries = [] if pkg is None: xml = self.build.get_repository(prj, repo, arch) else: xml = self.build.get(prj, repo, arch, pkg) if xml: blist = etree.fromstring(xml) binaries = [Binary(filename=i.get('filename'), size=i.get('size'), mtime=i.get('mtime')) for i in blist.findall('binary')] return binaries def get_binary_fileinfo(self, prj, pkg, repo, arch, binary): xml = self.build.get(prj, repo, arch, pkg, binary, view='fileinfo') return FileInfo(xml) def get_project_repos(self, prj): xml = self.build.get(prj) directory = etree.fromstring(xml) entries = [] for entry in directory.findall('entry'): entries.append(entry.get('name')) return entries def get_package_version(self, prj, pkg, repo, arch, full=True): binaries = self.binaries_ls(prj, pkg, repo, arch) finfo = None for item in binaries: if item.filename.endswith('.src.rpm'): finfo = self.get_binary_fileinfo(prj, pkg, repo, arch, item.filename ) break if finfo: version = finfo.version if full: version = '%s-%s' % (version, finfo.release) else: version = '' return version def get_package_source_version(self, prj, pkg, full=True): spec_version_re = re.compile(r'Version:\s*(.+?)[\s]') spec_release_re = re.compile(r'Release:\s*(.+?)[\s]') spec_content = '' spec_file = None spec_files = self.get_spec_files(prj, pkg, filenames=True) if len(list(spec_files)) == 1: spec_file = spec_files.next else: for sf in self.get_spec_files(prj, pkg, filenames=True): if sf.startswith(pkg): spec_file = sf if not spec_file: return '' spec_content = as_string(self.source.get(prj, pkg, spec_file)) try: version = spec_version_re.search(spec_content).group(1) release = spec_release_re.search(spec_content).group(1) if full: version = '%s-%s' % (version, release) except Exception: version = '' return version def get_build_config(self, prj, repo): return self.build.get_buildconfig(prj, repo) def get_build_info(self, prj, pkg, repo, arch): return self.build.get_buildinfo(prj, pkg, repo, arch) def get_nothing_provides(self, prj, pkg, repo, arch): tree = etree.fromstring(self.get_build_info(prj, pkg, repo, arch)) errors = (tree.xpath('error') or [EMPTY])[0] nothing_provides = [] if errors is None or 'unresolvable' not in errors: return nothing_provides for unresolvable in [u.strip() for u in errors.split('unresolvable:')[1].split(',')]: if unresolvable.startswith('nothing provides'): nothing_provides.append(unresolvable.split( 'nothing provides ')[1]) return nothing_provides def get_vendor(self, prj, repo=None): """ Attempt to get the value of the %vendor macro if exists Search build configs from all repos. Take first occurance. """ if repo is not None: repos = [repo] else: repos = self.get_project_repos(prj) vendor_re = re.compile(r'.*vendor (.+?)\n') vendor = None for repo in repos: prj_config = self.get_build_config(prj, repo) matches = vendor_re.findall(prj_config) if matches: vendor = matches[-1] break return vendor def get_repo_flags(self, prj, pkg=None): return RepoFlags(self.get_meta(prj, pkg)) def locked(self, prj, pkg=None): meta = xml2etree(self.get_meta(prj, pkg)) if meta.find('lock') is not None: return True return False def lock(self, prj, pkg=None, comment=None): if comment is None: comment = 'Lock' return self.source.post(prj, pkg, cmd='lock', comment=comment) def unlock(self, prj, pkg=None, comment=None): if comment is None: comment = 'Unlock' return self.source.post(prj, pkg, cmd='unlock', comment=comment) def get_users(self, prj, pkg=None, role=None): xml = self.get_meta(prj, pkg) if xml is None: return None if role is None: role = '.*' # regex to match all meta = xml2etree(xml) persons = meta.xpath('.//person[re:test(@role, role)]', namespaces={'re': REGEX_NS}) return [User(userid=p.get('userid'), role=p.get('role')) for p in persons] def get_groups(self, prj, pkg=None, role=None): xml = self.get_meta(prj, pkg) if xml is None: return None if role is None: role = '.*' # regex to match all meta = xml2etree(xml) groups = meta.xpath('.//group[re:test(@role, role)]', namespaces={'re': REGEX_NS}) return [Group(groupid=g.get('groupid'), role=g.get('role')) for g in groups] def add_user(self, user, prj, pkg=None): try: userid = user.userid except Exception as e: raise Exception(e) try: role = user.role except Exception as e: raise Exception(e) xml = self.get_meta(prj, pkg) if xml is None: raise Exception('unable to retrieve project meta') meta = xml2etree(xml) etree.SubElement(meta, 'person', userid=userid, role=role) return self.put_meta(prj, pkg, etree.tostring(meta)) def add_group(self, group, prj, pkg=None): try: groupid = group.groupid except Exception as e: raise Exception(e) try: role = group.role except Exception as e: raise Exception(e) xml = self.get_meta(prj, pkg) if xml is None: raise Exception('unable to retrieve project meta') meta = xml2etree(xml) etree.SubElement(meta, 'group', groupid=groupid, role=role) return self.put_meta(prj, pkg, etree.tostring(meta)) def get_spec_files(self, prj, pkg=None, filenames=False): if pkg is not None: pkgs = [pkg] else: pkgs = self.ls(prj) for pkg in pkgs: d, flist = self.ls(prj, pkg) for srcfile in flist: if srcfile.name.endswith('.spec'): if filenames: yield srcfile.name else: yield self.source.get(prj, pkg, srcfile.name) def __flag_to_params(self, flag): try: flag_type = getattr(flag, 'flag', flag.get('flag', None)) if flag_type is None: raise Exception('Invalid flag type: None') except AttributeError: raise Exception('flag type %s missing get function' % type(flag)) status = getattr(flag, 'status', flag.get('status', None)) if status not in ['enable', 'disable']: raise ValueError('flag status expected "enable" or "disable" got %s' % status) return dict(flag=flag_type, status=status, repository=getattr(flag, 'repository', flag.get('repository', None)), arch=getattr(flag, 'arch', flag.get('arch', None)), ) def parse_status_code(self, xml): status_code = etree.fromstring(xml) code = status_code.get('code', None) summary = status_code.find('summary') if summary is not None: summary = summary.text return (code, summary) def __check_status(self, r): if r.status_code == 400: code, summary = self.parse_status_code(r.text) msg = '{}: {}'.format(code.replace('_', ' ').capitalize(), summary) raise Exception(msg) return r def set_flag(self, flag, prj, pkg=None): params = self.__flag_to_params(flag) params['cmd'] = 'set_flag' r = self.source.post(prj, pkg, **params) try: return self.__check_status(r) except Exception as e: raise e def remove_flag(self, flag, prj, pkg=None): params = self.__flag_to_params(flag) params['cmd'] = 'remove_flag' r = self.source.post(prj, pkg, **params) try: return self.__check_status(r) except Exception as e: raise e