Celery Worker: How to Set an Asynchronous Task Using Redis and Flask



Have you ever encountered applications or tasks that take a long time to run or provide a result? Workflow in your application may be slowed by operations like email sending and data uploading that must be completed promptly. Separate from other jobs, these tasks ought to be carried out. 


As these activities are processed in the background, your application should be able to go on to other duties. It can be given to the user after these tasks are finished and the result is ready.


So now the question is: How can we provide the customer with a speedy experience while our difficult task loads?


In this case, asynchronous programming is used. Celery is used to do the work in parallel. Before delving deeply into the subject, let's examine some fundamentals.


What is an asynchronous task?


Simply said, an asynchronous task is a function that runs after every other process active on your app. When called, this function does not impact the regular workflow of your application.


You can move to a new task with asynchronous actions before the previous one is finished. You can handle several requests simultaneously with asynchronous programming, getting more done in less time.


What is a task queue?


A task queue is a mechanism for allocating tasks that must be finished in the background without interfering with the request and response cycle of an application.


Task Queues make it simpler to assign work that slows down application processes as they are executing. While visitors continue to interact with the website and engage in other activities, software programmes like these can manage intensive applications in the background.


This guarantees that the user's involvement is constant, timely, and workload-independent.


What are Celery, Redis and Flask?

Celery - A task queue called Celery enables us to define jobs that will be done asynchronously, either concurrently or at a future predetermined time. Our ability to delegate time-consuming jobs to other worker processes is its key advantage. 


Redis  - Redis is a distributed, in-memory key-value database, cache, and message broker that uses in-memory data structures and offers optional durability.


Flask - A web framework called Flask offers libraries to create simple web apps in Python. It is created by Armin Ronacher, the president of POCCO, a global organisation of Python lovers. It is built using the WSGI toolkit and the Jinja2 template engine. Flask is regarded as a micro framework.

Okay, let's look at how to use Celery Worker with the Redis message broker and Flask web server to execute asynchronous tasks to provide a good user experience for your application.

Why do we use Celery for asynchronous tasks?

  • Celery is a good choice for our background jobs for several reasons. First of all, it can scale up quite a bit, allowing more workers to be added as needed to handle a rise in demand or traffic. Along with its clear documentation and vibrant user community, Celery is still a project that is actively being developed and is thus supported.

  • Another benefit is that Celery is simple to integrate into a variety of web frameworks, and most of them include libraries to make this process easier.

  • Additionally, it offers the ability to use webhooks to communicate with other web apps when no library is available to facilitate communication.

  • Celery also supports many message brokers, giving us flexibility.

Features of Celery

Celery is a handy structure that decreases production load through delayed tasks, as it prepares asynchronous and planned jobs. Following are some important features of Celery.

Open-Source Library - Python Celery is a free and open-source programme. This feature entices businesses or developers to use Celery and complete their jobs for free.

Straight-Forward Installation  - It is an easy-to-install, lightweight library. Using "pip install -U Celery" in the terminal, we can install it.

Scheduling - Using the datetime module and celery beat, we can define the precise time to start a task. The activities may be started at regular intervals with the celery beat. For repeating events depending on a short interval, we can utilise a periodic job.

Broker Support - Celery supports a variety of message brokers, including the well-known Redis, RabbitMQ and Amazon SQS, however, it is missing some functionality (monitoring and remote control).

Integration with Web Frameworks - Celery integrates with several Python web frameworks, including Flask, Pyramid, Pylons, Django, Tornado, and Trylons.

Fast - In one minute, celery can complete millions of activities.

Works-Flow - It uses a collection of sophisticated primitives known as "canvas" to create both simple and complicated work routines.

How does Celery work?

In the conventional HTTP request-response cycle, the server provides the client with the response after receiving the request from the client. When we try to load the larger tasks, it could run slowly even though it functions properly for smaller activities. As a result, we must create a feature that can speed up loading.

Let's examine how Celery functions.

Celery communicates by messages; typically, a broker acts as a middleman between clients and workers. The celery's internal functioning confirms a producer and consumer pattern. Celery possesses all three major ingredients to a considerable degree.

Producers - Producers are the 'web nodes' that control the web requests. Tasks assigned to the Celery means are compelled into the task queue while the programme is processing.

Consumers - Consumers are the 'worker nodes' that keep an eye on the queue head; the workers take the tasks and do them. Workers can undertake a variety of jobs, which allows them to act in a producer-like manner.

Queue - In essence, it functions as a message broker that connects producers and consumers and it serves as a messaging system between Celery's workers and online applications. RabbitMQ, Redis, Zookeeper, and Amazon SQS all benefit from Celery's extensive support, while its capabilities are limited.

Components of Celery

As was already noted, Celery uses a producer and consumer-based approach. Accordingly, celery contains four components.

  1. Celery client - It serves as the producer, adding tasks to the queue of background tasks.

  2. Celery Worker - The consumer is the one who retrieves jobs from the queue and begins handling them. Celery allows for the simultaneous labour of several workers.

  3. Message broker -  It serves as a messenger between the client and the employee via a message queue. RabbitMQ and Redis are the most widely utilised brokers.

  4. Result Backend - When every Celery worker has completed processing their background task, they use the result backend to store the status and outcomes of the jobs. It also contains information about any errors that may have occurred while processing. A few of the well-liked tools for storing such results are Redis, AMQP, and SQLAlchemy.

To put it all together, the producer that adds a new task to the queue through the message broker is the Celery client. Celery workers then utilise the message broker to consume fresh jobs from the queue. Results are processed and then saved in the result backend.

Starting with the Celery worker using Redis and Flask

Installing Flask, Celery, and Redis

Create a fresh project in a fresh virtual environment using Python 3 and the most recent version of pip first:

$ python3 -m venv venv

$ . venv/bin/activate

$ pip install --upgrade pip

and set up Flask, Celery, and Redis: (The versions we're using for this tutorial are included in the next command.)

$ pip install celery==4.4.1 Flask==1.1.1 redis==3.4.1

Running Redis locally

Use the following commands to run the Redis server locally on your computer, assuming Linux or macOS:

$ wget http://download.redis.io/redis-stable.tar.gz

$ tar xvzf redis-stable.tar.gz

$ rm redis-stable.tar.gz

$ cd redis-stable

$ make

Now that compilation is complete. Several binaries, including redis-server (the Redis server you must start) and redis-cli (the command-line client you may require to communicate with Redis), are available in the source directory inside of redis-stable/. Rather than always travelling to the src directory, you may use the following command to start the server globally (from any location on your computer):

$ make install

You may now find the redis-server binaries in your /usr/local/bin directory. The following command can be used to run the server:

$ redis-server

The server is currently operating on port 6379, which is the standard port. On a different terminal, keep it running. For further use, we would require two additional new terminal tabs to launch the web server and the Celery worker. But first, let's connect Flask with Celery and have our web server ready to go online.

Creating a Flask server

It's simple to set up a Flask server. Go to the folder where you wish to create your server. Make a new Python file and name it celeryapp.py in our example.

And include the following basic code in your Python script:

from flask import Flask

    app = Flask(name__)

    @app.route("/")

    def home():

    return ""Hello, World!""

    if name == "__main":

    app.run(debug=True)

Let's check the functionality of our server right now.

Run the commands listed below in your terminal to launch our server:

python celeryapp.py

If you followed the article exactly, your outcome should be straightforward, similar to the image below.

Integrating Celery with Flask

We need to link Celery with our Flask application now that we have built our server. Simply modify your celeryapp.py file to reflect the below format to accomplish this.

#imports

    from flask import Flask

    from celery import Celery


    #creates a Flask object

    app = Flask(name)


    #Configure the redis server

    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'

    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'


    #creates a Celery object

    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

    celery.conf.update(app.config)

Celery is set up by establishing an object of the type Celery with the application name and the connection to the message broker URL, which is set to CELERY_BROKER_URL as the key in the app.config. You will need to adjust the URL if you use a system other than Redis or if the broker is located on a different machine.

It's usually advisable to update Celery's configuration using celery.conf.update(). Although not required, the CELERY_RESULT_BACKEND is only required to store task status and results in Celery.

The code that would execute in the background is just a regular function with the celery.task decorator. With simply this decorator, the function would always be running in the background. As an illustration:

@celery.task

    def async_function(arg1, arg2):

    #Async task

    return result

To carry out our celery task, it must be invoked just like any other function. The method can be called by including the following code in your celeryapp.py file.

 `async_function(10, 30)`

Sending an Asynchronous Email Using Celery

Let's examine Celery's practical application now. Using our flask application, let's send a message using Celery. The email form that will allow users to send emails must first be constructed. The HTML form's building block is provided here.

 <html>

      <head>

        <title>Flask and Celery</title>

      </head>

      <body>

        <h2>Sending Asynchronous Email</h2>

        {% for message in get_flashed_messages() %}

        <p style="color: red;">{{ message }}</p>

        {% endfor %}

        <form method="POST">

          <p>Send email to: <input type="text" name="email" value="{{ email }}"></p>

          <input type="submit" name="submit" value="Send">

        </form>

      </body>

    </html>

Simple HTML syntax with the ability to display Flask messages is what this is. You should be able to get by with that, hopefully.

Note: The index.html file's code can be found above.

We'd utilise the Flask-Mail extension to send emails.

Flask-Mail needs to be configured, which includes providing details about the email server it will use to send emails.

To set up your email sender, add the following code to your celeryapp.py application:

Flask-Mail configuration

    app.config['MAIL_SERVER'] = 'smtp.googlemail.com'

    app.config['MAIL_PORT'] = 587

    app.config['MAIL_USE_TLS'] = True

    app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')

    app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')

    app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'

Be sure to store your email and password to environment variables for this to work. My password and email are stored in environment variables for convenience and security reasons.

Since there is just one route in this app, we made a route index just for it. The following code should be added to your celeryapp.py file.

  @app.route('/', methods=['GET', 'POST'])

    def index():

    if request.method == 'GET':

    return render_template('index.html', email=session.get('email', ''))

    email = request.form['email']

    session['email'] = email

    # sends this content

    email_msg = {

        'subject': 'Testing Celery with Flask',

        'to': email,

        'body': 'Testing background task with Celery'

    }

    if request.form['submit'] == 'Send':

        # sends the email content to the backgraound function

        send_email.delay(email_msg)

        flash('Sending email to {0}'.format(email))

    else:

        flash('No Email sent')


    return redirect(url_for('index'))

The code above demonstrates a function that takes the data from our HTML form and saves it in a session for later use.

After the submit button has been clicked, this function looks for events on the send_email and submit buttons. The title, email address, and message text are all included in the email_msg.

A flash message is shown to users when the email is submitted so they can see what is happening in the background.

Note: To keep track of the user's input after a page reload, we saved the user's value in the text field in the session.

The asynchronous task, which completes the work when a user submits the email form, is the last component of this programme.

 @celery.task

    def send_email(email_msg):

    #Async function to send an email with Flask-Mail

    msg_sub = Message(email_msg['subject'],

    email_sender = app.config['MAIL_DEFAULT_SENDER'],

    recipient = [email_msg['to']])

    msg_sub.body = email_msg['body']

    with app.app_context():

    mail.send(msg_sub)

As said earlier, this task is decorated with celery.task to make it run in the background.

Using the email data dictionary and Flask-Mail, the function constructs a Message object. Flask-Mail must create an application context before invoking the send() method for it to function.

Wrapping Up

In the era of the fast internet, customers desire the page to load instantly and the result to appear within seconds. The heavy task may take several seconds or even a minute to complete, but the little task can be completed in a single second or even in microseconds.

Our programme frequently performs multiple operations concurrently, which slows down the system's natural flow. Sending emails to a hundred or thousand individuals could be a time-consuming operation.

Users now anticipate faster page loading due to improvements in global bandwidth and latency. Developers should make sure users have a better user experience even in time-consuming tasks like the one above. How can this superpower be attained? The miracle worker is the celery!

Celery is a distributed message passing-based asynchronous task queue that distributes workload among threads or processors. A client, a broker, and multiple workers make up a celery system.

Celery workers are used to shift data-intensive tasks to the background, improving the efficiency of applications. One Celery worker can complete millions of tasks per minute, and Celery is very accessible.

Celery takes more steps than merely assigning a job to a background thread, but the advantages in terms of flexibility and scalability are difficult to overlook.

Celery makes sending scheduled tasks much simpler than any alternative method. Imagine you wish to carry out a task every day. In this scenario, Celery can be utilised to run programmes in the background without any necessary human triggers.

Although celery is typically used for lengthy tasks, it can also be used to connect to external APIs. Data is served to the user as soon as it is received from the API in your Celery process.

This article described why and how to use Celery to launch an asynchronous job that will run sequentially until it is completed.

The power of celery is astounding! Now that we don't have to worry about keeping our users waiting for them to finish, we can add bigger jobs to our Flask applications.
























Comments

Popular posts from this blog

DataDog vs. AWS CloudWatch: Choosing the Best Observability Tool

Redis Tutorial: Exploring Data Types, Architecture, and Key Features