Source code for umbr_api.add

#!/usr/bin/env python3
"""API call to add a record via Umbrella Enforcement API.

Note:
    When posting data to the Security Platform API, the following steps are
    taken before the domain appears in a customer's block list. The optional
    parameter "disableDstSafeguards" can be used to bypass parts of this
    process as outlined in the Generic Event Format Field Descriptions.
    The domain acceptance process is outlined from start to finish here:

        1. An external source identifies malicious activity occurring when
        a user visits a particular URL. This source could be a third party
        vendor’s data feed, an entry in one of your security logs or something
        identified as malicious on a security related website.

        2. The event is sent to the Umbrella Security Platform API via
        a POST request, following the steps and syntax outlined earlier in
        this documentation.

        3. Before the domain included API POST event is added to the specified
        Umbrella customer’s block list, the following checks are performed:

            a) Does the domain already exist in the Umbrella Security global
               block list under one of the Security Categories?

            b) Is the domain considered benign, or safe, under the Cisco
               Umbrella Investigate?

            c) Is the status of the domain uncategorized?

            d) Is the domain already present on the customer’s allow list
               within the organization?

        4. If the domain is then added to the customer’s domain list, then
        any domains in that list will be blocked in accordance with that
        customer’s Umbrella policy security settings.

References:
    https://docs.umbrella.com/developer/enforcement-api/events2/
    https://docs.umbrella.com/developer/enforcement-api/domain-acceptance-process2/

"""

import json
from datetime import datetime
from urllib.parse import urlparse

from logzero import logger

import umbr_api
from umbr_api._http_requests import send_post
from umbr_api.credentials import get_key


[docs]def add(domain=None, url=None, key=None, bypass=False): """Add domain name to block list.""" assert domain and url assert isinstance(domain, str) assert isinstance(url, str) if domain != url: if url[0:7] == "http://" or url[0:8] == "https://": check_url = urlparse(url).hostname else: check_url = urlparse("http://" + url).hostname if domain != check_url: logger.warning( "Domain name part from URL doesn't match DNS domain name" ) logger.warning("DNS domain name: %s", domain) logger.warning("URL domain name: %s", check_url) key = get_key(key=key) response = None time_str = (datetime.utcnow()).isoformat(sep="T") + "Z" api_uri = ( "https://s-platform.api.opendns.com/1.0/events?customerKey=" + key ) if bypass: bypass_str = "true" else: bypass_str = "false" block_request_txt = """ {{ "alertTime": "{alertTime}", "deviceId": "ba6a59f4-e692-4724-ba36-c28132a761de", "deviceVersion": "{deviceVersion}", "dstDomain": "{dstDomain}", "dstUrl": "{dstUrl}", "eventTime": "{eventTime}", "protocolVersion": "1.0a", "providerName": "Security Platform", "disableDstSafeguards": {disableDstSafeguards} }}""".format( alertTime=time_str, deviceVersion=umbr_api.__version__, dstDomain=domain, dstUrl=url, eventTime=time_str, disableDstSafeguards=bypass_str, ) response = send_post( api_uri, data=block_request_txt, headers={"Content-Type": "application/json"}, ) format_response(response) return response
[docs]def format_response(response): """Format results.""" if response.status_code == 202: print("OK") else: print("Error") try: json_response = json.loads(response.text) for key, value in json_response.items(): logger.error("%s %s", key, value) except KeyError as msg: print("Get abnormal code while adding:", response.status_code) logger.exception(msg)
[docs]def main(test_key=None): """Test if executed directly.""" result = add( domain="www.example.com", url="https://www.example.com/test", key=test_key, ) assert result.status_code == 202 result = add(domain="www.example.com", url="www.example.com", key=test_key) assert result.status_code == 202
if __name__ == "__main__": main()