Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(uploads): add all copyrights related endpoints, refactor test files #111

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 74 additions & 45 deletions fossology/obj.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,15 @@ class Findings(object):
:param kwargs: handle any other finding information provided by the fossology instance
:type scanner: list
:type conclusion: list
:type copyright: list
:type kwargs: key word argument
"""

def __init__(
self,
scanner,
conclusion,
copyright=None,
scanner: list,
conclusion: list = None,
copyright: list = None,
**kwargs,
):
self.scanner = scanner
Expand Down Expand Up @@ -454,12 +455,12 @@ class PermGroups(object):
:param perm: the permission
:param group_pk: the id of the group
:param group_name: the name of the group
:type perm: string
:type perm: str
:type group_pk: str
:type group_name: str
"""

def __init__(self, perm, group_pk, group_name):
def __init__(self, perm: str, group_pk: str, group_name: str):
self.perm = Permission(perm)
self.group_pk = group_pk
self.group_name = group_name
Expand All @@ -480,12 +481,12 @@ class UploadPermGroups(object):
:param publicPerm: the public permission of the group
:param permGroups: array of permGroup objects for the upload
:param kwargs: handle any other folder information provided by the fossology instance
:type publicPerm: Permission
:type publicPerm: str
:type permGroups: array
:type kwargs: key word argument
"""

def __init__(self, publicPerm, permGroups, **kwargs):
def __init__(self, publicPerm: str, permGroups: list, **kwargs):
self.publicPerm = Permission(publicPerm)
self.permGroups = list()
for perm in permGroups:
Expand Down Expand Up @@ -607,44 +608,6 @@ def from_json(cls, json_dict):
return cls(**json_dict)


class Licenses(object):

"""FOSSology file license findings.

Represents a FOSSology licenses response.

:param filePath: the path of the file in the specified upload
:param findings: the license findings in that file
:param kwargs: handle any other license information provided by the fossology instance
:type filePath: string
:type findings: Findings
:type kwargs: key word argument
"""

def __init__(
self,
filePath,
findings=None,
**kwargs,
):
self.filepath = filePath
if findings:
self.findings = Findings.from_json(findings)
else:
self.findings = findings
self.additional_info = kwargs

def __str__(self):
if self.findings.conclusion:
return f"File {self.filepath} has {len(self.findings.conclusion)} concluded licenses"
else:
return f"File {self.filepath} doesn't have any concluded license yet"

@classmethod
def from_json(cls, json_dict):
return cls(**json_dict)


class Hash(object):

"""FOSSology hash.
Expand Down Expand Up @@ -790,6 +753,72 @@ def from_json(cls, json_dict):
return cls(**json_dict)


class UploadCopyrights(object):

"""Copyright findings in a FOSSology upload

Represents copyright matches of a FOSSology upload.

:param copyright: the copyright
:param filePath: relative file path
:param kwargs: handle any other information provided by the FOSSology instance
:type copyright: str
:type filePath: list
:type kwargs: key word argument
"""

def __init__(
self,
copyright: str,
filePath: list[str],
**kwargs,
):
self.copyright = copyright
self.filepath = list()
for path in filePath:
self.filepath.append(path)
self.additional_info = kwargs

def __str__(self):
return f"Copyright {self.copyright} was found in {len(self.filepath)} files."

@classmethod
def from_json(cls, json_dict):
return cls(**json_dict)


class UploadLicenses(object):

"""FOSSology upload licenses.

Represents licenses and copyright matches of a FOSSology upload.

:param filePath: relative file path
:param findings: the licenses and copyrights findings
:param kwargs: handle any other information provided by the fossology instance
:type filePath: str
:type findings: Findings
:type kwargs: key word argument
"""

def __init__(
self,
filePath: str,
findings: dict,
**kwargs,
):
self.filepath = filePath
self.findings = Findings.from_json(findings)
self.additional_info = kwargs

def __str__(self):
return f"File {self.filepath} has {len(self.findings.conclusion)} license and {len(self.findings.copyright)}matches"

@classmethod
def from_json(cls, json_dict):
return cls(**json_dict)


class Summary(object):

"""FOSSology upload summary.
Expand Down
89 changes: 66 additions & 23 deletions fossology/uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import re
import time
from typing import Tuple
from typing import List, Optional, Tuple

import requests
from tenacity import TryAgain, retry, retry_if_exception_type, stop_after_attempt
Expand All @@ -14,10 +14,11 @@
ClearingStatus,
Folder,
Group,
Licenses,
Permission,
Summary,
Upload,
UploadCopyrights,
UploadLicenses,
UploadPermGroups,
User,
get_options,
Expand Down Expand Up @@ -358,38 +359,46 @@ def upload_summary(self, upload: Upload, group=None):

@retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(3))
def upload_licenses(
self, upload: Upload, group: str = None, agent=None, containers=False
):
self,
upload: Upload,
group: str = None,
agent: List[str] = None,
containers: bool = False,
license: bool = True,
copyright: bool = False,
) -> Optional[List[UploadLicenses]]:
"""Get clearing information about an upload

API Endpoint: GET /uploads/{id}/licenses

The response does not generate Python objects yet, the plain JSON data is simply returned.

:param upload: the upload to gather data from
:param group: the group name to chose while accessing the upload (default: None)
:param agent: the license agents to use (e.g. "nomos,monk,ninka,ojo,reportImport", default: "nomos")
:param containers: wether to show containers or not (default: False)
:param group: the group name to chose while accessing the upload (default: None)
:param license: wether to expose license matches (default: True)
:param copyright: wether to expose copyright matches (default: False)
:type upload: Upload
:type group: string
:type agent: string
:type containers: boolean
:type group: string
:return: the list of licenses findings for the specified agent
:rtype: list of Licenses
:type license: boolean
:type copyright: boolean
:return: the list of UploadLicenses for the specified agent
:rtype: list of UploadLicenses
:raises FossologyApiError: if the REST call failed
:raises AuthorizationError: if the user can't access the group
"""
headers = {}
params = {}
headers = {}
if group:
headers["groupName"] = group
params = {
"containers": containers,
"license": license,
"copyright": copyright,
}
if agent:
params["agent"] = agent
else:
params["agent"] = agent = "nomos"
if containers:
params["containers"] = "true"

headers = {}
if group:
headers["groupName"] = group

Expand All @@ -401,7 +410,7 @@ def upload_licenses(
all_licenses = []
scanned_files = response.json()
for file_with_findings in scanned_files:
file_licenses = Licenses.from_json(file_with_findings)
file_licenses = UploadLicenses.from_json(file_with_findings)
all_licenses.append(file_licenses)
return all_licenses

Expand All @@ -410,18 +419,52 @@ def upload_licenses(
raise AuthorizationError(description, response)

elif response.status_code == 412:
description = f"Unable to get licenses from {agent} for {upload.uploadname} (id={upload.id})"
description = f"The agent {agent} has not been scheduled for upload {upload.uploadname} (id={upload.id})"
raise FossologyApiError(description, response)

elif response.status_code == 503:
logger.debug(
f"Unpack agent for {upload.uploadname} (id={upload.id}) didn't start yet"
)
logger.debug("The ununpack agent or queried agents have not started yet.")
time.sleep(3)
raise TryAgain

else:
description = f"API error while returning license findings for upload {upload.uploadname} (id={upload.id})"
raise FossologyApiError(description, response)

@retry(retry=retry_if_exception_type(TryAgain), stop=stop_after_attempt(3))
def upload_copyrights(
self,
upload: Upload,
):
"""Get copyright matches from an upload

API Endpoint: GET /uploads/{id}/copyrights

:param upload: the upload to gather data from
:type upload: Upload
:return: the list of copyrights findings
:rtype: list of Licenses
:raises FossologyApiError: if the REST call failed
"""
response = self.session.get(f"{self.api}/uploads/{upload.id}/copyrights")

if response.status_code == 200:
all_copyrights = []
for copyright in response.json():
all_copyrights.append(UploadCopyrights.from_json(copyright))
return all_copyrights

elif response.status_code == 412:
description = f"The agent has not been scheduled for upload {upload.uploadname} (id={upload.id})"
raise FossologyApiError(description, response)

elif response.status_code == 503:
logger.debug("The ununpack agent or queried agents have not started yet.")
time.sleep(3)
raise TryAgain

else:
description = f"No licenses for upload {upload.uploadname} (id={upload.id})"
description = f"API error while returning copyright findings for upload {upload.uploadname} (id={upload.id})"
raise FossologyApiError(description, response)

def delete_upload(self, upload, group=None):
Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ def foss_user_agents() -> Dict:
}


@pytest.fixture()
def fake_hash():
return {
"sha1": secrets.token_hex(16),
"md5": secrets.token_hex(nbytes=16),
"sha256": secrets.token_hex(56),
"size": secrets.randbelow(512),
}


@pytest.fixture(scope="session")
def foss_user(foss_user_agents) -> Dict:
return {
Expand Down
Loading
Loading