import re
import pytest
import os
from . import test_dir
from obsapi import ObsApi
from lxml import etree
try:
from urllib.parse import urlparse
from urllib.parse import parse_qs
except Exception:
from urlparse import urlparse
from urlparse import parse_qs
apiurl = 'https://api.opensuse.org'
api = ObsApi(apiurl=apiurl)
prj = 'home:sbahling:obsapi:test'
pkg = 'suse-hello-1.0'
repo = 'SLE_12_SP3'
arch = 'x86_64'
prj_meta_re = re.compile(r'/source/.*/_meta')
pkg_meta_re = re.compile(r'/source/.*/.*/_meta')
@pytest.fixture
def spec_file_1():
with open(os.path.join(test_dir, 'spec_file_1.spec'), 'rb') as f:
return f.read()
@pytest.fixture
def spec_file_2():
with open(os.path.join(test_dir, 'spec_file_2.spec'), 'rb') as f:
return f.read()
@pytest.fixture
def spec_file_3():
with open(os.path.join(test_dir, 'spec_file_3.spec'), 'rb') as f:
return f.read()
@pytest.fixture
def project_config():
with open(os.path.join(test_dir, 'project_config.txt')) as f:
return f.read()
@pytest.fixture
def binary_fileinfo():
with open(os.path.join(test_dir, 'binary_fileinfo.xml'), 'rb') as f:
return f.read()
@pytest.fixture
def source_info():
return """
suse-hello.spec
"""
@pytest.fixture
def repo_directory():
return """
"""
@pytest.fixture
def project_directory():
return """
"""
@pytest.fixture
def package_directory():
return """
"""
@pytest.fixture
def binarylist():
return """
"""
@pytest.fixture
def project_meta():
return """
Test project for obsapi unit tests
x86_64
x86_64
"""
@pytest.fixture
def locked_project_meta():
return """
Test project for obsapi unit tests
x86_64
x86_64
"""
@pytest.fixture
def package_meta():
return """
Example from Kernel Module Packages Manual
"""
@pytest.fixture
def locked_package_meta():
return """
Example from Kernel Module Packages Manual
"""
def test_get_project_meta(requests_mock):
matcher = prj_meta_re
text = ''
requests_mock.get(matcher, text=text)
response = api.get_meta(prj=prj)
assert response == text
def test_get_package_meta(requests_mock):
matcher = prj_meta_re
text = ''
requests_mock.get(matcher, text=text)
response = api.get_meta(prj=prj, pkg=pkg).strip()
assert response == text
def test_ls_prj(requests_mock):
matcher = re.compile('/source/.*')
text = ''
requests_mock.get(matcher, text=text)
assert api.ls(prj=prj) == ['package']
def test_ls_pkg(requests_mock, package_directory):
matcher = re.compile('/source/.*/.*')
requests_mock.get(matcher, text=package_directory)
directory, listing = api.ls(prj=prj, pkg=pkg)
dir_re = re.compile('(name=".+?").*(rev=".+?").*(vrev=".+?").*(srcmd5=".+?")')
expected_directory = ' '.join(dir_re.findall(package_directory)[0])
directory = 'name="{0.name}" rev="{0.rev}" vrev="{0.vrev}" srcmd5="{0.srcmd5}"'.format(directory)
assert directory == expected_directory
entry_re = re.compile('(name=".+?").*(md5=".+?").*(size=".+?").*(mtime=".+?")')
expected_entries = [' '.join(entry) for entry in entry_re.findall(package_directory)]
entries = ['name="{0.name}" md5="{0.md5}" size="{0.size}" mtime="{0.mtime}"'.format(entry) for entry in listing]
for item in expected_entries:
assert item in entries
entries.remove(item)
assert entries == []
def binarylist_test(xml, listing):
entry_re = re.compile('(filename=".+?").*(size=".+?").*(mtime=".+?")')
expected_entries = [' '.join(entry) for entry in entry_re.findall(xml)]
entries = ['filename="{0.filename}" size="{0.size}" mtime="{0.mtime}"'.format(entry) for entry in listing]
for item in expected_entries:
assert item in entries
entries.remove(item)
assert entries == []
def test_ls_binaries(requests_mock, binarylist):
matcher = re.compile('/build/.*/.*/.*/.*')
requests_mock.get(matcher, text=binarylist)
listing = api.ls(prj=prj, pkg=pkg, repo=repo, arch=arch)
binarylist_test(binarylist, listing)
def test_binaries_ls(requests_mock, binarylist):
matcher = re.compile('/build/.*/.*/.*/.*')
requests_mock.get(matcher, text=binarylist)
listing = api.binaries_ls(prj=prj, pkg=pkg, repo=repo, arch=arch)
binarylist_test(binarylist, listing)
def test_binaries_ls_project(requests_mock, binarylist):
matcher = re.compile('/build/.*/.*/.*/_repository')
requests_mock.get(matcher, text=binarylist)
listing = api.binaries_ls(prj=prj, pkg=None, repo=repo, arch=arch)
binarylist_test(binarylist, listing)
def test_get_project_repos(requests_mock, repo_directory):
matcher = re.compile('/build/.*')
requests_mock.get(matcher, text=repo_directory)
repos = api.get_project_repos(prj)
entry_re = re.compile('name="(.+?)"')
expected_entries = [entry for entry in entry_re.findall(repo_directory)]
for entry in expected_entries:
assert entry in repos
repos.remove(entry)
assert repos == []
@pytest.mark.parametrize('repository', (None, 'SLE_12_SP3'))
def test_get_vendor(requests_mock, repo_directory, project_config, repository):
matcher = re.compile('/build/.*')
requests_mock.get(matcher, text=repo_directory)
matcher = re.compile('/build/.*/.*/_buildconfig')
requests_mock.get(matcher, text=project_config)
vendor_re = re.compile(r'.*%vendor (.+?)\n')
expected_vendor = vendor_re.findall(project_config)[-1]
if repository is None:
vendor = api.get_vendor(prj=prj)
else:
vendor = api.get_vendor(prj=prj, repo=repository)
assert vendor == expected_vendor
@pytest.mark.parametrize('rev', (None, '1'))
def test_source_info(requests_mock, source_info, rev):
if rev is None:
matcher = re.compile('/source/.*/.*?view=info')
requests_mock.get(matcher, text=source_info)
sinfo = api.get_source_info(prj, pkg)
else:
matcher = re.compile('/source/.*/.*?(&?view=info|&?rev=1){2}')
requests_mock.get(matcher, text=source_info)
sinfo = api.get_source_info(prj, pkg, rev)
expected_sinfo = etree.fromstring(source_info)
assert sinfo.package == expected_sinfo.get('package', None)
assert sinfo.rev == expected_sinfo.get('rev', None)
assert sinfo.vrev == expected_sinfo.get('vrev', None)
assert sinfo.srcmd5 == expected_sinfo.get('srcmd5', None)
assert sinfo.verifymd5 == expected_sinfo.get('verifymd5', None)
def test_get_binary_fileinfo(requests_mock, binary_fileinfo):
matcher = re.compile('/build/.*/.*/.*/.*/.*?view=fileinfo')
requests_mock.get(matcher, content=binary_fileinfo)
binary = 'suse-hello-1.0-1.1.src.rpm'
expected_bfinfo = etree.fromstring(binary_fileinfo)
bfinfo = api.get_binary_fileinfo(prj, pkg, repo, arch, binary)
assert bfinfo.name == expected_bfinfo.find('name').text
assert bfinfo.size == expected_bfinfo.find('size').text
assert bfinfo.mtime == expected_bfinfo.find('mtime').text
assert bfinfo.arch == expected_bfinfo.find('arch').text
assert bfinfo.version == expected_bfinfo.find('version').text
assert bfinfo.release == expected_bfinfo.find('release').text
assert bfinfo.summary == expected_bfinfo.find('summary').text
assert bfinfo.description == expected_bfinfo.find('description').text
assert bfinfo.provides == [p.text for p in expected_bfinfo.findall('provides')]
assert bfinfo.requires == [p.text for p in expected_bfinfo.findall('requires')]
def test_get_package_version(requests_mock, binarylist, binary_fileinfo):
matcher = re.compile('/build/.*/.*/.*/.*')
requests_mock.get(matcher, text=binarylist)
matcher = re.compile('/build/.*/.*/.*/.*/.*?view=fileinfo')
requests_mock.get(matcher, content=binary_fileinfo)
root = etree.fromstring(binary_fileinfo.decode('utf-8'))
expected_version = root.find('version').text
expected_release = root.find('release').text
full_version = api.get_package_version(prj, pkg, repo, arch)
assert full_version == '{}-{}'.format(expected_version, expected_release)
version = api.get_package_version(prj, pkg, repo, arch, full=False)
assert version == expected_version
def test_get_project_flags(requests_mock, project_meta):
matcher = prj_meta_re
requests_mock.get(matcher, text=project_meta)
repoflags = api.get_repo_flags(prj)
assert repoflags.flag_types == set()
def test_get_package_repo_flags(requests_mock, package_meta):
matcher = pkg_meta_re
requests_mock.get(matcher, text=package_meta)
repoflags = api.get_repo_flags(prj, pkg)
assert repoflags.flag_types == {'build', 'debuginfo', 'useforbuild'}
meta = etree.fromstring(package_meta)
for element in meta:
for sub in element:
if sub.tag in ['enable', 'disable']:
flag = element.tag
status = sub.tag
repository = sub.get('repository', None)
arch = sub.get('arch', None)
match = False
for flag in repoflags.get_flag(flag):
if flag.status == status and flag.repository == repository and flag.arch == arch:
match = True
assert match
@pytest.mark.parametrize('target', ('project', 'package'))
@pytest.mark.parametrize('comment', (None, 'Comment'))
def test_lock(requests_mock, target, comment):
if comment is None:
message = 'Lock'
else:
message = comment
if target == 'project':
matcher = re.compile(r'/source/.*?(&?cmd=lock|&?comment=%s){2}' % message.replace(' ', '\\+'))
else:
matcher = re.compile(r'/source/.*/.*?(&?cmd=lock|&?comment=%s){2}' % message.replace(' ', '\\+'))
requests_mock.post(matcher)
if comment is None:
if target == 'project':
response = api.lock(prj)
else:
response = api.lock(prj, pkg)
else:
if target == 'project':
response = api.lock(prj, comment=comment)
else:
response = api.lock(prj, pkg, comment=comment)
assert response.status_code == 200
@pytest.mark.parametrize('target', ('project', 'package'))
@pytest.mark.parametrize('comment', (None, 'comment'))
def test_unlock(requests_mock, target, comment):
if comment is None:
message = 'Unlock'
else:
message = comment
if target == 'project':
matcher = re.compile(r'/source/.*?(&?cmd=unlock|&?comment=%s){2}' % message.replace(' ', '\\+'))
else:
matcher = re.compile(r'/source/.*/.*?(&?cmd=unlock|&?comment=%s){2}' % message.replace(' ', '\\+'))
requests_mock.post(matcher)
if comment is None:
if target == 'project':
response = api.unlock(prj)
else:
response = api.unlock(prj, pkg)
else:
if target == 'project':
response = api.unlock(prj, comment=comment)
else:
response = api.unlock(prj, pkg, comment=comment)
assert response.status_code == 200
def test_project_locked(requests_mock, project_meta, locked_project_meta):
matcher = prj_meta_re
requests_mock.get(matcher, text=locked_project_meta)
assert api.locked(prj) is True
requests_mock.get(matcher, text=project_meta)
assert api.locked(prj) is False
def test_package_locked(requests_mock, package_meta, locked_package_meta):
matcher = re.compile(r'/source/.*/.*/_meta')
requests_mock.get(matcher, text=locked_package_meta)
assert api.locked(prj, pkg) is True
requests_mock.get(matcher, text=package_meta)
assert api.locked(prj, pkg) is False
@pytest.mark.parametrize('target', ('project', 'package'))
def test_get_users(requests_mock, target, project_meta, package_meta):
if target == 'project':
matcher = prj_meta_re
requests_mock.get(matcher, text=project_meta)
users = api.get_users(prj)
meta = etree.fromstring(project_meta)
else:
matcher = pkg_meta_re
requests_mock.get(matcher, text=package_meta)
users = api.get_users(prj, pkg)
meta = etree.fromstring(package_meta)
expected_users = meta.findall('person')
matches = 0
for expected_user in expected_users:
for user in users:
if user.userid == expected_user.get('userid') and user.role == expected_user.get('role'):
matches += 1
assert len(expected_users) == len(users) == matches
@pytest.mark.parametrize('target', ('project', 'package'))
def test_get_groups(requests_mock, target, project_meta, package_meta):
if target == 'project':
matcher = prj_meta_re
requests_mock.get(matcher, text=project_meta)
groups = api.get_groups(prj)
meta = etree.fromstring(project_meta)
else:
matcher = pkg_meta_re
requests_mock.get(matcher, text=package_meta)
groups = api.get_groups(prj, pkg)
meta = etree.fromstring(package_meta)
expected_groups = meta.findall('group')
matches = 0
for expected_group in expected_groups:
for group in groups:
if group.groupid == expected_group.get('groupid') and group.role == expected_group.get('role'):
matches += 1
assert len(expected_groups) == len(groups) == matches
def test_get_spec_files(requests_mock, package_directory, spec_file_1):
matcher = re.compile('/source/.*/.*')
requests_mock.get(matcher, text=package_directory)
matcher = re.compile('/source/.*/.*/.*')
requests_mock.get(matcher, content=spec_file_1)
for specfile in api.get_spec_files(prj, pkg):
assert specfile == spec_file_1
def test_get_project_spec_files(requests_mock, project_directory, package_directory, spec_file_1, spec_file_2, spec_file_3):
matcher = re.compile(r'/source/.*')
requests_mock.get(matcher, text=project_directory)
matcher = re.compile(r'/source/.*/.*')
requests_mock.get(matcher, text=package_directory)
matcher = re.compile(r'/source/.*/package-1/.*.spec')
requests_mock.get(matcher, content=spec_file_1)
matcher = re.compile(r'/source/.*/package-2/.*.spec')
requests_mock.get(matcher, content=spec_file_2)
matcher = re.compile(r'/source/.*/package-3/.*.spec')
requests_mock.get(matcher, content=spec_file_3)
expected_specfiles = [spec_file_1, spec_file_2, spec_file_3]
specfiles = api.get_spec_files(prj)
assert list(specfiles) == expected_specfiles
def check_url_query_params(url, params):
query = parse_qs(urlparse(url).query)
params = {k: v for k, v in params.items() if v is not None}
assert query.keys() == params.keys()
for key, values in query.items():
assert len(values) == 1
assert params.get(key, None) == query.get(key, [])[0]
# POST /source/?cmd=set_flag&repository=:opt&arch=:opt&flag=flag&status=status
# POST /source//?cmd=set_flag&repository=:opt&arch=:opt&flag=flag&status=status
@pytest.mark.parametrize('target', ('project', 'package'))
@pytest.mark.parametrize('flag', ('build', 'publish', 'useforbuild', 'debuginfo'))
@pytest.mark.parametrize('status', ('enable', 'disable'))
@pytest.mark.parametrize('repository', (None, 'SLE_12_SP5'), ids=('no-repo', 'repo'))
@pytest.mark.parametrize('arch', (None, 'x86_64'), ids=('no-arch', 'arch'))
def test_set_flag(requests_mock, target, flag, status, repository, arch):
if target == 'project':
matcher = re.compile(r'/source/[^/]+\?')
else:
matcher = re.compile(r'/source/.+/[^/]*\?')
requests_mock.post(matcher)
flag = dict(flag=flag,
status=status,
repository=repository,
arch=arch)
if target == 'project':
response = api.set_flag(flag, prj)
else:
response = api.set_flag(flag, prj, pkg)
assert response.status_code == 200
url = requests_mock.request_history[0].url
flag['cmd'] = 'set_flag'
check_url_query_params(url, flag)
# POST /source/?cmd=remove_flag&repository=:opt&arch=:opt&flag=flag
# POST /source//?cmd=remove_flag&repository=:opt&arch=:opt&flag=flag
@pytest.mark.parametrize('target', ('project', 'package'))
@pytest.mark.parametrize('flag', ('build', 'publish', 'useforbuild', 'debuginfo'))
@pytest.mark.parametrize('status', ('enable', 'disable'))
@pytest.mark.parametrize('repository', (None, 'SLE_12_SP5'), ids=('no-repo', 'repo'))
@pytest.mark.parametrize('arch', (None, 'x86_64'), ids=('no-arch', 'arch'))
def test_remove_flag(requests_mock, target, flag, status, repository, arch):
if target == 'project':
matcher = re.compile(r'/source/[^/]+\?')
else:
matcher = re.compile(r'/source/.+/[^/]*\?')
requests_mock.post(matcher)
flag = dict(flag=flag,
status=status,
repository=repository,
arch=arch)
if target == 'project':
response = api.remove_flag(flag, prj)
else:
response = api.remove_flag(flag, prj, pkg)
assert response.status_code == 200
url = requests_mock.request_history[0].url
flag['cmd'] = 'remove_flag'
check_url_query_params(url, flag)