-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlite_query_stream.rs
More file actions
98 lines (81 loc) · 2.7 KB
/
Copy pathsqlite_query_stream.rs
File metadata and controls
98 lines (81 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::sync::Arc;
use async_sqlite::Client;
use rust_extensions::StrOrString;
use crate::{
sql::{SelectBuilder, SqlValues},
sql_select::SelectEntity,
sql_where::SqlWhereModel,
DbRow, SqlLiteError,
};
pub struct SqliteQueryStream<TEntity: SelectEntity + Send + Sync + 'static> {
rx: tokio::sync::mpsc::Receiver<Result<TEntity, async_sqlite::rusqlite::Error>>,
}
impl<TEntity: SelectEntity + Send + Sync + 'static> SqliteQueryStream<TEntity> {
pub fn new<TWhereModel: SqlWhereModel + Send + Sync + 'static>(
client: Arc<Client>,
table_name: StrOrString<'static>,
select_builder: SelectBuilder,
where_model: Option<TWhereModel>,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(2048);
tokio::spawn(select_builder_stream::<TEntity, TWhereModel>(
client,
table_name,
select_builder,
where_model,
tx,
));
Self { rx }
}
pub async fn get_next(&mut self) -> Option<Result<TEntity, SqlLiteError>> {
let next_one = self.rx.recv().await?;
match next_one {
Ok(item) => Some(Ok(item)),
Err(err) => Some(Err(err.into())),
}
}
}
async fn select_builder_stream<
TEntity: SelectEntity + Send + Sync + 'static,
TWhereModel: SqlWhereModel,
>(
client: Arc<Client>,
table_name: StrOrString<'static>,
select_builder: SelectBuilder,
where_model: Option<TWhereModel>,
tx: tokio::sync::mpsc::Sender<Result<TEntity, async_sqlite::rusqlite::Error>>,
) {
let mut sql = String::new();
let mut sql_values = SqlValues::new();
select_builder.build_select_sql(
&mut sql,
&mut sql_values,
table_name.as_str(),
where_model.as_ref(),
);
let sql = Arc::new(sql);
let sql_spawned = sql.clone();
let result = client
.conn(move |conn| {
let mut stmt = conn.prepare(&sql_spawned)?;
let response = stmt.query_map(sql_values.get_params_to_invoke().as_slice(), |row| {
let db_row = DbRow::new(row, TEntity::SELECT_FIELDS);
TEntity::from(&db_row);
Ok(TEntity::from(&db_row))
})?;
for itm in response {
let send_result = tx.blocking_send(itm);
if let Err(err) = send_result {
println!(
"Sending DbRow to string ended with Error. Err:{:?}. Sql:{}",
err, sql_spawned
);
}
}
Ok(())
})
.await;
if let Err(err) = result {
println!("Reading stream ended with error. Err:{:?} Sql:{}", err, sql);
}
}