Skip to content

Concurrent use of Context in stream causes a freeze #89

Description

@FeJe

What went wrong

When the Restate context is used in a concurrent stream, the stream is never completed.

  • With buffered(1) Everything works correctly.
  • With buffered(3): The first 3 items are processed, and then the stream freezes indefinitely.

Minimal example

Tested with:

  • restate-sdk 0.7 and 0.8
  • restatedev/restate:1.6.1
  • on Windows

Cargo.toml

[package]
name = "minimal_example"
version = "0.1.0"
edition = "2024"

[dependencies]
futures = "0.3"
restate-sdk = "0.7"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"

src/main.rs

use futures::{StreamExt, stream};
use restate_sdk::prelude::*;
use std::{convert::Infallible, time::Duration};
use tokio::time::sleep;
use tracing::info;

#[restate_sdk::service]
trait MinimalExample {
    async fn process_working() -> Result<(), Infallible>;
    async fn process_not_working() -> Result<(), Infallible>;
}

struct MinimalExampleImpl;

impl MinimalExample for MinimalExampleImpl {
    async fn process_working(&self, ctx: Context<'_>) -> Result<(), Infallible> {
        let _res = stream::iter(1..10)
            .map(|i| context_run(&ctx, i))
            .buffered(1)
            .collect::<Vec<_>>()
            .await;
        Ok(())
    }

    async fn process_not_working(&self, ctx: Context<'_>) -> Result<(), Infallible> {
        let _res = stream::iter(1..10)
            .map(|i| context_run(&ctx, i))
            .buffered(3)
            .collect::<Vec<_>>()
            .await;
        Ok(())
    }
}

async fn context_run(ctx: &Context<'_>, param: i32) -> Result<i32, TerminalError> {
    ctx.run(|| async move {
        info!("{param}");
        sleep(Duration::from_millis(700)).await;
        Ok(param)
    })
    .name(format!("context_operation-{param}"))
    .await
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();
    HttpServer::new(Endpoint::builder().bind(MinimalExampleImpl.serve()).build())
        .listen_and_serve("0.0.0.0:9080".parse().unwrap())
        .await;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions