Custom Workflow Orchestration at VISO Trust

The VISO Trust next generation third party risk platform lets your security team effortlessly access risk intelligence for any number of third parties. A key feature of the platform is Document Intelligence; a collection of expert auditor curated AI, and automated data processing services that remove the responsibility of analyzing questionnaires or reading documents from our customers.

Here on the Machine Learning and Data team, our charter is to further expand the capabilities of the Document Intelligence feature set, increase its accuracy, and deliver its results faster.

Why we needed workflow orchestration

There are many individual tasks executed which eventually result in what’s provided by Document Intelligence, including but not limited to:

  • Security Control Language Detections
  • Audit Framework Control ID Detections
  • Named Entity Extraction like organizations, dates and more
  • Decryption of encrypted pdfs
  • Translation of foreign language pdfs
  • Document Classification
  • Document Section Detection

Until our workflow orchestration implementation, the features listed above and more were all represented in code inside a single function. Over time, this function became unwieldy and difficult to read; snippets of ceremony, controls, logging, function calls and more sprinkled throughout. Moreover, this is one of the most important areas of our app where new features will be implemented regularly. So the need to clean this code up and make it easier to reason about became clear. Furthermore, execution inside this function occurred sequentially despite the fact that some of its function calls could occur in parallel. While in its current state, parallel execution isn’t required, we knew that in the near future, features in the roadmap would necessitate it. With these two requirements:

  • task execution that is easier to reason about and
  • the ability to execute in parallel

We knew we needed to either use an existing workflow orchestration tool or write it custom. We began with some rough analysis of what was going on in our main automation function, namely, we formalized each ‘step’ into a concept called Task and theorized on which Task’s could execute in parallel. At the time of the analysis, we had 11 ‘Tasks’ each of which required certain inputs and produced certain outputs; based on these inputs and outputs, we determined that a number could run in parallel. With this context, we reviewed some of the major open source python toolkits for workflow orchestration:

Both of these toolkits are designed for managing workflows that have tens, hundreds up to thousands of tasks to complete and can take days or weeks to finish. They have complex schedulers, user interfaces, failure modes, options for a variety of input and output modes and more. Our pipeline will reach this level of complexity someday, but with an 11 Task pipeline, we decided that these toolkits added too much complexity for our use. We resolved to build a custom workflow orchestration toolkit guided by the deep knowledge in these more advanced tools.

Our custom workflow orchestration

The first goal was to generalize all of the steps in our automation service into the concept of a Task. A few examples of a Task would be:

  • detecting a document’s language,
  • translating a foreign language document,
  • processing OCR results into raw text,
  • detecting keywords inside text,
  • running machine learning inference on text.

Just reading this list gives one a feel for how each Task is dependent on a previous Task’s output to run. Being explicit about dependencies is core to workflow orchestration, so the first step in our Task concept was defining what inputs a given Task requires and what outputs it will produce. To demonstrate Task’s, we will develop a fake example Task called DocClassifyInference, the goal of which is to run ML inference to classify a given document. Imagine that our model uses both images of the raw pdf file and the text inside it to make predictions. Our Task, then, will require the decrypted PDF and the paginated text of the pdf in order to execute. Further, when it’s complete it will write a file to S3 containing its results. Thus, the start of our example Task might look like:

class DocClassifyInference(S3Task):

    input_keys = [
        pipeline.Pipeline.DECRYPTED_PDF_KEY,
        pipelinePipeline.PAGINATED_TEXT_KEY,
    ]
    output_keys = [pipeline.Pieline.DOC_CLASSIFY_INFERENCE_KEY]
...

DocClassifyInference subclasses S3Task, an abstract class that enforces defining a method to write to s3. S3Task itself is a subclass of the Task class which enforces that subclasses define input keys, output keys and an execute method. The keys are enforced in a Pipeline class:

DECRYPTED_PDF_KEY = "decrypted_pdf"
PAGINATED_TEXT_KEY = "paginated_text"
DOC_CLASSIFY_INFERENCE_KEY = “doc_classify_inference


class Pipeline:

    def __init__(self):
        self.data = {}

    @property
    def decrypted_pdf(self):
        return self.data[DECRYPTED_PDF_KEY]

    @decrypted_pdf.setter
    def decrypted_pdf(self, val):
        self.data[DECRYPTED_PDF_KEY] = val
...

This Pipeline will become the object that manages state as our Tasks execute. In our case we were not approaching memory limits so we decided to keep much of the Task state in-memory though this could easily be changed to always write to and read from storage. As a state manager, the Pipeline can also capture ceremony prior to executing any Tasks that downstream Tasks may require.

Continuing on with DocClassifyInference, as a subclass of the abstract class Task, DocClassifyInference will have to implement def execute as well (enforced by Task). This method will take a Pipeline and return a Pipeline. In essence, it receives the state manager, modifies the state and returns it so the next Task can operate on it. In our example case, execute will extract the decrypted pdf and paginated text so they can be used as inputs for a ML model to perform document classification. Let’s look at the entire stubbed out DocClassifyInference:

class DocClassifyInference(S3Task):

    input_keys = [
        pipeline.Pipeline.DECRYPTED_PDF_KEY,
        pipelinePipeline.PAGINATED_TEXT_KEY,
    ]
    output_keys = [pipeline.Pieline.DOC_CLASSIFY_INFERENCE_KEY]

    def __init__(self):
        super().__init__()

    def execute(self, pipeline: Pipeline) -> Pipeline:
        decrypted_pdf = pipeline.decrypted_pdf 
        paginated_text = pipeline.paginated_text
        
        # our logic for performing inference goes here
        # assume it outputs doc_classify_inference
     
        pipeline.doc_classify_inference = doc_classify_inference
        self._write_to_s3(pipeline, "doc_classify_inference.json", doc_classify_inference)        return pipeline
        
    def _write_to_s3(self, pipeline, filename, data):
        # implements writing to s3 

It’s easy to see how DocClassifyInference gets the Pipeline state, extracts what it needs, operates on that data, sets what it has declared it’s going to set and returns the Pipeline. This allows for an API like this:

pipeline = Pipeline(...)
pipeline = DecryptPDF().execute(pipeline)
pipeline = PaginatedText().execute(pipeline)
pipeline = DocClassifyInference().execute(pipeline)

Which of course was much cleaner than what we had previously. It also lends itself to writing easy, understandable unit tests per Task as well as adhering more closely to functional programming principles. So this solves our first goal of making the code cleaner and more easy to reason about. What about parallel processing?

Parallel Processing

Similar to Luigi and Apache Airflow, the goal of our workflow orchestration is to generate a topologically sorted Directed Acyclic Graph of Tasks. In short, having each Task explicitly define its required inputs and intended output allows the Tasks to be sorted for optimal execution. We no longer need to write the Tasks down in sequential order like the API described above, rather we can pass a Task Planner a list of Tasks and it can decide how to optimally execute them. What we’ll want then is a Task Planner that is passed a List of Tasks, sorts the Tasks topologically and returns a list where each member is a list that contains Tasks. Let’s take a look at what this might look like using some of our examples from above:

TASK_CLASSES = [
    DecryptPDF(),
    PaginatedText(),
    RunDocInference(),
    KeywordDetection(),
    CreateCSVOutput()
]

Here I have retained our examples while adding two new Tasks: KeywordDetection and CreateCSVOutput. You can imagine these like matching keywords in the paginated text and modifying the results of RunDocInference & KeywordDetection to create a formatted CSVOutput. When the Task Planner receives this list, we’ll want it to topologically sort the tasks and output a data structure that looks like this:

expected_task_plan = [
    [DecryptPDF()],
    [PaginatedText()],
    [RunDocInference(), KeywordDetection()],
    [CreateCSVOutput()],
]

In the above List, you can imagine each of its members is a ‘stage’ of execution. Each stage has one-to-many Tasks; in the case of one, execution occurs sequentially and in the case of many, execution occurs in parallel. In english, the expected_task_plan can described like so:

  • DecryptPDF depends on nothing and creates a consumable PDF,
  • PaginatedText depends on a consumable PDF and creates a list of strings
    – RunDocInference depends on both and classifies the document
    – KeywordDetection depends on paginated text and produces matches
  • CreateCSVOutput depends on doc classification and keyword detection and produces a formatted CSV of their outputs.

An example of the function that creates the expected_task_plan above might look like:

def get_task_plan():
    tasks = get_task_classes()
    enforce_unique_outputs(tasks)
    sorter = add_nodes_to_sorter(tasks)
    sorted_tasks = get_sorted_tasks(sorter)
    return sorted_tasks

This function gets the list of Tasks, ensures that no two Task outputs have identical keys, adds the nodes to a sorter by interrogating the Task input_keys and output_keys and sorts them topologically. In our case the sorter comes from graphlib’s TopologicalSorter which is described here. Getting into what each of these functions are doing would take us too far afield so we will move on to executing a task plan.

With the expected_task_plan shown above, an execute_task_plan() function is straightforward:

def execute_task_plan(task_plan, pipeline):
    for tasks in task_plan:
        tasks_length = len(tasks)
        if tasks_length == 1:
            # Run Task single threaded
            pipeline = tasks[0].run(pipeline)
        else:
            # Run tasks in parallel
            task_threads = []
            for task in tasks:
                task_thread = TaskThread(target=task.run, args=(pipeline,))
                task_threads.append(task_thread)
                task_thread.start()

            for thread in task_threads:
                try:
                    thread.join()
                except Exception as e:
                    raise e

    return pipeline

Here we iterate over the task list deciding between sequential execution or parallel execution. In the latter case, we utilize python’s threading.Thread library to create a thread per task and use idiomatic methods for starting and joining threads. Wait, then what is TaskThread?

In our case, we wanted to ensure that an exception in a child thread will always be raised to the calling thread so the calling thread can exit immediately. So we extended the threading.Thread class with our own class called TaskThread. Overriding threading.Thread’s .run() method is fairly common (so common that it’s suggested in run()’s comments); we overrode run() to set an instance variable carrying an exception’s content and then we check that variable at .join() time.

class TaskThread(threading.Thread):
    def init(self):
        super().__init__()
        self.exc = None

    def run(self):
        self.exc = None
        try:
            # execute thread target here
        except BaseException as e:
            self.exc = e

    def join(self, timeout=None):
        threading.Thread.join(self)
        if self.exc:
            raise self.exc

The calling thread can now try/except at .join() time.

Conclusion

With these structures in place, the file containing the automation service’s primary functions was reduced from ~500 lines to ~90. Now when we create our threadpool to consume SQS messages, we get the Task plan like so task_plan = get_task_plan() and pass the task_plan into each thread. Once execution reaches the main function for performing document intelligence, what previously was a large section of difficult-to-read code now becomes:

pipeline = Pipeline(notification_message, s3_client, page_hooks, config)
pipeline = execute_task_plan(task_plan, pipeline)

The introduction of parallel processing of these Task’s shaved consistent time off of performing document intelligence (an average of about a minute). The real benefit of this change, however, will come in the future as we add more and more Tasks to the pipeline that can be processed in parallel.

While we’ve reduced the time-to-audit significantly from the former state-of-the-art, we are definitely not done. Features like the above will enable us to continue reducing this time while maintaining consistent processing times. We hope this blog helps you in your workflow orchestration research.

This is the third post in a three part series on creating a reusable ML pipeline that is initiated with a single config file and five user-defined functions. The pipeline is finetuning-based for the purposes of classification, runs on distributed GPUs on AWS Sagemaker and uses Huggingface Transformers, Accelerate, Datasets & Evaluate, PyTorch, wandb and more.

This post will cover the training and testing (inference) steps. These are the core steps in a ML pipeline where a model is hyper-parameter tuned and the test set is used to measure performance. If you have landed on this post first, check out the first post in the series detailing the pipeline setup and the second post detailing the data steps.

Training and Tuning

The reason I have combined Training and Tuning into one section is that Tuning just is a set of training jobs where performance is incrementally improved through the changing of hyperparameters. As such, underneath the covers, the two types of jobs are calling the same code. Like we have previously, let’s take a look first at perform_training() and perform_tuning() to see how the code interacts with Sagemaker.

Zooming into perform_training(), we encounter the first bit of backend code that handles a use case we have not yet discussed: comparing two models. If you recall in part one, one of the motivations for creating this pipeline was to rapidly test multiple Document Understanding models and compare performance between them. As such, the pipeline is built to handle, in a single experiment, multiple models being passed in the settings.ini file the experimenter defines. In fact, the MODEL_NAMES parameter from this file can accept one or many model names, the latter implying that the experimenter wants to run a comparison job. A comparison job has no impact on Data Reconciliation or Data Preparation; we want these steps to be isomorphic to a single model job as the idea is that n models get trained and tested on the exact same snapshot of training data. With that preamble, perform_training() looks like this:

 

The loop here is iterating over either a list with n model names or a list with a single model name. For each model name, an Estimator() is constructed and .fit() is called which kicks off a training job on Sagemaker. get_estimator_kwargs() will look familiar to anyone who has trained on Sagemaker already:

 

Settings are extracted from the config we discussed in the first post in the series, the most important of which is config.docker_image_path. As a refresher, this is the ECR URL of the training image the experimenter created in the setup that is used between Sagemaker Processor/Training/Tuning jobs and contains all needed dependencies. Next, perform_training checks a boolean from the settings.ini file, USE_DISTRIBUTED which defines whether or not the experimenter expects distributed GPU training to occur. If so, it sets some extra Estimator parameters which are largely inspired by the _distribution_configuration function from the sagemaker-sdk. 

I will digress for a moment here to talk about one such parameter, namely, an environment variable called USE_SMDEBUG. SMDEBUG refers to a debugging tool called Sagemaker Debugger. For reasons I cannot explain and have not been answered by AWSlabs, this tool is on by default and distributed training would not work for some models, producing mysterious exception traces. It only became obvious to me when carefully examining the traces and seeing that it was some code in smdebug that was ultimately throwing. Furthermore,  there are a variety of ways to turn off smdebug, for instance passing 'debugger_hook_config': False as done above or environment={‘USE_SMDEBUG’:0}. However, these methods only work on Training jobs. Again, for reasons I cannot explain, the only way to turn off SMDEBUG on Tuning jobs is to set the env var inside the docker container being used: ENV USE_SMDEBUG="0"; the other methods explained above somehow never make it to a Tuning jobs constituent Training jobs. An unfortunate side effect of this is that it makes it difficult for an experimenter to configure this environment variable. At any rate, hopefully AWSlabs fixes and or makes smdebug exceptions more user friendly.

The call to .fit() makes the actual call to the AWS API. The config.training_data_uri parameter specifies the S3 URI of the encoded training data from the Data Preparation step; the training instance will download this data to local disk before it executes where it can be easily accessed by multiple GPU processes. How does the job know what code to execute? That is specified in the base docker container which is extended by the experimenter:

 

These environment variables are used by the sagemaker-training library to kick off the training script. At this point we would dive into train.py,but since it is also used by a Tuning job, let’s take a look at how we kick off a Tuning job. The beginning of a Tuning job is nearly identical to a Training job:

 

But now, instead of calling .fit(), we need to set up a few more parameters a Tuning job requires. A Tuning job requires a set of constant hyperparameters and tunable hyperparameters. As such, here an example of what an experimenter might write in the settings.ini file to represent this:

 

Here the constants will not change between tuning jobs, but the tunable parameters will start with guesses and those guesses will get better as jobs complete. The -> and , are syntax I’ve chosen; in this context -> stands for an interval while , stands for categorial options. Having seen this, the next piece of the Tuning job setup should make sense:

 

Now we have our dict of tunable parameters we can pass to the HyperparameterTuner object:

 

This should look somewhat familiar to what we just did for Training with a few extra parameters. So far, the HyperparameterTuner object takes the constructed Estimator() object that will be re-used for each constituent Training job and the tunable hyperparameters we just discussed. A Tuning job needs to measure a metric in order to decide if one set of hyperparameters are better than another. objective_metric_name is the name of that metric. This value is also used in the metric_definitions parameter which explicitly defines how the HyperparameterTuner job can extract the objective metric value from the logs for comparison. To make this more concrete, this is how these values are defined in an example settings.ini file:

 

Finally, the max_jobs parameter defines how many total Training jobs will constitute the Tuning job and max_parallel_jobs defines how many can run in parallel at a given time. Like the Estimator in the Training job, we call fit() to actually kick off the Tuning job and pass it the training_data_uri like we did previously. With this in place, we can now look at train.py and see what executes when a Training or Tuning job is executed.

The goal of train.py is to fine tune a loaded model using a set of distributed GPUs, compute a number of metrics, determine which is the best model, extract that model’s state_dict, convert that model to torchscript, and save these files along with a number of graphs to S3. Huggingface’s Accelerate, Evaluate and Transformers libraries are all used to greatly simplify this process. Before continuing, I have to give a brief shoutout to the Accelerate devs who were extremely responsive while I was building this pipeline.

Note that in a distributed setting, every GPU process is going to execute this same train.py file. While much of this coordination can be passed off to Accelerate, it is helpful to understand that while working inside it. Diving a level deeper, train.py is going to:

  • Read hyperparameters and determine if the running job is a tuning job, training job or comparison job
  • Determine if gradient accumulation will be utilized
  • Construct the `Accelerator()` object which handles distribution
  • Initialize wandb trackers
  • Load split training data and create `Dataloader()`s for training and validation
  • Set up an optimizer with learning rate scheduling
  • Execute a training and validation loop, computing metrics and storing metric histories and determining what the best model was
  • Plot curves for metrics
  • Extract the curves, statistics and best model from the loops
  • Write all of this data to S3

We start by reading the passed hyperparameters and setting a few values that can be used throughout the training process:

 

_tuning_objective_metric is a hyperparamter set by Sagemaker that allows us to easily differentiate between Training and Tuning jobs. As we’ve mentioned before, the run_num is an important setting that allows us to organize our results and version our models in production so they easily connect back to training runs. Finally, job_type_str allows us to further organize our runs as training / tuning and comparison jobs.

Next we determine if gradient accumulation is needed. Briefly, gradient accumulation allows us to set batch sizes that are larger than what the GPUs we’re running on can store in memory:

 

Control now moves to setting up the Accelerator() object which is the tool for managing distributed processing:

 

Here we encounter a core concept in Accelerate, is_main_process. This boolean provides a simple way to execute code on one of the distributed processes. This is helpful if we want to run code as if we’re on a single process; for instance if we want to store a history of metrics as the training loop executes. We use this boolean to set up wandb so we can easily log metrics to wandb. Additionally, accelerator.print() is similar to if accelerator.is_main_process print(...), it ensures whatever statement is only printed once.

Recall that we passed config.training_data_uri to the .fit() call for both Training and Tuning jobs. This downloads all of the training data to the Sagemaker instance’s local disk. Thus, we can use Datasets load_from_disk() function to load this data. Note in the following code SAGEMAKER_LOCAL_TRAINING_DIR is just the path to the dir that data is downloaded to.   

 

Each process loads the dataset, id2label file, metrics and creates dataloaders. Note the use of Huggingface’s evaluate library to load metrics; these can be used in tandem with Accelerate to make metric tracking simple during distributed training. We will see shortly how Accelerator provides one simple function to handle distributed training.

 

In this code block, we first call the user-defined function load_model to receive the loaded model defined however the experimenter would like. Thus far, this function has typically looked like a call to a Transformers from_pretrained() function, though this is not enforced.

A common learning rate optimizer is created and used to create a learning rate scheduler. Finally, we encounter another core concept in Accelerator, namely, wait_for_everyone(). This function guarantees that all processes have made it to this point before proceeding to the next line of code. It must be called before the prepare() function which prepares all of the values we’ve created thus far for training (in our case, distributed training). wait_for_everyone() is used regularly in Accelerator code; for example, it is nice to have when ensuring that all GPUs have completed the training loop. After the prepare() step, the code enters a function to perform the training and validation loop. Next, we will look at how Accelerator works inside that loop.

 

At the start of the loop, we initialize a number of values to track throughout training. Here we use is_main_process again to create a single version of metric histories which we will use to plot graphs. In this example, we are only tracking training loss, validation accuracy and f1, but any number of metrics could be tracked here. Next, we enter the loop, set the model in train() mode and enter the train() function:

 

As execution enters a batch, it first needs to check if we’re running a comparison job. If so, it needs to extract the appropriate parameters for the current model’s forward() function. If you recall, for comparison jobs, in the Data Preparation step we combined all inputs in the same pyarrow format, but prepended with the model_name (e.g. longformer_input_ids). get_model_specific_batch() just returns those parameters of the batch that match the current model_name.

Next, we encounter with accelerator.accumulate(model), a context manager that recently came out in Accelerate that manages gradient accumulation. This simple wrapper reduces gradient accumulation to a single line. Underneath that manager, back propagation should look familiar to readers who have written ML code before, the one big difference is calling accelerator.backward(loss) instead of loss.backward().

Upon completing a training batch, execution sets the model in .eval() mode and moves into the validation loop:

 

Here we encounter another key accelerate function, gather_for_metrics(). This recently added function makes it much easier to gather predictions in a distributed setting so they can be used to calculate metrics. We pass the returned values to the f1_metric and acc_metric objects we created earlier using the Evaluate library. The validation loop then computes the scores and returns them.

After sending the batch through training and validation, we perform tracking on the values we initialized at the beginning:

 

Since is_main_process contains the references to our history-tracking datastructures, we use it to append our new values. accelerator.log links up with the init_trackers call we made earlier: .log sends these values to the tracker earlier initialized. In our case wandb will create graphs out of these values. Finally we use the F1 score to determine the best model over time.

After the training and validation loop is done, we execute:

 

We start by ensuring that all processes have completed the training/validation loop and then call unwrap_model to extract the model from its distributed containers. Since the main process contains our metric histories, we use it to plot curves for each metric and calculate model statistics; we then return out the best model, curves and statistics.

Now that the training/validation loops are complete and we’ve determined a best model, we need to convert that best model to torchscript and save all the returned files to S3.

 

Here we call end_training since we are using wandb and use is_main_process since we no longer need distribution. accelerator.save() is the correct way to save the model to disk, but we need to convert it to torchscript to mirror production as closely as possible. Briefly, Torchscript is a way of converting a python-based model into a serializable, production-friendly format that need not have a python dependency. As such, when testing inference on an unseen test set, it is best to test on the model that would be in production. One way to convert a model is to call torch.jit.trace passing it the model and a sample instance which is how we’ve implemented the conversion:

 

First, we take the best model and put it in CPU and evaluation mode. We then grab a sample instance out of the training data. Next, we encounter another user-defined function ordered_input_keys(). If you recall, this function returns the parameter names for a model’s forward() function in the correct order. It probably didn’t make sense earlier why this function was needed, but now it should: the example_inputs parameter of torch.jit.trace takes a tuple of input values which must match the exact parameter ordering of the forward() function.

Now, if we’re running a comparison job, then ordered_input_keys() is going to return a dictionary of OrderedDict’s with keys based on each model’s name. Thus, we test for this scenario and use the same get_model_specific_batch() function we used during training to extract a sample instance for the current model being converted.

Next, we iterate the ordered input keys and call .unsqueeze(0) on each parameter of the sample instance. The reason for this is because the forward() function expects a batch size as the first dimension of the input data; .unsqueeze(0) adds a dimension of 1 onto the tensors representing each parameter’s data.

Now we are ready to run the trace, passing the model, the example inputs and setting two parameters to false. The strict parameter controls whether or not you want the tracer to record mutable containers. By turning this off, you can allow, for example, your outputs = model(**batch) to remain a dict instead of a tuple. But you must be sure that the mutable containers used in your model aren’t actually mutated. check_trace checks that the same inputs run through the traced code produce the same outputs; in our case, leaving this True was producing odd errors, likely because of some internal non-deterministic operations, so we set it to False. Again, the ultimate test of the performance of the model is the inference step which we will be discussing next.

Finally, we save the traced model to local disk so it can be uploaded to s3. The final step of the train.py file is to upload all of these generated files to S3. In the case of a tuning job, we only retain the generated files from the run with the best objective metric score:

 

And with that, we have completed discussing the training/tuning step of the ML Pipeline. Next, we will look at the inference step where we load the torchscript model, perform inference on the unseen test set and collect statistics.

Inference

In the Training/Tuning step, we convert our best model into torchscript which means it can easily run on the CPU or multi-CPU environment. This enables us to hijack a Sagemaker Processor instance to perform our inference job. Like the previous sections, we will first look at how an inference job is initiated. Because we can use a Processor instance, it is identical to our Data Preparation step except for pointing it at our /test/ data and our inference.py file.

 

Refer to the Data Preparation section of the second post to learn more about Processor/ScriptProcessor jobs. Note the differences of input_source_dir pointing at /test/ and `code` pointing at inference.py. Since these are so similar, we will move on to looking at the inference.py file.

We’ve discussed repeatedly the importance of run_num and how it is used to help identify the current experiment not only while training, but also the current model in production (so a production model can be linked to a training experiment). The inference.py will use the experiment parent directory to find the test data and the run_num to find the correct trained model.

The inference.py starts by downloading the id2label file so we can translate between model predictions and human-readable predictions:

 

Recall from previous sections that the ML pipeline is capable of running comparison jobs (n models trained and tested on the same dataset). Inference is the step where comparison really shines, allowing you to compare performance on identical data. In the next code block, we will load n models to prepare for inference. Recall that if a single model was trained, it is passed as a list with a single value:

 

This loop iterates the model names, downloads/loads the torchscript converted model and initializes statistics tracking for each. Let’s take a look at each inner function:

 

This function constructs the path the .pt file will be behind and downloads the .pt file. It then calls torch.jit.load and sets the model to eval mode, ready for inference. init_model_stats initializes values we will track per model, for each label which provides us facts that we can use to build statistics:

 

And init_metrics() simply loads the metrics we used earlier in the training step:

 

Next, we get the test data from the Data Preparation step:

 

With the models and data loaded, we are now ready to run inference:

 

The inference code will use config.is_comparison repeatedly to execute code specific to comparison jobs. It starts by initializing statistics specifically for comparisons which we will skip for now. Next, it enters the main loop which iterates through each instance of unseen test data. The ground truth label is extracted and execution enters the inner loop over the model names (in the case of one model this is just a List with a single entry). is_comparison is called to extract the data specific to the current model using the same function used in Training (get_model_specific_batch). The instance is then prepared for the forward() function using the same technique we used in covert_to_torchscript: each value gets .unsqueeze(0) called in order to add a batch size of 1 as the first dimension of the tensor.

We then grab the currently loaded model and pass the instance to it. We extract the most confident prediction from the returned logits by calling argmax(-1). Now let’s look at the remainder of the loop (note this begins inside the inner loop):

 

We take the prediction produced by the model and pass it and the ground truth to our accuracy and f1 metrics. We then increment the counters we initialized at the beginning:

 

If inference.py is running a comparison job, we then add counts to the structure we initialized earlier; we will skip over these calls and jump to `process_statistics` which occurs after the inference code has finished looping:

 

This function looks intimidating, but all it is doing is calculating the F1 score and Accuracy per label, sorting the results by F1 score descending, calculating the overall F1 and Accuracy and uploading the results to S3 under the correct parent dir and run_num.

If you’ve followed the ML Pipeline blogs up to this point, it is prescient to revisit the folder structure that is built on S3 while the entire pipeline executes that we laid out in the first blog: 

 

This folder structure recurs for every machine learning experiment, containing everything one would need to quickly understand the experiment or reproduce it and link an experiment to what is in production.

Prima facie, it seems like a simple part of the overall pipeline, but I believe it is one of the most important: imbuing each experiment with desirable properties like navigability, readability, reproducibility, versioning and more.

If you’ve been following these blogs up to this point then you’ve been on quite a journey. I hope they provide some guidance in setting up your own ML Pipeline. As we continue to modify ours we will post on blog-worthy topics so stay tuned. If you can check out the first two posts in the series here: Part One: Setup, Part Two: Data Steps.