Introduction

The library allows to create scheduled tasks via Redis for Rust.

How does it work?

Taskline revolves around the concept of a task. A task is a unit of work that is requested by a producer to be completed by a consumer / worker.

A producer can schedule a task to be completed at a specific time in the future. A consumer can then fetch the task and complete it.

There are backends for consumers and producers, which must impliment the DequeuBackend and EnqueuBackend traits. Right now, there is only one backend, which is Redis.

When should I use Taskline?

Taskline is a good fit for applications that need to schedule work to be done in the future. For example, Taskline is a good fit for:

  • Scheduling emails to be sent in the future
  • Scheduling a notification to be sent to a user in the future

Quick Start

Taskline is provided as the a library on crates.io. To get started, add taskline as a dependency to your project.

cargo add taskline

Example

The library provides an asynchronous code for interacting with Redis. You can use tokio or async-std as the runtime.

First of all, you need to create a RedisBackend instance. You can do this by using the RedisBackend::new method or RedisBackendConfig struct.

After that, you need to create a consumer and a producer. These are simple structs for more comfortable interaction with the library. You can create them using the Consumer::new and Producer::new methods.

You can look at the example below.

extern crate redis;
use tokio::time::{sleep, Duration};

use taskline::prelude::*;

#[tokio::main]
async fn main() {
    let queue = RedisBackendConfig {
        queue_key: "taskline",
        read_batch_size: 10,
        autodelete: true,
    }
    .with_client(redis::Client::open("redis://127.0.0.1/").unwrap());

    if !queue.is_redis_version_ok().await.unwrap() {
        return;
    }

    queue
        .write(&"Hello!".to_string(), &(now() + 1000.))
        .await
        .unwrap();

    loop {
        let tasks = queue.read(&now()).await;
        match tasks {
            Ok(tasks) => {
                if tasks.is_empty() {
                    sleep(Duration::from_millis(100)).await;
                    continue;
                }
                for task in tasks {
                    tokio::task::spawn(async move {
                        println!("Consumed '{}'", task);
                    });
                }
            }
            Err(e) => {
                sleep(Duration::from_millis(1000)).await;
                println!("Error: {:?}", e);
                continue;
            }
        }
    }
}

More examples can be found here.

Run the example

git clone git@github.com:daxartio/taskline.git
# git clone https://github.com/daxartio/taskline.git
cd taskline
docker-compose up -d
cargo run --example redis

User Guide

That you need to know.

Formats of tasks

A format of a task for sending and receiving via Redis

Actually, Taskline uses a format of a backend. You can use any format which you want.

There are two formats of a task for sending and receiving via Redis which are implemented in the library:

  • JSON
  • String

Example

extern crate redis;
use serde::{Deserialize, Serialize};

use taskline::prelude::*;

#[derive(Deserialize, Serialize, Debug, Clone)]
struct Data {
    id: u64,
    name: String,
}

#[tokio::main]
async fn main() {
    let queue_key = String::from("taskline");
    let backend = JsonRedisBackend::<Data>::new(RedisBackend::new(
        redis::Client::open("redis://127.0.0.1/").unwrap(),
        queue_key,
        10,
        true,
    ));
    let producer = Producer::new(backend.clone());
    let consumer = Consumer::new(backend.clone());

    if !backend.is_redis_version_ok().await.unwrap() {
        return;
    }

    producer
        .schedule(
            &Data {
                id: 1,
                name: "Task".to_string(),
            },
            &(now() + 1000.),
        )
        .await
        .unwrap();

    poll_tasks(100, consumer, |tasks| async {
        for task in tasks.unwrap() {
            tokio::task::spawn(async move {
                println!("Consumed {:?}", task.unwrap());
            });
        }
        true
    })
    .await;
}

Autodelete

Deleting from a storage after handling

If you want to delete a task from storage after handling, you can use RedisJsonBackend or RedisBackend with autodelete=false parameter. It's safe to use it only with one consumer. If you have more than one consumer, you can use distributed lock by redis. It's also named as redlock. See Distributed Locks with Redis.

Don't forget to delete a task explicitly from storage after handling. See Committer::commit.

It's experimental implementation. In the future, it will be implemented more comfortable way.

Recommendation

I recommend to use autodelete=True, if it fits to you. This way is simple to understanding and it do not require extra configurations. But you need to know that your tasks will not be handling again if your application has an error.

Example

extern crate redis;
use serde::{Deserialize, Serialize};

use taskline::prelude::*;

#[derive(Deserialize, Serialize, Debug, Clone)]
struct Data {
    id: u64,
    name: String,
}

#[tokio::main]
async fn main() {
    let backend = JsonRedisBackend::<Data>::new(RedisBackend::new(
        redis::Client::open("redis://127.0.0.1/").unwrap(),
        String::from("taskline"),
        10,
        false,
    ));
    let producer = Producer::new(backend.clone());
    let consumer = Consumer::new(backend.clone());
    let committer = Committer::new(backend.clone());

    producer
        .schedule(
            &Data {
                id: 1,
                name: "Task".to_string(),
            },
            &(now() + 1000.),
        )
        .await
        .unwrap();

    poll_tasks(100, consumer, |tasks| async {
        for task in tasks.unwrap() {
            let task = task.unwrap();
            println!("Consumed {:?}", task);
            committer.commit(&task).await.unwrap();
        }
        true
    })
    .await;
}

Contribution guidelines

First off, thank you for considering contributing to taskline.

If your contribution is not straightforward, please first discuss the change you wish to make by creating a new issue before making the change.

Reporting issues

Before reporting an issue on the issue tracker, please check that it has not already been reported by searching for some related keywords.

Pull requests

Try to do one pull request per change.

Commit Message Format

This project adheres to Conventional Commits. A specification for adding human and machine readable meaning to commit messages.

Commit Message Header

<type>(<scope>): <short summary>
  │       │             │
  │       │             └─⫸ Summary in present tense. Not capitalized. No period at the end.
  │       │
  │       └─⫸ Commit Scope
  │
  └─⫸ Commit Type: feat|fix|build|ci|docs|perf|refactor|test|chore

Type

featFeaturesA new feature
fixBug FixesA bug fix
docsDocumentationDocumentation only changes
styleStylesChanges that do not affect the meaning of the code (white-space, formatting, missing semi-colons, etc)
refactorCode RefactoringA code change that neither fixes a bug nor adds a feature
perfPerformance ImprovementsA code change that improves performance
testTestsAdding missing tests or correcting existing tests
buildBuildsChanges that affect the build system or external dependencies (example scopes: main, serde)
ciContinuous IntegrationsChanges to our CI configuration files and scripts (example scopes: Github Actions)
choreChoresOther changes that don't modify src or test files
revertRevertsReverts a previous commit

Developing

Set up

This is no different than other Rust projects.

git clone https://github.com/daxartio/taskline
cd taskline
cargo test

Useful Commands

  • Run Clippy:

    cargo clippy --all-targets --all-features --workspace
    
  • Run all tests:

    cargo test --all-features --workspace
    
  • Check to see if there are code formatting issues

    cargo fmt --all -- --check
    
  • Format the code in the project

    cargo fmt --all
    

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

0.9.2 (2024-04-29)

Refactor

  • add with_capacity

0.9.1 (2023-11-04)

Fix

  • fix interval between tasks

0.9.0 (2023-09-22)

Feat

  • add new functions - now, poll_tasks

Fix

  • return a value for poll_tasks

0.8.1 (2023-09-21)

Fix

  • make the MemoryBackend type as a generic type

0.8.0 (2023-09-20)

Feat

  • add the memory backend

0.7.0 (2023-08-19)

Feat

  • use & for args

Refactor

  • don't use async trait methods

0.6.0 (2023-07-21)

Feat

  • add is_redis_version_ok method

0.5.1 (2023-07-14)

Refactor

  • add committer instead of the delete method

0.5.0 (2023-07-12)

Feat

  • add autodelete arg

0.4.2 (2023-07-03)

Fix

  • update docs

0.4.1 (2023-06-25)

Fix

  • docs: update code's documentation

0.4.0 (2023-06-25)

Feat

  • add JsonRedisBackend

0.3.0 (2023-06-18)

Feat

  • return error

0.2.0 (2023-06-15)

Feat

  • simpify api

0.1.1 (2023-06-13)

Fix

  • allow to write of queue_key as String or &str

0.1.0 (2023-06-12)

Feat

  • add generics
  • add loop instead of tasks
  • use async await
  • add scheduled tasks
  • init

Fix

  • add delay between requests

Code of Conduct

This project adheres to the Rust Code of Conduct, which can be found here.

Additional Resources

Alternatives