Tf Pipeline: Part 1: Training pipeline

05 Jan 2019

Foreword

This is going to be Part 1 of a 2 part series. In the first part (this) I show you how to convert your dataset into a tfRecord and how to read it. In part 2, I show you how to actually use it and set us up for another mini-series on Production Tensorflow. At the bottom I link part 2 and the corresponding code

We’ll be expanding on the tf high-level API video series because I think it’s a good starting point - the dataset is non-trivial and we can use some of their boilerplate. Additionally, it seems like many people were asking for code or dissatisfied with the videos.

You can find the notebook here

Motivation:

1) Get used to tf datasets

2) Sketch out a production pipeline (that we’ll expand upon in the next mini-series)

Tf datasets tutorial

As mentioned in my first blog post, I’m a fan of work that feels “motivated” and to that end let’s assume we were given the following problem:

You’re a ML Engineer at a startup that uses Framework X. Framework X was great in getting the company to where it is, but you’re having issues scaling up and issues with customizing the pipeline further. Furthermore, it seems that the customers want to host some of the code on their end, which your pipeline was never made for.

You, being the go-getter you are tell them that you’ll construct the pipeline. Only issue is that you’ve never used (or heard of) tf.data.Datasets which are supposed to be amazingly useful. Hopefully this serves as a starting point for you in your endeavor.

The Plan

1) Overview: a rough outline of how I’ve sketched this tutorial

2) Notes: high-level discussion

3) Code discussion: link

Brief Aside: threading vs multiprocessing vs separate service

You want an evolving model, not something “static” in time. Below I discuss 3 different interfaces I’ve seen and have implemented

threading: tensorflow works well with threading by default. By using the coordinator (tf.Coordinator()), you can halt training/ inference easily. This method lets you have the freshest model, but you run the risk of slowing down your model training by quite a bit if you have streaming data

separate service: you can have a separate script which checks for updates to a folder (where your training model is saving to), and every time there is an update you load the updated model. This lets you have a relatively fresh model while avoiding the MANY headaches of the multiprocessing method. Having said that, this way you’ll always be checking for updates - it would be better if you could “send” a signal to the inference model to let it know that there is a “fresher” set of weights

multiprocessing: tensorflow multiprocessing is a complex thing - since it uses fork() by default, trying to spawn a new process withing a session will cause your code to hang. The workaround for this is to for multiprocessing to use spawn(). There are also other issues but PM me if you’d like to talk more about it ;) Note that using this method lets you train on one process and infer on another so you never “clash”. You might even be able to use a multiprocessing event to avoid the issue of constantly check loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(labels=labels, logits=logits))ing I mentioned above

At the end of the day, the choice is yours. If you’re looking for a high-level interface I highly recommend using tf.estimator because it handles a lot of the “cruft” while letting you focus on your model

Overview:

There will be two separate service pipelines:

1) inference pipeline

2) training pipeline(itself consisting of a producer and a provider model)

Notes: Training Pipeline

Producer:

You have 2 options:

1) Generate the tfRecord

2) Skip the tfRecord

Provider:

1) loads in all data from the storage and provides it to the model to be trained

2) All preprocessing happens here, and is stored as part of the computation graph. Thus, when we get new training data or inference data it will be pre-processed in the same way (You’ll see what I mean in part 2 where we visualize the graph)

Inference Pipeline

When new data arrives, we do 2 things in separate threads:

1) prediction on the new bit of data

2) storage of the data

Tf Dataset API Introduction

1) tf.Example

2) tf.data.Dataset

3) tf.data.TfRecords

    # Experimental writer - from a tf.Dataset
    writer = tf.data.experimental.TFRecordWriter(filename)
    writer.write(serialized_features_dataset)

-or-

    # Raw data
    with tf.python_io.TFRecordWriter(filename) as writer:
        for i in range(n_observations):
        example = serialize_example(feature0[i], feature1[i], feature2[i], feature3[i])
        writer.write(example)

Code Discussion

Link to code

FeatureProto Class

By designing our feature constructor, this allows us flexibility later on if we were to add more features into the model, or if we were to modify any features. By modularizing it as some abstract container, this allows for

i) better abstraction

ii) less code repetition

iii) easier testing

Almost all operations on the features use code tied to this prototype:

1) generating the features

2) generating the feature-columns (more on this later)

3) storing the named-features

If you’re familiar with OOP, I’d say the biggest takeaway is

proto = namedtuple('prototype', ['name', 'dtype', 'shape'])

which lets you describe features. This, combined with an ordered container, lets you split a single example in a very abstract fashion

Producer Discussion

Looking at the code, one may wonder why we’re even doing the steps.

time = str(datetime.datetime.now().replace(microsecond=0,second=0,minute=0)).replace(' ', '_')
for record_type in [('train', train_ind), ('test', test_ind)]:
    filename = 'processed_data/tf_record_covtype_{}_{}'.format(
        record_type[0],
        time
    )  # Round to the previous hour
    with tf.python_io.TFRecordWriter(filename) as writer:
        for i in tqdm.tqdm_notebook(record_type[1]):
            datum = loaded[i, :]
            feature = feature_proto.dataset_creation(datum)
            example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
            writer.write(example_proto.SerializeToString())

This looks like a lot of unnecessary work; we haven’t even really done anything to it apart from constructing some mappings. If pressed, I would say that the biggest advantages are the following:

1) as the tensorflow environment grows, you will gain any speed benefits that they implement. By baking in tfRecords into your pipeline, if you were to scale up to the GCP with TPUs, you’ll not be bottlenecked by IO

2) By bundling our tensors into a dictionary, we have gained better introspection. For example, we now know that index 14:54 is the soil_type feature tensor; we don’t need to open up the documentation again

Provider Discussion

TfRecordDataset

This is the far more interesting model and section of the tutorial. What we need to do here is to ingest the data, parse it into a form that Tensorflow plays nicely with, and then (in the followup) give an example of how to use it with a tf.feature_column.input_layer() which becomes the input layer to your model.

Notice that we use the dataset_parsing method from our FeatureProto class which defines how to parse our tfRecord data. It serves as a blueprint for how to breakdown the data into FixedLenFeature. This is then passed through our unpack nested function which does any pre-processing we want. E.g One-Hot Encoding, type-casting and so forth.

By calling dataset_config, you will then get an iterator that you can use to iterate the entire dataset.

I mentioned earlier that your dataset will probably increase in size - this is most like because tfRecord only works with a few formats: bytes, int64 and floats. So, if your dataset is originally stored as uint8 it’ll grow quite a bit

CSV

We use tf.data.TextLineDataset instead of tf.contrib.data.CsvDataset because contrib will be deprecated. It seems like there is interest in getting the underlying data without the contrib module so I’ll work with that

In the tutorial we use

dataset = tf.data.TextLineDataset(csv_filenames)

but given tensorflows’ long history, depending on what you google (tensorflow data csv) you might find old documentation or tutorials. It is surprisingly similar to the TfRecordDataset method whereby we first create a dataset, then map a parser to it.

Because we did not create a named interface (as we did when we created the tfrecord), the parser is longer but not much more complicated (once you know what you’re doing). All of this can be found in the FeatureProto specifically in unpack_csv

vals = tf.string_split([example_proto], delimiter=',').values
vals_cvt = tf.string_to_number(vals, out_type=tf.int32)
parsed_features = {}
idx = 0
for prototype in self.features:
    datum = vals_cvt[idx] if prototype.shape == 1 else vals_cvt[idx:idx + prototype.shape]
    parsed_features[prototype.name] = datum
    idx += prototype.shape
            
labels = parsed_features['Cover_Type']
parsed_features.pop('Cover_Type')
# Then, convert the dataset into tensors which tensorflow expects?
parsed_features['Soil_Type'] = tf.convert_to_tensor(parsed_features['Soil_Type'])
parsed_features['Wilderness_Area'] = tf.cast(tf.argmax(parsed_features['Wilderness_Area'], axis=0), dtype=tf.float32)
labels = tf.cast(labels, dtype=tf.int32)
        
return parsed_features, labels

first, we need to split the string and obtain the underlying values (while parsing them, I obtained them in bytes format so we converted them to tf.int32 values). Then, as in the tutorial, we “zipped” up the values to create a named interface, and the remaining code is similar to unpack

Numpy

Note that although this is easier initially (we don’t construct the tfRecord), it later becomes a hassle to load large datasets. Given how you’re working at startup X and expect to scale wink wink you’ll likely want to avoid this

Also, although the FeatureProto should support the numpy arrays with some minor tweaking I’ve not actually made any attempt to convert the .data format

Done!

Your boss seems happy with the progress so far! Now, how do you actually go about using these models? tf.layers.Dense doesn’t seem to accept dictionaries… uh oh?

With that, we’re done with saving and loading tfRecords! The next step is figuring out how to use them! The aforementioned tensorflow videos leave a lot to be desired (as already mentioned) - they use things that are supposed to be deprecated soon (tf.contrib), and they use features not present in the codebase yet (tf.keras.layers.DenseFeatures). Womp womp. More to come in the next part!

Next Steps:

1) Try this with your own dataset! Working on converting the Tensorflow code from the video (and even tutorials) to work in this case was a very interesting challenge that I recommend everyone try

2) Look at followup tutorial and corresponding code where I show you how to USE the loaded dataset with a tf.estimator and with a low-level implementation

[ Pipeline_Tutorial  ]