APIs & Developers
\
Switch-API Python Package (switch-api · PyPI)

Switch-API Python Package (switch-api · PyPI)

Overview

A Python package for interacting with the Switch Automation platform to retrieve and ingest data as well as register and deploy Tasks that automate processes to ingest, transfer and/or analyze data.

Installing the Package

The switch-api Python Package is distributed via pypi.org and can be found at: https://pypi.org/project/switch-api/

The package can be installed with the following pip command:

pip install switch-api

Importing the Package

When importing the switch-api package, we typically use the alias 'sw' as demonstrated below:

import switch_api as sw

Initializing Authorization

Authorization is managed by initializing an api_inputs object within your Python script, which upon execution will open a login page within your web browser where you log in with your Switch Platform credentials, and then those api_inputs are passed as a parameter to each function within the Switch API, thus providing the authorization. Deployed Tasks within Task Insights will have valid api_inputs passed to the Task whenever it is triggered.

First, you need to locate the Portfolio Id (aka Api Project Id) of the portfolio that you want to interact with. You can do this by logging into the Switch Platform and navigating to Configure / Portfolios.  

From Configure / Portfolios, locate the desired Portfolio from the grid then right click on the row of data for that portfolio and select Edit from the menu that appears.

From the Edit page that appears, the Portfolio Id can be copied from the bottom left of the page as seen below:

You can then initialize the api_inputs using the api_project_id as follows:

api_project_id = '7037e961-bad1-4ebb-8590-9d6cd3ea4ad4' api_inputs = sw.initialize(api_project_id=api_project_id)

Dataset Module

A module for interacting with Data Sets from the Switch Automation Platform. Provides the ability to list Data Sets and their folder structure as well as retrieve Data Sets with the option to pass parameters.

The simplest way to navigate through and identify Data Sets is from the Switch Platform. Once you’ve identified a Data Set that you want to access from the Switch API, you’ll need to get it’s dataset_id, which can be found by opening the Data Set in the Switch Platform and clicking the ‘Copy dataset Id’ button near the top right of the page.

The you can then request the data from the Data Set as follows:

import switch_api as sw api_project_id = '7037e961-bad1-4ebb-8590-9d6cd3ea4ad4' api_inputs = sw.initialize(api_project_id=api_project_id) dataset_id = 'c7cc3182-bdfc-47f1-90c8-710e4962876a' params = [{'name': 'site', 'value': '82794e05-1f7b-4a3a-a6d8-e206444c1c1e', 'type': 'String'}, {'name': 'equipType', 'value': 'Zone Sensor', 'type': 'String'}, {'name': 'pivotGrouping', 'value': 'DeviceName', 'type': 'Keyword'}, ] response, df = sw.dataset.dataset.get_data(api_inputs=api_inputs, dataset_id=dataset_id, parameters=params)

Integration Overview

This section will discuss the overarching structure of an integration while subsequent sections will detail specific methods for achieving the integrations.  

Integrations generally fall into one of two categories: Data Feed and API. Data Feed integrations involve the delivery of data through one of the supported protocols (Email, FTP, API Endpoint Upload) followed by processing and ingestion into the Switch Automation platform, and API integrations involve the request of data from an external API, which is setup to occur on a predefined scheduled, followed by processing and ingestion into the Switch Automation platform. Often, the data (e.g., Readings, Tags, Workorders, etc.) being ingested into the Switch Automation platform must be associated with existing Assets (e.g., Sites, or Sensors/Points)., requiring the request of data from the Switch Automation platform and the subsequent joining of those retrieved Assets to the externally sourced data. In some cases, new Assets need to be created so that novel data will have something to be associated with. Once the associations have been made and any further data processing is completed, then the data can be ingested into the Switch Automation platform.

Integration Module

A module for integrating data with the Switch Automation Platform. Provides the ability to create, update, and retrieve assets (i.e., devices, sensors, sites, workorders, tags, metadata); ingest and retrieve sensor readings; and ingest and retrieve data from Azure Data Explorer (ADX).

Methods for Retrieving Assets and Data:

def get_sites(api_inputs: ApiInputs, include_tag_groups: Union[list, bool] = False, sql_where_clause: str = None, top_count: int = None): """Retrieve site information from backend SQL database. Parameters ---------- api_inputs : ApiInputs Object returned by initialize() function. include_tag_groups : Union[list, bool], default = False If False, no tag groups are included. If True, all tag groups will be returned. Else, if list, the Tag Groups in the list are retrieved as columns. sql_where_clause : str, default = None Optional `WHERE` clause in SQL syntax. Use field names only and do not include the "WHERE". top_count: int, default = None For use during testing to limit the number of records returned. Returns ------- df : pandas.DataFrame """ def get_device_sensors(api_inputs: ApiInputs, include_tag_groups: Union[list, bool] = False, include_metadata_keys: Union[list, bool] = False, sql_where_clause: str = None, top_count: int = None): """Retrieve device and sensor information from backend SQL database. Optionally include all or a subset of tag groups and/or metadata keys depending on the configuration of the `include_tag_groups` and `include_metadata_keys` parameters. Whilst testing, there is the option to limit the number of records returned via the `top_count` parameter. If this parameter is not set, then the function will return all records. Parameters ---------- api_inputs : ApiInputs Object returned by initialize() function. include_tag_groups : Union[list, bool], default = False If False, no tag groups are included. If True, all tag groups will be returned. Else, if list, the Tag Groups in the list are retrieved as columns. include_metadata_keys : Union[list, bool], default = False If False, no metadata keys are included. If True, all metadata keys will be returned. Else, if list, the metadata keys in the list are retrieved as columns. sql_where_clause : str, optional optional `WHERE` clause in SQL syntax. top_count: int, default = None For use during testing to limit the number of records returned. Returns ------- df : pandas.DataFrame """ def get_data(query_text, api_inputs: ApiInputs, query_language: QUERY_LANGUAGE = "sql"): """Retrieve data from Azure Data Explorer (ADX) database, which mirrors certain data from the backend SQL database, for the portfolio associated with the provided api_inputs. Parameters ---------- query_text : str SQL or KQL (Kusto) statement used to retrieve data. api_inputs : ApiInputs Object returned by initialize() function. query_language : QUERY_LANGUAGE, optional The query language the query_text is written in (Default value = 'sql'). 'kql' is also supported Returns ------- df : pandas.DataFrame """

Methods for Updating and/or Creating Assets:

def upsert_sites(df: pandas.DataFrame, api_inputs: ApiInputs, tag_columns: list = None, save_additional_columns_as_slices: bool = False): """Upsert site(s). The `df` input must contain the following columns: - InstallationName - InstallationCode - Address - Suburb - State - StateName - Country - FloorAreaM2 - ZipPostCode The following additional columns are optional: - Latitude - Longitude - Timezone - InstallationId - The UUID of the existing site within the Switch Automation Platform. Parameters ---------- df: pandas.DataFrame : The dataframe containing the sites to be created/updated in the Switch platform. All required columns must be present with no null values. api_inputs : ApiInputs Object returned by initialize() function. tag_columns : list, default=[] The columns containing site-level tags. The column header will be the tag group name. (Default value = True) save_additional_columns_as_slices : bool, default = False Whether any additional columns should be saved as slices. (Default value = False) Returns ------- tuple[str, pandas.DataFrame] (response, response_data_frame) - Returns the response status and the dataframe containing the parsed response text. """ def upsert_device_sensors(df: pandas.DataFrame, api_inputs: ApiInputs, tag_columns: list = None, metadata_columns: list = None, save_additional_columns_as_slices: bool = False): """Upsert device(s) and sensor(s) Required fields are: - InstallationCode - DeviceCode - DeviceName - SensorName - SensorTemplate - SensorUnitOfMeasure - EquipmentClass - EquipmentLabel Parameters ---------- df: pandas.DataFrame The asset register created by the driver including the minimum required set of columns. api_inputs : ApiInputs Object returned by initialize() function. tag_columns : list, default = None Columns of dataframe that contain tags (Default value = None). metadata_columns : list, default = None Column(s) of dataframe that contain device-level metadata (Default value = None). save_additional_columns_as_slices : bool, default = False Whether additional columns should be saved as slices (Default value = False). Returns ------- tuple[list, pandas.DataFrame] (response_status_list, upsert_response_df) - Returns the list of response statuses and the dataframe containing the parsed response text. """ def upsert_tags(api_inputs: ApiInputs, df: pandas.DataFrame, tag_level: TAG_LEVEL): """ Upsert tags to Site/Device/Sensors as specified by the tag_level argument. Required fields are: - Identifier - Additional columns as TagGroups / Tags Parameters ---------- api_inputs : ApiInputs Object returned by initialize() function. df : DataFrame List of Devices along with corresponding TagsJson to upsert tag_level : TAG_LEVEL Level of tagging applied to the list of Identifier input. If tag_level is Site, Identifier should be InstallationIds. If tag_level is Device, Identifier should be DeviceIds. If tag_level is Sensor, Identifier should be ObjectPropertyIds. Returns ------- List of affected records """

Methods for Ingesting Data into Pre-defined Data Tables Natively Supported by the Switch Automation platform:

def upsert_timeseries(df: pandas.DataFrame, api_inputs: ApiInputs, is_local_time: bool = True, save_additional_columns_as_slices: bool = False, data_feed_file_status_id: uuid.UUID = None, is_specific_timezone: Union[bool, str] = False): """Upserts timeseries readings to EventHub for processing into Timeseries table. The following columns are required to be present in the data_frame: - InstallationId - ObjectPropertyId - Timestamp - Value Parameters ---------- df: pandas.DataFrame Dataframe containing the data to be appended to timeseries. api_inputs: ApiInputs Object returned by initialize() function. is_local_time : bool, default = True Whether the datetime values are in local time or UTC. If, False and is_specific_timezone is False, then UTC (Default value = True). Should be set to False when 'is_specific_timezone' has value. save_additional_columns_as_slices : bool, default = False (Default value = False) data_feed_file_status_id : uuid.UUID, default = None Enables developer to identify upserted rows using during development. This data is posted to the DataFeedFileStatusId in the Timeseries_Ds table. Once deployed, the DataFeedFileStatusId field will contain a unique Guid which will assist in tracking upload results and logging. is_specific_timezone : Union[False, str] Accepts a timezone name as the specific timezone used by the source data. Defaults to False. Cannot have value if 'is_local_time' is set to True. Retrieve list of timezones using 'sw.integration.get_timezones()' Returns ------- tuple[str, pandas.DataFrame] (response_status, response_df) - Returns the response status and the dataframe containing the parsed response text. """ def upsert_workorders(df: pandas.DataFrame, api_inputs: ApiInputs, save_additional_columns_as_slices: bool = False): """Upsert data to the Workorder table. The following columns are required to be present in the df: - ``WorkOrderId``: unique identifier for the work order instance - ``InstallationId``: the InstallationId (guid) used to uniquely identify a given site within the Switch platform - ``WorkOrderSiteIdentifier``: the work order provider's raw/native site identifier field - ``Status``: the status mapped to the Switch standard values defined by literal: `WORK_ORDER_STATUS` - ``RawStatus``: the work order provider's raw/native status - ``Priority``: the priority mapped to the Switch standard values defined by literal: `WORK_ORDER_PRIORITY` - ``RawPriority``: the work order provider's raw/native priority - ``WorkOrderCategory``: the category mapped to the Switch standard values defined by literal: `WORK_ORDER_CATEGORY` - ``RawWorkOrderCategory``: the work order provider's raw/native category - ``Type`` - work order type (as defined by provider) - e.g. HVAC - Too Hot, etc. - ``Description``: description of the work order. - ``CreatedDate``: the date the work order was created (Submitted status) - ``LastModifiedDate``: datetime the workorder was last modified - ``WorkStartedDate``: datetime work started on the work order (In Progress status) - ``WorkCompletedDate``: datetime work was completed for the work order (Resolved status) - ``ClosedDate``: datetime the workorder was closed (Closed status) The following columns are optional: - ``SubType``: the sub-type of the work order - ``Vendor``: the name of the vendor - ``VendorId``: the vendor id - ``EquipmentClass``: the Switch defined Equipment Class mapped from the work order provider's definition - ``RawEquipmentClass``: the work order provider's raw/native equipment class - ``EquipmentLabel``: the EquipmentLabel as defined within the Switch platform - ``RawEquipmentId``: the work order provider's raw/native equipment identifier/label - ``TenantId``: the tenant id - ``TenantName``: the name of the tenant - ``NotToExceedCost``: the cost not to be exceeded for the given work order - ``TotalCost``: total cost of the work order - ``BillableCost``: the billable portion of the work order cost - ``NonBillableCost``: the non-billable portion of the work order cost. - ``Location``: the Location as defined within the Switch platform - ``RawLocation``: the work order provider's raw/native location definition - ``ScheduledStartDate``: datetime work was scheduled to start on the given work order - ``ScheduledCompletionDate``" datetime work was scheduled to be completed for the given work order Parameters ---------- df: pandas.DataFrame Dataframe containing the work order data to be upserted. api_inputs: ApiInputs Object returned by initialize() function. save_additional_columns_as_slices : bool, default = False (Default value = False) Returns ------- response_status, response_df """

Pipeline Module

Module defining the Task types. A deployed integration or analytics process must be one of these Task types and all python code that is to be executed must be contained within one of the pre-defined methods of these Tasks. External Users of the Switch API should feel empowered to develop IntegrationTask’s and AnalyticsTask’s, but other Task types have a more complicated interaction with the Switch Automation platform and should be developed in coordination with Switch Automation’s Software Development and Data Science teams.

class Task(ABC): """An Abstract Base Class called Task. Attributes ---------- id : uuid.UUID Unique identifier of the task. This is an abstract property that needs to be overwritten when sub-classing. A new unique identifier can be created using uuid.uuid4() description : str Brief description of the task mapping_entities : List[MAPPING_ENTITIES] The type of entities being mapped. An example is: ``['Installations', 'Devices', 'Sensors']`` author : str The author of the task. version : str The version of the task. """ @property @abstractmethod def id(self) -> uuid.UUID: """Unique identifier of the task. Create a new unique identifier using uuid.uuid4() """ pass @property @abstractmethod def description(self) -> str: """Brief description of the task""" pass @property @abstractmethod def mapping_entities(self) -> List[MAPPING_ENTITIES]: """The type of entities being mapped.""" pass @property @abstractmethod def author(self) -> str: """"The author of the task.""" pass @property @abstractmethod def version(self) -> str: """The version of the task""" pass

IntegrationTask

Base class used to create integrations between the Switch Automation Platform and other platforms, low-level services, or hardware.

Examples include:

  • Pulling readings or other types of data from REST APIs
  • Protocol Translators which ingest data sent to the platform via email, ftp or direct upload within platform.
class IntegrationTask(Task): """Integration Task This class is used to create integrations that post data to the Switch Automation Platform. Only one of the following methods should be implemented per class, based on the type of integration required: - process_file() - process_stream() - process() """ @abstractmethod def process_file(self, api_inputs: ApiInputs, file_path_to_process: str): """Method to be implemented if a file will be processed by the Integration Task. The method should contain all code used to cleanse, reformat & post the data contained in the file. Parameters ---------- api_inputs : ApiInputs object returned by call to initialize() file_path_to_process : str the file path """ pass @abstractmethod def process_stream(self, api_inputs: ApiInputs, stream_to_process): """Method to be implemented if data received via stream The method should contain all code used to cleanse, reformat & post the data received via the stream. Parameters ---------- api_inputs: ApiInputs The object returned by call to initialize() stream_to_process The details of the stream to be processed. """ pass @abstractmethod def process(self, api_inputs: ApiInputs, integration_settings: dict): """Method to be implemented if data The method should contain all code used to cleanse, reformat & post the data pulled via the integration. Parameters ---------- api_inputs: ApiInputs object returned by call to initialize() integration_settings : dict Any settings required to be passed to the integration to run. For example, username & password, api key, auth token, etc. """ pass

AnalyticsTask

Base class used to create specific analytics functionality which may leverage existing data from the platform. Each task may add value to, or supplement, this data and write it back.

Examples include:

  • Anomaly Detection
  • Leaky Pipes
  • Peer Tracking

class AnalyticsTask(Task): @property @abstractmethod def analytics_settings_definition(self) -> List[AnalyticsSettings]: """Define the start() method's analytics_settings dictionary requirements & defaults. The definition of the dictionary keys, display labels in Task Insights UI, default value & allowed values for the start() method's ``analytics_settings`` input parameter. property_name - the analytics_settings dictionary key display_label - the display label for the given property_name in Task Insights UI editor - the editor shown in Task Insights UI default_value - default value for this property_name (if applicable) allowed_values - the set of allowed values (if applicable) for the given property_name. If editor=text_box, this should be None. """ pass def check_analytics_settings_valid(self, analytics_settings: dict): # required_analytics_settings_keys = set(['task_id']) required_analytics_settings_keys= set() for setting in self.analytics_settings_definition: required_analytics_settings_keys.add(setting.property_name) if not required_analytics_settings_keys.issubset(set(analytics_settings.keys())): logger.error(f'The analytics_setting passed to the task do not contain the required keys: ' f'{required_analytics_settings_keys} ') return False else: return True @abstractmethod def start(self, api_inputs: ApiInputs, analytics_settings: dict): """Start. The method should contain all code used by the task. Notes ----- The method should first check the analytics_settings passed to the task are valid. Pseudo code below: >>> if self.check_analytics_settings_valid(analytics_settings=analytics_settings == True: >>> # Your actual task code here - i.e. proceed with the task if valid analytics_settings passed. >>> else: >>> sw.pipeline.logger.error('Invalid analytics_settings passed to driver. ') Parameters ---------- api_inputs : ApiInputs the object returned by call to initialize() analytics_settings : dict any setting required by the AnalyticsTask """ pass

DiscoverableIntegrationTask

Base class used to create integrations between the Switch Automation Platform and 3rd party APIs.Similar to the IntegrationTask, but includes a secondary method `run_discovery()` which triggers discovery of available points on the 3rd party API and upserts these records to the Switch Platform backend so that the records are available in Build / Discovery & Selection UI. These Tasks require coordination with Switch Automation’s Software Development team to ensure that the discovery functionality is supported by the Switch Platform backend.

Examples include:

  • Pulling readings or other types of data from REST APIs
class DiscoverableIntegrationTask(Task): """Discoverable Integration Task This class is used to create integrations that post data to the Switch Automation Platform from 3rd party APIs that have discovery functionality. The `process()` method should contain the code required to post data for the integration. The `run_discovery()` method upserts records into the Integrations table so that end users can configure the points and import as devices/sensors within the Build - Discovery and Selection UI in the Switch Automation Platform. Additional properties are required to be created to support both the discovery functionality & the subsequent device/sensor creation from the discovery records. """ @property @abstractmethod def integration_device_type_definition(self) -> IntegrationDeviceDefinition: """The IntegrationDeviceDefinition used to create the DriverDevices records required for the Integration DeviceType to be available to drag & drop in the Build - Integration Schematic UI. Contains the properties that define the minimum set of required fields to be passed to the integration_settings dictionaries for the `process()` and `run_discovery()` methods""" pass def check_integration_settings_valid(self, integration_settings: dict): required_integration_keys = set() if self.integration_device_type_definition.expose_address == True: required_integration_keys.add(self.integration_device_type_definition.address_label) for setting in self.integration_device_type_definition.config_properties: if setting.required_for_task == True: required_integration_keys.add(setting.property_name) if not required_integration_keys.issubset(set(integration_settings.keys())): logger.error(f'The integration_settings passed to the task do not contain the required keys: ' f'{required_integration_keys} ') return False else: return True @abstractmethod def run_discovery(self, api_inputs: ApiInputs, integration_settings: dict, integration_input: DiscoveryIntegrationInput): """Method to implement discovery of available points from 3rd party API. The method should contain all code used to retrieve available points, reformat & post information to populate the Build - Discovery & Selection UI in the platform and allows users to configure discovered points prior to import. Parameters ---------- api_inputs: ApiInputs object returned by call to initialize() integration_settings : dict Any settings required to be passed to the integration to run. For example, username & password, api key, auth token, etc. integration_input : DiscoveryIntegrationInput The information required to be sent to the container when the `run_discovery` method is triggered by a user from the UI. This information is the ApiProjectID, InstallationID, NetworkDeviceID and IntegrationDeviceID. """ pass @abstractmethod def process(self, api_inputs: ApiInputs, integration_settings: dict): """Method to be implemented if data The method should contain all code used to cleanse, reformat & post the data pulled via the integration. Parameters ---------- api_inputs: ApiInputs object returned by call to initialize() integration_settings : dict Any settings required to be passed to the integration to run. For example, username & password, api key, auth token, etc. """ pass

QueueTask

Base class used to create data pipelines that are fed via a queue.

class QueueTask(Task): """Queue Task This class is used to create integrations that post data to the Switch Automation Platform using a Queue as the data source. Only one of the following methods should be implemented per class, based on the type of integration required: - process_queue() """ @property @abstractmethod def queue_name(self) -> str: """The name of the queue to receive Data .. Name will actually be constructed as {ApiProjectId}_{queue_name} """ pass @property def queue_type(self) -> str: """Type of the queue to receive data from""" return 'DataIngestion' @property @abstractmethod def maximum_message_count_per_call(self) -> int: """ The maximum amount of messages which should be passed to the process_queue at any one time set to zero to consume all """ pass @abstractmethod def start(self, api_inputs: ApiInputs): """Method to be implemented if a file will be processed by the QueueTask Task. This will run once at the start of the processing and should contain """ pass @abstractmethod def process_queue(self, api_inputs: ApiInputs, messages: List): """Method to be implemented if a file will be processed by the QueueTask Task. The method should contain all code used to consume the messages Parameter _________ api_inputs : ApiInputs object returned by call to initialize() messages:List) list of serialized json strings which have been consumed from the queue """ pass

EventWorkOrderTask

Base class is used to create work orders in 3rd party systems via tasks that are created in the Events UI of the Switch Automation Platform.

class EventWorkOrderTask(Task): """Event Work Order Task This class is used to create work orders in 3rd party systems via tasks that are created in the Events UI of the Switch Automation Platform. """ @property @abstractmethod def work_order_fields_definition(self) -> List[EventWorkOrderFieldDefinition]: """Define the fields available in Events UI when creating a work order in 3rd Party System. The definition of the dictionary keys, display labels in Events UI, default value & allowed values for the generate_work_order() method's ``work_order_input`` parameter. property_name - the ``work_order_input`` dictionary key display_label - the display label for the given property_name in Events UI Work Order creation screen editor - the editor shown in Events UI Work Order creation screen default_value - default value for this property_name (if applicable) allowed_values - the set of allowed values (if applicable) for the given property_name. If editor=text_box, this should be None. """ pass @property @abstractmethod def integration_settings_definition(self) -> List[IntegrationSettings]: """Define the generate_work_order() method's integration_settings dictionary requirements & defaults. The definition of the dictionary keys, display labels in Task Insights UI, default value & allowed values for the generate_work_order() method's ``integration_settings`` input parameter. property_name - the ``integration_settings`` dictionary key display_label - the display label for the given property_name in Task Insights UI editor - the editor shown in Task Insights UI default_value - default value for this property_name (if applicable) allowed_values - the set of allowed values (if applicable) for the given property_name. If editor=text_box, this should be None. """ pass def check_work_order_input_valid(self, work_order_input: dict): required_work_order_input_keys = set(['EventTaskId', 'Description', 'IntegrationId', 'DueDate', 'EventLink', 'EventSummary', 'InstallationId']) for setting in self.work_order_fields_definition: required_work_order_input_keys.add(setting.property_name) if not required_work_order_input_keys.issubset(set(work_order_input.keys())): logger.error(f'The work_order_input passed to the task do not contain the required keys: ' f'{required_work_order_input_keys} ') return False else: return True def check_integration_settings_valid(self, integration_settings: dict): required_integration_keys = set() for setting in self.integration_settings_definition: required_integration_keys.add(setting.property_name) if not required_integration_keys.issubset(set(integration_settings.keys())): logger.error(f'The integration_settings passed to the task do not contain the required keys: ' f'{required_integration_keys} ') return False else: return True @abstractmethod def generate_work_order(self, api_inputs: ApiInputs, integration_settings: dict, work_order_input: dict): """Generate work order in 3rd party system via Events UI Method to generate work order in 3rd party system based on a work order task created in Events UI in the Switch Automation platform. Notes ----- In addition to the defined `work_order_fields_definition` fields, the `work_order_input` dictionary passed to this method will contain the following keys: - `EventTaskId` - unique identifier (uuid.UUID) for the given record in the Switch Automation Platform - `Description` - free text field describing the work order to be generated. - `IntegrationId` - if linked to an existing work order in the 3rd party API, this will contain that system's identifier for the workorder. If generating a net new workorder, this field will be null (None). - `DueDate` - The due date for the work order as set in the Switch Automation Platform. - `EventLink` - The URL link to the given Event in the Switch Automation Platform UI. - `EventSummary` - The Summary text associated with the given Event in the Switch Automation Platform UI. - `InstallationId` - The unique identifier of the site in the Switch Automation Platform that the work order is associated with. Parameters ---------- api_inputs: ApiInputs object returned by call to initialize() integration_settings : dict Any settings required to be passed to the integration to run. For example, username & password, api key, auth token, etc. work_order_input : dict The work order defined by the task created in the Events UI of the Switch Automation Platform. To be sent to 3rd party system for creation. """ pass

Automation Module

A module containing methods used to register, deploy, cancel, and test Tasks. Includes helper functions for retrieving details of existing Tasks and their deployments, including process history and logs, on the Switch Automation Platform.

Before deploying a Task, you must first register the Task with the Switch Automation platform. The registration process stores the Task object and it’s code for later deployment using whichever deployment method is desired.

Register Tasks, Deploy Tasks, and cancel Task Deployments:

def register_task(api_inputs: ApiInputs, task: Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask]): """Register the task. Registers the task that was defined. Parameters ---------- api_inputs : ApiInputs Object returned by initialize() function. task : Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask] An instance of the custom class created from the Abstract Base Class `Task` or its abstract sub-classes: `IntegrationTask`,`DiscoverableIntegrationTask`, `AnalyticsTask`, `QueueTask`, `EventWorkOrderTask`, or `LogicModuleTask`. Returns ------- pandas.DataFrame """ def deploy_as_email_data_feed(task: Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask], api_inputs: ApiInputs, data_feed_id: uuid.UUID, expected_delivery: EXPECTED_DELIVERY, email_subject_regex: str, email_address_domain: str, queue_name: QUEUE_NAME = 'task', data_feed_name: str = None): """Deploy task as an email data feed. Deploys the created `task` as an email data feed. This allows the driver to ingest data sent via email. The data must be sent to data@switchautomation.com to be processed. If it is sent to another email address, the task will not be run. Parameters ---------- task : Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask] The custom driver class created from the Abstract Base Class `Task` api_inputs : ApiInputs Object returned by initialize() function. data_feed_id : uuid.UUID The DataFeedId to update if existing, else will create a new record with the given value. expected_delivery : EXPECTED_DELIVERY The expected delivery frequency. email_subject_regex : str Regex expression used to parse the email subject line to determine which driver the received file will be processed by. email_address_domain : str The email domain, without the @ symbol, of the sender. For example, if the email address that will send file(s) for this data feed to the Switch Automation Platform is sender@test.com, the string that should be passed to this parameter is "test.com". queue_name : QUEUE_NAME, Optional The name of queue (Default value = 'task'). data_feed_name : str, Optional The name of the data feed (to be displayed in Task Insights UI). If not provided, the API will automatically default to using the task.name property of the `task` passed to function. Returns ------- df : pandas.DataFrame Dataframe containing the details of the deployed email data feed. """ def deploy_as_ftp_data_feed(task: Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask], api_inputs: ApiInputs, data_feed_id: uuid.UUID, expected_delivery: EXPECTED_DELIVERY, ftp_user_name: str, ftp_password: str, queue_name: QUEUE_NAME = 'task', data_feed_name: str = None): """Deploy the custom driver as an FTP data feed Deploys the custom driver to receive data via an FTP data feed. Sets the `ftp_user_name` & `ftp_password` and the `expected_delivery` of the file. Parameters ---------- task : Task The custom driver class created from the Abstract Base Class 'Task' api_inputs : ApiInputs Object returned by the initialize() function. data_feed_id : uuid.UUID The DataFeedId to update if existing, else will create a new record with the given value. expected_delivery : EXPECTED_DELIVERY The expected delivery frequency of the data. ftp_user_name : str The user_name to be used by the ftp service to authenticate delivery of the data feed. ftp_password : str The password to be used by the ftp service for the given `ftp_user_name` to authenticate delivery of the data feed. queue_name : QUEUE_NAME, default = 'task' The queue name (Default value = 'task'). data_feed_name : str, Optional The name of the data feed (to be displayed in Task Insights UI). If not provided, will default to the task name. Returns ------- df : pandas.DataFrame Dataframe containing the details of the deployed ftp data feed. """ def deploy_as_upload_data_feed(task: Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask], api_inputs: ApiInputs, data_feed_id: uuid.UUID, expected_delivery: EXPECTED_DELIVERY, queue_name: QUEUE_NAME = 'task', data_feed_name: str = None): """Deploy the custom driver as a REST API end point Datafeed. To upload a file to the deployed data feed, use the UploadUrl from the response dataframe (with request type POST) with the following two headers: - 'Ocp-Apim-Subscription-Key' - set to the value of ``api_inputs.subscription_key`` - 'Authorization' - set to the value 'Bearer ``api_inputs.bearer_token``' For example, to upload a file using the ``requests`` package: >>> import requests >>> url = df.loc[0,'UploadUrl'] >>> payload={} >>> file_path = 'C:/xxyyzz.txt' >>> files={'file': open(file_path, 'rb')} >>> headers = {'Ocp-Apim-Subscription-Key': api_inputs.subscription_key, 'Authorization': f'Bearer {api_inputs.bearer_token}'} >>> response = requests.request("POST", url, headers=headers, data=payload, files=files) >>> print(response.text) Parameters ---------- task : Task The custom driver class created from the Abstract Base Class 'Task' api_inputs : ApiInputs Object returned by the initialize() function. data_feed_id : uuid.UUID The DataFeedId to update if existing, else will create a new record with the given value. expected_delivery : EXPECTED_DELIVERY The expected delivery frequency of the data. queue_name : QUEUE_NAME, optional The queue name (Default value = 'task'). data_feed_name : str, Optional The name of the data feed (to be displayed in Task Insights UI). If not provided, will default to the task name. Returns ------- df : pandas.DataFrame Dataframe containing the details of the deployed https endpoint data feed. """ def deploy_on_timer(task: Union[Task, IntegrationTask, DiscoverableIntegrationTask, QueueTask, AnalyticsTask, LogicModuleTask, EventWorkOrderTask], api_inputs: ApiInputs, data_feed_id: uuid.UUID, expected_delivery: EXPECTED_DELIVERY, cron_schedule: str, queue_name: QUEUE_NAME = "task", settings: dict = None, schedule_timezone: SCHEDULE_TIMEZONE = 'Local', timezone_offset_minutes: int = None, data_feed_name: str = None): """Deploy driver to run on timer.

continue

Parameters ---------- task : Task The custom driver class created from the Abstract Base Class `Task`. api_inputs : ApiInputs Object returned by initialize.initialize() function data_feed_id : uuid.UUID The DataFeedId to update if existing, else will create a new record with the given value. expected_delivery : EXPECTED_DELIVERY The expected delivery frequency. cron_schedule : str The CRONOS cron object containing the required schedule for the driver to be run. For details on the required format, see: https://crontab.cronhub.io/ queue_name : QUEUE_NAME, optional The queue name (Default value = 'task'). settings : dict, Optional List of settings used to deploy the driver. For example, may contain the user_name and password required to authenticate calls to a third-party API (Default value = None). schedule_timezone : SCHEDULE_TIMEZONE, optional Whether the ``cron_schedule`` should be applied based on Local or Utc timezone. If set to `Local`, this is taken as the timezone of the western-most site in the given portfolio (Default value = 'Local'). timezone_offset_minutes: int, Optional Timezone offset in minutes (from UTC) to be used when applying the ``cron_schedule`` (Default value = None). data_feed_name : str, Optional The name of the data feed (to be displayed in Task Insights UI). If not provided, will default to the task name. Returns ------- pandas.Dataframe A dataframe containing the details of the deployed data feed. """ def cancel_deployed_data_feed(api_inputs: ApiInputs, data_feed_id: uuid.UUID, deployment_type: List[DEPLOY_TYPE]): """Cancel deployment for a given `data_feed_id` and `deployment_type` Parameters ---------- api_inputs : ApiInputs Object returned by initialize.initialize() function data_feed_id: uuid.UUID Datafeed Id to cancel deployment deployment_type: List[DEPLOY_TYPE] Returns ------- str A string containing the response text. """ Retrieve Details on Registered and Deployed Tasks and the Results of Executed Task Instances: def list_tasks(api_inputs: ApiInputs, search_name_pattern: str = '*'): """Get a list of the registered tasks. Parameters ---------- api_inputs : ApiInputs Object returned by initialize() function. search_name_pattern : str, optional A pattern that should be used as a filter when retrieving the list of deployed drivers (Default value = '*'). Returns ------- pandas.DataFrame Dataframe containing the registered tasks. """ def list_deployments(api_inputs: ApiInputs, search_name_pattern='*'): """Retrieve list of deployed drivers. Parameters ---------- api_inputs : ApiInputs Object returned by initialize.initialize() function search_name_pattern : str A pattern that should be used as a filter when retrieving the list of deployed drivers (Default value = '*'). Returns ------- df : pandas.DataFrame Dataframe containing the drivers deployed for the given ApiProjectID that match the `search_name_pattern`. """ def list_data_feed_history(api_inputs: ApiInputs, data_feed_id: uuid.UUID, top_count: int = 10): """Retrieve data feed history Retrieves the `top_count` records for the given `data_feed_id`. Parameters ---------- api_inputs : ApiInputs Object returned by initialize.initialize() function data_feed_id : uuid.UUID The unique identifier for the data feed that history should be retrieved for. top_count : int, default = 10 The top record count to be retrieved. (Default value = 10). Returns ------- df : pandas.DataFrame Dataframe containing the `top_count` history records for the given `data_feed_id`. """ def data_feed_history_process_output(api_inputs: ApiInputs, data_feed_id: uuid.UUID = None, data_feed_file_status_id: uuid.UUID = None, row_number: int = None): """Retrieve data feed history process output Retrieves the `top_count` records for the given `data_feed_id`.

Deployment Environment

When deploying a Task within Task Insights, the following Python environment will be used to execute the Task script. If you need to utilize a Python Package that is not listed below, please contact Switch Automation to see if it can be installed on the containers running the Task scripts.

Python Version:

Python 3.9.5

Python Packages:

  • azure-common 1.1.28
  • azure-core 1.23.1
  • azure-servicebus 7.6.0
  • azure-storage-blob 12.11.0
  • msal 1.17.0
  • msrest 0.6.21
  • numpy 1.22.3
  • pandas 1.4.2
  • pandas-stubs 1.2.0.38
  • pandera 0.7.1
  • paramiko 2.10.3
  • pyodbc 4.0.32
  • pysftp 0.2.9
  • requests 2.27.1
  • requests-oauthlib 1.3.1
  • openpyxl 3.0.9
  • tabulate 0.8.10
  • xmltodict 0.12.0

This is some text inside of a div block.