Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions crates/ark/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pub struct Console {
tasks_interrupt_rx: Receiver<RTask>,
tasks_idle_rx: Receiver<RTask>,
tasks_idle_any_rx: Receiver<RTask>,
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo)>,
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo, Option<String>)>,

/// Channel to communicate requests and events to the frontend
/// by forwarding them through the UI comm. Optional, and really Positron specific.
Expand Down Expand Up @@ -2056,7 +2056,18 @@ impl Console {

let (mut fut, mut start_info) = match fut {
Some(fut) => (fut, waker.start_info.clone()),
None => self.pending_futures.remove(&waker.id).unwrap(),
None => {
let (fut, start_info, suspended_capture) =
self.pending_futures.remove(&waker.id).unwrap();
// Restore `captured_output` that was suspended when this
// future last yielded, so the future's `ConsoleOutputCapture`
// can continue accumulating into it.
if suspended_capture.is_some() {
stdext::soft_assert!(self.captured_output.is_none());
self.captured_output = suspended_capture;
}
(fut, start_info)
},
};

let awaker = waker.clone().into();
Expand All @@ -2073,7 +2084,12 @@ impl Console {
},
Poll::Pending => {
start_info.bump_elapsed(tick.elapsed());
self.pending_futures.insert(waker.id, (fut, start_info));
// Suspend `captured_output` so that console output during
// evaluation (e.g. debug messages) goes to IOPub instead of
// being swallowed by a pending task's capture buffer.
let suspended_capture = self.captured_output.take();
self.pending_futures
.insert(waker.id, (fut, start_info, suspended_capture));
None
},
}
Expand Down
72 changes: 70 additions & 2 deletions crates/ark/src/r_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crossbeam::channel::bounded;
use crossbeam::channel::unbounded;
use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use libr::SEXP;
use uuid::Uuid;

use crate::console::Console;
Expand Down Expand Up @@ -372,5 +373,72 @@ where
tasks_tx.send(task).unwrap();
}

// Tests are tricky because `harp::fixtures::r_test_init()` is very bare bones and
// doesn't have an `CONSOLE` or `CONSOLE_TASKS_TX`.
// Test-only R-callable functions for spawning a pending idle task.
//
// This allows integration tests to exercise the `captured_output`
// save/restore mechanism in `poll_task`. The flow is:
//
// 1. Test calls `.Call("ps_test_spawn_pending_task")` from R.
// This spawns an async idle task that creates a `ConsoleOutputCapture`
// and then awaits a oneshot channel (staying Pending).
//
// 2. On the next event-loop iteration the task is polled, `captured_output`
// is set, and the future yields. `poll_task` should save the capture
// into `pending_futures` and clear `captured_output`.
//
// 3. The test busy-loops with `getOption("ark.test.task_polled")` until it
// sees `TRUE`, confirming the task has been polled.
//
// 4. The test sends another execute request (e.g. `cat("hello\n")`).
// Because `captured_output` has been cleared, the output reaches IOPub.
//
// 5. Test calls `.Call("ps_test_complete_pending_task")` to unblock the
// oneshot, letting the idle task finish and drop its capture cleanly.
Comment thread
lionel- marked this conversation as resolved.

static PENDING_TASK_TX: Mutex<Option<futures::channel::oneshot::Sender<()>>> = Mutex::new(None);

#[harp::register]
unsafe extern "C-unwind" fn ps_test_spawn_pending_task() -> anyhow::Result<SEXP> {
if !stdext::IS_TESTING {
return Err(anyhow::anyhow!(
"ps_test_spawn_pending_task is only available in tests"
));
}

// Reset the flag before spawning
harp::parse_eval_base("options(ark.test.task_polled = FALSE)")?;

let (tx, rx) = futures::channel::oneshot::channel::<()>();
*PENDING_TASK_TX.lock().unwrap() = Some(tx);

spawn_idle(async move |_capture| {
// Signal that we've been polled (capture is now active)
harp::parse_eval_base("options(ark.test.task_polled = TRUE)").ok();

// Stay pending until the test signals completion
let _ = rx.await;

// Clean up
harp::parse_eval_base("options(ark.test.task_polled = NULL)").ok();
});

Ok(libr::R_NilValue)
}

/// Signal the pending idle task to complete. The oneshot sender is
/// consumed, the task's future resolves, and its `ConsoleOutputCapture`
/// is dropped (restoring the previous capture state).
#[harp::register]
unsafe extern "C-unwind" fn ps_test_complete_pending_task() -> anyhow::Result<SEXP> {
if !stdext::IS_TESTING {
return Err(anyhow::anyhow!(
"ps_test_complete_pending_task is only available in tests"
));
}

if let Some(tx) = PENDING_TASK_TX.lock().unwrap().take() {
let _ = tx.send(());
}

Ok(libr::R_NilValue)
}
136 changes: 136 additions & 0 deletions crates/ark/tests/kernel-captured-output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//
// kernel-captured-output.rs
//
// Copyright (C) 2026 Posit Software, PBC. All rights reserved.
//
// Integration tests verifying that pending async idle tasks don't swallow
// console output. See the `captured_output` save/restore mechanism in
// `poll_task` and the test helpers in `r_task.rs` for the full flow.

use std::time::Duration;
use std::time::Instant;

use amalthea::fixtures::dummy_frontend::ExecuteRequestOptions;
use ark_test::comm::RECV_TIMEOUT;
use ark_test::DummyArkFrontend;

/// Poll until the kernel confirms the pending idle task has been polled
/// by checking the `ark.test.task_polled` R option.
///
/// We sleep between iterations so the kernel's event loop gets actual
/// idle time. Without the sleep, execute requests win the `try_recv`
/// priority check every time and completely starve the idle-task
/// channel. The kernel never reaches `select` where it could pick up
/// the idle task.
fn wait_until_task_polled(frontend: &DummyArkFrontend) {
let deadline = Instant::now() + RECV_TIMEOUT;
let polled = std::cell::Cell::new(false);
loop {
std::thread::sleep(Duration::from_millis(50));
frontend.execute_request("getOption('ark.test.task_polled', FALSE)", |result| {
polled.set(result.contains("TRUE"));
});
if polled.get() {
return;
}
if Instant::now() >= deadline {
panic!("Timed out waiting for idle task to be polled");
}
}
}

#[test]
fn test_pending_idle_task_does_not_swallow_stdout() {
let frontend = DummyArkFrontend::lock();

// Spawn an idle task that holds a ConsoleOutputCapture and stays pending.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_spawn_pending_task"))"#);

// Wait until the idle task has been polled (capture is active then saved).
wait_until_task_polled(&frontend);

// This output must reach IOPub as a stream message, not be captured.
frontend.send_execute_request(
r#"cat("hello from stdout\n")"#,
ExecuteRequestOptions::default(),
);
frontend.recv_iopub_busy();
frontend.recv_iopub_execute_input();
frontend.assert_stream_stdout_contains("hello from stdout");
frontend.recv_iopub_idle();
frontend.recv_shell_execute_reply();

// Clean up: unblock the pending task so it finishes.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_complete_pending_task"))"#);
}

#[test]
fn test_pending_idle_task_does_not_swallow_autoprint() {
let frontend = DummyArkFrontend::lock();

// Spawn the pending idle task.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_spawn_pending_task"))"#);

// Wait until the idle task has been polled.
wait_until_task_polled(&frontend);

// Autoprint output must still arrive as execute_result.
frontend.execute_request("42", |result| {
assert_eq!(result, "[1] 42");
});

// Clean up.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_complete_pending_task"))"#);
}

#[test]
fn test_pending_idle_task_does_not_swallow_stderr() {
let frontend = DummyArkFrontend::lock();

// Spawn the pending idle task.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_spawn_pending_task"))"#);

// Wait until the idle task has been polled.
wait_until_task_polled(&frontend);

// Stderr must still reach IOPub.
frontend.send_execute_request(
r#"cat("hello from stderr\n", file = stderr())"#,
ExecuteRequestOptions::default(),
);
frontend.recv_iopub_busy();
frontend.recv_iopub_execute_input();
frontend.assert_stream_stderr_contains("hello from stderr");
frontend.recv_iopub_idle();
frontend.recv_shell_execute_reply();

// Clean up.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_complete_pending_task"))"#);
}

#[test]
fn test_multiple_pending_tasks_do_not_swallow_output() {
let frontend = DummyArkFrontend::lock();

// Spawn two pending idle tasks. The second spawn overwrites the oneshot
// sender so the first task will be orphaned (its oneshot is dropped),
// which is fine - the future resolves with Err(Canceled). The R option
// is reset by each spawn and set by each poll, so we only need to wait
// for the last one.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_spawn_pending_task"))"#);
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_spawn_pending_task"))"#);

// Wait until the idle task has been polled.
wait_until_task_polled(&frontend);

// Output must still reach IOPub.
frontend.send_execute_request(r#"cat("still works\n")"#, ExecuteRequestOptions::default());
frontend.recv_iopub_busy();
frontend.recv_iopub_execute_input();
frontend.assert_stream_stdout_contains("still works");
frontend.recv_iopub_idle();
frontend.recv_shell_execute_reply();

// Clean up.
frontend.execute_request_invisibly(r#"invisible(.Call("ps_test_complete_pending_task"))"#);
}
14 changes: 14 additions & 0 deletions crates/stdext/src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ macro_rules! debug_panic {
};
}

#[macro_export]
macro_rules! soft_assert {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not debug_assert!, like debug_panic!?

they both have the same implication:

  • panic, but only die during debug
  • assert, but only die during debug

Copy link
Copy Markdown
Contributor Author

@lionel- lionel- Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the name I initially picked, but it conflicts with a macro in the standard library. I thought it's fine to have stdext::debug_assert! as a disambiguator but then ran into complications due to scoping of macro exports at top-level that I don't fully understand.

( $cond:expr $(,)? ) => {
if !$cond {
stdext::debug_panic!("assertion failed: {}", stringify!($cond));
}
};
( $cond:expr, $($fmt_arg:tt)+ ) => {
if !$cond {
stdext::debug_panic!( $($fmt_arg)+ );
}
};
}

// From https://github.com/zed-industries/zed/blob/a910c594/crates/util/src/util.rs#L554
pub trait ResultExt<E> {
type Ok;
Expand Down