Workflow Execution API

Execute your workflows via API from your system and then download combined results.


List

GET https://stevesie.com/cloud/api/v1/workflows

Get

GET https://stevesie.com/cloud/api/v1/workflows/WORKFLOW_ID

Destroy

DELETE https://stevesie.com/cloud/api/v1/workflows/WORKFLOW_ID

Execute

POST https://stevesie.com/cloud/api/v1/workflows/WORKFLOW_ID/executions
Request Body
  • proxy
    • type: Must be "shared", "dedicated" or "custom"
    • location: E.g. "nyc"
    • proxy_id: ID of your dedicated proxy
    • url: For custom proxies only
  • execution_parameters (Optional): JSON object of parameters to inject. These will override any collection mappings and also be injected into filters.
  • maximum_requests_count (Optional): Global maximum number of requests to make
  • output_aggregation_format (Optional): "csv" or "json" (default CSV)
  • do_proxy_hot_swap (Optional): Boolean (default false)

Examples

{ "proxy": { "type": "shared", "location": "nyc" }, "execution_parameters": { "min_timestamp": 1568505600, "max_timestamp": 1568592000 }, "do_proxy_hot_swap": false }

Executions

GET https://stevesie.com/cloud/api/v1/workflows/WORKFLOW_ID/executions

Execution Status

GET https://stevesie.com/cloud/api/v1/workflows/executions/EXECUTION_ID

Example Code

We'll work in Python to show you how you can manage running a workflow execution, as well as managing the collection items in that workflow.

Get your workflows:


import requests

API_TOKEN = 'XXX'
URL_BASE = 'https://stevesie.com/cloud/api/v1'

auth_headers = {
  'Token': API_TOKEN,
}

workflows = requests.get(
  '{}/workflows'.format(URL_BASE),
  headers=auth_headers,
).json()

print(workflows)

You'll get a response looking like this:


{'objects': [{'id': 'WORKFLOW_ID', 'name': 'My Awesome Workflow'}]}

Get workflow details:


WORKFLOW_ID = 'WORKFLOW_ID'

workflow = requests.get(
  '{}/workflows/{}'.format(URL_BASE, WORKFLOW_ID),
  headers=auth_headers,
).json()

print(workflow)

You'll get a response looking like this:


{'object': {'id': 'WORKFLOW_ID', 'name': 'My Awesome Workflow', 'mappings': [{'id': 'MAPPING_ID', 'target_variable_name': 'user_id', 'source_collection_id': 'COLLECTION_ID', 'source_collection_json_key': None}]}}

We'll see that this workflow has an input collection mapping for the user_id target variable. We can manage the source collection via API as well:

Manage input collections:

Let's see what's inside our collection first:


COLLECTION_ID = 'COLLECTION_ID'

collection = requests.get(
  '{}/collections/{}'.format(URL_BASE, COLLECTION_ID),
  headers=auth_headers,
).json()

print(collection)

You'll get a response looking like this:


{'object': {'id': 'COLLECTION_ID', 'name': 'User IDs', 'count': 4, 'items': ['43823188', '50834309', '54546072', '192946169']}}

Let's say we want to remove users 43823188 and 50834309 from our list:


r = requests.post(
  '{}/collections/{}'.format(URL_BASE, COLLECTION_ID),
  json={
    'method': 'remove',
    'items': ['43823188', '50834309']
  },
  headers=auth_headers,
).json()

print(r)

You'll see a response confirming that the number of objects has lowered by 2. But what if we changed our mind and want to add these back to the collection?

Just change the method to "add" and you can add them back:


r = requests.post(
  '{}/collections/{}'.format(URL_BASE, COLLECTION_ID),
  json={
    'method': 'add',
    'items': ['43823188', '50834309']
  },
  headers=auth_headers,
).json()

print(r)

Trigger an excution:

Once we're happy with our input collections, we can trigger an execution and also provide additional paremters.


from datetime import datetime

MIN_DATE = '2019-01-01'
MAX_DATE = '2019-02-01'

min_date_timestamp = int(datetime.strptime(MIN_DATE, '%Y-%m-%d').timestamp())
max_date_timestamp = int(datetime.strptime(MAX_DATE, '%Y-%m-%d').timestamp())

execution = requests.post(
  '{}/workflows/{}/executions'.format(URL_BASE, WORKFLOW_ID),
  json={
    'proxy': {
      'type': 'dedicated',
      'location': 'nyc',
    },
    'execution_parameters': {
      'min_timestamp': min_date_timestamp,
      'max_timestamp': max_date_timestamp,
    },
    'do_proxy_hot_swap': False,
  },
  headers=auth_headers,
).json()

print(execution)

You'll get a response looking like this, with the exeuction ID:


{'object': {'id': 'EXECUTION_ID'}}

Check execution status


EXECUTION_ID = 'EXECUTION_ID'

status = requests.get(
  '{}/workflows/executions/{}'.format(URL_BASE, EXECUTION_ID),
  headers=auth_headers,
).json()

print(status)

You'll get a response looking like this, and can see this workflow is still in progress because completed_at is None:


{'object': {'id': 'EXECUTION_ID', 'workflow_id': 'WORKFLOW_ID', 'execution_parameters': '{"min_timestamp": 1546318800, "max_timestamp": 1548997200}', 'created_at': '2019-09-20T14:52:00.400Z', 'completed_at': None, 'results': []}}

Download results

Once your execution is complete, you'll see the response include values for results, which you can download from the URL:


{'object': {'id': 'EXECUTION_ID', 'workflow_id': 'WORKFLOW_ID', 'execution_parameters': '{"min_timestamp": 1546318800, "max_timestamp": 1548997200}', 'created_at': '2019-09-20T14:52:00.400Z', 'completed_at': '2019-09-20T15:02:07.291Z', 'results': [{'id': 'RESULT_ID', 'url': 'https://DOWNLOAD_URL.csv'}]}}