Celery service¶
Language: Python 2.7
Celery is a distributed task queue, and is used in the Willow backend to perform all requested data operations asynchronously.
Dependencies¶
- redis as a message broker and result backend
- requests for notifying Flask app of completed operation through
POST
Tasks¶
Celery tasks must be defined as Python functions using the special Celery task decorator. In Willow, Celery tasks are always invoked by the Flask application in response to HTTP and WebSocket requests made by connected clients.
Because the Celery service and Flask application run as different processes, an interprocess communication (IPC) mechanism was needed to alert the Flask application whenever a Celery task was completed. Since Flask inherently supports HTTP requests, we decided to employ notify the results of completed Celery operations through HTTP, using an internal POST /celeryTaskCompleted endpoint.
Task Function Structure¶
A single Celery task function is defined for each HTTP and WebSocket
request. Most task functions will have the following structure:
@celery.task()
def request(sessionID, requestID, ...):
# Celery operation results are always JSON dictionaries
result = {'success' : False, 'requestID': requestID, 'sessionID': sessionID, 'operation': "standardize"}
# Load data frame from HDF file store using helper function
df = loadDataFrameFromCache(sessionID)
try:
# call appropriate function from dcs library
result['success'] = True
except Exception as e:
# include error information in result
toReturn['error'] = str(e)
toReturn['errorDescription'] = traceback.format_exc()
try:
# POST result to Flask application using requests library
requests.post("http://localhost:5000/celeryTaskCompleted/", json=toReturn, timeout=0.001)
except:
pass
List of Tasks¶
-
flaskApp.tasks.
DFtoJSON
(sessionID)¶ Task invoked synchronously by GET /downloadJSON request in Flask application
Uses
pandas.DataFrame.to_json()
.Returns: CSV text Return type: str
-
flaskApp.tasks.
DataFrameToCSV
(sessionID)¶ Task invoked synchronously by GET /downloadCSV request in Flask application
Uses
pandas.DataFrame.to_csv()
.Returns: CSV text Return type: str
-
flaskApp.tasks.
analyze
(sessionID, requestID, column)¶ Task invoked asynchronously by ‘analyze’ WebSocket request in Flask application
Uses
dcs.analyze.analysisForColumn()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
changeColumnDataType
(sessionID, requestID, column, newDataType, dateFormat=None)¶ Task invoked asynchronously by ‘changeColumnDataType’ WebSocket request in Flask application
Uses
dcs.load.changeColumnDataType()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
combineColumns
(sessionID, requestID, columnsToCombine, seperator, newName, insertIndex)¶ Task invoked asynchronously by ‘combineColumns’ WebSocket request in Flask application
Uses
dcs.clean.combineColumns()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
data
(request)¶ Task invoked asynchronously by ‘data’ WebSocket request in Flask application
Uses
dcs.load.dataFrameToJSON()
,dcs.load.rowsWithInvalidValuesInColumns()
, :func`dcs.load.outliersTrimmedMeanSd`,dcs.load.duplicateRowsInColumns()
anddcs.view.filterWithSearchQuery()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
deleteColumns
(sessionID, requestID, columnIndices)¶ Task invoked asynchronously by ‘deleteColumns’ WebSocket request in Flask application
Uses
dcs.load.removeColumns()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
deleteRows
(sessionID, requestID, rowIndices)¶ Task invoked asynchronously by ‘deleteRows’ WebSocket request in Flask application
Uses
dcs.load.removeRows()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
deleteRowsWithNA
(sessionID, requestID, columnIndex)¶ Task invoked asynchronously by ‘deleteRowsWithNA’ WebSocket request in Flask application
Uses
dcs.clean.deleteRowsWithNA()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
discretize
(sessionID, requestID, columnIndex, cutMode, numberOfBins)¶ Task invoked asynchronously by ‘discretize’ WebSocket request in Flask application
Uses
dcs.clean.discretize()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
emptyStringToNan
(sessionID, requestID, columnIndex)¶ Task invoked asynchronously by ‘emptyStringToNan’ WebSocket request in Flask application
Uses
dcs.load.emptyStringToNan()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
executeCommand
(sessionID, requestID, command)¶ Task invoked asynchronously by ‘executeCommand’ WebSocket request in Flask application
Uses
dcs.clean.executeCommand()
.Danger
Using this function carries direct risk, as any arbitrary command can be executed
POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
fillDown
(sessionID, requestID, columnFrom, columnTo, method)¶ Task invoked asynchronously by ‘fillDown’ WebSocket request in Flask application
Uses
dcs.clean.fillDown()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
fillWithAverage
(sessionID, requestID, columnIndex, metric)¶ Task invoked asynchronously by ‘fillWithAverage’ WebSocket request in Flask application
Uses
dcs.clean.fillWithAverage()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
fillWithCustomValue
(sessionID, requestID, columnIndex, newValue)¶ Task invoked asynchronously by ‘fillWithCustomValue’ WebSocket request in Flask application
Uses
dcs.clean.fillWithCustomValue()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
findReplace
(sessionID, requestID, columnIndex, toReplace, replaceWith, matchRegex)¶ Task invoked asynchronously by ‘findReplace’ WebSocket request in Flask application
Uses
dcs.clean.findReplace()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
generateDummies
(sessionID, requestID, columnIndex, inplace)¶ Task invoked asynchronously by ‘generateDummies’ WebSocket request in Flask application
Uses
dcs.clean.generateDummies()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
insertDuplicateColumn
(sessionID, requestID, columnIndex)¶ Task invoked asynchronously by ‘insertDuplicateColumn’ WebSocket request in Flask application
Uses
dcs.clean.insertDuplicateColumn()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
interpolate
(sessionID, requestID, columnIndex, method, order)¶ Task invoked asynchronously by ‘interpolate’ WebSocket request in Flask application
Uses
dcs.clean.fillByInterpolation()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
loadDataFrameFromCache
(sessionID, key='original')¶ Supporting function that loads a dataset from the HDF file store.
The HDF file format supports storing multiple datasets with different labels in the same file. The HDF file associated with a Willow sessionID always stores the current version of the dataset, under the ‘original’ label.
Uses
pandas.read_hdf()
.Parameters: - sessionID (str) – Willow sessionID
- key (str, optional) – retrieve a dataset under a different label in the HDF file
Returns: pandas.dataFrame on success,
None
on failureReturn type: pandas.dataFrame
-
flaskApp.tasks.
metadata
(request)¶ Task invoked asynchronously by ‘metadata’ WebSocket request in Flask application
Uses
dcs.load.dataFrameToJSON()
,dcs.load.rowsWithInvalidValuesInColumns()
, :func`dcs.load.outliersTrimmedMeanSd`,dcs.load.duplicateRowsInColumns()
anddcs.view.filterWithSearchQuery()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
newCellValue
(sessionID, requestID, columnIndex, rowIndex, newValue)¶ Task invoked asynchronously by ‘newCellValue’ WebSocket request in Flask application
Uses
dcs.load.newCellValue()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
normalize
(sessionID, requestID, columnIndex, rangeFrom, rangeTo)¶ Task invoked asynchronously by ‘normalize’ WebSocket request in Flask application
Uses
dcs.clean.normalize()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
renameColumn
(sessionID, requestID, column, newName)¶ Task invoked asynchronously by ‘renameColumn’ WebSocket request in Flask application
Uses
dcs.load.renameColumn()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
saveToCache
(df, sessionID)¶ Supporting function that saves a
pandas.DataFrame
object to the HDF file store.This function must be called after every Celery operation that modifies the dataset, as the Willow backend depends on the invariant that the HDF file corresponding to a Willow sessionID always holds the latest version of the dataset.
Uses
pandas.DataFrame.to_hdf()
.Returns: True
on success,False
on failureReturn type: bool
-
flaskApp.tasks.
splitColumn
(sessionID, requestID, columnIndex, delimiter, regex)¶ Task invoked asynchronously by ‘splitColumn’ WebSocket request in Flask application
Uses
dcs.clean.splitColumn()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
undo
(sessionID, requestID)¶ Task invoked asynchronously by ‘undo’ WebSocket request in Flask application
Uses
loadDataFrameFromCache()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.
-
flaskApp.tasks.
undoAvailable
(sessionID)¶ Supporting function that detects whether an undo operation is available.
The HDF file format supports storing multiple datasets with different labels in the same file. Undo operations revert to a dataset to what is stored under the ‘undo’ label in the same HDF file.
This function checks if the ‘undo’ label is present for the HDF file for the specified sessionID.
Parameters: sessionID (str) – Willow sessionID Returns: bool
-
flaskApp.tasks.
uniquefyDataFrameColumnNames
(df)¶ Supporting function that ensures that all column names in a
pandas.DataFrame
object are unique.The HDF fixed file format used by Willow does not support duplicate column names, so this function checks if every column name is unique. If a duplicate column name is found, the column name is renamed with a unique integer appended to the column name
e.g. (..., Date, Date, ...) becomes (..., Date_1, Date, ...)
-
flaskApp.tasks.
userUploadedCSVToDataFrame
(uploadID, initialSkip, sampleSize, seed, headerIncluded)¶ Task invoked synchronously by POST /upload request in Flask application
Calls
dcs.load.CSVtoDataFrame()
andflaskApp.tasks.saveToCache()
Returns: new Willow sessionID Return type: str
-
flaskApp.tasks.
userUploadedJSONToDataFrame
(uploadID, initialSkip, sampleSize, seed)¶ Task invoked synchronously by POST /upload request in Flask application
Calls
dcs.load.JSONtoDataFrame()
andflaskApp.tasks.saveToCache()
Returns: new Willow sessionID Return type: str
-
flaskApp.tasks.
userUploadedXLSXToDataFrame
(uploadID, initialSkip, sampleSize, seed, headerIncluded)¶ Task invoked synchronously by POST /upload request in Flask application
Calls
dcs.load.XLSXtoDataFrame()
andflaskApp.tasks.saveToCache()
Returns: new Willow sessionID Return type: str
-
flaskApp.tasks.
visualize
(request)¶ Task invoked asynchronously by ‘visualize’ WebSocket request in Flask application
Uses
dcs.view.histogram()
,dcs.view.scatter()
,dcs.view.line()
,dcs.view.date()
anddcs.view.frequency()
.POSTs result dictionary in JSON format to /celeryTaskCompleted endpoint in Flask application.