From de915ff4886d21bc983fb3f89455e662f44a8956 Mon Sep 17 00:00:00 2001 From: gaoqiangz Date: Fri, 2 Dec 2022 21:04:51 +0800 Subject: [PATCH 1/5] Add support for customized future executor --- bb8/src/api.rs | 52 ++++++++++++++++++++++++++++++++++++-------- bb8/src/inner.rs | 34 ++++++++++++++--------------- bb8/src/internals.rs | 14 ++++++------ bb8/src/lib.rs | 2 +- 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 794d58d..4237f59 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -1,15 +1,21 @@ +use crate::inner::PoolInner; +use crate::internals::Conn; +pub use crate::internals::State; +use async_trait::async_trait; use std::borrow::Cow; use std::error; use std::fmt; +use std::future::Future; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::time::Duration; -use async_trait::async_trait; - -use crate::inner::PoolInner; -use crate::internals::Conn; -pub use crate::internals::State; +/// An executor of futures. +pub trait Executor: Sync + Send + 'static { + /// Place the future into the executor to be run. + fn execute(&self, fut: Pin + Send>>); +} /// A generic connection pool. pub struct Pool @@ -253,7 +259,7 @@ impl Builder { self } - fn build_inner(self, manager: M) -> Pool { + fn build_inner(self, manager: M, executor: E) -> Pool { if let Some(min_idle) = self.min_idle { assert!( self.max_size >= min_idle, @@ -262,7 +268,7 @@ impl Builder { } Pool { - inner: PoolInner::new(self, manager), + inner: PoolInner::new(self, manager, executor), } } @@ -271,7 +277,19 @@ impl Builder { /// The `Pool` will not be returned until it has established its configured /// minimum number of connections, or it times out. pub async fn build(self, manager: M) -> Result, M::Error> { - let pool = self.build_inner(manager); + self.build_with_executor(manager, TokioExecutor).await + } + + /// Consumes the builder with the specified executor, returning a new, initialized `Pool`. + /// + /// The `Pool` will not be returned until it has established its configured + /// minimum number of connections, or it times out. + pub async fn build_with_executor( + self, + manager: M, + executor: E, + ) -> Result, M::Error> { + let pool = self.build_inner(manager, executor); pool.inner.start_connections().await.map(|()| pool) } @@ -280,12 +298,28 @@ impl Builder { /// Unlike `build`, this does not wait for any connections to be established /// before returning. pub fn build_unchecked(self, manager: M) -> Pool { - let p = self.build_inner(manager); + self.build_unchecked_with_executor(manager, TokioExecutor) + } + + /// Consumes the builder with the specified executor, returning a new, initialized `Pool`. + /// + /// Unlike `build`, this does not wait for any connections to be established + /// before returning. + pub fn build_unchecked_with_executor(self, manager: M, executor: E) -> Pool { + let p = self.build_inner(manager, executor); p.inner.spawn_start_connections(); p } } +struct TokioExecutor; + +impl Executor for TokioExecutor { + fn execute(&self, fut: Pin + Send>>) { + tokio::spawn(fut); + } +} + /// A trait which provides connection-specific functionality. #[async_trait] pub trait ManageConnection: Sized + Send + Sync + 'static { diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 71f6ab8..0243805 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -1,18 +1,15 @@ +use crate::api::{Builder, Executor, ManageConnection, PooledConnection, RunError}; +use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State}; +use futures_channel::oneshot; +use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures_util::TryFutureExt; use std::cmp::{max, min}; use std::fmt; use std::future::Future; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; - -use futures_channel::oneshot; -use futures_util::stream::{FuturesUnordered, StreamExt}; -use futures_util::TryFutureExt; -use tokio::spawn; use tokio::time::{interval_at, sleep, timeout, Interval}; -use crate::api::{Builder, ManageConnection, PooledConnection, RunError}; -use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State}; - pub(crate) struct PoolInner where M: ManageConnection + Send, @@ -24,15 +21,15 @@ impl PoolInner where M: ManageConnection + Send, { - pub(crate) fn new(builder: Builder, manager: M) -> Self { - let inner = Arc::new(SharedPool::new(builder, manager)); + pub(crate) fn new(builder: Builder, manager: M, executor: E) -> Self { + let inner = Arc::new(SharedPool::new(builder, manager, executor)); if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() { let s = Arc::downgrade(&inner); if let Some(shared) = s.upgrade() { let start = Instant::now() + shared.statics.reaper_rate; let interval = interval_at(start.into(), shared.statics.reaper_rate); - schedule_reaping(interval, s); + schedule_reaping(&inner.executor, interval, s); } } @@ -59,7 +56,7 @@ where } let this = self.clone(); - spawn(async move { + self.inner.executor.execute(Box::pin(async move { let mut stream = this.replenish_idle_connections(approvals); while let Some(result) = stream.next().await { match result { @@ -67,7 +64,7 @@ where Err(e) => this.inner.statics.error_sink.sink(e), } } - }); + })); } fn replenish_idle_connections( @@ -254,11 +251,14 @@ where } } -fn schedule_reaping(mut interval: Interval, weak_shared: Weak>) -where +fn schedule_reaping( + executor: &Box, + mut interval: Interval, + weak_shared: Weak>, +) where M: ManageConnection, { - spawn(async move { + executor.execute(Box::pin(async move { loop { let _ = interval.tick().await; if let Some(inner) = weak_shared.upgrade() { @@ -267,5 +267,5 @@ where break; } } - }); + })); } diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index b9a63db..26f33fa 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -1,12 +1,10 @@ -use std::cmp::min; -use std::sync::Arc; -use std::time::Instant; - +use crate::api::{Builder, Executor, ManageConnection}; use futures_channel::oneshot; use parking_lot::Mutex; - -use crate::api::{Builder, ManageConnection}; +use std::cmp::min; use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Instant; /// The guts of a `Pool`. #[allow(missing_debug_implementations)] @@ -15,6 +13,7 @@ where M: ManageConnection + Send, { pub(crate) statics: Builder, + pub(crate) executor: Box, pub(crate) manager: M, pub(crate) internals: Mutex>, } @@ -23,11 +22,12 @@ impl SharedPool where M: ManageConnection + Send, { - pub(crate) fn new(statics: Builder, manager: M) -> Self { + pub(crate) fn new(statics: Builder, manager: M, executor: E) -> Self { Self { statics, manager, internals: Mutex::new(PoolInternals::default()), + executor: Box::new(executor), } } } diff --git a/bb8/src/lib.rs b/bb8/src/lib.rs index b85f910..37526f9 100644 --- a/bb8/src/lib.rs +++ b/bb8/src/lib.rs @@ -35,7 +35,7 @@ mod api; pub use api::{ - Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool, + Builder, CustomizeConnection, ErrorSink, Executor, ManageConnection, NopErrorSink, Pool, PooledConnection, RunError, State, }; From f892a7a8f909051766d3b4de6e6f5cd63a44c93f Mon Sep 17 00:00:00 2001 From: gaoqiangz Date: Sun, 4 Dec 2022 16:34:00 +0800 Subject: [PATCH 2/5] fix lint --- bb8/src/inner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 0243805..822caf7 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -29,7 +29,7 @@ where if let Some(shared) = s.upgrade() { let start = Instant::now() + shared.statics.reaper_rate; let interval = interval_at(start.into(), shared.statics.reaper_rate); - schedule_reaping(&inner.executor, interval, s); + schedule_reaping(inner.executor.as_ref(), interval, s); } } @@ -252,7 +252,7 @@ where } fn schedule_reaping( - executor: &Box, + executor: &dyn Executor, mut interval: Interval, weak_shared: Weak>, ) where From efef42610d1a8a4d546513a621cb5af98214e7bf Mon Sep 17 00:00:00 2001 From: gaoqiangz Date: Wed, 7 Dec 2022 17:54:56 +0800 Subject: [PATCH 3/5] fix PR --- bb8/src/api.rs | 36 +++++++++++++++++++----------------- bb8/src/inner.rs | 12 +++++++----- bb8/src/internals.rs | 8 +++++--- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 4237f59..624ca12 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -1,7 +1,3 @@ -use crate::inner::PoolInner; -use crate::internals::Conn; -pub use crate::internals::State; -use async_trait::async_trait; use std::borrow::Cow; use std::error; use std::fmt; @@ -11,11 +7,11 @@ use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::time::Duration; -/// An executor of futures. -pub trait Executor: Sync + Send + 'static { - /// Place the future into the executor to be run. - fn execute(&self, fut: Pin + Send>>); -} +use async_trait::async_trait; + +use crate::inner::PoolInner; +use crate::internals::Conn; +pub use crate::internals::State; /// A generic connection pool. pub struct Pool @@ -312,14 +308,6 @@ impl Builder { } } -struct TokioExecutor; - -impl Executor for TokioExecutor { - fn execute(&self, fut: Pin + Send>>) { - tokio::spawn(fut); - } -} - /// A trait which provides connection-specific functionality. #[async_trait] pub trait ManageConnection: Sized + Send + Sync + 'static { @@ -490,3 +478,17 @@ impl ErrorSink for NopErrorSink { Box::new(*self) } } + +/// An executor of futures. +pub trait Executor: Sync + Send + 'static { + /// Place the future into the executor to be run. + fn execute(&self, fut: Pin + Send>>); +} + +struct TokioExecutor; + +impl Executor for TokioExecutor { + fn execute(&self, fut: Pin + Send>>) { + tokio::spawn(fut); + } +} diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 822caf7..0597ba1 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -1,8 +1,3 @@ -use crate::api::{Builder, Executor, ManageConnection, PooledConnection, RunError}; -use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State}; -use futures_channel::oneshot; -use futures_util::stream::{FuturesUnordered, StreamExt}; -use futures_util::TryFutureExt; use std::cmp::{max, min}; use std::fmt; use std::future::Future; @@ -10,6 +5,13 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::time::{interval_at, sleep, timeout, Interval}; +use futures_channel::oneshot; +use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures_util::TryFutureExt; + +use crate::api::{Builder, Executor, ManageConnection, PooledConnection, RunError}; +use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State}; + pub(crate) struct PoolInner where M: ManageConnection + Send, diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 26f33fa..48c02c1 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -1,11 +1,13 @@ -use crate::api::{Builder, Executor, ManageConnection}; -use futures_channel::oneshot; -use parking_lot::Mutex; use std::cmp::min; use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; +use futures_channel::oneshot; +use parking_lot::Mutex; + +use crate::api::{Builder, Executor, ManageConnection}; + /// The guts of a `Pool`. #[allow(missing_debug_implementations)] pub(crate) struct SharedPool From a6094e2e2e1f9cdcd81bbe3c8469743ea09b79d9 Mon Sep 17 00:00:00 2001 From: gaoqiangz Date: Wed, 7 Dec 2022 18:01:35 +0800 Subject: [PATCH 4/5] fix PR --- bb8/src/internals.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 48c02c1..0d2529f 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -15,7 +15,7 @@ where M: ManageConnection + Send, { pub(crate) statics: Builder, - pub(crate) executor: Box, + pub(crate) executor: Box, pub(crate) manager: M, pub(crate) internals: Mutex>, } From e1dac2fcab50d3c3dc9ef53f05d83eca2a847d15 Mon Sep 17 00:00:00 2001 From: gaoqiangz Date: Wed, 31 May 2023 11:33:06 +0800 Subject: [PATCH 5/5] update --- bb8/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bb8/Cargo.toml b/bb8/Cargo.toml index fb22788..be18b2c 100644 --- a/bb8/Cargo.toml +++ b/bb8/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "bb8" -version = "0.8.0" -description = "Full-featured async (tokio-based) connection pool (like r2d2)" +version = "0.8.0-rc.1" +description = "[FORK] Full-featured async (tokio-based) connection pool (like r2d2)" license = "MIT" -repository = "https://github.com/djc/bb8" +repository = "https://github.com/gaoqiangz/bb8" edition = "2021" workspace = ".." readme = "../README.md"