Skip to content

Commit 91d1b6f

Browse files
committed
save
1 parent 11f07f0 commit 91d1b6f

File tree

14 files changed

+258
-29
lines changed

14 files changed

+258
-29
lines changed

integration/pgdog.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ tls_certificate = "integration/tls/cert.pem"
2222
tls_private_key = "integration/tls/key.pem"
2323
query_parser_engine = "pg_query_raw"
2424
system_catalogs = "omnisharded_sticky"
25-
reload_schema_on_ddl = false
25+
reload_schema_on_ddl = true
26+
cross_shard_backend = "fdw"
2627

2728
[memory]
2829
net_buffer = 8096

integration/postgres_fdw/pgdog.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
[fdw]
2-
enabled = true
1+
2+
[general]
3+
cross_shard_backend = "fdw"
34

45
[[databases]]
56
name = "pgdog"

pgdog/src/admin/set.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ impl Command for Set {
160160
config.config.general.client_idle_in_transaction_timeout = self.value.parse()?;
161161
}
162162

163+
"cross_shard_backend" => {
164+
config.config.general.cross_shard_backend = Self::from_json(&self.value)?;
165+
}
166+
163167
_ => return Err(Error::Syntax),
164168
}
165169

pgdog/src/backend/databases.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ pub fn reload_from_existing() -> Result<(), Error> {
8383
let databases = from_config(&config);
8484

8585
replace_databases(databases, true)?;
86+
87+
// Reconfigure FDW with new schema.
88+
if config.config.general.cross_shard_backend.need_fdw() {
89+
PostgresLauncher::get().reconfigure();
90+
}
91+
8692
Ok(())
8793
}
8894

@@ -122,7 +128,24 @@ pub fn reload() -> Result<(), Error> {
122128
tls::reload()?;
123129

124130
// Reconfigure FDW with new schema.
125-
PostgresLauncher::get().reconfigure();
131+
match (
132+
old_config.config.general.cross_shard_backend.need_fdw(),
133+
new_config.config.general.cross_shard_backend.need_fdw(),
134+
) {
135+
(true, true) => {
136+
PostgresLauncher::get().reconfigure();
137+
}
138+
139+
(false, true) => {
140+
PostgresLauncher::get().launch();
141+
}
142+
143+
(true, false) => {
144+
PostgresLauncher::get().shutdown();
145+
}
146+
147+
(false, false) => {}
148+
}
126149

127150
// Remove any unused prepared statements.
128151
PreparedStatements::global()

pgdog/src/backend/fdw/launcher.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ impl PostgresLauncher {
8282
let _ = self.events.send(LauncherEvent::Start);
8383
}
8484

85+
pub(crate) fn shutdown(&self) {
86+
let _ = self.events.send(LauncherEvent::Shutdown);
87+
}
88+
8589
/// Request reconfiguration.
8690
pub(crate) fn reconfigure(&self) {
8791
let _ = self.events.send(LauncherEvent::Reconfigure);

pgdog/src/backend/fdw/postgres.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,41 @@ impl PostgresProcess {
122122
})
123123
}
124124

125+
/// Kill any existing process listening on the given port.
126+
/// This handles orphaned postgres processes from previous crashes.
127+
#[cfg(unix)]
128+
async fn kill_existing_on_port(port: u16) {
129+
// Use fuser to find and kill any process on the port
130+
let result = Command::new("fuser")
131+
.arg("-k")
132+
.arg(format!("{}/tcp", port))
133+
.stdout(Stdio::null())
134+
.stderr(Stdio::null())
135+
.status()
136+
.await;
137+
138+
if let Ok(status) = result {
139+
if status.success() {
140+
warn!(
141+
"[fdw] killed orphaned process on port {} from previous run",
142+
port
143+
);
144+
// Give it a moment to fully terminate
145+
sleep(Duration::from_millis(100)).await;
146+
}
147+
}
148+
}
149+
150+
#[cfg(not(unix))]
151+
async fn kill_existing_on_port(_port: u16) {
152+
// Not implemented for non-unix platforms
153+
}
154+
125155
/// Setup and launch Postgres process.
126156
pub(crate) async fn launch(&mut self) -> Result<(), Error> {
157+
// Clean up any orphaned postgres from previous crashes
158+
Self::kill_existing_on_port(self.port).await;
159+
127160
info!(
128161
"[fdw] initializing \"{}\" (PostgreSQL {})",
129162
self.initdb_dir.display(),
@@ -168,6 +201,22 @@ impl PostgresProcess {
168201
#[cfg(unix)]
169202
cmd.process_group(0); // Prevent sigint from terminal.
170203

204+
// SAFETY: prctl(PR_SET_PDEATHSIG) is async-signal-safe and doesn't
205+
// access any shared state. It tells the kernel to send SIGKILL to
206+
// this process when its parent dies, preventing orphaned processes.
207+
#[cfg(target_os = "linux")]
208+
{
209+
#[allow(unused_imports)]
210+
use std::os::unix::process::CommandExt;
211+
unsafe {
212+
cmd.pre_exec(|| {
213+
const PR_SET_PDEATHSIG: nix::libc::c_int = 1;
214+
nix::libc::prctl(PR_SET_PDEATHSIG, nix::libc::SIGKILL);
215+
Ok(())
216+
});
217+
}
218+
}
219+
171220
let child = cmd.spawn()?;
172221

173222
self.pid = child.id().map(|pid| pid as i32);
@@ -212,8 +261,22 @@ impl PostgresProcess {
212261
}
213262
}
214263

215-
_ = process.child.wait() => {
216-
error!("[fdw] postgres shut down unexpectedly");
264+
exit_status = process.child.wait() => {
265+
// Drain remaining stderr before reporting shutdown
266+
loop {
267+
let mut remaining = String::new();
268+
match reader.read_line(&mut remaining).await {
269+
Ok(0) => break, // EOF
270+
Ok(_) => {
271+
if !remaining.is_empty() {
272+
let remaining = LOG_PREFIX.replace(&remaining, "");
273+
info!("[fdw::subprocess] {}", remaining.trim());
274+
}
275+
}
276+
Err(_) => break,
277+
}
278+
}
279+
error!("[fdw] postgres shut down unexpectedly, exit status: {:?}", exit_status);
217280
break;
218281
}
219282

pgdog/src/backend/pool/cluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ impl Cluster {
291291
cross_shard_backend,
292292
};
293293

294-
if cross_shard_backend.need_fdw() {
294+
if cross_shard_backend.need_fdw() && cluster.shards().len() > 1 {
295295
cluster.fdw_lb = FdwLoadBalancer::new(&cluster).ok();
296296
}
297297

pgdog/src/backend/schema/postgres_fdw/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub use custom_types::{CustomType, CustomTypeKind, CustomTypes, CUSTOM_TYPES_QUE
1010
pub use error::Error;
1111
pub use extensions::{Extension, Extensions, EXTENSIONS_QUERY};
1212
pub use schema::{FdwServerDef, ForeignTableColumn, ForeignTableSchema, FOREIGN_TABLE_SCHEMA};
13-
pub use statement::{create_foreign_table, ForeignTableBuilder, PartitionStrategy};
13+
pub use statement::{
14+
create_foreign_table, CreateForeignTableResult, ForeignTableBuilder, PartitionStrategy,
15+
TypeMismatch,
16+
};
1417

1518
pub(crate) use statement::quote_identifier;

pgdog/src/backend/schema/postgres_fdw/postgres_fdw.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ WHERE
2727
AND n.nspname <> 'pg_catalog'
2828
AND n.nspname !~ '^pg_toast'
2929
AND n.nspname <> 'information_schema'
30+
AND NOT (n.nspname = 'pgdog' AND c.relname IN ('validator_bigint', 'validator_uuid', 'config'))
3031
AND NOT c.relispartition
3132
ORDER BY
3233
n.nspname,

pgdog/src/backend/schema/postgres_fdw/schema.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Foreign table schema query and data structures.
22
33
use std::collections::{HashMap, HashSet};
4-
use tracing::debug;
4+
use tracing::{debug, warn};
55

66
use crate::{
77
backend::{schema::postgres_fdw::create_foreign_table, Server, ShardingSchema},
@@ -11,6 +11,7 @@ use crate::{
1111
use super::custom_types::CustomTypes;
1212
use super::extensions::Extensions;
1313
use super::quote_identifier;
14+
use super::TypeMismatch;
1415

1516
/// Server definition for FDW setup.
1617
#[derive(Debug, Clone)]
@@ -134,18 +135,37 @@ impl ForeignTableSchema {
134135
self.custom_types.setup(server).await?;
135136

136137
let mut tables = HashSet::new();
138+
let mut all_type_mismatches: Vec<TypeMismatch> = Vec::new();
139+
137140
for ((schema, table), columns) in &self.tables {
141+
// Skip internal PgDog tables
142+
if Self::is_internal_table(schema, table) {
143+
continue;
144+
}
145+
138146
let dedup = (schema.clone(), table.clone());
139147
if !tables.contains(&dedup) {
140-
let statements = create_foreign_table(columns, sharding_schema)?;
141-
for sql in statements {
148+
let result = create_foreign_table(columns, sharding_schema)?;
149+
for sql in &result.statements {
142150
debug!("[fdw::setup] {} [{}]", sql, server.addr());
143-
server.execute(&sql).await?;
151+
server.execute(sql).await?;
144152
}
153+
all_type_mismatches.extend(result.type_mismatches);
145154
tables.insert(dedup);
146155
}
147156
}
148157

158+
// Log summary of type mismatches if any were found
159+
if !all_type_mismatches.is_empty() {
160+
warn!(
161+
"[fdw] {} table(s) skipped due to sharding config type mismatches:",
162+
all_type_mismatches.len()
163+
);
164+
for mismatch in &all_type_mismatches {
165+
warn!("[fdw] - {}", mismatch);
166+
}
167+
}
168+
149169
server.execute("COMMIT").await?;
150170
Ok(())
151171
}
@@ -182,6 +202,11 @@ impl ForeignTableSchema {
182202
&self.custom_types
183203
}
184204

205+
/// Check if a table is an internal PgDog table that shouldn't be exposed via FDW.
206+
fn is_internal_table(schema: &str, table: &str) -> bool {
207+
schema == "pgdog" && matches!(table, "validator_bigint" | "validator_uuid" | "config")
208+
}
209+
185210
/// Collect unique schemas from tables and custom types.
186211
fn schemas(&self) -> HashSet<String> {
187212
self.tables

0 commit comments

Comments
 (0)