Skip to content

Rewrite agfs with rust and only keep simple local mode#1209

Closed
MaojiaSheng wants to merge 0 commit intovolcengine:mainfrom
MaojiaSheng:main
Closed

Rewrite agfs with rust and only keep simple local mode#1209
MaojiaSheng wants to merge 0 commit intovolcengine:mainfrom
MaojiaSheng:main

Conversation

@MaojiaSheng
Copy link
Copy Markdown
Collaborator

No description provided.

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ MaojiaSheng
❌ openviking


openviking seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 3, 2026

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 70
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add Rust binding loader and build step

Relevant files:

  • openviking/pyagfs/init.py
  • openviking/utils/agfs_utils.py
  • setup.py

Sub-PR theme: Add Rust RAGFS implementation and plugins

Relevant files:

  • crates/ragfs/**/*
  • crates/ragfs-python/**/*

⚡ Recommended focus areas for review

Test Assertion Mismatch

QueueFS tests expect raw message data from dequeue/peek but receive JSON-serialized {"id": "...", "data": "..."}. This will cause test failures.

    assert_eq!(result1, data1);

    let result2 = fs.read("/test/dequeue", 0, 0).await.unwrap();
    assert_eq!(result2, data2);

    // Queue should be empty
    let result = fs.read("/test/dequeue", 0, 0).await;
    assert!(result.is_err());
}

#[tokio::test]
async fn test_queuefs_peek() {
    let fs = QueueFileSystem::new();

    // Create a queue first
    fs.mkdir("/test", 0o755).await.unwrap();

    // Enqueue a message
    let data = b"test message";
    fs.write("/test/enqueue", data, 0, WriteFlag::None)
        .await
        .unwrap();

    // Peek should return the message without removing it
    let result1 = fs.read("/test/peek", 0, 0).await.unwrap();
    assert_eq!(result1, data);

    let result2 = fs.read("/test/peek", 0, 0).await.unwrap();
    assert_eq!(result2, data);

    // Dequeue should still work
    let result3 = fs.read("/test/dequeue", 0, 0).await.unwrap();
    assert_eq!(result3, data);
Redundant Code

Computed env_impl and effective_impl are not used; get_binding_client already handles env var override.

config_impl = getattr(agfs_config, "impl", "auto")
env_impl = os.environ.get("RAGFS_IMPL", "").lower() or None
effective_impl = env_impl or config_impl or "auto"
BindingFileHandle None for Rust

When using Rust binding, BindingFileHandle is set to None, which may break code relying on this import.

AGFSBindingClient, BindingFileHandle = _resolve_binding(_RAGFS_IMPL_ENV or "auto")

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 3, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix unsafe unwrap in path existence check

The path_exists method uses unwrap_or(0) which can panic on database errors. Replace
it with proper error handling matching the pattern used in is_directory to safely
return false only when no rows are found, and propagate other errors.

crates/ragfs/src/plugins/sqlfs/backend.rs [160-162]

-let count: i64 = stmt
-    .query_row(params![path], |row| row.get(0))
-    .unwrap_or(0);
+let count: i64 = match stmt.query_row(params![path], |row| row.get(0)) {
+    Ok(count) => count,
+    Err(rusqlite::Error::QueryReturnedNoRows) => 0,
+    Err(e) => return Err(Error::internal(format!("query error: {}", e))),
+};
Suggestion importance[1-10]: 9

__

Why: The original code used unwrap_or(0) which could panic on database errors. The improved code properly handles QueryReturnedNoRows and propagates other errors, preventing potential panics and improving error handling.

High
Fix QueueFS dequeue test to expect JSON

The dequeue/peek operations return JSON objects (per QueueMessage struct), but the
test expects raw bytes. Update the test to parse the JSON response and verify the
"data" field.

crates/ragfs/src/plugins/queuefs/mod.rs [504-532]

 #[tokio::test]
 async fn test_queuefs_enqueue_dequeue() {
     let fs = QueueFileSystem::new();
 
     // Create a queue first
     fs.mkdir("/test", 0o755).await.unwrap();
 
     // Enqueue messages
     let data1 = b"message 1";
     let data2 = b"message 2";
 
     fs.write("/test/enqueue", data1, 0, WriteFlag::None)
         .await
         .unwrap();
     fs.write("/test/enqueue", data2, 0, WriteFlag::None)
         .await
         .unwrap();
 
     // Dequeue messages
     let result1 = fs.read("/test/dequeue", 0, 0).await.unwrap();
-    assert_eq!(result1, data1);
+    let msg1: QueueMessage = serde_json::from_slice(&result1).unwrap();
+    assert_eq!(msg1.data, "message 1");
 
     let result2 = fs.read("/test/dequeue", 0, 0).await.unwrap();
-    assert_eq!(result2, data2);
+    let msg2: QueueMessage = serde_json::from_slice(&result2).unwrap();
+    assert_eq!(msg2.data, "message 2");
 
     // Queue should be empty
     let result = fs.read("/test/dequeue", 0, 0).await;
     assert!(result.is_err());
 }
Suggestion importance[1-10]: 8

__

Why: The test was incorrectly expected raw bytes from dequeue, but the implementation returns JSON objects. This fix ensures the test validates the correct behavior.

Medium
Fix QueueFS peek test to expect JSON

The peek operation returns JSON, not raw bytes. Update the test to parse the JSON
response and validate the data field.

crates/ragfs/src/plugins/queuefs/mod.rs [534-557]

 #[tokio::test]
 async fn test_queuefs_peek() {
     let fs = QueueFileSystem::new();
 
     // Create a queue first
     fs.mkdir("/test", 0o755).await.unwrap();
 
     // Enqueue a message
     let data = b"test message";
     fs.write("/test/enqueue", data, 0, WriteFlag::None)
         .await
         .unwrap();
 
     // Peek should return the message without removing it
     let result1 = fs.read("/test/peek", 0, 0).await.unwrap();
-    assert_eq!(result1, data);
+    let msg1: QueueMessage = serde_json::from_slice(&result1).unwrap();
+    assert_eq!(msg1.data, "test message");
 
     let result2 = fs.read("/test/peek", 0, 0).await.unwrap();
-    assert_eq!(result2, data);
+    let msg2: QueueMessage = serde_json::from_slice(&result2).unwrap();
+    assert_eq!(msg2.data, "test message");
 
     // Dequeue should still work
     let result3 = fs.read("/test/dequeue", 0, 0).await.unwrap();
-    assert_eq!(result3, data);
+    let msg3: QueueMessage = serde_json::from_slice(&result3).unwrap();
+    assert_eq!(msg3.data, "test message");
 }
Suggestion importance[1-10]: 8

__

Why: The test was incorrectly expecting raw bytes from peek, but the implementation returns JSON objects. This fix ensures the test validates the correct behavior.

Medium
Fix directory rename to include children

The rename method doesn't handle directory children, leaving them with stale paths.
After renaming the directory entry, collect all child entries, update their paths,
and reinsert them into the map.

crates/ragfs/src/plugins/memfs/mod.rs [404-407]

 // Move entry
 entries.remove(&old_normalized);
-entries.insert(new_normalized, entry);
+entries.insert(new_normalized.clone(), entry.clone());
+
+// Rename children if entry is a directory
+if entry.is_dir {
+    let old_prefix = format!("{}/", old_normalized);
+    let new_prefix = format!("{}/", new_normalized);
+    let children: Vec<(String, FileEntry)> = entries
+        .iter()
+        .filter(|(k, _)| k.starts_with(&old_prefix))
+        .map(|(k, v)| (k.clone(), v.clone()))
+        .collect();
+
+    for (old_child_path, child_entry) in children {
+        entries.remove(&old_child_path);
+        let new_child_path = old_child_path.replacen(&old_prefix, &new_prefix, 1);
+        entries.insert(new_child_path, child_entry);
+    }
+}
 
 Ok(())
Suggestion importance[1-10]: 8

__

Why: The original rename method did not handle directory children, leaving them with stale paths. The improved code correctly renames all child entries when a directory is renamed, ensuring consistent filesystem state.

Medium
Fix QueueFS read_dir test entry count

The QueueFS directory listing includes 6 control files (including "ack"), but the
test expects 5. Update the assertion and check for the "ack" control file.

crates/ragfs/src/plugins/queuefs/mod.rs [616-639]

 #[tokio::test]
 async fn test_queuefs_read_dir() {
     let fs = QueueFileSystem::new();
 
     // Create a queue
     fs.mkdir("/test", 0o755).await.unwrap();
 
     // Root should list the queue
     let entries = fs.read_dir("/").await.unwrap();
     assert_eq!(entries.len(), 1);
     assert_eq!(entries[0].name, "test");
     assert!(entries[0].is_dir);
 
     // Queue directory should list control files
     let entries = fs.read_dir("/test").await.unwrap();
-    assert_eq!(entries.len(), 5);
+    assert_eq!(entries.len(), 6);
 
     let names: Vec<String> = entries.iter().map(|e| e.name.clone()).collect();
     assert!(names.contains(&"enqueue".to_string()));
     assert!(names.contains(&"dequeue".to_string()));
     assert!(names.contains(&"peek".to_string()));
     assert!(names.contains(&"size".to_string()));
     assert!(names.contains(&"clear".to_string()));
+    assert!(names.contains(&"ack".to_string()));
 }
Suggestion importance[1-10]: 7

__

Why: The test expected 5 control files but the implementation provides 6 (including "ack"). This fix updates the assertion and checks for all control files.

Medium
General
Add S3 client timeout and retry logic

Add timeout and retry configuration to the S3 client to improve API resilience. Set
reasonable connect/read timeouts and enable standard retry behavior for transient
errors.

crates/ragfs/src/plugins/s3fs/client.rs [129-132]

 // Build S3 config
 let mut s3_config_builder = aws_sdk_s3::Config::builder()
     .behavior_version(BehaviorVersion::latest())
     .region(Region::new(region))
-    .force_path_style(use_path_style);
+    .force_path_style(use_path_style)
+    .timeout_config(
+        aws_sdk_s3::config::TimeoutConfig::builder()
+            .connect_timeout(std::time::Duration::from_secs(30))
+            .read_timeout(std::time::Duration::from_secs(60))
+            .build()
+    )
+    .retry_config(
+        aws_sdk_s3::config::RetryConfig::standard()
+            .with_max_attempts(3)
+    );
Suggestion importance[1-10]: 5

__

Why: Adding timeout and retry configuration improves the S3 client's resilience to transient network errors and slow responses, making the filesystem more robust.

Low

@MaojiaSheng MaojiaSheng changed the title [WIP] rewrite agfs with rust and only keep simple local mode Rewrite agfs with rust and only keep simple local mode Apr 3, 2026
@MaojiaSheng MaojiaSheng closed this Apr 4, 2026
@github-project-automation github-project-automation bot moved this from Backlog to Done in OpenViking project Apr 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

2 participants