Skip to content

Commit 5a7807c

Browse files
author
80347547
committed
fix(p2p): address PR CurvineIO#671 review findings
1 parent 2aa35c8 commit 5a7807c

9 files changed

Lines changed: 125 additions & 29 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

curvine-client/src/block/block_reader.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use orpc::runtime::{RpcRuntime, Runtime};
2828
use orpc::sys::DataSlice;
2929
use orpc::{err_box, CommonResult};
3030
use std::sync::Arc;
31+
use std::time::Duration;
3132
use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard};
33+
use tokio::time::timeout;
3234

3335
enum ReaderAdapter {
3436
Local(BlockReaderLocal),
@@ -303,7 +305,10 @@ impl BlockReader {
303305
return None;
304306
}
305307
let lock = self.fs_context.read_chunk_flight_lock(read_key.clone());
306-
let guard = lock.clone().lock_owned().await;
308+
let timeout_ms = self.fs_context.conf.client.p2p.transfer_timeout_ms.max(1);
309+
let guard = timeout(Duration::from_millis(timeout_ms), lock.clone().lock_owned())
310+
.await
311+
.ok()?;
307312
Some((lock, guard))
308313
}
309314

curvine-client/src/file/curvine_filesystem.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,9 @@ impl CurvineFileSystem {
351351

352352
let (version, peer_whitelist, tenant_whitelist, signature) =
353353
self.fs_client.get_p2p_runtime_policy().await?;
354-
if !p2p_service
354+
let _ = p2p_service
355355
.sync_runtime_policy_from_master(version, peer_whitelist, tenant_whitelist, signature)
356-
.await
357-
{
358-
return err_box!("failed to apply p2p runtime policy from master");
359-
}
356+
.await;
360357
Ok(())
361358
}
362359

curvine-client/src/file/fs_context.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ impl FsContext {
121121
.eviction_policy(EvictionPolicy::lru())
122122
.build_with_hasher(BuildHasherDefault::<FxHasher>::default());
123123

124-
let read_chunk_flights =
125-
FastDashMap::with_capacity(conf.client.read_chunk_cache_capacity.min(65536) as usize);
124+
let read_chunk_flights = FastDashMap::default();
126125

127126
let p2p_service = if conf.client.p2p.enable {
128127
let service = Arc::new(P2pService::new_with_runtime(

curvine-client/src/p2p/cache_manager.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@ impl CacheManager {
186186
let (path, stored_len, mtime, entry_checksum) = match self.entries.get_mut(&chunk_id) {
187187
Some(mut entry) => {
188188
if entry.expire_at_ms <= now_ms {
189-
self.expired_chunks.fetch_add(1, Ordering::Relaxed);
190189
drop(entry);
191-
self.invalidate(chunk_id);
190+
if self.invalidate(chunk_id) {
191+
self.expired_chunks.fetch_add(1, Ordering::Relaxed);
192+
}
192193
return CacheGetResult {
193194
tag: CacheGetResultTag::Miss,
194195
chunk: None,
@@ -280,10 +281,15 @@ impl CacheManager {
280281
.filter_map(|v| (v.value().expire_at_ms <= now_ms).then_some(*v.key()))
281282
.collect();
282283
if !expired.is_empty() {
283-
self.expired_chunks
284-
.fetch_add(expired.len() as u64, Ordering::Relaxed);
284+
let mut newly_expired = 0u64;
285285
for chunk_id in expired {
286-
self.invalidate(chunk_id);
286+
if self.invalidate(chunk_id) {
287+
newly_expired += 1;
288+
}
289+
}
290+
if newly_expired > 0 {
291+
self.expired_chunks
292+
.fetch_add(newly_expired, Ordering::Relaxed);
287293
}
288294
}
289295
}
@@ -310,12 +316,14 @@ impl CacheManager {
310316
}
311317
}
312318

313-
fn invalidate(&self, chunk_id: ChunkId) {
319+
fn invalidate(&self, chunk_id: ChunkId) -> bool {
314320
if let Some((_, entry)) = self.entries.remove(&chunk_id) {
315321
self.usage_bytes.fetch_sub(entry.len, Ordering::Relaxed);
316322
self.invalidations.fetch_add(1, Ordering::Relaxed);
317323
let _ = fs::remove_file(entry.path);
324+
return true;
318325
}
326+
false
319327
}
320328

321329
fn evict_until_fit(&self) {

curvine-common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ slog-stdlog = { workspace = true }
4141
chrono = { workspace = true }
4242
trait-variant = { workspace = true }
4343
sha2 = "0.10"
44+
hmac = "0.12"
4445

4546
anyhow = { workspace = true }
4647
tracing = { workspace = true }

curvine-common/src/conf/client_conf.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ impl ClientP2pConf {
113113
)
114114
.into());
115115
}
116+
if self.provider_ttl.is_zero() {
117+
return Err(Error::new(
118+
ErrorKind::InvalidInput,
119+
"client.p2p.provider_ttl must be greater than 0",
120+
)
121+
.into());
122+
}
116123
if self.max_inflight_per_peer > 0 && self.min_inflight_per_peer > self.max_inflight_per_peer
117124
{
118125
return Err(Error::new(
@@ -653,6 +660,16 @@ mod tests {
653660
assert!(conf.init().is_err());
654661
}
655662

663+
#[test]
664+
fn client_p2p_conf_rejects_zero_provider_ttl() {
665+
let mut conf = ClientP2pConf {
666+
provider_ttl_str: "0s".to_string(),
667+
provider_publish_interval_str: "1s".to_string(),
668+
..ClientP2pConf::default()
669+
};
670+
assert!(conf.init().is_err());
671+
}
672+
656673
#[test]
657674
fn client_p2p_conf_rejects_zero_layered_timeout() {
658675
let mut conf = ClientP2pConf {

curvine-common/src/utils/common_utils.rs

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use hmac::{Hmac, Mac};
1516
use log::info;
1617
use orpc::common::Utils;
1718
use orpc::{err_msg, CommonResult};
18-
use sha2::{Digest, Sha256};
19+
use sha2::Sha256;
1920
use std::collections::HashMap;
2021
use std::process::{Command, Stdio};
2122

23+
type PolicyHmac = Hmac<Sha256>;
24+
2225
pub struct CommonUtils;
2326

2427
impl CommonUtils {
@@ -67,18 +70,19 @@ impl CommonUtils {
6770
if secret.trim().is_empty() {
6871
return String::new();
6972
}
70-
let mut hasher = Sha256::new();
71-
Self::update_signing_segment(&mut hasher, secret);
72-
Self::update_signing_segment(&mut hasher, &version.to_string());
73-
Self::update_signing_segment(&mut hasher, &peer_whitelist.len().to_string());
73+
let Ok(mut mac) = PolicyHmac::new_from_slice(secret.as_bytes()) else {
74+
return String::new();
75+
};
76+
Self::update_signing_segment(&mut mac, &version.to_string());
77+
Self::update_signing_segment(&mut mac, &peer_whitelist.len().to_string());
7478
for peer in peer_whitelist {
75-
Self::update_signing_segment(&mut hasher, peer);
79+
Self::update_signing_segment(&mut mac, peer);
7680
}
77-
Self::update_signing_segment(&mut hasher, &tenant_whitelist.len().to_string());
81+
Self::update_signing_segment(&mut mac, &tenant_whitelist.len().to_string());
7882
for tenant in tenant_whitelist {
79-
Self::update_signing_segment(&mut hasher, tenant);
83+
Self::update_signing_segment(&mut mac, tenant);
8084
}
81-
let digest = hasher.finalize();
85+
let digest = mac.finalize().into_bytes();
8286
digest.iter().map(|v| format!("{:02x}", v)).collect()
8387
}
8488

@@ -121,17 +125,48 @@ impl CommonUtils {
121125
})
122126
}
123127

124-
fn update_signing_segment(hasher: &mut Sha256, value: &str) {
125-
hasher.update(value.len().to_string().as_bytes());
126-
hasher.update(b":");
127-
hasher.update(value.as_bytes());
128-
hasher.update(b"\n");
128+
fn update_signing_segment(mac: &mut PolicyHmac, value: &str) {
129+
mac.update(value.len().to_string().as_bytes());
130+
mac.update(b":");
131+
mac.update(value.as_bytes());
132+
mac.update(b"\n");
129133
}
130134
}
131135

132136
#[cfg(test)]
133137
mod tests {
134138
use super::CommonUtils;
139+
use sha2::{Digest, Sha256};
140+
141+
fn legacy_prefix_signature(
142+
secret: &str,
143+
version: u64,
144+
peer_whitelist: &[String],
145+
tenant_whitelist: &[String],
146+
) -> String {
147+
fn update_segment(hasher: &mut Sha256, value: &str) {
148+
hasher.update(value.len().to_string().as_bytes());
149+
hasher.update(b":");
150+
hasher.update(value.as_bytes());
151+
hasher.update(b"\n");
152+
}
153+
let mut hasher = Sha256::new();
154+
update_segment(&mut hasher, secret);
155+
update_segment(&mut hasher, &version.to_string());
156+
update_segment(&mut hasher, &peer_whitelist.len().to_string());
157+
for peer in peer_whitelist {
158+
update_segment(&mut hasher, peer);
159+
}
160+
update_segment(&mut hasher, &tenant_whitelist.len().to_string());
161+
for tenant in tenant_whitelist {
162+
update_segment(&mut hasher, tenant);
163+
}
164+
hasher
165+
.finalize()
166+
.iter()
167+
.map(|v| format!("{:02x}", v))
168+
.collect()
169+
}
135170

136171
#[test]
137172
fn p2p_policy_signature_roundtrip() {
@@ -169,4 +204,13 @@ mod tests {
169204
&signatures
170205
));
171206
}
207+
208+
#[test]
209+
fn p2p_policy_signature_should_not_use_legacy_prefix_hash() {
210+
let peers = vec!["peer-a".to_string()];
211+
let tenants = vec!["tenant-a".to_string(), "tenant-b".to_string()];
212+
let hmac_signature = CommonUtils::sign_p2p_policy("secret", 7, &peers, &tenants);
213+
let legacy_signature = legacy_prefix_signature("secret", 7, &peers, &tenants);
214+
assert_ne!(hmac_signature, legacy_signature);
215+
}
172216
}

curvine-server/src/master/meta/inode/inode_file.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,11 @@ impl InodeFile {
390390
}
391391

392392
fn bump_version_epoch(&mut self) {
393-
self.version_epoch = self.version_epoch.max(1).saturating_add(1);
393+
self.version_epoch = if self.version_epoch <= 0 {
394+
1
395+
} else {
396+
self.version_epoch.saturating_add(1)
397+
};
394398
}
395399

396400
/// Extend the file to the specified length by allocating new blocks or extending existing ones.
@@ -515,3 +519,23 @@ impl PartialEq for InodeFile {
515519
self.id == other.id
516520
}
517521
}
522+
523+
#[cfg(test)]
524+
mod tests {
525+
use super::InodeFile;
526+
527+
#[test]
528+
fn bump_version_epoch_should_increment_normal_epoch() {
529+
let mut inode = InodeFile::new(1, 1);
530+
inode.bump_version_epoch();
531+
assert_eq!(inode.version_epoch, 2);
532+
}
533+
534+
#[test]
535+
fn bump_version_epoch_should_recover_legacy_zero_epoch() {
536+
let mut inode = InodeFile::new(1, 1);
537+
inode.version_epoch = 0;
538+
inode.bump_version_epoch();
539+
assert_eq!(inode.version_epoch, 1);
540+
}
541+
}

0 commit comments

Comments
 (0)