author avatar
By Michael Conroy Senior Software Engineer

*Views, thoughts, and opinions expressed in this post belong solely to the author, and not necessarily to SemanticBits.

Introduction

Assume for a minute that not only have you been given a task to calculate customer satisfaction but also the data to be processed is inherently complex. Additionally, the business requirements include calculations, which change over time and per focus group.

With complex data sets and business requirements such as these, where do you start? What is the plan of attack? We faced a similar problem here at SemanticBits. Our solution was to utilize the Pipeline-Worker Architecture.

The Pipeline-Worker Architecture requires one to break down a processing system into its atomic parts, much like a mathematician will do to solve an equation.

Description

Context

For example, let’s assume the following:

Solve (x+1)+(y+2)+(z+3), where x=3, y=2, and z=1

Naturally, we would solve by variable substitution each of the terms individually. First, we would tackle the (x+1) term, second, the (y+2) term, and third, the (z+3) term. Let’s call the calculation applied to each term an operation.

In the same way, the atomic parts of the processing system are broken down into individual operations. These operations — calculations, transformations, metadata additions, etc. — are contained within workers. Therefore, a worker performs a specific, intended operation. The multiple steps of operations occurring together — the collection of workers performing operations — is the pipeline.

Looking back at the mathematical equation, the individual terms are added together. This final step or operation is referred to as the compiler, which is the final worker in the pipeline. The compiler builds output data from the results of the preceding workers, which the pipeline returns.

Lastly, we can think of the input JSON as the where statement in the equation. The supplied values must be processed to find a resulting value within a data structure, and in this case, some integer.

Theory

Considering the Pipeline-Worker Architecture as a very opinionated implementation of the Strategy Pattern is also very useful. The Strategy Pattern is a behavioral design pattern that lets the program select the appropriate algorithm at runtime.

The workers are highly selective of the data to act upon. That allows for a worker’s action to be a single, atomic transformation or calculation. Therefore, a pipeline is an opinionated implementation of the Strategy Pattern because it is responsible for a resulting dataset and data structure that matches the overall goal of a collection of mutually related workers, and each worker is therefore an algorithm selected to run according to whether or not it receives its expected data as input from the pipeline workflow.

Also consider Functional Programming in respect to the Pipeline-Worker Architecture. Each worker should not have side effects or mutate state of the overall program. This is why additional metadata elements expected by the compiler are added to the pipeline’s input instead of mutating the input. The workers do not mutate state for the entire program either, because a pipeline of workers should always produce the same output when called with a specific input.

Things get interesting, too, when implementing this architecture in the full Functional Programming paradigm. For example, pulling in monads, such as the Task and State monads, may potentially ease development in pipeline architectures.

Example

Requirements

For our detailed example, we will assume the business requirements are detailed such that customer satisfaction (set as variable S) is calculated by focus group and year:

  1. For 2017 and 2018, the model for calculating customer satisfaction for the Adult Focus Group is:

S = (x+1)+(y+2)+(z+3)

  1. For 2017 for the Children Focus Group, the model is:  

S = (x*x+1)+(y/2+2)+(z+3)

  1. For 2018 for the Children Focus Group, the model is:

S = (x*x+1)+(2y+2)+(z+13)

  1. Data comes in with the form:
{
  'title': 'Focus Group Data',
  'focusGroups': [
    {
      'title': 'Focus Group: Adults',
      'type': 'adult',
      'yearData': [
        {
          'year': 2017,
          'dataValues': {
            'x': 3.14159,
            'y': 2.71828,
            'z': 6.67408
          }
        },
        {
          'year': 2018,
          'dataValues': {
            'x': 3.14159,
            'y': 2.71828,
            'z': 6.67408
          }
        }
      ],
    },
    {
      'title': 'Focus Group: Children',
      'type': 'child',
      'yearData': [
        {
          'year': 2017,
          'dataValues': {
            'x': 1,
            'y': 1,
            'z': 2
          }
        },
        {
          'year': 2018,
          'dataValues': {
            'x': 3,
            'y': 5,
            'z': 8
          }
        }
      ]
    }
  ]
}

Implementation

An example implementation of the architecture applied to solving our equation in Node could take the form of:

// Main Export File

const adultPipeline = require('./focus-group-one');
const childPipeline = require('./focus-group-two');

module.exports = (customerSatisfactionData) => {
  const adultData = adultPipeline(customerSatisfactionData);
  const childData = childPipeline(customerSatisfactionData);

  return {
    'adult': adultData,
    'child': childData
  };
};

Here we have two pipelines, one for the Adult Focus Group and one for the Child Focus Group.

The first pipeline for the Adult Focus Group looks like:

// Adult Focus Group
// Customer Satisfaction Pipeline

const addTermX = require('./add-term-x');
const addTermY = require('./add-term-y');
const addTermZ = require('./add-term-z');
const compiler = require('./compiler);

const adultProcessingPipeline = (customerSatisfactionData) => {
  const result = [ [customerSatisfactionData] ]
    .map((args) => addTermX(...args))
    .map((args) => addTermY(...args))
    .map((args) => addTermZ(...args));

  return compiler(...result[0]);
};

module.exports = adultProcessingPipeline;

The second pipeline for the child focus group looks like:

// Child Focus Group
// Customer Satisfaction Calculation Pipeline

const addTermX2017 = require('./year-2017/add-term-x');
const addTermY2017 = require('./year-2017/add-term-y');
const addTermZ2017 = require('./year-2017/add-term-z');
const addTermX2018 = require('./year-2018/add-term-x');
const addTermY2018 = require('./year-2018/add-term-y');
const addTermZ2018 = require('./year-2018/add-term-z');
const compiler = require('./compiler);

const childProcessingPipeline = (customerSatisfactionData) => {
  const result = [ [customerSatisfactionData] ]
    .map((args) => addTermX2017(...args))
    .map((args) => addTermY2017(...args))
    .map((args) => addTermZ2017(...args))
    .map((args) => addTermX2018(...args))
    .map((args) => addTermY2018(...args))
    .map((args) => addTermZ2018(...args));

  return compiler(...result[0]);
};

module.exports = childProcessingPipeline;

There are a number of things to point out here, so let’s dive right in!

Term Processing Workers

The workers that process the different terms are unique to processing by year. Let’s take a look at what the addTermY worker looks like for the Child Focus Group for year 2017:

// Child Focus Group
// Add Term Y Worker
// Equation: y/2+2

const addTermY = (customerSatisfactionData) => {

  customerSatisfactionData.focusGroups
    .filter((fg) => fg.type === 'child'))
    .forEach((fg) => {
      fg.yearData
        .filter((yd) => yd.year === 2017))
        .forEach((yd2017) => {
          // Add metadata field 'y_term' with calculated value
          yd2017['term_values']['y_term'] = (yd2017.y / 2) + 2));
        });
    });

  return [ customerSatisfactionData ];
};

module.exports = addTermY;

Each worker shuffles its computed term value into the $.focusGroups[*].yearData[*].term_values metadata object for the compiler.

Compiler

Finally, the compiler takes all of the metadata and forms the final values, return object, and object structure for the pipeline:

// Child Focus Group
// Compiler Worker

const _ = require('lodash');

const compiler = (result) => {
  const solution = result.focusGroups
    .map((fg) => {
      fg.yearData
        .forEach((yd) => {
          // Get the calculated term values into an array
          const term_values = Object.values(yd.term_values);

          // Add term values together
          const sum = _.sum(termValues);

          // Add to focus group metadata
          fg[yd.year] = {
            'sum': sum,
            'term_values': yd.term_values
          };
        };

      // Compile data
      const year = yd.year;
      const mappedFocusGroup = {
        'title': fg.title,
        'type': fg.type,
        '2017': {
          's_value': fg['2017'].sum,
          'term_values': fg['2017'].term_values
        },
        '2018': {
          's_value': fg['2018'].sum,
          'term_values': fg['2018'].term_values
        }
      };

      return mappedFocusGroup;
    };
  };

  return [ solution ];
};

module.exports = compiler;

The resulting data structure will look like (with mock values):

[
  {
    'title': 'Focus Group: Children',
    'type': 'child',
    '2017': {
      's_value': 1,
      'term_values': {
        'x': 1,
        'y': 2,
        'z': 3
      }
    },
    '2018': {
      's_value': 5,
      'term_values': {
        'x': 8,
        'y': 13,
        'z': 21
      }
    }
  },
  {
    'title': 'Focus Group: Adults',
    'type': 'adult',
    '2017': {
      's_value': 1,
      'term_values': {
        'x': 1,
        'y': 2,
        'z': 3
      }
    },
    '2018': {
      's_value': 1,
      'term_values': {
        'x': 1,
        'y': 2,
        'z': 3
      }
    }
  }
]

Packaging

 A project structure could be organized like so:

/src
  /equation-pipeline
    /workers
      addTermX.js
      addTermY.js
      addTermZ.js
      compiler.js
    equation-pipeline.js
  /additional-pipeline
    /workers
      //... workers located here...
    additional-pipeline.js
index.js
package.json

Multiple pipelines may also be implemented to process multiple sets of resulting data from a single dataset.

The overall intent is a Node package that processes data using the Pipeline-Worker Architecture and that may be imported as a dependency into an application, which requires data processing. Therefore, the package may be used like so:

const dataProcessingPipeline = require('data-processing-pipeline');

const processedData = dataProcessingPipeline(dataToProcess);

The index.js could even export the pipeline as an async/await function or even wrapped in a promise if desired.

Development and Testing

It is now much easier to assign transformations in data to multiple developers, because there are no monolithic functions. Since the transformations are atomic and encapsulated, subject matter experts (e.g., a business analyst or developer) may focus on one individual worker at a time instead of dealing with interrelated, convoluted code.

By adhering to more functional programming paradigms, it is now easier to write functions without side effects. Functions (workers) will now have a much more limited scope in terms of input and will always return the same result for that input. Coupled with the lack of monolithic functions, unit tests are now more concise and easier to write. Edge cases, mutated state,  and other classes of bugs may be mitigated altogether.

Another advantage in terms of testing is that unit tests actually cover individual units of code, while integration tests are allowed to test the integration of workers over a pipeline via black-box testing.

Changes are now more identifiable and reportable, which makes bugs less likely to appear. However, if they do, they are much easier to address since specific changes will only occur in specific workers; only the logic within the code that requires changes will change without side effects in other areas of the codebase.

Final Thoughts

By breaking down a large processing problem into its atomic parts, assigning those parts to workers, and bringing those workers together into a pipeline, we can start to see the advantage of the Pipeline-Worker Architecture. Deeply nested data and complex data transformations may be processed in an intuitive and easy-to-understand way.

For these reasons and the examples above, the utility of this architecture for data processing has been demonstrated to show how this architecture is an additional, powerful tool in your data processing toolkit.