# -*- coding: utf-8 -*- # from io import BytesIO from lxml import etree from collections import namedtuple from datetime import datetime from obsapi.sourceapi import ObsSourceApi from obsapi.buildapi import ObsBuildApi from obsapi.formatter import Formatter LSItem = namedtuple('LSItem', 'name md5 size mtime') Directory = namedtuple('Directory', 'name rev vrev srcmd5') SourceInfo = namedtuple('SourceInfo', 'package rev vrev srcmd5 verifymd5') Binary = namedtuple('Binary', 'filename size mtime') DEFAULTAPIURL = 'https://api.opensuse.org' emptyxml = '' EMPTYTREE = etree.fromstring(emptyxml) EMPTY = EMPTYTREE.xpath('.')[0] class FileInfo(object): file_str_template = ("{filename}" "size : {size}" "mtime : {mtime}" ) pkg_str_template = ("{filename}" "Name : {name}" "Version : {version}" "Release : {release}" "Architecture: {arch}" "Size : {size}" "Source RPM : {source}" "Build Date : {mtime}" "Summary : {summary}" "Description :" "{description}" "Provides :" "{provides}" ) src_str_template = ("{filename}" "Name : {name}" "Version : {version}" "Release : {release}" "Architecture: {arch}" "Size : {size}" "Build Date : {mtime}" "Summary : {summary}" "Description :" "{description}" ) def __init__(self, xml): self.finfo = etree.fromstring(xml) self.filename = self.finfo.get('filename', None) self.formatter = Formatter(self) @property def info(self): info = {'filename': [self.filename]} for item in self.finfo: info.setdefault(item.tag, []).append(item.text) for item, value in info.items(): if len(value) == 1: info[item] = value[0] # info[item] = '\n'.join(value) return info def __str__(self): return self.formatter.render() def __getattr__(self, attr): try: values = [value.text for value in self.finfo.findall(attr)] except: values = [None] if len(values) == 1: return values[0] else: return values @property def datetime(self): if self.mtime: return datetime.fromtimestamp(int(self.mtime)) @property def xml(self): return etree.tostring(self.finfo) @property def is_pkg(self): if self.arch: return True else: return False @property def is_src(self): if self.source: return False else: return True @property def is_debug_info(self): if '-' in self.name: if self.name.split('-')[-1] in ('debuginfo', 'debugsource'): return True return False class ObsApi(object): default_xml = '' def __init__(self, apiurl=None): self.apiurl = apiurl or DEFAULTAPIURL self.source = ObsSourceApi(self.apiurl) self.build = ObsBuildApi(self.apiurl) self.source.retries = 3 self.source.verify_ssl = True self.build.retries = 3 self.build.verify_ssl = True self.lastapi = None def __xml2etree(self, xml): xml = xml.encode('utf-8') parser = etree.XMLParser(remove_blank_text=True) return etree.parse(BytesIO(xml), parser).xpath('.')[0] def get_meta(self, prj, pkg=None): return self.source.get_meta(prj, pkg=pkg) def put_meta(self, prj, pkg=None, xml=None): return self.source.put_meta(prj, pkg=pkg, xml=xml) def ls(self, prj=None, pkg=None, repo=None, arch=None): if repo and arch: return self.binaries_ls(prj, pkg, repo, arch) if prj and pkg: return self.package_ls(prj, pkg) if prj: return self.project_ls(prj) return None def project_ls(self, prj): xml = self.source.get(prj) d = etree.fromstring(xml) lsitems = [e.get('name') for e in d.findall('entry')] return lsitems def package_ls(self, prj, pkg): xml = self.source.get(prj, pkg=pkg) if xml is None: return (Directory(None, None, None, None), []) d = etree.fromstring(xml) directory = Directory(d.get('name'), d.get('rev'), d.get('vrev'), d.get('srcmd5') ) lsitems = [] for item in d.findall('entry'): lsitems.append(LSItem(item.get('name'), item.get('md5'), item.get('size'), item.get('mtime') ) ) return (directory, lsitems) def get_source_info(self, prj, pkg, rev=None): if rev is None: xml = self.source.get(prj, pkg=pkg, view='info') else: xml = self.source.get(prj, pkg=pkg, view='info', rev=rev) tree = etree.fromstring(xml) sinfo = SourceInfo(tree.get('package'), tree.get('rev'), tree.get('vrev'), tree.get('srcmd5'), tree.get('verifymd5') ) return sinfo def binaries_ls(self, prj, pkg, repo, arch): binaries = [] xml = self.build.get(prj, repo, arch, pkg) if xml: blist = etree.fromstring(xml) binaries = [Binary(filename=i.get('filename'), size=i.get('size'), mtime=i.get('mtime')) for i in blist.findall('binary')] return binaries def get_binary_fileinfo(self, prj, pkg, repo, arch, binary): xml = self.build.get(prj, repo, arch, pkg, binary, view='fileinfo') return FileInfo(xml) def get_project_repos(self, prj): xml = self.build.get(prj) directory = etree.fromstring(xml) entries = [] for entry in directory.findall('entry'): entries.append(entry.get('name')) return entries def get_package_version(self, prj, pkg, repo, arch, full=True): binaries = self.binaries_ls(prj, pkg, repo, arch) r_finfo = '' for item in binaries: if item.filename.endswith('.src.rpm'): r_finfo = self.get_binary_fileinfo(prj, pkg, repo, arch, item.filename ) break if r_finfo: finfo = etree.fromstring(r_finfo) version = finfo.find('version').text release = finfo.find('release').text if full: version = '%s-%s' % (version, release) else: version = '' return version def get_build_config(self, prj, repo): return self.build.get_buildconfig(prj, repo) def get_build_info(self, prj, pkg, repo, arch): return self.build.get_buildinfo(prj, pkg, repo, arch) def get_nothing_provides(self, prj, pkg, repo, arch): tree = etree.fromstring(self.get_build_info(prj, pkg, repo, arch)) errors = (tree.xpath('error') or [EMPTY])[0] nothing_provides = [] if errors is None or 'unresolvable' not in errors: return nothing_provides for unresolvable in [u.strip() for u in errors.split('unresolvable:')[1].split(',')]: if unresolvable.startswith('nothing provides'): nothing_provides.append(unresolvable.split( 'nothing provides ')[1]) return nothing_provides def get_vendor(self, prj, repo=None): ''' Attempt to get the value of the %vendor macro if exists Search build configs from all repos. Take first occurance. ''' if repo is not None: repos = [repo] else: repos = self.get_project_repos(prj) vendor = None for repo in repos: for line in self.get_build_config(prj, repo).splitlines(): if line.strip().startswith('%vendor '): vendor = line.split(' ', 1)[1] # We take the first occurance if vendor is not None: break return vendor def locked(self, prj, pkg=None): meta = self.__xml2etree(self.get_meta(prj, pkg)) if meta.find('lock') is not None: return True return False def lock(self, prj, pkg=None): xml = self.get_meta(prj, pkg) meta = self.__xml2etree(xml) if meta.tag not in ['project', 'package']: if pkg: raise Exception('Failed to lock package. %s/%s' % (prj, pkg)) else: raise Exception('Failed to lock project. %s' % prj) if self.locked(prj, pkg): return 'Already Locked' lock = etree.SubElement(meta, 'lock') etree.SubElement(lock, 'enable') r = self.put_meta(prj, pkg, etree.tostring(meta)) return r def unlock(self, prj, pkg=None, comment=None): if not self.locked(prj, pkg): return comment = comment or "Unlock" self.source.post(prj, pkg, cmd="unlock", comment="Unlock")