Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions apps/server/src/services/agent.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::*;
use crate::services::utf8_stream::Utf8StreamDecoder;

const DEFAULT_PTY_COLS: u16 = 120;
const DEFAULT_PTY_ROWS: u16 = 30;
Expand Down Expand Up @@ -259,11 +260,41 @@ pub(crate) fn agent_start(
std::thread::spawn(move || {
let mut reader = reader;
let mut buf = [0u8; 4096];
let mut decoder = Utf8StreamDecoder::new();
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(0) => {
let text = decoder.finish();
if !text.is_empty() {
if let Ok(mut lifecycle_state) = lifecycle_fallback_state_out.lock() {
if let Some((kind, source_event, data)) =
fallback_agent_lifecycle_from_output(&mut lifecycle_state, &text)
{
emit_agent_lifecycle(
&app_handle,
&workspace_id_out,
&session_out,
kind,
source_event,
&data,
);
}
}
emit_agent(
&app_handle,
&workspace_id_out,
&session_out,
"stdout",
&text,
);
let state: State<AppState> = state_handle.state();
let _ =
append_session_stream(state, &workspace_id_out, session_out_num, &text);
}
break;
}
Ok(n) => {
let text = String::from_utf8_lossy(&buf[..n]).to_string();
let text = decoder.push(&buf[..n]);
if text.is_empty() {
continue;
}
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod filesystem;
pub(crate) mod git;
pub(crate) mod system;
pub(crate) mod terminal;
pub(crate) mod utf8_stream;
pub(crate) mod workspace;
pub(crate) mod workspace_runtime;
pub(crate) mod workspace_watch;
19 changes: 17 additions & 2 deletions apps/server/src/services/terminal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::*;
use crate::services::utf8_stream::Utf8StreamDecoder;

const DEFAULT_PTY_COLS: u16 = 120;
const DEFAULT_PTY_ROWS: u16 = 30;
Expand Down Expand Up @@ -83,11 +84,25 @@ pub(crate) fn terminal_create(
std::thread::spawn(move || {
let mut reader = reader;
let mut buf = [0u8; 4096];
let mut decoder = Utf8StreamDecoder::new();
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(0) => {
let text = decoder.finish();
if !text.is_empty() {
emit_terminal(&app_handle, &workspace_id_out, terminal_id, &text);
let state: State<AppState> = state_handle.state();
let _ = append_workspace_terminal_output(
state,
&workspace_id_out,
terminal_id,
&text,
);
}
break;
}
Ok(n) => {
let text = String::from_utf8_lossy(&buf[..n]).to_string();
let text = decoder.push(&buf[..n]);
if text.is_empty() {
continue;
}
Expand Down
85 changes: 85 additions & 0 deletions apps/server/src/services/utf8_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
pub(crate) struct Utf8StreamDecoder {
pending: Vec<u8>,
}

impl Utf8StreamDecoder {
pub(crate) fn new() -> Self {
Self {
pending: Vec::new(),
}
}

pub(crate) fn push(&mut self, chunk: &[u8]) -> String {
if chunk.is_empty() {
return String::new();
}

self.pending.extend_from_slice(chunk);
let mut output = String::new();

loop {
match std::str::from_utf8(&self.pending) {
Ok(valid) => {
output.push_str(valid);
self.pending.clear();
break;
}
Err(error) => {
let valid_up_to = error.valid_up_to();
if valid_up_to > 0 {
let valid = std::str::from_utf8(&self.pending[..valid_up_to])
.unwrap_or_default();
output.push_str(valid);
self.pending.drain(..valid_up_to);
}

match error.error_len() {
Some(len) => {
output.push('\u{FFFD}');
self.pending.drain(..len);
}
None => break,
}
}
}
}

output
}

pub(crate) fn finish(&mut self) -> String {
if self.pending.is_empty() {
return String::new();
}
let flushed = String::from_utf8_lossy(&self.pending).to_string();
self.pending.clear();
flushed
}
}

#[cfg(test)]
mod tests {
use super::Utf8StreamDecoder;

#[test]
fn decodes_multibyte_characters_split_across_chunks() {
let mut decoder = Utf8StreamDecoder::new();

let first = decoder.push(&[0xE4, 0xBD]);
let second = decoder.push(&[0xA0, 0xE5, 0xA5, 0xBD]);
let flushed = decoder.finish();

assert_eq!(first, "");
assert_eq!(second, "你好");
assert_eq!(flushed, "");
}

#[test]
fn preserves_invalid_bytes_with_replacement_and_continues() {
let mut decoder = Utf8StreamDecoder::new();

let output = decoder.push(&[0x66, 0x80, 0x6F, 0x6F]);

assert_eq!(output, "f\u{FFFD}oo");
}
}
36 changes: 36 additions & 0 deletions apps/server/src/ws/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,40 @@ pub(crate) enum WsEnvelope {
pub(crate) enum WsClientEnvelope {
Ping { ts: i64 },
Pong { ts: i64 },
AgentSend {
workspace_id: String,
session_id: String,
input: String,
append_newline: Option<bool>,
fencing_token: i64,
},
TerminalWrite {
workspace_id: String,
terminal_id: u64,
input: String,
fencing_token: i64,
},
TerminalResize {
workspace_id: String,
terminal_id: u64,
cols: u16,
rows: u16,
fencing_token: i64,
},
AgentResize {
workspace_id: String,
session_id: String,
cols: u16,
rows: u16,
fencing_token: i64,
},
SessionUpdate {
workspace_id: String,
session_id: u64,
patch: SessionPatch,
fencing_token: i64,
},
WorkspaceControllerHeartbeat {
workspace_id: String,
},
}
Loading
Loading