Learn Rust by Developing a Crypto Publisher “Apache Kafka” Application | by Sung Kim | Mar, 2022

Picture from Unsplash

I all the time discover that when studying a programming language, it’s useful to simply begin creating an utility from scratch, one step at a time. This lesson builds upon the earlier lesson Learn Rust by Developing Application: Crypto Publisher CLI to publish/produce crypto costs (like Bitcoin value in USD or BTC-USD) to Apache Kafka.

Earlier than diving into an article, let’s get some housekeeping out of method first by offering a hyperlink to a GitHub repository the place all supply codes can be found => https://github.com/sungkim11/crypto-publisher.

Additionally, I wish to showcase what utility you may be creating utilizing Rust as proven under.

Apache Kafka

You need to have Apache Kafka put in in your machine or have entry to Apache Kafka service. I’ve used APACHE KAFKA QUICKSTART to put in Apache Kafka on Home windows 10/11 WSL (Home windows Subsystem for Linux).

Rust Crates

Earlier than we begin creating the applying, we might want to add seven Rust crates:

  • Serde: It’s a framework for serializing and deserializing Rust knowledge buildings effectively and generically. Their crates.io URL is https://crates.io/crates/serde. Serde is used to map JSON knowledge into Rust knowledge buildings or Struct. It’s used to Deserialize or convert JSON to struct and Serialize or convert struct to JSON.
  • Reqwest: It’s HTTP Consumer for Rust. Their crates.io URL is https://crates.io/crates/reqwest. Reqwest is used to make HTTPS requests to Coinbase REST API.
  • Tokio: It’s a platform for writing asynchronous functions. Their crates.io URL is https://crates.io/crates/tokio. Tokio is required to develop an asynchronous utility.
  • Clap: It’s a command-line argument parser for Rust. Their crates.io URL is https://crates.io/crates/clap. Clap is required to develop a command-line interface utility.
  • (NEW) rdkafka: A totally asynchronous, futures-enabled Apache Kafka consumer library for Rust based mostly on librdkafka. Their crates.io URL is https://crates.io/crates/rdkafka.
  • (NEW) log: A Rust library offering a light-weight logging facade. Their crates.io URL is https://crates.io/crates/log. Log is used to supply informational messages.
  • (NEW) serde_json: Serde is a framework for serializing and deserializing Rust knowledge buildings effectively and generically. Their crates.io URL is https://crates.io/crates/serde_json. Serde_json is used to transform struct again to JSON.

Lastly, Cargo.toml is modified to incorporate seven crates as follows:

[package]
identify = "rust-struct"
model = "0.1.0"
version = "2021"
# See extra keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies]
# CLI
serde = model = "1.0.136", options = ["derive"]
reqwest = model = "0.11", options = ["json"]
tokio = model = "1.17.0", options = ["full"]
clap = model = "3.1.2", options = ["derive"]
# Apach Kafka
rdkafka = model = "0.25", options = ["cmake-build"]
log = "0.4.14"
serde_json = "1.0"

On this article, we are going to use develop a easy utility then construct upon an utility so as to add extra functionalities in subsequent classes:

  1. Produce or publish a crypto value message to Apache Kafka Service
  2. Produce or publish a crypto value message in JSON format to Apache Kafka Service

1: Produce or publish a crypto value message to Apache Kafka Service

We are going to create a brand new async perform to publish or produce messages to the Apache Kafka service.

Publish (Message) — First, we configured settings for Kafka producer consumer the place I’ve outlined the minimal variety of settings as proven under:

  • dealer
  • message.timeout
  • safety (no safety)
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", dealer)
.set("message.timeout.ms", "5000")
.set("safety.protocol", "plaintext")
.create()
.anticipate("Didn't create producer");

The entire record of configuration properties is accessible right here => https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Second, we outlined each key and payload (e.g., message) as proven under:

let payload = format!("message ", pub_message);
let key = format!("key ", rely);

Lastly, we ship the message to Apache Kafka service as proven under:

let standing = producer.ship(
FutureRecord::to(matter)
.payload(&payload)
.key(&key)
.headers(OwnedHeaders::new().add(
&format!("header_key_", rely),
&format!("header_value_", rely)
)),
Period::from_secs(0)
).await;

The entire code for async fn publish() is supplied under:

Subsequent, we modify the clap config to incorporate:

  • dealer, which defines the place Apache Kafka providers are working.
  • matter, which defines the class the place information are saved and printed.

Lastly, we modify the pub fn crypto_publisher() to name the brand new perform:

publish(dealer, matter, &price_message, rely);

The entire code, named crypto_publisher_6.rs is accessible on GitHub repository (https://github.com/sungkim11/crypto-publisher).

2: Produce or publish a crypto value message in JSON format to Apache Kafka Service

Within the earlier lesson, we despatched a message as a one-line of a string with crypto value as proven under:

2022-03-28T02:22:11Z: BTC-USD SPOT Value: 46811.21 | BUY Value: 47048.86 | SELL Value: 46575.70 | Value Unfold: 473.16016

On this lesson, we are going to convert the message to JSON format utilizing serde’s serialize characteristic.

First, we create a struct to carry the message in JSON format as follows:

#[derive(Serialize, Debug)]
pub struct CryptoPriceData
pub knowledge: CryptoPrice
#[derive(Serialize, Debug)]
pub struct CryptoPrice
pub quote_time: String,
pub forex: String,
pub fee: String,
pub spot_price: String,
pub buy_price: String,
pub sell_price: String,
pub spread_price: String,

Then we populate the struct with crypto value knowledge as proven under:

let price_struct = CryptoPriceData 
knowledge: CryptoPrice
quote_time: quote_time.unwrap().to_string(),
forex: forex.to_string(),
fee: charges.to_string(),
spot_price: spot_price.unwrap().to_string(),
buy_price: buy_price.unwrap().to_string(),
sell_price: sell_price.unwrap().to_string(),
spread_price: spread_price.to_string(),

;

Lastly, we convert the struct to JSON then name async fn publish() as proven under:

let price_json = serde_json::to_string(&price_struct).unwrap();
publish(dealer, matter, &price_json, rely);

The entire code, named crypto_publisher_7.rs is accessible on the GitHub repository (https://github.com/sungkim11/crypto-publisher).

Within the subsequent lesson, we are going to create a Crypto Subscriber to devour the message and retailer the message in Apache Parquet format. Usually, we use Apache Spark to course of these streams of information (i.e., messages) from Apache Kafka, however I need to exhibit you may also use Rust to do an analogous activity. I do suggest utilizing Apache Spark, although.

I hope this text was useful! Thanks for studying.

More Posts