@@ -26,6 +26,16 @@ use tracing::{debug, warn};
2626
2727use crate :: { kv:: PortFlags , main_service:: Proxy } ;
2828
29+ /// Outcome of a single fetch attempt, distinguishing what we can usefully retry.
30+ enum FetchError {
31+ /// Transient: connection failed, RPC timed out, agent returned 5xx, etc.
32+ /// The CVM might just be warming up — retry with backoff.
33+ Transient ( anyhow:: Error ) ,
34+ /// Permanent: instance is gone, or the CVM responded with data we can't
35+ /// parse (malformed tcb_info, schema mismatch). Retrying won't help.
36+ Permanent ( anyhow:: Error ) ,
37+ }
38+
2939/// Decide whether the gateway should send a PROXY protocol header on the
3040/// outbound connection to (`instance_id`, `port`).
3141///
@@ -76,26 +86,31 @@ async fn fetch_with_retry(state: &Proxy, instance_id: &str) {
7686 let mut attempt = 0u32 ;
7787 let mut backoff = cfg. backoff_initial ;
7888 loop {
79- match tokio:: time:: timeout ( cfg. timeout , fetch_and_store ( state, instance_id) ) . await {
80- Ok ( Ok ( ( ) ) ) => {
89+ let result =
90+ match tokio:: time:: timeout ( cfg. timeout , fetch_and_store ( state, instance_id) ) . await {
91+ Ok ( r) => r,
92+ // The Info() RPC took too long. Treat as transient — the CVM
93+ // may just be slow to come up.
94+ Err ( _) => Err ( FetchError :: Transient ( anyhow:: anyhow!(
95+ "Info() rpc timed out after {:?}" ,
96+ cfg. timeout
97+ ) ) ) ,
98+ } ;
99+ match result {
100+ Ok ( ( ) ) => {
81101 debug ! ( "port_attrs cached for instance {instance_id} (attempt {attempt})" ) ;
82102 return ;
83103 }
84- Ok ( Err ( err) ) if is_unknown_instance ( & err) => {
85- // Instance was recycled while the fetch was queued. No
86- // point retrying — the instance is gone.
87- debug ! ( "port_attrs fetch for {instance_id}: instance no longer exists, giving up" ) ;
104+ Err ( FetchError :: Permanent ( err) ) => {
105+ // Either the instance was recycled while queued, or the
106+ // agent responded with data we can't parse. Retrying won't
107+ // change either; bail.
108+ debug ! ( "port_attrs fetch for {instance_id}: permanent failure: {err:#}" ) ;
88109 return ;
89110 }
90- Ok ( Err ( err) ) => {
111+ Err ( FetchError :: Transient ( err) ) => {
91112 warn ! ( "port_attrs fetch for {instance_id} failed (attempt {attempt}): {err:#}" ) ;
92113 }
93- Err ( _) => {
94- warn ! (
95- "port_attrs fetch for {instance_id} timed out after {:?} (attempt {attempt})" ,
96- cfg. timeout
97- ) ;
98- }
99114 }
100115 if attempt >= cfg. max_retries {
101116 warn ! (
@@ -110,39 +125,50 @@ async fn fetch_with_retry(state: &Proxy, instance_id: &str) {
110125 }
111126}
112127
113- fn is_unknown_instance ( err : & anyhow:: Error ) -> bool {
114- err. chain ( )
115- . any ( |e| e. to_string ( ) . contains ( "unknown instance" ) )
116- }
117-
118- async fn fetch_and_store ( state : & Proxy , instance_id : & str ) -> Result < ( ) > {
128+ async fn fetch_and_store ( state : & Proxy , instance_id : & str ) -> Result < ( ) , FetchError > {
119129 let ( ip, agent_port) = {
120130 let guard = state. lock ( ) ;
121- let ip = guard. instance_ip ( instance_id) . context ( "unknown instance" ) ?;
131+ let ip = guard
132+ . instance_ip ( instance_id)
133+ // Instance was recycled — never coming back under this id.
134+ . ok_or_else ( || FetchError :: Permanent ( anyhow:: anyhow!( "unknown instance" ) ) ) ?;
122135 ( ip, guard. config . proxy . agent_port )
123136 } ;
124137 let attrs = fetch_port_attrs ( ip, agent_port) . await ?;
125138 state. lock ( ) . update_instance_port_attrs ( instance_id, attrs) ;
126139 Ok ( ( ) )
127140}
128141
129- async fn fetch_port_attrs ( ip : Ipv4Addr , agent_port : u16 ) -> Result < BTreeMap < u16 , PortFlags > > {
142+ async fn fetch_port_attrs (
143+ ip : Ipv4Addr ,
144+ agent_port : u16 ,
145+ ) -> Result < BTreeMap < u16 , PortFlags > , FetchError > {
130146 let url = format ! ( "http://{ip}:{agent_port}/prpc" ) ;
131147 let client = DstackGuestClient :: new ( PrpcClient :: new ( url) ) ;
132- let info = client. info ( ) . await . context ( "agent Info() rpc failed" ) ?;
148+ // Network/RPC errors here are transient: agent might still be coming up.
149+ let info = client
150+ . info ( )
151+ . await
152+ . context ( "agent Info() rpc failed" )
153+ . map_err ( FetchError :: Transient ) ?;
154+
155+ // Anything below this point is the agent telling us something we can't
156+ // turn into port_attrs — treat as permanent.
133157 if info. tcb_info . is_empty ( ) {
134158 // Legacy CVM with public_tcbinfo=false; we cannot inspect app-compose
135159 // remotely. Cache an empty map so we don't keep retrying.
136160 return Ok ( BTreeMap :: new ( ) ) ;
137161 }
138- let tcb: serde_json:: Value =
139- serde_json:: from_str ( & info. tcb_info ) . context ( "invalid tcb_info json" ) ?;
162+ let tcb: serde_json:: Value = serde_json:: from_str ( & info. tcb_info )
163+ . context ( "invalid tcb_info json" )
164+ . map_err ( FetchError :: Permanent ) ?;
140165 let raw = tcb
141166 . get ( "app_compose" )
142167 . and_then ( |v| v. as_str ( ) )
143- . context ( "tcb_info missing app_compose" ) ?;
144- let app_compose: AppCompose =
145- serde_json:: from_str ( raw) . context ( "failed to parse app_compose from tcb_info" ) ?;
168+ . ok_or_else ( || FetchError :: Permanent ( anyhow:: anyhow!( "tcb_info missing app_compose" ) ) ) ?;
169+ let app_compose: AppCompose = serde_json:: from_str ( raw)
170+ . context ( "failed to parse app_compose from tcb_info" )
171+ . map_err ( FetchError :: Permanent ) ?;
146172 Ok ( app_compose
147173 . ports
148174 . into_iter ( )
0 commit comments