-
-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for customized future executor #144
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
use std::borrow::Cow; | ||
use std::error; | ||
use std::fmt; | ||
use std::future::Future; | ||
use std::marker::PhantomData; | ||
use std::ops::{Deref, DerefMut}; | ||
use std::pin::Pin; | ||
use std::time::Duration; | ||
|
||
use async_trait::async_trait; | ||
|
@@ -253,7 +255,7 @@ impl<M: ManageConnection> Builder<M> { | |
self | ||
} | ||
|
||
fn build_inner(self, manager: M) -> Pool<M> { | ||
fn build_inner<E: Executor>(self, manager: M, executor: E) -> Pool<M> { | ||
if let Some(min_idle) = self.min_idle { | ||
assert!( | ||
self.max_size >= min_idle, | ||
|
@@ -262,7 +264,7 @@ impl<M: ManageConnection> Builder<M> { | |
} | ||
|
||
Pool { | ||
inner: PoolInner::new(self, manager), | ||
inner: PoolInner::new(self, manager, executor), | ||
} | ||
} | ||
|
||
|
@@ -271,7 +273,19 @@ impl<M: ManageConnection> Builder<M> { | |
/// The `Pool` will not be returned until it has established its configured | ||
/// minimum number of connections, or it times out. | ||
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> { | ||
let pool = self.build_inner(manager); | ||
self.build_with_executor(manager, TokioExecutor).await | ||
} | ||
|
||
/// Consumes the builder with the specified executor, returning a new, initialized `Pool`. | ||
/// | ||
/// The `Pool` will not be returned until it has established its configured | ||
/// minimum number of connections, or it times out. | ||
pub async fn build_with_executor<E: Executor>( | ||
self, | ||
manager: M, | ||
executor: E, | ||
) -> Result<Pool<M>, M::Error> { | ||
let pool = self.build_inner(manager, executor); | ||
pool.inner.start_connections().await.map(|()| pool) | ||
} | ||
|
||
|
@@ -280,7 +294,15 @@ impl<M: ManageConnection> Builder<M> { | |
/// Unlike `build`, this does not wait for any connections to be established | ||
/// before returning. | ||
pub fn build_unchecked(self, manager: M) -> Pool<M> { | ||
let p = self.build_inner(manager); | ||
self.build_unchecked_with_executor(manager, TokioExecutor) | ||
} | ||
|
||
/// Consumes the builder with the specified executor, returning a new, initialized `Pool`. | ||
/// | ||
/// Unlike `build`, this does not wait for any connections to be established | ||
/// before returning. | ||
pub fn build_unchecked_with_executor<E: Executor>(self, manager: M, executor: E) -> Pool<M> { | ||
let p = self.build_inner(manager, executor); | ||
p.inner.spawn_start_connections(); | ||
p | ||
} | ||
|
@@ -456,3 +478,17 @@ impl<E> ErrorSink<E> for NopErrorSink { | |
Box::new(*self) | ||
} | ||
} | ||
|
||
/// An executor of futures. | ||
pub trait Executor: Sync + Send + 'static { | ||
/// Place the future into the executor to be run. | ||
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>); | ||
} | ||
|
||
struct TokioExecutor; | ||
|
||
impl Executor for TokioExecutor { | ||
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would prefer to call this |
||
tokio::spawn(fut); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,12 @@ | ||
use std::cmp::min; | ||
gaoqiangz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
use std::collections::VecDeque; | ||
use std::sync::Arc; | ||
use std::time::Instant; | ||
|
||
use futures_channel::oneshot; | ||
use parking_lot::Mutex; | ||
|
||
use crate::api::{Builder, ManageConnection}; | ||
use std::collections::VecDeque; | ||
use crate::api::{Builder, Executor, ManageConnection}; | ||
|
||
/// The guts of a `Pool`. | ||
#[allow(missing_debug_implementations)] | ||
|
@@ -15,6 +15,7 @@ where | |
M: ManageConnection + Send, | ||
{ | ||
pub(crate) statics: Builder<M>, | ||
pub(crate) executor: Box<dyn Executor>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need a trait object? It would be nice if we didn't have to |
||
pub(crate) manager: M, | ||
pub(crate) internals: Mutex<PoolInternals<M>>, | ||
} | ||
|
@@ -23,11 +24,12 @@ impl<M> SharedPool<M> | |
where | ||
M: ManageConnection + Send, | ||
{ | ||
pub(crate) fn new(statics: Builder<M>, manager: M) -> Self { | ||
pub(crate) fn new<E: Executor>(statics: Builder<M>, manager: M, executor: E) -> Self { | ||
Self { | ||
statics, | ||
manager, | ||
internals: Mutex::new(PoolInternals::default()), | ||
executor: Box::new(executor), | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't be able to merge this PR with this stuff in it, please revert these changes.