Mail sending optimization - implemented mail queue. Some minor optimizations.

main
Josef Rokos 1 year ago
parent 0a87a6d7cb
commit 7ca3e0a258

2
Cargo.lock generated

@ -2963,7 +2963,7 @@ dependencies = [
[[package]] [[package]]
name = "rezervator" name = "rezervator"
version = "1.0.0" version = "1.1.0"
dependencies = [ dependencies = [
"actix-files", "actix-files",
"actix-multipart", "actix-multipart",

@ -1,6 +1,6 @@
[package] [package]
name = "rezervator" name = "rezervator"
version = "1.0.0" version = "1.1.0"
edition = "2021" edition = "2021"
[lib] [lib]

@ -10,15 +10,18 @@ cfg_if! { if #[cfg(feature = "ssr")] {
use crate::backend::get_pool; use crate::backend::get_pool;
use crate::error::AppError; use crate::error::AppError;
use log::info; use log::info;
use log::error;
use crate::config::Mailing; use crate::config::Mailing;
use crate::config::MailTransport; use crate::config::MailTransport;
use crate::backend::data::ResSumWithItems; use crate::backend::data::ResSumWithItems;
use lettre::message::Message as LettreMessage; use lettre::message::Message as LettreMessage;
use lettre::{AsyncSmtpTransport, AsyncFileTransport, AsyncTransport, Tokio1Executor}; use lettre::{FileTransport, SmtpTransport, Transport};
use lettre::transport::smtp::client::{Tls, TlsParameters}; use lettre::transport::smtp::client::{Tls, TlsParameters};
use lettre::transport::smtp::authentication::Credentials; use lettre::transport::smtp::authentication::Credentials;
use lettre::transport::smtp::extension::ClientId; use lettre::transport::smtp::extension::ClientId;
use std::ops::Add; use std::ops::Add;
use std::sync::mpsc::Sender;
use std::sync::mpsc;
pub async fn message_for_type(msg_type: &MessageType, pool: &PgPool) -> Result<Message, Error> { pub async fn message_for_type(msg_type: &MessageType, pool: &PgPool) -> Result<Message, Error> {
Ok(query_as::<_, Message>("SELECT * FROM message WHERE msg_type = $1") Ok(query_as::<_, Message>("SELECT * FROM message WHERE msg_type = $1")
@ -100,13 +103,26 @@ cfg_if! { if #[cfg(feature = "ssr")] {
} }
} }
impl Mailing { #[derive(Clone)]
pub async fn send_mail(&self, msg: MailMessage) -> Result<(), AppError> { enum MailerType {
match self.transport() { Smtp(SmtpTransport),
File(String)
}
#[derive(Clone)]
pub struct Mailer {
transport: MailerType,
from: String,
sender: Option<Sender<MailMessage>>
}
impl Mailer {
pub fn new(config: &Mailing) -> Self {
match config.transport() {
MailTransport::Smtp => { MailTransport::Smtp => {
let tls = if let Some(t) = self.accept_all_certs() { let tls = if let Some(t) = config.accept_all_certs() {
if t { if t {
let tls = TlsParameters::builder(self.server().clone().unwrap_or_default()) let tls = TlsParameters::builder(config.server().clone().unwrap_or_default())
.dangerous_accept_invalid_certs(true) .dangerous_accept_invalid_certs(true)
.dangerous_accept_invalid_hostnames(true); .dangerous_accept_invalid_hostnames(true);
Some(tls.build().expect("Cannot build TLS params")) Some(tls.build().expect("Cannot build TLS params"))
@ -116,8 +132,8 @@ cfg_if! { if #[cfg(feature = "ssr")] {
} else { } else {
None None
}; };
let transport = if self.tls().unwrap_or(false) { let transport = if config.tls().unwrap_or(false) {
let transport = AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&self.server().clone().unwrap_or_default()) let transport = SmtpTransport::starttls_relay(&config.server().clone().unwrap_or_default())
.expect("Cannot create SMTP mail transport"); .expect("Cannot create SMTP mail transport");
if let Some(t) = tls { if let Some(t) = tls {
transport.tls(Tls::Required(t)) transport.tls(Tls::Required(t))
@ -125,7 +141,7 @@ cfg_if! { if #[cfg(feature = "ssr")] {
transport transport
} }
} else { } else {
let transport = AsyncSmtpTransport::<Tokio1Executor>::relay(&self.server().clone().unwrap_or_default()) let transport = SmtpTransport::relay(&config.server().clone().unwrap_or_default())
.expect("Cannot create SMTP mail transport"); .expect("Cannot create SMTP mail transport");
if let Some(t) = tls { if let Some(t) = tls {
transport.tls(Tls::Wrapper(t)) transport.tls(Tls::Wrapper(t))
@ -133,30 +149,85 @@ cfg_if! { if #[cfg(feature = "ssr")] {
transport transport
} }
}; };
let transport = if let Some(p) = self.port() { let transport = if let Some(p) = config.port() {
transport.port(p) transport.port(p)
} else { } else {
transport transport
}; };
let transport = if let Some(hello) = self.hello_name() { let transport = if let Some(hello) = config.hello_name() {
transport.hello_name(ClientId::Domain(hello.to_string())) transport.hello_name(ClientId::Domain(hello.to_string()))
} else { } else {
transport transport
}; };
if self.user().is_some() && self.password().is_some() { if config.user().is_some() && config.password().is_some() {
let cred = Credentials::new(self.user().clone().unwrap(), self.password().clone().unwrap()); let cred = Credentials::new(config.user().clone().unwrap(), config.password().clone().unwrap());
transport.credentials(cred).build().send(msg.build_mail(self.from().to_string())?).await?; Self {
transport: MailerType::Smtp(transport.credentials(cred).build()),
from: config.from().to_string(),
sender: None
}
} else { } else {
transport.build().send(msg.build_mail(self.from().to_string())?).await?; Self {
transport: MailerType::Smtp(transport.build()),
from: config.from().to_string(),
sender: None
}
} }
} }
MailTransport::File => { MailTransport::File => {
AsyncFileTransport::<Tokio1Executor>::new(self.path().clone().unwrap_or_default()) Self {
.send(msg.build_mail(self.from().to_string())?).await?; transport: MailerType::File(config.path().clone().unwrap_or("".to_string())),
from: config.from().to_string(),
sender: None
}
} }
} }
}
fn send(&self, msg: &MailMessage) -> Result<(), AppError> {
let to_send = msg.build_mail(self.from.clone())?;
match &self.transport {
MailerType::Smtp(s) => {s.send(&to_send)?;},
MailerType::File(s) => {FileTransport::new(s).send(&to_send)?;}
}
Ok(()) Ok(())
} }
pub fn start_sender(&mut self) {
let (tx, rx) = mpsc::channel::<MailMessage>();
self.sender = Some(tx);
let mailer = self.clone();
std::thread::spawn(move || {
loop {
let msg = rx.recv();
if let Err(e) = msg {
error!("Mailer error: {}", e.to_string());
break;
}
let msg = msg.unwrap();
if let Err(e) = mailer.send(&msg) {
error!("Mail send error: {}", e);
} else {
info!("Mail message for: {} has been successfully sent", msg.to);
}
}
});
info!("Mail sender started");
}
pub fn send_mail(&self, msg: MailMessage) -> Result<(), AppError> {
if let Err(e) = self.sender.as_ref().expect("Sender not started").send(msg) {
error!("Mail queue error: {}", e);
Err(AppError::MailSendError(e.to_string()))
} else {
info!("Message queued for send");
Ok(())
}
}
} }
}} }}

@ -55,16 +55,16 @@ cfg_if!{
use actix_web::web::Data; use actix_web::web::Data;
use leptos_actix::extract; use leptos_actix::extract;
use leptos::ServerFnError; use leptos::ServerFnError;
use crate::config::Mailing; use crate::backend::mail::Mailer;
#[derive(Clone)] #[derive(Clone)]
pub struct AppData { pub struct AppData {
db_pool: PgPool, db_pool: PgPool,
mailer: Mailing mailer: Mailer
} }
impl AppData { impl AppData {
pub fn new(db_pool: PgPool, mailer: Mailing) -> Self { pub fn new(db_pool: PgPool, mailer: Mailer) -> Self {
Self { Self {
db_pool, db_pool,
mailer mailer
@ -75,7 +75,7 @@ cfg_if!{
&self.db_pool &self.db_pool
} }
pub fn mailer(&self) -> &Mailing { pub fn mailer(&self) -> &Mailer {
&self.mailer &self.mailer
} }
} }
@ -85,7 +85,7 @@ cfg_if!{
Ok(data.db_pool().clone()) Ok(data.db_pool().clone())
} }
pub async fn get_mailing() -> Result<Mailing, ServerFnError> { pub async fn get_mailing() -> Result<Mailer, ServerFnError> {
let data = extract::<Data<AppData>>().await?; let data = extract::<Data<AppData>>().await?;
Ok(data.mailer().clone()) Ok(data.mailer().clone())
} }

@ -7,7 +7,7 @@ use crate::components::data_form::ForValidation;
cfg_if! { if #[cfg(feature = "ssr")] { cfg_if! { if #[cfg(feature = "ssr")] {
use crate::backend::get_pool; use crate::backend::get_pool;
pub async fn get_props(filter: Option<String>) -> Result<Vec<ResProperty>, ServerFnError> { pub async fn get_props(filter: Option<&'static str>) -> Result<Vec<ResProperty>, ServerFnError> {
let pool = get_pool().await?; let pool = get_pool().await?;
let props = if let Some(f) = filter { let props = if let Some(f) = filter {
sqlx::query_as::<_, ResProperty>(&format!("SELECT * FROM property WHERE {} ORDER BY id", f)).fetch_all(&pool).await? sqlx::query_as::<_, ResProperty>(&format!("SELECT * FROM property WHERE {} ORDER BY id", f)).fetch_all(&pool).await?
@ -38,7 +38,7 @@ pub async fn get_properties() -> Result<ApiResponse<Vec<ResProperty>>, ServerFnE
#[server] #[server]
pub async fn get_active_properties() -> Result<ApiResponse<Vec<ResProperty>>, ServerFnError> { pub async fn get_active_properties() -> Result<ApiResponse<Vec<ResProperty>>, ServerFnError> {
let props = get_props(Some("active = true".to_string())).await?; let props = get_props(Some("active = true")).await?;
Ok(ApiResponse::Data(props)) Ok(ApiResponse::Data(props))
} }

@ -167,7 +167,7 @@ cfg_if! { if #[cfg(feature = "ssr")] {
let msg = get_message(MessageType::NewReservation).await?; let msg = get_message(MessageType::NewReservation).await?;
for m in emails_for_notify().await? { for m in emails_for_notify().await? {
mailing.send_mail(MailMessage::new(admin_mail.clone(), m, msg.clone(), reservation)).await?; mailing.send_mail(MailMessage::new(admin_mail.clone(), m, msg.clone(), reservation))?;
} }
Ok(()) Ok(())
@ -183,7 +183,7 @@ cfg_if! { if #[cfg(feature = "ssr")] {
return Err(AppError::MailSendError("No admin mail".to_string())) return Err(AppError::MailSendError("No admin mail".to_string()))
} }
mailing.send_mail(MailMessage::new(admin_mail.clone().unwrap(), reservation.customer.email.clone(), msg, &reservation)).await?; mailing.send_mail(MailMessage::new(admin_mail.clone().unwrap(), reservation.customer.email.clone(), msg, &reservation))?;
notify_new_all(admin_mail.unwrap(), &reservation).await notify_new_all(admin_mail.unwrap(), &reservation).await
} }
@ -196,7 +196,7 @@ cfg_if! { if #[cfg(feature = "ssr")] {
return Err(AppError::MailSendError("No admin mail".to_string())) return Err(AppError::MailSendError("No admin mail".to_string()))
} }
mailing.send_mail(MailMessage::new(admin_mail.unwrap(), reservation.customer.email.clone(), msg, &reservation)).await mailing.send_mail(MailMessage::new(admin_mail.unwrap(), reservation.customer.email.clone(), msg, &reservation))
} }
async fn notify_approve(uuid: Uuid) -> Result<(), AppError> { async fn notify_approve(uuid: Uuid) -> Result<(), AppError> {
@ -238,7 +238,7 @@ pub async fn get_public_form_data(day: NaiveDate) -> Result<ApiResponse<Vec<Publ
use chrono::Datelike; use chrono::Datelike;
let hours = hours_for_day(day.weekday()).await?; let hours = hours_for_day(day.weekday()).await?;
let props = get_props(Some("active = true".to_string())).await?; let props = get_props(Some("active = true")).await?;
let reservations = reservations_for_day(&day).await?; let reservations = reservations_for_day(&day).await?;
info!("Loading public form data"); info!("Loading public form data");

@ -3,7 +3,7 @@ use leptos_captcha::spow::pow::Pow;
use log::error; use log::error;
use rezervator::backend::appearance::check_appearance; use rezervator::backend::appearance::check_appearance;
use rezervator::backend::company::check_company; use rezervator::backend::company::check_company;
use rezervator::backend::mail::check_messages; use rezervator::backend::mail::{check_messages, Mailer};
use rezervator::backend::user::create_admin; use rezervator::backend::user::create_admin;
#[cfg(feature = "ssr")] #[cfg(feature = "ssr")]
@ -58,7 +58,8 @@ async fn main() -> std::io::Result<()> {
.connect(&srv_conf.database().con_string()).await.unwrap(); .connect(&srv_conf.database().con_string()).await.unwrap();
migrate!().run(&pool).await.expect("could not run SQLx migrations"); migrate!().run(&pool).await.expect("could not run SQLx migrations");
let mailing = srv_conf.mailing().clone(); let mut mailer = Mailer::new(srv_conf.mailing());
mailer.start_sender();
if let Err(e) = create_admin(&pool).await { if let Err(e) = create_admin(&pool).await {
error!("Error while checking admin user: {:?}", e); error!("Error while checking admin user: {:?}", e);
@ -78,7 +79,7 @@ async fn main() -> std::io::Result<()> {
let site_root = &leptos_options.site_root; let site_root = &leptos_options.site_root;
App::new() App::new()
.app_data(Data::new(AppData::new(pool.clone(), mailing.clone()))) .app_data(Data::new(AppData::new(pool.clone(), mailer.clone())))
.wrap(Logger::default()) .wrap(Logger::default())
.wrap(Authentication) .wrap(Authentication)
.wrap(SessionMiddleware::new( .wrap(SessionMiddleware::new(

@ -26,7 +26,7 @@ impl Validator {
} }
} }
pub fn check(&self, entity: &(impl Validate + ?Sized), ev: &web_sys::Event) { pub fn check(&self, entity: &dyn Validate, ev: &web_sys::Event) {
if let Err(val_err) = entity.validate() { if let Err(val_err) = entity.validate() {
ev.prevent_default(); ev.prevent_default();
//self.set_message.update(|m| *m = Some(val_err.to_string().clone())); //self.set_message.update(|m| *m = Some(val_err.to_string().clone()));

Loading…
Cancel
Save