Pushed new branch.

This commit is contained in:
Daniele Maglie 2024-01-15 13:32:24 +01:00
commit f3ed518b65
107 changed files with 6285 additions and 0 deletions

View file

@ -0,0 +1,73 @@
import pytest
import requests
import secrets
import sys
import os
import time
sys.path.append('../')
from base import reporter_authentication, authenticated_post_request
class TestReporterCreateTicket:
ticket_wait_time = 76
ticket_parameters = {
'dda_id': '2326485749e94573bf5724ff5006f30c',
'description': '__MOCK_TICKET__',
'forensic_evidence': {
'hash': {}
},
'fqdn': [
'mock-website.com',
'google.com'
],
'ipv4': [
'9.8.7.6',
'1.1.1.1'
],
'ipv6': [
'2001:db8:3333:4444:5555:6666:7777:8888'
]
}
@pytest.fixture(scope = "function", autouse = True)
def setup_method(self, reporter_authentication):
self.access_token, self.refresh_token = reporter_authentication
def test_create_and_remove_ticket(self):
self.ticket_parameters['forensic_evidence']['hash'] = {
'sha256': secrets.token_hex(32)
}
create_response = authenticated_post_request('/api/v1/ticket/create', self.access_token, self.ticket_parameters)
assert create_response.status_code == 200
assert create_response.json()['status'] == 'success'
time.sleep(1)
remove_response = authenticated_post_request('/api/v1/ticket/remove', self.access_token, {
'ticket_id': create_response.json()['data']['ticket_id']
})
assert remove_response.status_code == 200
assert remove_response.json()['status'] == 'success'
time.sleep(1)
def test_create_real_ticket(self):
self.ticket_parameters['forensic_evidence']['hash'] = {
'sha256': secrets.token_hex(32)
}
response = authenticated_post_request('/api/v1/ticket/create', self.access_token, self.ticket_parameters)
assert response.status_code == 200
assert response.json()['status'] == 'success'
print(f"Waiting for the ticket to change status ({self.ticket_wait_time}s)")
time.sleep(self.ticket_wait_time)

View file

@ -0,0 +1,39 @@
import pytest
import requests
import sys
import os
sys.path.append('../')
from base import provider_authentication, authenticated_get_request
class TestProviderRetrieveTicketItems:
@pytest.fixture(scope = "function", autouse = True)
def setup_method(self, provider_authentication):
self.access_token, self.refresh_token = provider_authentication
def test_fqdn_get_all(self):
response = authenticated_get_request('/api/v1/fqdn/get/all', self.access_token)
assert response.status_code == 200
assert response.json()['status'] == 'success'
def test_fqdn_get_all_txt(self):
response = authenticated_get_request('/api/v1/fqdn/get/all/txt', self.access_token)
assert response.status_code == 200
assert response.headers.get('Content-Type', '').lower() == 'text/plain; charset=utf-8'
def test_ipv4_get_all(self):
response = authenticated_get_request('/api/v1/fqdn/get/all', self.access_token)
assert response.status_code == 200
assert response.json()['status'] == 'success'
def test_ipv4_get_all_txt(self):
response = authenticated_get_request('/api/v1/fqdn/get/all/txt', self.access_token)
assert response.status_code == 200
assert response.headers.get('Content-Type', '').lower() == 'text/plain; charset=utf-8'

View file

@ -0,0 +1,39 @@
import pytest
import requests
import sys
import os
sys.path.append('../')
from base import provider_authentication, authenticated_post_request
class TestProviderSetTicketItems:
@pytest.fixture(scope = "function", autouse = True)
def setup_method(self, provider_authentication):
self.access_token, self.refresh_token = provider_authentication
def test_set_processed_non_existent(self):
response = authenticated_post_request('/api/v1/ticket/item/set/processed', self.access_token, {
'value': '1.2.3.4'
})
assert response.status_code == 400
assert response.json()['status'] == 'error'
def test_set_processed_fqdn(self):
response = authenticated_post_request('/api/v1/ticket/item/set/processed', self.access_token, {
'value': 'mock-website.com'
})
assert response.status_code == 200
assert response.json()['status'] == 'success'
def test_set_processed_ipv4(self):
response = authenticated_post_request('/api/v1/ticket/item/set/processed', self.access_token, {
'value': '9.8.7.6'
})
assert response.status_code == 200
assert response.json()['status'] == 'success'

78
tests/base.py Normal file
View file

@ -0,0 +1,78 @@
from piracyshield_component.config import Config
import pytest
import os
import requests
application_config = Config('api').get('general')
URL = f"http://127.0.0.1:{application_config['port']}"
def get_request(endpoint: str):
return requests.get(f'{URL}{endpoint}')
def post_request(endpoint: str, data: dict):
return requests.post(f'{URL}{endpoint}', json = data)
def authenticated_get_request(endpoint: str, access_token: str):
return requests.get(f'{URL}{endpoint}', headers = {
'Authorization': f'Bearer {access_token}'
})
def authenticated_post_request(endpoint: str, access_token: str, data: dict):
return requests.post(f'{URL}{endpoint}', headers = {
'Authorization': f'Bearer {access_token}'
}, json = data)
def authenticate(email, password):
"""
General authentication utility.
"""
response = requests.post(f'{URL}/api/v1/authentication/login', json = {
'email': email,
'password': password
})
response_json = response.json()
access_token = response_json['data']['access_token']
refresh_token = response_json['data']['refresh_token']
assert access_token is not None
assert refresh_token is not None
return [ access_token, refresh_token ]
@pytest.fixture
def internal_authentication():
"""
Passes parameters to authenticate a provider account.
"""
return authenticate(
email = os.environ.get('PIRACYSHIELD_MOCK_INTERNAL_EMAIL'),
password = os.environ.get('PIRACYSHIELD_MOCK_INTERNAL_PASSWORD')
)
@pytest.fixture
def reporter_authentication():
"""
Passes parameters to authenticate a provider account.
"""
return authenticate(
email = os.environ.get('PIRACYSHIELD_MOCK_REPORTER_EMAIL'),
password = os.environ.get('PIRACYSHIELD_MOCK_REPORTER_PASSWORD')
)
@pytest.fixture
def provider_authentication():
"""
Passes parameters to authenticate a provider account.
"""
return authenticate(
email = os.environ.get('PIRACYSHIELD_MOCK_PROVIDER_EMAIL'),
password = os.environ.get('PIRACYSHIELD_MOCK_PROVIDER_PASSWORD')
)

4
tests/bench_params.txt Normal file
View file

@ -0,0 +1,4 @@
{
"email": "test@test.com",
"password": "testing123"
}

10
tests/mock_storage.py Normal file
View file

@ -0,0 +1,10 @@
from piracyshield_data_storage.blob.drivers.azure import AzureBlobStorage
import os
mock_storage = AzureBlobStorage(
connection_string = os.environ.get("PIRACYSHIELD_MOCK_STORAGE_CONNECTION_STRING"),
container_name = os.environ.get("PIRACYSHIELD_MOCK_STORAGE_CONTAINER_NAME")
)
mock_storage.create_container()

15
tests/test_0001_health.py Normal file
View file

@ -0,0 +1,15 @@
import pytest
import requests
from base import get_request
class TestGeneral:
def test_ping(self):
"""
Check the API availability.
"""
response = get_request('/api/v1/ping')
assert response.status_code == 200

60
tests/test_0002_errors.py Normal file
View file

@ -0,0 +1,60 @@
import pytest
import requests
from base import URL
class TestErrors:
def test_route_not_found(self):
"""
Test a non existent route.
"""
response = requests.get(f'{URL}/fake/route')
assert response.status_code == 404
def test_method_not_allowed(self):
"""
Test an existing route with the wrong method.
"""
response = requests.get(f'{URL}/api/v1/account/guest/create')
assert response.status_code == 405
def test_create_user(self):
"""
Test a non authorized account creation request.
"""
response = requests.post(f'{URL}/api/v1/account/guest/create', {'name': 'test'})
assert response.status_code == 401
def test_missing_authentication_parameter(self):
"""
Miss a parameter.
"""
response = requests.post(f'{URL}/api/v1/authentication/login', {'email': 'test@fake.com'})
assert response.status_code == 400
def test_fake_authentication(self):
"""
Test using a non existent e-mail.
"""
response = requests.post(f'{URL}/api/v1/authentication/login', {'email': 'test@fake.com', 'password': 'very_fake'})
assert response.status_code == 400
def test_bad_format_refresh_token(self):
"""
Try to refresh a bad token.
"""
response = requests.post(f'{URL}/api/v1/authentication/refresh', {'refresh_token': 'non valid token'})
assert response.status_code == 400

View file

@ -0,0 +1,39 @@
import pytest
import os
import requests
from base import internal_authentication, post_request, authenticated_get_request
class TestAuthentication:
access_token = None
refresh_token = None
@pytest.fixture(scope = "function", autouse = True)
def setup_method(self, internal_authentication):
self.access_token, self.refresh_token = internal_authentication
def test_token_refresh(self):
"""
Test authentication refresh token.
"""
response = post_request('/api/v1/authentication/refresh', {
'refresh_token': self.refresh_token
})
response_json = response.json()
self.access_token = response_json['data']['access_token']
assert response.status_code == 200
def test_logout(self):
"""
Test authentication logout.
"""
response = authenticated_get_request('/api/v1/authentication/logout', self.access_token)
assert response.status_code == 200