howto

Writing partitioned custom datasets

November 18, 2015

Partitioning is one of the most powerful features of DSS.

Partitioning refers to the splitting of a dataset along meaningful dimensions. Each partition contains a subset of the dataset. With partitioning, you can process different portions of a dataset independently and have some incrementality for operations based on a dataset.

Before reading this article, we recommend that:

  • You have a good understanding of the two partitioning models of DSS
  • You have a good understanding of how to write custom datasets for DSS. Reading our Howto is recommended.

The custom partitioning models

In a regular DSS dataset, partitioning is decided by the user: the used selects which columns, or what hierarchy of files, defines the partitioning, and the associated partitioning dimensions.

In a custom dataset, it sometimes make sense to have the code of the dataset itself define the partitioning.

  • In a custom dataset that reads from a "generic" data source, like a NoSQL database which DSS does not natively handle, the user would define the partitioning.

  • In a custom dataset that reads from a specific source, like an API service stock quotes, it is more natural that the code defines a "static" partitioning, which the user cannot edit. For example, in our case, the connector could defined a single-dimension hourly partitioning, and retrieve one record per partition and per quote.

In both cases, the code of the dataset is aware of which partition it must return, and can act accordingly:

  • In the case of the NoSQL database, by adding filters to the query
  • In the case of the API, by sending the proper request for the given hour.

Listable versus unlistable

For code-defined partitioned custom datasets, there is an additional split: sometimes, the dataset is able to list its own partitions, sometimes it is not. For example, on our API case, it might not make sense to "list" partitions, since there is no real notion of "when did the API start". Instead, we'll want to let the user request partitions as needed

As the dataset creator, you must indicate whether you can list partitions or not. If you can list them, you must provide a method on your dataset to actually perform the listing.

If you can't list partitions:

  • Either the user can provide a static list of all partitions that he knows should be valid.
  • Or any API call that would require listing partitions will fail, and the user will have to provide explicit partitions.

An "unlistable code-defined" partitioned dataset

We are going to take the example of the Hipchat connector that is available in the dataiku-contrib repository.

Hipchat is a communication, chat-like, tool for companies. People discuss in rooms. Hipchat makes the whole history of conversations in a room available through its API. Let's see how we build a connector that will retrieve this history in a dataset.

We obviously want some incrementality on this connector, so we're going to partition it by day. The Hipchat API allows us to specify a starting and ending date on each request, so we'll send the dates of the partition we're working on each time.

Similarly to the stock quote example, it does not make sense to list partitions on this connector: there is no API for that in Hipchat, and we'd rather want the user to ask for partitions explicitely. Our connector will therefore be a CODE_DEFINED_UNLISTABLE one.

To start, create a new custom dataset, as seen in the basic Howto.

Defining the connector and partitioning

Then edit the connector.json file.

First of all, we need a few parameters: the API endpoint, the name of the room, and of course the API key.

    "params": [
        {
            "name": "api_endpoint",
            "label" : "Hipchat API endpoint",
            "type": "STRING",
            "description":"Usually https://COMPANY.hipchat.com"
        },
        {
            "name": "room_name",
            "label" :"Room name or id",
            "type": "STRING"
        },
        {
            "name": "auth_token",
            "label" :"Auth. tokens",
            "type": "STRING",
            "description": "See Hipchat documentation"
        }
    ]

Then, since we are a CODE_DEFINED dataset, the code needs to define the partitioning: a single time-based dimension, with a day-level granularity.

In the JSON:

    "partitioningMode" : "CODE_DEFINED_UNLISTABLE"

In the connector class:

    def get_partitioning(self):
        return {
            "dimensions": [
                {
                    "name" : "day",
                    "type" : "time",
                    "params" : {
                        "period" : "DAY"
                    }

                }
            ]
        }

Note that the partitioning itself is defined in the Python code, not in the JSON. Even though the partitioning schema can't be edited by the user, it's still the code that decides it, after receiving parameter. So for example, we could have a parameter that instructs the code to switch to hourly partitioning, in case we have an extremely busy chat room.

Generating the content

Now, let's move on to the "real meat", the generate_rows method. If you look at its signature, it receives a partition_id parameter. Each time the generaterows method is invoked, this partitionid will tell our code which partition it should generate the data for.

Here, for a day-level partitioning, the partition_id will be in the form yyyy-MM-dd. Let's use regular Python APIs to transform that to a date, and generate the begin and end timestamps to pass to the Hipchat API:

import dateutil.parser
import datetime

...

    def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
                            partition_id=None, records_limit = -1):

        beg_date_str = "%sT00:00:00Z" % partition_id
        beg_date = dateutil.parser.parse(beg_date_str)
        end_date = beg_date + datetime.timedelta(days=1)
        end_date_str = end_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        # Both beg_date_str and end_date_str are ISO8601-formatted timestamps

Basically, we're done for the partitioning part! The rest of the code handles the two remaining complexities:

  • Handling the pagination of the API (we have to make multiple calls to the Hipchat API and to follow the "next" links given in the answers)
  • Transforming the Hipchat API response into a proper row, which involves properly naming the columns and transforming the Python dicts to JSON strings

A "code-defined" dataset with partitions listing

For an example of dataset where partitioning is defined by the code but which has the ability to list partitions, see the USPTO Patents connector

A "user-defined" partitioned dataset with partitions listing

We are now going to create a connector where the user defines the partitioning scheme and the connector obeys it. This is fairly close to how regular datasets like the SQL or File datasets work in DSS.

We are going to take the example of the Algolia Search connector that is available in the dataiku-contrib repository.

Algolia is an extremely fast SaaS search engine. Through its API, you can define your documents to be indexed and search them. It's designed mainly to be used for end-user search like "Instant-Search" on your website.

By default, the Algolia connector just maps the whole content of an index (or a query) to a dataset, both in read and write. We are going to add column-based partitioning to this: the user can define dimensions, that will get mapped to the values of columns (also known as fields in the index).

We want the connector to be able to list its own partitions. Therefore, our connector will be a USER_DEFINED_LISTBALE one.

Defining the connector

To start, create a new custom dataset, as seen in the basic Howto.

Then edit the connector.json file.

First of all, we need a few parameters: the API keys and the index.

    "partitioningMode" : "USER_DEFINED_LISTABLE",
    "params": [
        {
            "name" : "applicationId",
            "label" : "Application ID",
            "type" : "STRING",
            "mandatory" : true
        },
        {
            "name" : "apiKey",
            "label" : "API Key",
            "type" : "STRING",
            "mandatory" : true
        },

        {
            "name" : "index",
            "label" : "Index name",
            "type" : "STRING",
            "mandatory" : true
        }
    ]

In the connector class, this time, we don't define the partitioning. We only need to obey it in the generate_rows and get_writer methods.

Generating the content

For partitioned read, we are going to use a facetFilter in Algolia parlance, ie. we are going to refine our search to only take into account the partition specified by the user.

We receive two things in the generate_rows method: the partitioning scheme and the partition id. For more information about partition ids, see the reference guide.

Here is the important part, for read (simplified compared to the real connector to focus on the important parts):

        # Get a handle on the Algolia index
        index = self._get_index()

        search_settings = {}

        if dataset_partitioning is not None:
            facetFilters = []
            idx = 0

            # Split the partition identifiers by dimension
            id_chunks = partition_id.split("|")

            for dim in dataset_partitioning["dimensions"]:

                # For each dimension, define a facet filter in the form
                # dimension_name:value
                facetFilters.append(dim["name"] + ":" + id_chunks[idx])
                idx += 1

            search_settings["facetFilters"] = ",".join(facetFilters)

        print "Final settings : %s" % search_settings

        res = index.search("*", search_settings)

        for hit in res["hits"]:
            yield hit

We have defined a combination of facet filters (separated by ',' as indicated by Algolia documentation) that exactly refines on the specified partitions.

Two notes:

  • Implicitely, in that connector, we have used the dimension names as columns that must exist in the index. You could have a partitioning strategy that does not rely on this. For example, you could have a time-based strategy

  • Also implicitely, we have taken the partition dimension values as exact values in the index. We might for example have made some normalization first.

Listing partitions

Since we are a CODE_DEFINED_LISTABLE connector, we must provide a way to list the partitions. Let's do it. Once more, we'll use the ability of the Algolia index to provide facets (i.e. value counts for each value of a field).

We are going to ask for a facet on all the dimensions on partitioning, and that will give us the existing partitions:

    def list_partitions(self, dataset_partitioning):
        assert dataset_partitioning is not None

        # Ask Algolia to facet on the name of each dimension
        facets = [dim["name"] for dim in dataset_partitioning["dimensions"]]
        search_settings = {}
        search_settings["facets"] = facets

        # Perform the faceted search
        index = self._get_index()
        res = index.search("*", search_settings)

        # Gather the various values of each dimension in a list of list
        vals =[]
        for dim in dataset_partitioning["dimensions"]:
            facet = res["facets"][dim["name"]]
            vals.append(facet.keys())

        # Make the cartesian products of the lists of lists.
        # For example, if we had [ [cat1, cat2], [author1, author2] ]
        # in vals, we'll end up in ret with:
        # ["cat1|author1", "cat1|author2", "cat2|author1", "cat2|author2"]

        ret = []
        import itertools
        for element in itertools.product(*vals):
            ret.append("|".join(element))
        print ret
        return ret

Note: we cheat a bit in the case of multiple dimensions because we make the full cartesian products, whereas some values might not exist. To have the exact thing, we would need to ask the index whether each combination of each value of each dimension actually exists, but that would be very expensive.

Writing data

Finally, we decided to make our connector writable. Writing in a partitioned dataset implies the following:

  • Clearing the existing data for the partition
  • Inserting the new data
    # As for generate_rows, we receive here the partitioning and the partition
    # id to write
    def get_writer(self, dataset_schema=None, dataset_partitioning=None,
                         partition_id=None):
        return AlgoliaSearchConnectorWriter(self.config, self._get_index(),
                    dataset_schema, dataset_partitioning, partition_id)

class AlgoliaSearchConnectorWriter(CustomDatasetWriter):
    def __init__(self, config, index, dataset_schema, dataset_partitioning, partition_id):
        CustomDatasetWriter.__init__(self)
        self.config = config
        self.index = index
        self.dataset_schema = dataset_schema
        self.dataset_partitioning = dataset_partitioning
        self.partition_id = partition_id

        if self.dataset_partitioning is not None:
            # If we are partitioned, clear by query
        else:
            # Clear all
            self.index.clear_index()

    def write_row(self, row):
        obj = {}

        # We generate the "regular" row with the content of the row
        for (col, val) in zip(self.dataset_schema["columns"], row):
            obj[col["name"]] = val

        # We add or overwrite partitioning columns with the actual
        # value sof the dimension
        if self.dataset_partitioning is not None:
            id_chunks = self.partition_id.split("|")
            idx = 0

            for dim in self.dataset_partitioning["dimensions"]:
                obj[dim["name"]] = id_chunks[idx]
                idx += 1

        logging.info("Final obj: %s" % obj)

        # Send to Algolia
        self.index.save_object(obj)
        self.buffer = []