howto

Reading or writing a dataset with custom Python code

August 20, 2016

When you use a Python recipe to transform a dataset in DSS, you generally use DSS's Python API, both to read and to write the dataset.

This DSS API provides an easy way to read or write datasets, regardless of their size or data store. This way, you don't need to install specific packages for interacting with each data store, nor to learn specific APIs.

There are some cases, however, where the DSS API does not provide enough flexibility and you want to use the specific API or package for your datastore.

Some use cases could include:

  • You want to read data which is stored in a MongoDB collection with a specific filter, which is not represented in the filter for the input dataset.
  • You want to "upsert" data in the output dataset (ie, insert, update or remove records based on a primary key)
  • ...

The usage of the DSS API is by no means mandatory: you can read data and write data however you want. If you don't call the get_dataframe or iter_tuples methods, DSS will not read any data nor load anything in memory from the datastore.

Similarly, you don't have to use the write_dataframe or get_writer API to write data in the output. Even if you use a writer that DSS does not know about (for example, the pymongo package for MongoDB), the recipe will work properly, and DSS will know that the dataset has been changed.

Accessing info about datasets

You generally want to avoid hardcoding connection information, table names, ... in your recipe code. DSS can give you some connection / location information about the datasets that you are trying to read or write.

For all datasets, you can use dataset.get_location_info() method. It returns a structure containing an info dict. The keys in the info dict depend on the specific kind of dataset. Print the dict to see more (NB: you can do that in a Jupyter notebook). Here are a few examples:

# myfs is a Filesystem dataset
dataset = dataiku.Dataset("myfs")
locinfo = dataset.get_location_info()
print locinfo["info"]

{
  "path" : "/data/input/myfs"
}

# sql is a PostgreSQL dataset
dataset = dataiku.Dataset("sql")
locinfo = dataset.get_location_info()
print locinfo["info"]

{
  "databaseType" : "PostgreSQL",
  "schema" : "public",
  "table" : "mytablename"
}

In addition, for "Filesystem-like" datasets (Filesystem, HDFS, S3, ...), you can use the get_files_info() method to get details about all files in a dataset (or partition)

dataset = dataiku.Dataset("non_partitioned_fs")
fi = dataset.get_files_info()

for filepath in fi["globalPaths"]:
  # Returns a path relative to the root path of the dataset.
  # The root path of the dataset is returned by get_location_info
  print filepath["path"]
  # Size in bytes of that file
  print filepath["size"]
dataset = dataiku.Dataset("partitioned_fs")
fi = dataset.get_files_info()

for (partition_id, partition_filepaths) in fi["pathsByPartition"].items():
  print partition_id

  for filepath in partition_filepaths:
    # Returns a path relative to the root path of the dataset.
    # The root path of the dataset is returned by get_location_info
    print filepath["path"]
    # Size in bytes of that file
    print filepath["size"]

Partitioned datasets

If your recipe deals with partitioned datasets, in input or output, you need to be careful about reading and/or writing the correct data.

Reading

When reading using a custom reader, you need to make sure that you only read data from the correct input partitions (as defined by the partition dependencies).

On an input dataset, the list of partitions to read is available using: dataset.get_read_partitions(). This list is available as a Python list of strings, representing partition identifiers. It is up to you to convert this to a proper filtering query for your input data

Writing

When writing using a custom writer, you need to make sure that you only insert data into the output partition of the current execution of the recipe.

On an output dataset, the partition to write to is available using: dataset.get_write_partition(). It is a string representing the partition identifier. It is up to you to convert this to proper commands for writing into the appropriate partition.