Introduction
The library allows to create scheduled tasks via Redis for Rust.
How does it work?
Taskline revolves around the concept of a task. A task is a unit of work that is requested by a producer to be completed by a consumer / worker.
A producer can schedule a task to be completed at a specific time in the future. A consumer can then fetch the task and complete it.
There are backends for consumers and producers, which must impliment the DequeuBackend
and EnqueuBackend
traits. Right now, there is only one backend, which is Redis.
When should I use Taskline?
Taskline is a good fit for applications that need to schedule work to be done in the future. For example, Taskline is a good fit for:
- Scheduling emails to be sent in the future
- Scheduling a notification to be sent to a user in the future
Quick Start
Taskline is provided as the a library on crates.io. To get started, add taskline as a dependency to your project.
cargo add taskline
Example
The library provides an asynchronous code for interacting with Redis. You can use tokio
or async-std
as the runtime.
First of all, you need to create a RedisBackend
instance. You can do this by using the RedisBackend::new
method or RedisBackendConfig
struct.
After that, you need to create a consumer and a producer. These are simple structs for more comfortable interaction with the library. You can create them using the Consumer::new
and Producer::new
methods.
You can look at the example below.
extern crate redis;
use tokio::time::{sleep, Duration};
use taskline::prelude::*;
#[tokio::main]
async fn main() {
let queue = RedisBackendConfig {
queue_key: "taskline",
read_batch_size: 10,
autodelete: true,
}
.with_client(redis::Client::open("redis://127.0.0.1/").unwrap());
if !queue.is_redis_version_ok().await.unwrap() {
return;
}
queue
.write(&"Hello!".to_string(), &(now() + 1000.))
.await
.unwrap();
loop {
let tasks = queue.read(&now()).await;
match tasks {
Ok(tasks) => {
if tasks.is_empty() {
sleep(Duration::from_millis(100)).await;
continue;
}
for task in tasks {
tokio::task::spawn(async move {
println!("Consumed '{}'", task);
});
}
}
Err(e) => {
sleep(Duration::from_millis(1000)).await;
println!("Error: {:?}", e);
continue;
}
}
}
}
More examples can be found here.
Run the example
git clone git@github.com:daxartio/taskline.git
# git clone https://github.com/daxartio/taskline.git
cd taskline
docker-compose up -d
cargo run --example redis
User Guide
That you need to know.
Formats of tasks
A format of a task for sending and receiving via Redis
Actually, Taskline uses a format of a backend. You can use any format which you want.
There are two formats of a task for sending and receiving via Redis which are implemented in the library:
- JSON
- String
Example
extern crate redis;
use serde::{Deserialize, Serialize};
use taskline::prelude::*;
#[derive(Deserialize, Serialize, Debug, Clone)]
struct Data {
id: u64,
name: String,
}
#[tokio::main]
async fn main() {
let queue_key = String::from("taskline");
let backend = JsonRedisBackend::<Data>::new(RedisBackend::new(
redis::Client::open("redis://127.0.0.1/").unwrap(),
queue_key,
10,
true,
));
let producer = Producer::new(backend.clone());
let consumer = Consumer::new(backend.clone());
if !backend.is_redis_version_ok().await.unwrap() {
return;
}
producer
.schedule(
&Data {
id: 1,
name: "Task".to_string(),
},
&(now() + 1000.),
)
.await
.unwrap();
poll_tasks(100, consumer, |tasks| async {
for task in tasks.unwrap() {
tokio::task::spawn(async move {
println!("Consumed {:?}", task.unwrap());
});
}
true
})
.await;
}
Autodelete
Deleting from a storage after handling
If you want to delete a task from storage after handling, you can use RedisJsonBackend
or RedisBackend
with autodelete=false
parameter. It's safe to use it only with one consumer. If you have more than one consumer, you can use distributed lock by redis. It's also named as redlock. See Distributed Locks with Redis.
Don't forget to delete a task explicitly from storage after handling. See Committer::commit
.
It's experimental implementation. In the future, it will be implemented more comfortable way.
Recommendation
I recommend to use autodelete=True
, if it fits to you. This way is simple to understanding and it do not require extra configurations.
But you need to know that your tasks will not be handling again if your application has an error.
Example
extern crate redis;
use serde::{Deserialize, Serialize};
use taskline::prelude::*;
#[derive(Deserialize, Serialize, Debug, Clone)]
struct Data {
id: u64,
name: String,
}
#[tokio::main]
async fn main() {
let backend = JsonRedisBackend::<Data>::new(RedisBackend::new(
redis::Client::open("redis://127.0.0.1/").unwrap(),
String::from("taskline"),
10,
false,
));
let producer = Producer::new(backend.clone());
let consumer = Consumer::new(backend.clone());
let committer = Committer::new(backend.clone());
producer
.schedule(
&Data {
id: 1,
name: "Task".to_string(),
},
&(now() + 1000.),
)
.await
.unwrap();
poll_tasks(100, consumer, |tasks| async {
for task in tasks.unwrap() {
let task = task.unwrap();
println!("Consumed {:?}", task);
committer.commit(&task).await.unwrap();
}
true
})
.await;
}
Contribution guidelines
First off, thank you for considering contributing to taskline.
If your contribution is not straightforward, please first discuss the change you wish to make by creating a new issue before making the change.
Reporting issues
Before reporting an issue on the issue tracker, please check that it has not already been reported by searching for some related keywords.
Pull requests
Try to do one pull request per change.
Commit Message Format
This project adheres to Conventional Commits. A specification for adding human and machine readable meaning to commit messages.
Commit Message Header
<type>(<scope>): <short summary>
│ │ │
│ │ └─⫸ Summary in present tense. Not capitalized. No period at the end.
│ │
│ └─⫸ Commit Scope
│
└─⫸ Commit Type: feat|fix|build|ci|docs|perf|refactor|test|chore
Type
feat | Features | A new feature |
---|---|---|
fix | Bug Fixes | A bug fix |
docs | Documentation | Documentation only changes |
style | Styles | Changes that do not affect the meaning of the code (white-space, formatting, missing semi-colons, etc) |
refactor | Code Refactoring | A code change that neither fixes a bug nor adds a feature |
perf | Performance Improvements | A code change that improves performance |
test | Tests | Adding missing tests or correcting existing tests |
build | Builds | Changes that affect the build system or external dependencies (example scopes: main, serde) |
ci | Continuous Integrations | Changes to our CI configuration files and scripts (example scopes: Github Actions) |
chore | Chores | Other changes that don't modify src or test files |
revert | Reverts | Reverts a previous commit |
Developing
Set up
This is no different than other Rust projects.
git clone https://github.com/daxartio/taskline
cd taskline
cargo test
Useful Commands
-
Run Clippy:
cargo clippy --all-targets --all-features --workspace
-
Run all tests:
cargo test --all-features --workspace
-
Check to see if there are code formatting issues
cargo fmt --all -- --check
-
Format the code in the project
cargo fmt --all
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
0.9.5 (2025-01-22)
0.9.4 (2024-10-23)
0.9.3 (2024-07-11)
0.9.2 (2024-04-29)
Refactor
- add with_capacity
0.9.1 (2023-11-04)
Fix
- fix interval between tasks
0.9.0 (2023-09-22)
Feat
- add new functions - now, poll_tasks
Fix
- return a value for poll_tasks
0.8.1 (2023-09-21)
Fix
- make the MemoryBackend type as a generic type
0.8.0 (2023-09-20)
Feat
- add the memory backend
0.7.0 (2023-08-19)
Feat
- use & for args
Refactor
- don't use async trait methods
0.6.0 (2023-07-21)
Feat
- add is_redis_version_ok method
0.5.1 (2023-07-14)
Refactor
- add committer instead of the delete method
0.5.0 (2023-07-12)
Feat
- add autodelete arg
0.4.2 (2023-07-03)
Fix
- update docs
0.4.1 (2023-06-25)
Fix
- docs: update code's documentation
0.4.0 (2023-06-25)
Feat
- add JsonRedisBackend
0.3.0 (2023-06-18)
Feat
- return error
0.2.0 (2023-06-15)
Feat
- simpify api
0.1.1 (2023-06-13)
Fix
- allow to write of queue_key as String or &str
0.1.0 (2023-06-12)
Feat
- add generics
- add loop instead of tasks
- use async await
- add scheduled tasks
- init
Fix
- add delay between requests
Code of Conduct
This project adheres to the Rust Code of Conduct, which can be found here.