Celery service

Language: Python 2.7

Celery is a distributed task queue, and is used in the Willow backend to perform all requested data operations asynchronously.

Dependencies

  • redis as a message broker and result backend
  • requests for notifying Flask app of completed operation through POST

Tasks

Celery tasks must be defined as Python functions using the special Celery task decorator. In Willow, Celery tasks are always invoked by the Flask application in response to HTTP and WebSocket requests made by connected clients.

Because the Celery service and Flask application run as different processes, an interprocess communication (IPC) mechanism was needed to alert the Flask application whenever a Celery task was completed. Since Flask inherently supports HTTP requests, we decided to employ notify the results of completed Celery operations through HTTP, using an internal POST /celeryTaskCompleted endpoint.

Task Function Structure

A single Celery task function is defined for each HTTP and WebSocket request. Most task functions will have the following structure:

@celery.task()
def request(sessionID, requestID, ...):
        # Celery operation results are always JSON dictionaries
        result = {'success' : False, 'requestID': requestID, 'sessionID': sessionID, 'operation': "standardize"}

        # Load data frame from HDF file store using helper function
        df = loadDataFrameFromCache(sessionID)

        try:
                # call appropriate function from dcs library
                result['success'] = True
        except Exception as e:
                # include error information in result
                toReturn['error'] = str(e)
                toReturn['errorDescription'] = traceback.format_exc()

try:
        # POST result to Flask application using requests library
        requests.post("http://localhost:5000/celeryTaskCompleted/", json=toReturn, timeout=0.001)
except:
        pass

List of Tasks

flaskApp.tasks.DFtoJSON(sessionID)

Task invoked synchronously by GET /downloadJSON request in Flask application

Uses pandas.DataFrame.to_json().

Returns:CSV text
Return type:str
flaskApp.tasks.DataFrameToCSV(sessionID)

Task invoked synchronously by GET /downloadCSV request in Flask application

Uses pandas.DataFrame.to_csv().

Returns:CSV text
Return type:str
flaskApp.tasks.analyze(sessionID, requestID, column)

Task invoked asynchronously by ‘analyze’ WebSocket request in Flask application

Uses dcs.analyze.analysisForColumn().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.changeColumnDataType(sessionID, requestID, column, newDataType, dateFormat=None)

Task invoked asynchronously by ‘changeColumnDataType’ WebSocket request in Flask application

Uses dcs.load.changeColumnDataType().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.combineColumns(sessionID, requestID, columnsToCombine, seperator, newName, insertIndex)

Task invoked asynchronously by ‘combineColumns’ WebSocket request in Flask application

Uses dcs.clean.combineColumns().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.data(request)

Task invoked asynchronously by ‘data’ WebSocket request in Flask application

Uses dcs.load.dataFrameToJSON(), dcs.load.rowsWithInvalidValuesInColumns(), :func`dcs.load.outliersTrimmedMeanSd`, dcs.load.duplicateRowsInColumns() and dcs.view.filterWithSearchQuery().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.deleteColumns(sessionID, requestID, columnIndices)

Task invoked asynchronously by ‘deleteColumns’ WebSocket request in Flask application

Uses dcs.load.removeColumns().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.deleteRows(sessionID, requestID, rowIndices)

Task invoked asynchronously by ‘deleteRows’ WebSocket request in Flask application

Uses dcs.load.removeRows().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.deleteRowsWithNA(sessionID, requestID, columnIndex)

Task invoked asynchronously by ‘deleteRowsWithNA’ WebSocket request in Flask application

Uses dcs.clean.deleteRowsWithNA().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.discretize(sessionID, requestID, columnIndex, cutMode, numberOfBins)

Task invoked asynchronously by ‘discretize’ WebSocket request in Flask application

Uses dcs.clean.discretize().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.emptyStringToNan(sessionID, requestID, columnIndex)

Task invoked asynchronously by ‘emptyStringToNan’ WebSocket request in Flask application

Uses dcs.load.emptyStringToNan().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.executeCommand(sessionID, requestID, command)

Task invoked asynchronously by ‘executeCommand’ WebSocket request in Flask application

Uses dcs.clean.executeCommand().

Danger

Using this function carries direct risk, as any arbitrary command can be executed

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.fillDown(sessionID, requestID, columnFrom, columnTo, method)

Task invoked asynchronously by ‘fillDown’ WebSocket request in Flask application

Uses dcs.clean.fillDown().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.fillWithAverage(sessionID, requestID, columnIndex, metric)

Task invoked asynchronously by ‘fillWithAverage’ WebSocket request in Flask application

Uses dcs.clean.fillWithAverage().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.fillWithCustomValue(sessionID, requestID, columnIndex, newValue)

Task invoked asynchronously by ‘fillWithCustomValue’ WebSocket request in Flask application

Uses dcs.clean.fillWithCustomValue().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.findReplace(sessionID, requestID, columnIndex, toReplace, replaceWith, matchRegex)

Task invoked asynchronously by ‘findReplace’ WebSocket request in Flask application

Uses dcs.clean.findReplace().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.generateDummies(sessionID, requestID, columnIndex, inplace)

Task invoked asynchronously by ‘generateDummies’ WebSocket request in Flask application

Uses dcs.clean.generateDummies().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.insertDuplicateColumn(sessionID, requestID, columnIndex)

Task invoked asynchronously by ‘insertDuplicateColumn’ WebSocket request in Flask application

Uses dcs.clean.insertDuplicateColumn().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.interpolate(sessionID, requestID, columnIndex, method, order)

Task invoked asynchronously by ‘interpolate’ WebSocket request in Flask application

Uses dcs.clean.fillByInterpolation().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.loadDataFrameFromCache(sessionID, key='original')

Supporting function that loads a dataset from the HDF file store.

The HDF file format supports storing multiple datasets with different labels in the same file. The HDF file associated with a Willow sessionID always stores the current version of the dataset, under the ‘original’ label.

Uses pandas.read_hdf().

Parameters:
  • sessionID (str) – Willow sessionID
  • key (str, optional) – retrieve a dataset under a different label in the HDF file
Returns:

pandas.dataFrame on success, None on failure

Return type:

pandas.dataFrame

flaskApp.tasks.metadata(request)

Task invoked asynchronously by ‘metadata’ WebSocket request in Flask application

Uses dcs.load.dataFrameToJSON(), dcs.load.rowsWithInvalidValuesInColumns(), :func`dcs.load.outliersTrimmedMeanSd`, dcs.load.duplicateRowsInColumns() and dcs.view.filterWithSearchQuery().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.newCellValue(sessionID, requestID, columnIndex, rowIndex, newValue)

Task invoked asynchronously by ‘newCellValue’ WebSocket request in Flask application

Uses dcs.load.newCellValue().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.normalize(sessionID, requestID, columnIndex, rangeFrom, rangeTo)

Task invoked asynchronously by ‘normalize’ WebSocket request in Flask application

Uses dcs.clean.normalize().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.renameColumn(sessionID, requestID, column, newName)

Task invoked asynchronously by ‘renameColumn’ WebSocket request in Flask application

Uses dcs.load.renameColumn().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.saveToCache(df, sessionID)

Supporting function that saves a pandas.DataFrame object to the HDF file store.

This function must be called after every Celery operation that modifies the dataset, as the Willow backend depends on the invariant that the HDF file corresponding to a Willow sessionID always holds the latest version of the dataset.

Uses pandas.DataFrame.to_hdf().

Returns:True on success, False on failure
Return type:bool
flaskApp.tasks.splitColumn(sessionID, requestID, columnIndex, delimiter, regex)

Task invoked asynchronously by ‘splitColumn’ WebSocket request in Flask application

Uses dcs.clean.splitColumn().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.undo(sessionID, requestID)

Task invoked asynchronously by ‘undo’ WebSocket request in Flask application

Uses loadDataFrameFromCache().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.

flaskApp.tasks.undoAvailable(sessionID)

Supporting function that detects whether an undo operation is available.

The HDF file format supports storing multiple datasets with different labels in the same file. Undo operations revert to a dataset to what is stored under the ‘undo’ label in the same HDF file.

This function checks if the ‘undo’ label is present for the HDF file for the specified sessionID.

Parameters:sessionID (str) – Willow sessionID
Returns:bool
flaskApp.tasks.uniquefyDataFrameColumnNames(df)

Supporting function that ensures that all column names in a pandas.DataFrame object are unique.

The HDF fixed file format used by Willow does not support duplicate column names, so this function checks if every column name is unique. If a duplicate column name is found, the column name is renamed with a unique integer appended to the column name

e.g. (..., Date, Date, ...) becomes (..., Date_1, Date, ...)

flaskApp.tasks.userUploadedCSVToDataFrame(uploadID, initialSkip, sampleSize, seed, headerIncluded)

Task invoked synchronously by POST /upload request in Flask application

Calls dcs.load.CSVtoDataFrame() and flaskApp.tasks.saveToCache()

Returns:new Willow sessionID
Return type:str
flaskApp.tasks.userUploadedJSONToDataFrame(uploadID, initialSkip, sampleSize, seed)

Task invoked synchronously by POST /upload request in Flask application

Calls dcs.load.JSONtoDataFrame() and flaskApp.tasks.saveToCache()

Returns:new Willow sessionID
Return type:str
flaskApp.tasks.userUploadedXLSXToDataFrame(uploadID, initialSkip, sampleSize, seed, headerIncluded)

Task invoked synchronously by POST /upload request in Flask application

Calls dcs.load.XLSXtoDataFrame() and flaskApp.tasks.saveToCache()

Returns:new Willow sessionID
Return type:str
flaskApp.tasks.visualize(request)

Task invoked asynchronously by ‘visualize’ WebSocket request in Flask application

Uses dcs.view.histogram(), dcs.view.scatter(), dcs.view.line(), dcs.view.date() and dcs.view.frequency().

POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.