import requests from requests.auth import HTTPBasicAuth from lxml import etree from collections import namedtuple try: import osc.conf as osc_conf osc_conf.get_config() except: osc_conf = None LSItem = namedtuple('LSItem', 'name md5 size mtime') Directory = namedtuple('Directory', 'name rev, vrev, srcmd5') Binary = namedtuple('Binary', 'filename size mtime') DEFAULTAPIURL = 'https://api.opensuse.org' class FileInfo(object): attributes = ['name', 'version', 'release', 'arch', 'summary', 'description', 'source', 'size', 'mtime', 'provides', 'requires', ] pkg_str_template = ("{filename}" "Name : {name}" "Version : {version}" "Release : {release}" "Architecture: {arch}" "Size : {size}" "Source RPM : {source}" "Build Date : {mtime}" "Summary : {summary}" "Description :" "{description}" ) src_str_template = ("{filename}" "Name : {name}" "Version : {version}" "Release : {release}" "Architecture: {arch}" "Size : {size}" "Build Date : {mtime}" "Summary : {summary}" "Description :" "{description}" ) file_str_template = ("{filename}" "size : {size}" "mtime : {mtime}" ) def __init__(self, xml): finfo = etree.fromstring(xml) self.filename = finfo.get('filename') for attr in self.attributes: if attr in ('provides', 'requires'): try: self.set_list_attr(attr, finfo.findall(attr)) except: self.set_attr(attr, []) continue try: value = finfo.find(attr).text self.set_attr(attr, value) except: self.set_attr(attr, '') def __str__(self): opts = dict(filename=self.filename, name=self.name, version=self.version, release=self.release, arch=self.arch, size=self.size, source=self.source, mtime=self.mtime, summary=self.summary, description=self.description, ) if self.arch == 'src': return self.src_str_template.format(**opts) elif self.source: return self.pkg_str_template.format(**opts) else: return self.file_str_template.format(**opts) def set_list_attr(self, attr, elements): items = [element.text for element in elements] setattr(attr, items) def set_attr(self, attr, value): setattr(self, attr, value) @property def is_pkg(self): if self.arch: return True @property def is_src(self): if self.source: return False else: return True @property def is_debug_info(self): if '-' in self.name: if self.name.split('-')[-1] in ('debuginfo', 'debugsource'): return True return False class ObsApi(object): def __init__(self, apiurl=None): self.apiurl = apiurl or DEFAULTAPIURL self.__get_auth() def __get_auth(self): if osc_conf: try: conf = osc_conf.get_apiurl_api_host_options(self.apiurl) except: conf = {} user = conf.get('user', None) password = conf.get('pass', None) if user and password: self.auth = HTTPBasicAuth(user, password) else: self.auth = None def __api_get(self, api, payload=None): url = '{0}/{1}'.format(self.apiurl, api) r = requests.get(url, auth=self.auth, params=payload) return r def get_package_meta(self, prj, pkg): api = '/source/{}/{}/_meta'.format(prj, pkg) r = self.__api_get(api) return r.text def ls(self, prj=None, pkg=None): if prj and pkg: return self.package_ls(prj, pkg) if prj: return self.project_ls(prj) return None def project_ls(self, prj): api = '/source/{}'.format(prj) r = self.__api_get(api) d = etree.fromstring(r.text) lsitems = [e.get('name') for e in d.findall('entry')] return lsitems def package_ls(self, prj, pkg): api = '/source/{}/{}'.format(prj, pkg) r = self.__api_get(api) d = etree.fromstring(r.text) directory = Directory(d.get('name'), d.get('rev'), d.get('vrev'), d.get('srcmd5') ) lsitems = [] for item in d.findall('entry'): lsitems.append(LSItem(item.get('name'), item.get('md5'), item.get('size'), item.get('mtime') ) ) return (directory, lsitems) def get_binaries(self, prj, pkg, repo, arch): api = '/build/{}/{}/{}/{}'.format(prj, repo, arch, pkg) r = self.__api_get(api) blist = etree.fromstring(r.text) binaries = [Binary(filename=i.get('filename'), size=i.get('size'), mtime=i.get('mtime')) for i in blist.findall('binary')] return binaries def get_binary_fileinfo(self, prj, pkg, repo, arch, binary): payload = dict(view='fileinfo') api = '/build/{}/{}/{}/{}/{}'.format(prj, repo, arch, pkg, binary) r = self.__api_get(api, payload) return FileInfo(r.text) def get_project_repos(self, prj): api = '/build/{}'.format(prj) r = self.__api_get(api) directory = etree.fromstring(r.text) entries = [] for entry in directory.findall('entry'): entries.append(entry.get('name')) return entries def get_package_version(self, prj, pkg, repo, arch, full=True): binaries = self.get_binaries(prj, pkg, repo, arch) r_finfo = '' for item in binaries: if item.filename.endswith('.src.rpm'): r_finfo = self.get_binary_fileinfo(prj, pkg, repo, arch, item.filename ) break if r_finfo: finfo = etree.fromstring(r_finfo) version = finfo.find('version').text release = finfo.find('release').text if full: version = '%s-%s' % (version, release) else: version = '' return version def get_build_config(self, prj, repo): api = '/build/{}/{}/_buildconfig'.format(prj, repo) r = self.__api_get(api) return r.text def get_vendor(self, prj, repo=None): repo = repo or self.get_project_repos(prj)[0] vendor = None for line in self.get_build_config(prj, repo).splitlines(): if line.strip().startswith('%vendor '): vendor = line.split(' ', 1)[1] return vendor