flowthings.io Getting Started


Core objects


Projects are made up of three powerful building blocks. These building blocks enable a variety of architectures that support processing and pushing real time data in many industry verticals.

image

Drops

A Drop represents a single entity of data within flowthings.io that contains a heterogenous map of data elements. It's an atomic piece of information that can exist in one or more Flows. It could be a reading from a sensor, an inventory update, a location update; the possibilities are endless. Any complex assortment of related data elements should be able to map to a Drop.

Flows

If a Drop is the core data object in the service, a Flow is the core stream object. Flows are real time streams of Drops. Flows have a capacity of zero (no Data is persisted, only processed and pushed) to a maximum of 100,000 Drops and can be queried using an explicit query using Flow Filter Language. Flows can be interconnected via Tracks so that any drop added to one Flow can be "tracked" to one or more Flows in real-time. While being tracked, a Drop can be processed in real time using javascript augmented with additional functionality provided by Flow.js.

Flows can also specify filters. These filters can be used to only allow Drops containing certain values to be written to the Flow, be they written from Tracks or via the REST API.

New Drops added to Flows can be consumed via REST queries, Webhooks, and WebSockets.

Flows are organized in a fashion similar to a filesystem, such that all the Flows created by a user named "bob" will have a path of /bob/<flow_name>. Flow paths can have an arbitrary depth, and can be permissoined based on path matches. Read more about sharing realtime data at the Share Object Overview.

Tracks

A Track defines a realtime directional connection between a source Flow and one or more destination Flows. A Track's purpose is to monitor, filter, and/or transform the Drops from its source Flow to its destination Flow(s). When a Drop enters a Flow, all Tracks that denote that Flow as a source are executed.

A Track can specify simple yet powerful filters in order to accept or reject Drops from the source Flow based on specified criteria. When a new Drop is written to the source, a filter written in Flow Filter Language will test the Drop before doing any processing. If the Drop passes the filter, then an optional javascript transformation function is executed with the incoming Drop as the first parameter. If the function returns a valid set of Drop elements for one or more destinations, the Drop will be written to the destination Flow(s).

A Track can have a transformation function executed in a javascript environment which is augmented by much of the Flow REST API; exposed in the form of the Flow.js utility library. Simple transformation functions take the input Drop as a json object and return the transformed Drop.

Using flow.js, you can also pull in Drops from other Flows to assist in the transform process and decision making.

Connecting to flowthings.io


Both of the above reference implementations illustrate flowthings.io’s ability to rapidly develop solutions that turn raw data into actionable information. Key to those solutions is the ability to gather and distibute information precicely and efficiently to the people, apps, and services that need it. flowthings.io currently provides highly efficient access to realtime information via the following methods:

REST API for low bandwith / low power clients and rapid loading of historic data

  • Allows one request to query many Flows, while only delivering the needed data members
  • Ideal for low power and low bandwidth hardware devices as well as mobile users who need periodic and resource efficient access
  • Can also rapidly reduce initial load time for rich BI dashboards or flowthings.io-Powered plugins to existing apps

Websockets API

  • Real time push to network connected devices
  • Powers real time visualizations
  • Powers real time alerting mechanisms
  • Best for contantly connected devices and applications

Webhooks API for pushing data to other REST services

  • Allows data to be pushed to any REST accesable system
    • Public/Private cloud systems
    • Proprietary systems
    • Devices
  • Supports custom header authentication and HTTPS
  • Implemented in the visual track editor or Flow.js

MQTT Publish to Flows

  • Enables conntecting flowthings.io to existing MQTT brokers
  • Current MQTT solutions will be empowered with new CEP and Intelligent Aggregation functionalies

flowthings.io API Importer

  • Allows developers to quickly and easily connect to any REST-Accessable API and pull that data into the flowthings.io environment
  • API Importer tasks are written in javascript and can leverage full Flow.js functionality
  • Developers already familiar with processing data in flowthings, will have almost no ramp up time

Authentication and Security


Each account on flowthings.io already has a Master Token. This Token can be used to access any object that the account is permitted to see.

Requests to flowthings should contain either the Master Token of the account making the requests, or a regular token which has been created by an account.

A Token is means of granting limited access to your Flows to the outside world. For example, you may wish to grant a unique Token to each of your devices.

Tokens are designed to be sandboxed for security purposes:

  • A Token can be restricted to particular Flow paths. E.g. /<account_id>/homesecurity/thermostat
  • Tokens can read Read any objects from the paths specified, but may only write Drops
  • Tokens may be revoked at any time
  • Tokens may have a duration, after which they expire and cannot be used

As may Tokens can be generated as needed, each with their own unique Token String.

To use a Token, supply the Token String in the same way as you would supply the Master Token - within the X-Auth-Token header. The API will autmatically sandbox any requests made with this Token.

Here is an example Token that can be used to read and write data to the /alice/homesecurity/thermostat Flow, and write but not read data to the /alice/homesecurity/reports Flow.

 {
      "id": "k548b20f2d4c63c0634ad3cc8",
      "tokenString": "SSOjDZ4VMHS2JcwT1sIpE8x91QfG",
      "paths": {
            "/alice/homesecurity/thermostat" : {
                "dropRead" : true,
                "dropWrite": true
            },
            "/alice/homesecurity/reports" : {
                "dropRead" : false,
                "dropWrite": true
            }
       }
  }

flowthings.io “hello realtime processing” with curl


In this tutorial, we will show how easy it is to create a simple set of tracks and flows that do some realtime data filtering using the REST API. The following diagram outlines what we wish to achieve:

We will refer to the following object types in this tutorial:

  • Drop - A single piece of data
  • Flow - An endpoint for creating and receiving Drops
  • Track - A connection between two Flows, with optional filtering and processing

The important components in this example are:

  • A Flow named Messages, which will receive new messages.
  • A Flow named Important which will receive messages which we deem of high importance
  • A Flow named Public which will receieve all messages marked for public consumption, but with sensitive data removed
  • Two Tracks, which connect these Flows

Authentication


To create our application we must send the Master Token in each request header. This is given to you when you log in to the developer site at dev.flowthings.io.

Token

Creating Flows


Let's begin by creating our Messages Flow:

curl https://api.flowthings.io/v0.1/alice/flow \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp" \
    -H "Content-Type: application/json" \
    -d '{
          "path":"/alice/messages"
        }'

>> {
      "head": {
        "status":201,
        "ok":true,
        "messages":["Your request has been processed successfully. A new resource has been created."],
        "errors":[]
      },
      "body": {
        "id":"f540e0f7230043db101bb8946",
        "creationDate":1410205150197,
        "path":"/alice/messages"}
      }

The server should respond that the request was processed successfully (see above).

Note: In the response, we are given the Flow's "ID". We will need this later when reading Drops from the flow. In this case, we are given f540e0f7230043db101bb8946

We also need to create two Flows which will be our destinations:

curl https://api.flowthings.io/v0.1/alice/flow \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp" \
    -H "Content-Type: application/json" \
    -d '{"path":"/alice/important"}'

curl https://api.flowthings.io/v0.1/alice/flow \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp" \
    -H "Content-Type: application/json" \
    -d '{
          "path":"/alice/public"
        }'

Once you've taken note of the resulting Flow IDs, we can move on to creating the Tracks.

Creating Tracks


We need to link the source and destination Flows using Tracks. A simple track may simply forward all Drops from one flow to another. A more complex Track can filter and process the data along the way.

Our first Track will be the most simple - we want to capture all messages from the Inbox which are of high importance:

curl https://api.flowthings.io/v0.1/alice/track \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp" \
    -H "Content-Type: application/json" \
    -d '{
               "source":"/alice/messages",
               "destination":"/alice/important",
               "filter":"elems.priority == \"URGENT\""
          }'

The to and from attributes are the source and destination Flows respectively. The filter property allows us to filter Drops arriving from the messages Flow. There is a rich and powerful Filter Language available for us to use.

Note: We have used the prefix elems within our filter query. elems is the namespace for any user-defined data within a Drop. This will be more clear later when we proceed to define Drops

Our second Track connects the Public Flow, and includes custom Javascript to scrub the data before public consumption. The FlowJs documentation gives more info on the advanced capabilities of these Tracks, but for this tutorial we will keep things simple and just remove the author's information:

curl https://api.flowthings.io/v0.1/alice/track \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp" \
    -H "Content-Type: application/json" \
    -d '{
                "source":"/alice/messages",
                "destination":"/alice/public",
                "filter":"elems.tags CONTAINS [\"PUBLIC\"]",
                "js":"function f(input) {
                    delete input.elems[\"author\"];
                    return input;
            }"
          }'

Creating Drops


Our very basic infrastructure is complete. We now need to post some data and see how it is routed between the various flows. A single data unit is a Drop. Let's post some drops to our source flow:

curl https://api.flowthings.io/v0.1/alice/drop/ \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp"  \
    -H "Content-Type: application/json" \
    -d '{
                "path" : "/alice/messages",
                "elems" : {
                    "title" : "Important Order",
                    "author" : {
                        "name" : "John Smith",
                        "telephone" : "555-0199"
                    },
                    "priority" : "URGENT",
                    "tags" : ["ORDERS"]
                }
          }'

We expect the preceding Drop to be routed to the Important Flow.

curl https://api.flowthings.io/v0.1/alice/drop/ \
    -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp"  \
    -H "Content-Type: application/json" \
    -d '{
            "path" : "/alice/messages",
            "elems" : {
                "title" : "Newsletter",
                "author" : {
                    "name" : "Abigale Jones",
                    "telephone" : "555-0168"
                },
                "priority" : "NORMAL",
                "tags" : ["MARKETING", "PUBLIC"]
            }
        }'

The preceding Drop will be routed to the Public Flow.

Reading Drops


A typical realtime system would retrieve Drops from each of the destination flows using our Websockets API, allowing instant delivery of Drops as they are processed. However, for the purposes of this tutorial we will illustrate how to retrieve drops using the REST API. Here, we use the "Drop Find" endpoint:

We can retreive a list of Drops from the Important Flow using the following request:

curl https://api.flowthings.io/v0.1/alice/drop/f540e108b30043db101bb894a \
    -G -H "X-Auth-Token: IlmK7uLSzQvXruJbyGMCfrcU05p52NXp"