Skip to content

Commit 8e6bed9

Browse files
committed
fix(metrics): wait_time for pool calculation
1 parent 7c23661 commit 8e6bed9

File tree

5 files changed

+31
-23
lines changed

5 files changed

+31
-23
lines changed

pgdog/src/backend/pool/inner.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl Inner {
252252
/// Place connection back into the pool
253253
/// or give it to a waiting client.
254254
#[inline]
255-
pub(super) fn put(&mut self, mut conn: Box<Server>, now: Instant) -> Result<(), Error> {
255+
pub(super) fn put(&mut self, mut conn: Box<Server>) -> Result<(), Error> {
256256
// Try to give it to a client that's been waiting, if any.
257257
let id = *conn.id();
258258
while let Some(waiter) = self.waiting.pop_front() {
@@ -263,8 +263,8 @@ impl Inner {
263263
server: id,
264264
client: waiter.request.id,
265265
})?;
266-
self.stats.counts.server_assignment_count += 1;
267-
self.stats.counts.wait_time += now.duration_since(waiter.request.created_at);
266+
self.stats
267+
.record_checkout(waiter.request.created_at, waiter.request.read);
268268
return Ok(());
269269
}
270270
}
@@ -380,7 +380,7 @@ impl Inner {
380380
// Finally, if the server is ok,
381381
// place the connection back into the idle list.
382382
if server.can_check_in() {
383-
self.put(server, now)?;
383+
self.put(server)?;
384384
result.replenish = false;
385385
} else {
386386
self.out_of_sync += 1;
@@ -854,7 +854,7 @@ mod test {
854854
});
855855

856856
let server = Box::new(Server::default());
857-
inner.put(server, Instant::now()).unwrap();
857+
inner.put(server).unwrap();
858858

859859
assert_eq!(inner.idle(), 0); // Connection given to waiter, not idle
860860
assert_eq!(inner.checked_out(), 1); // Connection now checked out to waiter
@@ -869,7 +869,7 @@ mod test {
869869
let mut inner = Inner::default();
870870
let server = Box::new(Server::default());
871871

872-
inner.put(server, Instant::now()).unwrap();
872+
inner.put(server).unwrap();
873873

874874
assert_eq!(inner.idle(), 1); // Connection added to idle pool
875875
assert_eq!(inner.checked_out(), 0);
@@ -1046,7 +1046,7 @@ mod test {
10461046
assert_eq!(inner.waiting.len(), 3);
10471047

10481048
let server = Box::new(Server::default());
1049-
inner.put(server, Instant::now()).unwrap();
1049+
inner.put(server).unwrap();
10501050

10511051
// All waiters should be removed from queue since we tried each one
10521052
assert_eq!(inner.waiting.len(), 0);
@@ -1083,7 +1083,7 @@ mod test {
10831083
assert_eq!(inner.waiting.len(), 2);
10841084

10851085
let server = Box::new(Server::default());
1086-
inner.put(server, Instant::now()).unwrap();
1086+
inner.put(server).unwrap();
10871087

10881088
// All waiters should be removed since they were all dropped
10891089
assert_eq!(inner.waiting.len(), 0);

pgdog/src/backend/pool/monitor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,10 @@ impl Monitor {
226226
/// Replenish pool with one new connection.
227227
async fn replenish(&self, reason: ConnectReason) -> Result<bool, Error> {
228228
if let Ok(conn) = Self::create_connection(&self.pool, reason).await {
229-
let now = Instant::now();
230229
let server = Box::new(conn);
231230
let mut guard = self.pool.lock();
232231
if guard.online {
233-
guard.put(server, now)?;
232+
guard.put(server)?
234233
}
235234
Ok(true)
236235
} else {

pgdog/src/backend/pool/pool_impl.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,26 +123,21 @@ impl Pool {
123123

124124
// Fast path, idle connection probably available.
125125
let (server, granted_at, paused) = {
126-
// Ask for time before we acquire the lock
127-
// and only if we actually waited for a connection.
128-
let granted_at = request.created_at;
129-
let elapsed = granted_at.saturating_duration_since(request.created_at);
130126
let mut guard = self.lock();
131127

132128
if !guard.online {
133129
return Err(Error::Offline);
134130
}
135131

136132
let conn = guard.take(request)?;
133+
// Capture the grant time after the lock and after take() so that
134+
// lock contention and any in-lock work are included in wait_time.
135+
let granted_at = Instant::now();
137136

138137
if conn.is_some() {
139-
guard.stats.counts.wait_time += elapsed;
140-
guard.stats.counts.server_assignment_count += 1;
141-
if request.read {
142-
guard.stats.counts.reads += 1;
143-
} else {
144-
guard.stats.counts.writes += 1;
145-
}
138+
guard
139+
.stats
140+
.record_checkout(request.created_at, request.read);
146141
}
147142

148143
(conn, granted_at, guard.paused)
@@ -296,7 +291,6 @@ impl Pool {
296291
pub(crate) fn move_conns_to(&self, destination: &Pool) -> Result<(), Error> {
297292
// Ensure no deadlock.
298293
assert!(self.inner.id != destination.id());
299-
let now = Instant::now();
300294

301295
{
302296
let mut from_guard = self.lock();
@@ -305,7 +299,7 @@ impl Pool {
305299
from_guard.online = false;
306300
let (idle, taken) = from_guard.move_conns_to(destination);
307301
for server in idle {
308-
to_guard.put(server, now)?;
302+
to_guard.put(server)?;
309303
}
310304
to_guard.set_taken(taken);
311305
}

pgdog/src/backend/pool/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl State {
5050
maxwait: guard
5151
.waiting
5252
.iter()
53+
// The first waiter is the oldest, so their metric is basically the max wait time
5354
.next()
5455
.map(|req| now.duration_since(req.request.created_at))
5556
.unwrap_or(Duration::ZERO),

pgdog/src/backend/pool/stats.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use pgdog_stats::memory::MemoryStats as StatsMemoryStats;
1717
use pgdog_stats::pool::Counts as StatsCounts;
1818
use pgdog_stats::pool::Stats as StatsStats;
1919
use pgdog_stats::MessageBufferStats;
20+
use tokio::time::Instant;
2021

2122
/// Pool statistics.
2223
///
@@ -106,6 +107,19 @@ impl Stats {
106107
pub fn calc_averages(&mut self, time: Duration) {
107108
self.inner.calc_averages(time);
108109
}
110+
111+
/// Record a successful connection checkout.
112+
/// Centralises the four counts that must always move together:
113+
/// wait time, assignment counter, and the read/write routing counter.
114+
pub fn record_checkout(&mut self, started_at: Instant, read: bool) {
115+
self.counts.wait_time += started_at.elapsed();
116+
self.counts.server_assignment_count += 1;
117+
if read {
118+
self.counts.reads += 1;
119+
} else {
120+
self.counts.writes += 1;
121+
}
122+
}
109123
}
110124

111125
/// Statistics calculated for the network buffer used

0 commit comments

Comments
 (0)