Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pool telemetry #248

Merged
merged 15 commits into from
Jan 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: dynamic supervisor restart bug
oliveigah committed Nov 27, 2023
commit b9f8f73e0c139dd7a74b1f644da554cdcd2807d0
6 changes: 3 additions & 3 deletions lib/finch.ex
Original file line number Diff line number Diff line change
@@ -592,9 +592,9 @@ defmodule Finch do
See the `Finch.HTTP1.PoolMetrics` and `Finch.HTTP2.PoolMetrics` for more details.
`{:error, :not_found}` may return on 2 scenarios. There is no pool on the
given finch instance regarding the given url or the pool is configured with
`start_pool_metrics?` option false (the default).
`{:error, :not_found}` may return on 2 scenarios:
- There is no pool registered for the given pair finch instance and url
- The pool is configured with `start_pool_metrics?` option false (default)
## Example
41 changes: 19 additions & 22 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
@@ -30,20 +30,13 @@ defmodule Finch.HTTP1.Pool do
{shp, registry_name, pool_size, conn_opts, pool_max_idle_time, start_pool_metrics?,
pool_idx}
) do
{:ok, metric_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(pool_idx, pool_size),
else: {:ok, nil}

{:ok, pid} =
NimblePool.start_link(
worker: {__MODULE__, {registry_name, shp, pool_idx, metric_ref, conn_opts}},
pool_size: pool_size,
lazy: true,
worker_idle_timeout: pool_idle_timeout(pool_max_idle_time)
)

{:ok, pid, metric_ref}
NimblePool.start_link(
worker:
{__MODULE__, {registry_name, shp, pool_idx, pool_size, start_pool_metrics?, conn_opts}},
pool_size: pool_size,
lazy: true,
worker_idle_timeout: pool_idle_timeout(pool_max_idle_time)
)
end

@impl Finch.Pool
@@ -146,22 +139,26 @@ defmodule Finch.HTTP1.Pool do

@impl Finch.Pool
def get_pool_status(finch_name, shp) do
case Finch.PoolManager.get_metrics_refs(finch_name, shp) do
case Finch.PoolManager.get_pool_count(finch_name, shp) do
nil ->
{:error, :not_found}

refs ->
resp =
refs
|> Enum.map(&PoolMetrics.get_pool_status/1)
|> Enum.map(fn {:ok, metrics} -> metrics end)
count ->
result = Enum.map(1..count, &PoolMetrics.get_pool_status(finch_name, shp, &1))

{:ok, resp}
if Enum.all?(result, &match?({:ok, _}, &1)),
do: {:ok, Enum.map(result, &elem(&1, 1))},
else: {:error, :not_found}
end
end

@impl NimblePool
def init_pool({registry, shp, pool_idx, metric_ref, opts}) do
def init_pool({registry, shp, pool_idx, pool_size, start_pool_metrics?, opts}) do
{:ok, metric_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(registry, shp, pool_idx, pool_size),
else: {:ok, nil}

# Register our pool with our module name as the key. This allows the caller
# to determine the correct pool module to use to make the request
{:ok, _} = Registry.register(registry, shp, __MODULE__)
12 changes: 11 additions & 1 deletion lib/finch/http1/pool_metrics.ex
Original file line number Diff line number Diff line change
@@ -34,10 +34,12 @@ defmodule Finch.HTTP1.PoolMetrics do
in_use_connections: 3
]

def init(pool_idx, pool_size) do
def init(registry, shp, pool_idx, pool_size) do
ref = :atomics.new(length(@atomic_idx), [])
:atomics.add(ref, @atomic_idx[:pool_idx], pool_idx)
:atomics.add(ref, @atomic_idx[:pool_size], pool_size)

:persistent_term.put({__MODULE__, registry, shp, pool_idx}, ref)
{:ok, ref}
end

@@ -49,6 +51,12 @@ defmodule Finch.HTTP1.PoolMetrics do
end)
end

def get_pool_status(name, shp, pool_idx) do
{__MODULE__, name, shp, pool_idx}
|> :persistent_term.get(nil)
|> get_pool_status()
end

def get_pool_status(ref) when is_reference(ref) do
%{
pool_idx: pool_idx,
@@ -68,4 +76,6 @@ defmodule Finch.HTTP1.PoolMetrics do

{:ok, result}
end

def get_pool_status(nil), do: {:error, :not_found}
end
31 changes: 13 additions & 18 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
@@ -77,17 +77,16 @@ defmodule Finch.HTTP2.Pool do

@impl Finch.Pool
def get_pool_status(finch_name, shp) do
case Finch.PoolManager.get_metrics_refs(finch_name, shp) do
case Finch.PoolManager.get_pool_count(finch_name, shp) do
nil ->
{:error, :not_found}

refs ->
resp =
refs
|> Enum.map(&PoolMetrics.get_pool_status/1)
|> Enum.map(fn {:ok, metrics} -> metrics end)
count ->
result = Enum.map(1..count, &PoolMetrics.get_pool_status(finch_name, shp, &1))

{:ok, resp}
if Enum.all?(result, &match?({:ok, _}, &1)),
do: {:ok, Enum.map(result, &elem(&1, 1))},
else: {:error, :not_found}
end
end

@@ -190,21 +189,17 @@ defmodule Finch.HTTP2.Pool do
end
end

def start_link({shp, finch_name, pool_config, start_pool_metrics?, pool_idx}) do
def start_link({_shp, _finch_name, _pool_config, _start_pool_metrics?, _pool_idx} = opts) do
:gen_statem.start_link(__MODULE__, opts, [])
end

@impl true
def init({{scheme, host, port} = shp, registry, pool_opts, start_pool_metrics?, pool_idx}) do
{:ok, metrics_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(pool_idx),
do: PoolMetrics.init(registry, shp, pool_idx),
else: {:ok, nil}

new_opts = {shp, finch_name, pool_config, metrics_ref, pool_idx}

{:ok, pid} = :gen_statem.start_link(__MODULE__, new_opts, [])

{:ok, pid, metrics_ref}
end

@impl true
def init({{scheme, host, port} = shp, registry, pool_opts, metrics_ref, pool_idx}) do
{:ok, _} = Registry.register(registry, shp, __MODULE__)

data = %{
11 changes: 10 additions & 1 deletion lib/finch/http2/pool_metrics.ex
Original file line number Diff line number Diff line change
@@ -25,10 +25,11 @@ defmodule Finch.HTTP2.PoolMetrics do
in_flight_requests: 2
]

def init(pool_idx) do
def init(finch_name, shp, pool_idx) do
ref = :atomics.new(length(@atomic_idx), [])
:atomics.put(ref, @atomic_idx[:pool_idx], pool_idx)

:persistent_term.put({__MODULE__, finch_name, shp, pool_idx}, ref)
{:ok, ref}
end

@@ -40,6 +41,12 @@ defmodule Finch.HTTP2.PoolMetrics do
end)
end

def get_pool_status(name, shp, pool_idx) do
{__MODULE__, name, shp, pool_idx}
|> :persistent_term.get(nil)
|> get_pool_status()
end

def get_pool_status(ref) when is_reference(ref) do
%{
pool_idx: pool_idx,
@@ -56,4 +63,6 @@ defmodule Finch.HTTP2.PoolMetrics do

{:ok, result}
end

def get_pool_status(nil), do: {:error, :not_found}
end
30 changes: 13 additions & 17 deletions lib/finch/pool_manager.ex
Original file line number Diff line number Diff line change
@@ -75,30 +75,26 @@ defmodule Finch.PoolManager do
defp do_start_pools(shp, config) do
pool_config = pool_config(config, shp)

init_results =
Enum.map(1..pool_config.count, fn pool_idx ->
pool_args = pool_args(shp, config, pool_config, pool_idx)
# Choose pool type here...
{:ok, pid, metric_ref} =
DynamicSupervisor.start_child(config.supervisor_name, {pool_config.mod, pool_args})

{pid, pool_config.mod, metric_ref}
end)

if pool_config.start_pool_metrics? do
put_metrics_refs(config, shp, Enum.map(init_results, &elem(&1, 2)))
put_pool_count(config, shp, pool_config.count)
end

{pid, pool_mod, _} = List.first(init_results)
Enum.map(1..pool_config.count, fn pool_idx ->
pool_args = pool_args(shp, config, pool_config, pool_idx)
# Choose pool type here...
{:ok, pid} =
DynamicSupervisor.start_child(config.supervisor_name, {pool_config.mod, pool_args})

{pid, pool_mod}
{pid, pool_config.mod}
end)
|> hd()
end

defp put_metrics_refs(%{registry_name: name}, shp, refs),
do: :persistent_term.put({__MODULE__, :metrics_refs, name, shp}, refs)
defp put_pool_count(%{registry_name: name}, shp, val),
do: :persistent_term.put({__MODULE__, :pool_count, name, shp}, val)

def get_metrics_refs(finch_name, shp),
do: :persistent_term.get({__MODULE__, :metrics_refs, finch_name, shp}, nil)
def get_pool_count(finch_name, shp),
do: :persistent_term.get({__MODULE__, :pool_count, finch_name, shp}, nil)

defp pool_config(%{pools: config, default_pool_config: default}, shp) do
config
2 changes: 1 addition & 1 deletion test/finch/http1/pool_metrics_test.exs
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ defmodule Finch.HTTP1.PoolMetricsTest do
end)

wait_connection_checkin()
assert nil == PoolManager.get_metrics_refs(finch_name, shp)
assert nil == PoolManager.get_pool_count(finch_name, shp)
assert {:error, :not_found} = Finch.get_pool_status(finch_name, shp)
end

6 changes: 3 additions & 3 deletions test/finch/http2/pool_metrics_test.exs
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ defmodule Finch.HTTP2.PoolMetricsTest do
ref
end)

Process.sleep(400)
Process.sleep(50)

assert {:ok,
[
@@ -115,7 +115,7 @@ defmodule Finch.HTTP2.PoolMetricsTest do
end)
end)

Process.sleep(400)
Process.sleep(50)

assert {:ok,
[
@@ -169,7 +169,7 @@ defmodule Finch.HTTP2.PoolMetricsTest do
ref
end)

Process.sleep(400)
Process.sleep(50)

assert {:ok,
[
17 changes: 7 additions & 10 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
@@ -31,16 +31,13 @@ defmodule Finch.HTTP2.PoolTest do
end

def start_pool(port) do
{:ok, pid, _info} =
Pool.start_link({
{:https, "localhost", port},
:test,
[conn_opts: [transport_opts: [verify: :verify_none]]],
false,
1
})

{:ok, pid}
Pool.start_link({
{:https, "localhost", port},
:test,
[conn_opts: [transport_opts: [verify: :verify_none]]],
false,
1
})
end

describe "requests" do