import re import pytest import os from . import test_dir from obsapi import ObsApi from lxml import etree try: from urllib.parse import urlparse from urllib.parse import parse_qs except Exception: from urlparse import urlparse from urlparse import parse_qs apiurl = 'https://api.opensuse.org' api = ObsApi(apiurl=apiurl) prj = 'home:sbahling:obsapi:test' pkg = 'suse-hello-1.0' repo = 'SLE_12_SP3' arch = 'x86_64' prj_meta_re = re.compile(r'/source/.*/_meta') pkg_meta_re = re.compile(r'/source/.*/.*/_meta') @pytest.fixture def spec_file_1(): with open(os.path.join(test_dir, 'spec_file_1.spec'), 'rb') as f: return f.read() @pytest.fixture def spec_file_2(): with open(os.path.join(test_dir, 'spec_file_2.spec'), 'rb') as f: return f.read() @pytest.fixture def spec_file_3(): with open(os.path.join(test_dir, 'spec_file_3.spec'), 'rb') as f: return f.read() @pytest.fixture def project_config(): with open(os.path.join(test_dir, 'project_config.txt')) as f: return f.read() @pytest.fixture def binary_fileinfo(): with open(os.path.join(test_dir, 'binary_fileinfo.xml'), 'rb') as f: return f.read() @pytest.fixture def source_info(): return """ suse-hello.spec """ @pytest.fixture def repo_directory(): return """ """ @pytest.fixture def project_directory(): return """ """ @pytest.fixture def package_directory(): return """ """ @pytest.fixture def binarylist(): return """ """ @pytest.fixture def project_meta(): return """ Test project for obsapi unit tests x86_64 x86_64 """ @pytest.fixture def locked_project_meta(): return """ Test project for obsapi unit tests x86_64 x86_64 """ @pytest.fixture def package_meta(): return """ Example from Kernel Module Packages Manual """ @pytest.fixture def locked_package_meta(): return """ Example from Kernel Module Packages Manual """ def test_get_project_meta(requests_mock): matcher = prj_meta_re text = '' requests_mock.get(matcher, text=text) response = api.get_meta(prj=prj) assert response == text def test_get_package_meta(requests_mock): matcher = prj_meta_re text = '' requests_mock.get(matcher, text=text) response = api.get_meta(prj=prj, pkg=pkg).strip() assert response == text def test_ls_prj(requests_mock): matcher = re.compile('/source/.*') text = '' requests_mock.get(matcher, text=text) assert api.ls(prj=prj) == ['package'] def test_ls_pkg(requests_mock, package_directory): matcher = re.compile('/source/.*/.*') requests_mock.get(matcher, text=package_directory) directory, listing = api.ls(prj=prj, pkg=pkg) dir_re = re.compile('(name=".+?").*(rev=".+?").*(vrev=".+?").*(srcmd5=".+?")') expected_directory = ' '.join(dir_re.findall(package_directory)[0]) directory = 'name="{0.name}" rev="{0.rev}" vrev="{0.vrev}" srcmd5="{0.srcmd5}"'.format(directory) assert directory == expected_directory entry_re = re.compile('(name=".+?").*(md5=".+?").*(size=".+?").*(mtime=".+?")') expected_entries = [' '.join(entry) for entry in entry_re.findall(package_directory)] entries = ['name="{0.name}" md5="{0.md5}" size="{0.size}" mtime="{0.mtime}"'.format(entry) for entry in listing] for item in expected_entries: assert item in entries entries.remove(item) assert entries == [] def binarylist_test(xml, listing): entry_re = re.compile('(filename=".+?").*(size=".+?").*(mtime=".+?")') expected_entries = [' '.join(entry) for entry in entry_re.findall(xml)] entries = ['filename="{0.filename}" size="{0.size}" mtime="{0.mtime}"'.format(entry) for entry in listing] for item in expected_entries: assert item in entries entries.remove(item) assert entries == [] def test_ls_binaries(requests_mock, binarylist): matcher = re.compile('/build/.*/.*/.*/.*') requests_mock.get(matcher, text=binarylist) listing = api.ls(prj=prj, pkg=pkg, repo=repo, arch=arch) binarylist_test(binarylist, listing) def test_binaries_ls(requests_mock, binarylist): matcher = re.compile('/build/.*/.*/.*/.*') requests_mock.get(matcher, text=binarylist) listing = api.binaries_ls(prj=prj, pkg=pkg, repo=repo, arch=arch) binarylist_test(binarylist, listing) def test_binaries_ls_project(requests_mock, binarylist): matcher = re.compile('/build/.*/.*/.*/_repository') requests_mock.get(matcher, text=binarylist) listing = api.binaries_ls(prj=prj, pkg=None, repo=repo, arch=arch) binarylist_test(binarylist, listing) def test_get_project_repos(requests_mock, repo_directory): matcher = re.compile('/build/.*') requests_mock.get(matcher, text=repo_directory) repos = api.get_project_repos(prj) entry_re = re.compile('name="(.+?)"') expected_entries = [entry for entry in entry_re.findall(repo_directory)] for entry in expected_entries: assert entry in repos repos.remove(entry) assert repos == [] @pytest.mark.parametrize('repository', (None, 'SLE_12_SP3')) def test_get_vendor(requests_mock, repo_directory, project_config, repository): matcher = re.compile('/build/.*') requests_mock.get(matcher, text=repo_directory) matcher = re.compile('/build/.*/.*/_buildconfig') requests_mock.get(matcher, text=project_config) vendor_re = re.compile(r'.*%vendor (.+?)\n') expected_vendor = vendor_re.findall(project_config)[-1] if repository is None: vendor = api.get_vendor(prj=prj) else: vendor = api.get_vendor(prj=prj, repo=repository) assert vendor == expected_vendor @pytest.mark.parametrize('rev', (None, '1')) def test_source_info(requests_mock, source_info, rev): if rev is None: matcher = re.compile('/source/.*/.*?view=info') requests_mock.get(matcher, text=source_info) sinfo = api.get_source_info(prj, pkg) else: matcher = re.compile('/source/.*/.*?(&?view=info|&?rev=1){2}') requests_mock.get(matcher, text=source_info) sinfo = api.get_source_info(prj, pkg, rev) expected_sinfo = etree.fromstring(source_info) assert sinfo.package == expected_sinfo.get('package', None) assert sinfo.rev == expected_sinfo.get('rev', None) assert sinfo.vrev == expected_sinfo.get('vrev', None) assert sinfo.srcmd5 == expected_sinfo.get('srcmd5', None) assert sinfo.verifymd5 == expected_sinfo.get('verifymd5', None) def test_get_binary_fileinfo(requests_mock, binary_fileinfo): matcher = re.compile('/build/.*/.*/.*/.*/.*?view=fileinfo') requests_mock.get(matcher, content=binary_fileinfo) binary = 'suse-hello-1.0-1.1.src.rpm' expected_bfinfo = etree.fromstring(binary_fileinfo) bfinfo = api.get_binary_fileinfo(prj, pkg, repo, arch, binary) assert bfinfo.name == expected_bfinfo.find('name').text assert bfinfo.size == expected_bfinfo.find('size').text assert bfinfo.mtime == expected_bfinfo.find('mtime').text assert bfinfo.arch == expected_bfinfo.find('arch').text assert bfinfo.version == expected_bfinfo.find('version').text assert bfinfo.release == expected_bfinfo.find('release').text assert bfinfo.summary == expected_bfinfo.find('summary').text assert bfinfo.description == expected_bfinfo.find('description').text assert bfinfo.provides == [p.text for p in expected_bfinfo.findall('provides')] assert bfinfo.requires == [p.text for p in expected_bfinfo.findall('requires')] def test_get_package_version(requests_mock, binarylist, binary_fileinfo): matcher = re.compile('/build/.*/.*/.*/.*') requests_mock.get(matcher, text=binarylist) matcher = re.compile('/build/.*/.*/.*/.*/.*?view=fileinfo') requests_mock.get(matcher, content=binary_fileinfo) root = etree.fromstring(binary_fileinfo.decode('utf-8')) expected_version = root.find('version').text expected_release = root.find('release').text full_version = api.get_package_version(prj, pkg, repo, arch) assert full_version == '{}-{}'.format(expected_version, expected_release) version = api.get_package_version(prj, pkg, repo, arch, full=False) assert version == expected_version def test_get_project_flags(requests_mock, project_meta): matcher = prj_meta_re requests_mock.get(matcher, text=project_meta) repoflags = api.get_repo_flags(prj) assert repoflags.flag_types == set() def test_get_package_repo_flags(requests_mock, package_meta): matcher = pkg_meta_re requests_mock.get(matcher, text=package_meta) repoflags = api.get_repo_flags(prj, pkg) assert repoflags.flag_types == {'build', 'debuginfo', 'useforbuild'} meta = etree.fromstring(package_meta) for element in meta: for sub in element: if sub.tag in ['enable', 'disable']: flag = element.tag status = sub.tag repository = sub.get('repository', None) arch = sub.get('arch', None) match = False for flag in repoflags.get_flag(flag): if flag.status == status and flag.repository == repository and flag.arch == arch: match = True assert match @pytest.mark.parametrize('target', ('project', 'package')) @pytest.mark.parametrize('comment', (None, 'Comment')) def test_lock(requests_mock, target, comment): if comment is None: message = 'Lock' else: message = comment if target == 'project': matcher = re.compile(r'/source/.*?(&?cmd=lock|&?comment=%s){2}' % message.replace(' ', '\\+')) else: matcher = re.compile(r'/source/.*/.*?(&?cmd=lock|&?comment=%s){2}' % message.replace(' ', '\\+')) requests_mock.post(matcher) if comment is None: if target == 'project': response = api.lock(prj) else: response = api.lock(prj, pkg) else: if target == 'project': response = api.lock(prj, comment=comment) else: response = api.lock(prj, pkg, comment=comment) assert response.status_code == 200 @pytest.mark.parametrize('target', ('project', 'package')) @pytest.mark.parametrize('comment', (None, 'comment')) def test_unlock(requests_mock, target, comment): if comment is None: message = 'Unlock' else: message = comment if target == 'project': matcher = re.compile(r'/source/.*?(&?cmd=unlock|&?comment=%s){2}' % message.replace(' ', '\\+')) else: matcher = re.compile(r'/source/.*/.*?(&?cmd=unlock|&?comment=%s){2}' % message.replace(' ', '\\+')) requests_mock.post(matcher) if comment is None: if target == 'project': response = api.unlock(prj) else: response = api.unlock(prj, pkg) else: if target == 'project': response = api.unlock(prj, comment=comment) else: response = api.unlock(prj, pkg, comment=comment) assert response.status_code == 200 def test_project_locked(requests_mock, project_meta, locked_project_meta): matcher = prj_meta_re requests_mock.get(matcher, text=locked_project_meta) assert api.locked(prj) is True requests_mock.get(matcher, text=project_meta) assert api.locked(prj) is False def test_package_locked(requests_mock, package_meta, locked_package_meta): matcher = re.compile(r'/source/.*/.*/_meta') requests_mock.get(matcher, text=locked_package_meta) assert api.locked(prj, pkg) is True requests_mock.get(matcher, text=package_meta) assert api.locked(prj, pkg) is False @pytest.mark.parametrize('target', ('project', 'package')) def test_get_users(requests_mock, target, project_meta, package_meta): if target == 'project': matcher = prj_meta_re requests_mock.get(matcher, text=project_meta) users = api.get_users(prj) meta = etree.fromstring(project_meta) else: matcher = pkg_meta_re requests_mock.get(matcher, text=package_meta) users = api.get_users(prj, pkg) meta = etree.fromstring(package_meta) expected_users = meta.findall('person') matches = 0 for expected_user in expected_users: for user in users: if user.userid == expected_user.get('userid') and user.role == expected_user.get('role'): matches += 1 assert len(expected_users) == len(users) == matches @pytest.mark.parametrize('target', ('project', 'package')) def test_get_groups(requests_mock, target, project_meta, package_meta): if target == 'project': matcher = prj_meta_re requests_mock.get(matcher, text=project_meta) groups = api.get_groups(prj) meta = etree.fromstring(project_meta) else: matcher = pkg_meta_re requests_mock.get(matcher, text=package_meta) groups = api.get_groups(prj, pkg) meta = etree.fromstring(package_meta) expected_groups = meta.findall('group') matches = 0 for expected_group in expected_groups: for group in groups: if group.groupid == expected_group.get('groupid') and group.role == expected_group.get('role'): matches += 1 assert len(expected_groups) == len(groups) == matches def test_get_spec_files(requests_mock, package_directory, spec_file_1): matcher = re.compile('/source/.*/.*') requests_mock.get(matcher, text=package_directory) matcher = re.compile('/source/.*/.*/.*') requests_mock.get(matcher, content=spec_file_1) for specfile in api.get_spec_files(prj, pkg): assert specfile == spec_file_1 def test_get_project_spec_files(requests_mock, project_directory, package_directory, spec_file_1, spec_file_2, spec_file_3): matcher = re.compile(r'/source/.*') requests_mock.get(matcher, text=project_directory) matcher = re.compile(r'/source/.*/.*') requests_mock.get(matcher, text=package_directory) matcher = re.compile(r'/source/.*/package-1/.*.spec') requests_mock.get(matcher, content=spec_file_1) matcher = re.compile(r'/source/.*/package-2/.*.spec') requests_mock.get(matcher, content=spec_file_2) matcher = re.compile(r'/source/.*/package-3/.*.spec') requests_mock.get(matcher, content=spec_file_3) expected_specfiles = [spec_file_1, spec_file_2, spec_file_3] specfiles = api.get_spec_files(prj) assert list(specfiles) == expected_specfiles def check_url_query_params(url, params): query = parse_qs(urlparse(url).query) params = {k: v for k, v in params.items() if v is not None} assert query.keys() == params.keys() for key, values in query.items(): assert len(values) == 1 assert params.get(key, None) == query.get(key, [])[0] # POST /source/?cmd=set_flag&repository=:opt&arch=:opt&flag=flag&status=status # POST /source//?cmd=set_flag&repository=:opt&arch=:opt&flag=flag&status=status @pytest.mark.parametrize('target', ('project', 'package')) @pytest.mark.parametrize('flag', ('build', 'publish', 'useforbuild', 'debuginfo')) @pytest.mark.parametrize('status', ('enable', 'disable')) @pytest.mark.parametrize('repository', (None, 'SLE_12_SP5'), ids=('no-repo', 'repo')) @pytest.mark.parametrize('arch', (None, 'x86_64'), ids=('no-arch', 'arch')) def test_set_flag(requests_mock, target, flag, status, repository, arch): if target == 'project': matcher = re.compile(r'/source/[^/]+\?') else: matcher = re.compile(r'/source/.+/[^/]*\?') requests_mock.post(matcher) flag = dict(flag=flag, status=status, repository=repository, arch=arch) if target == 'project': response = api.set_flag(flag, prj) else: response = api.set_flag(flag, prj, pkg) assert response.status_code == 200 url = requests_mock.request_history[0].url flag['cmd'] = 'set_flag' check_url_query_params(url, flag) # POST /source/?cmd=remove_flag&repository=:opt&arch=:opt&flag=flag # POST /source//?cmd=remove_flag&repository=:opt&arch=:opt&flag=flag @pytest.mark.parametrize('target', ('project', 'package')) @pytest.mark.parametrize('flag', ('build', 'publish', 'useforbuild', 'debuginfo')) @pytest.mark.parametrize('status', ('enable', 'disable')) @pytest.mark.parametrize('repository', (None, 'SLE_12_SP5'), ids=('no-repo', 'repo')) @pytest.mark.parametrize('arch', (None, 'x86_64'), ids=('no-arch', 'arch')) def test_remove_flag(requests_mock, target, flag, status, repository, arch): if target == 'project': matcher = re.compile(r'/source/[^/]+\?') else: matcher = re.compile(r'/source/.+/[^/]*\?') requests_mock.post(matcher) flag = dict(flag=flag, status=status, repository=repository, arch=arch) if target == 'project': response = api.remove_flag(flag, prj) else: response = api.remove_flag(flag, prj, pkg) assert response.status_code == 200 url = requests_mock.request_history[0].url flag['cmd'] = 'remove_flag' check_url_query_params(url, flag)