Skip to content

Commit a420e5a

Browse files
authored
Merge pull request #263 from ties/feature/mrt_header_parsing
Use zerocopy for mrt header parsing
2 parents 9725402 + 96a948f commit a420e5a

3 files changed

Lines changed: 108 additions & 31 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ serde = { version = "1.0", features = ["derive"], optional = true }
3838
# Parser dependencies #
3939
#######################
4040
bytes = { version = "1.7", optional = true }
41+
zerocopy = { version = "0.8", features = ["derive"], optional = true }
4142
hex = { version = "0.4.3", optional = true } # bmp/openbmp parsing
4243
oneio = { version = "0.20.0", default-features = false, features = ["http", "gz", "bz"], optional = true }
4344
regex = { version = "1", optional = true } # used in parser filter
@@ -60,6 +61,7 @@ parser = [
6061
"bytes",
6162
"chrono",
6263
"regex",
64+
"zerocopy",
6365
]
6466
cli = [
6567
"clap",

benches/internals.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
9191
})
9292
});
9393

94+
c.bench_function("updates into_raw_record_iter", |b| {
95+
b.iter(|| {
96+
let mut reader = black_box(&updates[..]);
97+
98+
BgpkitParser::from_reader(&mut reader)
99+
.into_raw_record_iter()
100+
.take(RECORD_LIMIT)
101+
.for_each(|x| {
102+
black_box(x);
103+
});
104+
})
105+
});
106+
94107
c.bench_function("rib into_record_iter", |b| {
95108
b.iter(|| {
96109
let mut reader = black_box(&rib_dump[..]);
@@ -129,6 +142,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
129142
});
130143
})
131144
});
145+
146+
c.bench_function("rib into_raw_record_iter", |b| {
147+
b.iter(|| {
148+
let mut reader = black_box(&rib_dump[..]);
149+
150+
BgpkitParser::from_reader(&mut reader)
151+
.into_raw_record_iter()
152+
.take(RECORD_LIMIT)
153+
.for_each(|x| {
154+
black_box(x);
155+
});
156+
})
157+
});
132158
}
133159

134160
criterion_group! {

src/parser/mrt/mrt_header.rs

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,71 @@
11
use crate::models::{CommonHeader, EntryType};
22
use crate::ParserError;
3-
use bytes::{Buf, BufMut, Bytes, BytesMut};
3+
use bytes::Bytes;
44
use std::io::Read;
5+
use zerocopy::big_endian::{U16, U32};
6+
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
7+
8+
/// On-wire MRT common header layout (12 bytes, network byte order).
9+
#[derive(IntoBytes, FromBytes, KnownLayout, Immutable)]
10+
#[repr(C)]
11+
struct RawMrtCommonHeader {
12+
timestamp: U32,
13+
entry_type: U16,
14+
entry_subtype: U16,
15+
length: U32,
16+
}
17+
18+
const _: () = assert!(size_of::<RawMrtCommonHeader>() == 12);
19+
20+
/// On-wire MRT header with microseconds included (16 bytes, network byte order)
21+
#[derive(IntoBytes, FromBytes, KnownLayout, Immutable)]
22+
#[repr(C)]
23+
struct RawMrtEtCommonHeader {
24+
timestamp: U32,
25+
entry_type: U16,
26+
entry_subtype: U16,
27+
length: U32,
28+
microseconds: U32,
29+
}
30+
31+
const _: () = assert!(size_of::<RawMrtEtCommonHeader>() == 16);
32+
33+
enum RawMrtHeader {
34+
Standard(RawMrtCommonHeader),
35+
Et(RawMrtEtCommonHeader),
36+
}
37+
38+
impl From<&CommonHeader> for RawMrtHeader {
39+
fn from(header: &CommonHeader) -> Self {
40+
match header.microsecond_timestamp {
41+
None => RawMrtHeader::Standard(RawMrtCommonHeader {
42+
timestamp: U32::new(header.timestamp),
43+
entry_type: U16::new(header.entry_type as u16),
44+
entry_subtype: U16::new(header.entry_subtype),
45+
length: U32::new(header.length),
46+
}),
47+
Some(microseconds) => RawMrtHeader::Et(RawMrtEtCommonHeader {
48+
timestamp: U32::new(header.timestamp),
49+
entry_type: U16::new(header.entry_type as u16),
50+
entry_subtype: U16::new(header.entry_subtype),
51+
// Internally, we use the length of the MRT payload.
52+
// However in the header, the length includes the space used by the extra timestamp
53+
// data.
54+
length: U32::new(header.length + 4),
55+
microseconds: U32::new(microseconds),
56+
}),
57+
}
58+
}
59+
}
60+
61+
impl RawMrtHeader {
62+
fn as_bytes(&self) -> &[u8] {
63+
match self {
64+
RawMrtHeader::Standard(raw) => raw.as_bytes(),
65+
RawMrtHeader::Et(raw) => raw.as_bytes(),
66+
}
67+
}
68+
}
569

670
/// Result of parsing a common header, including the raw bytes.
771
pub struct ParsedHeader {
@@ -56,14 +120,16 @@ pub fn parse_common_header<T: Read>(input: &mut T) -> Result<CommonHeader, Parse
56120
pub fn parse_common_header_with_bytes<T: Read>(input: &mut T) -> Result<ParsedHeader, ParserError> {
57121
let mut base_bytes = [0u8; 12];
58122
input.read_exact(&mut base_bytes)?;
59-
let mut data = &base_bytes[..];
60123

61-
let timestamp = data.get_u32();
62-
let entry_type_raw = data.get_u16();
63-
let entry_type = EntryType::try_from(entry_type_raw)?;
64-
let entry_subtype = data.get_u16();
124+
// Single bounds check via zerocopy instead of four sequential cursor reads.
125+
let raw = RawMrtCommonHeader::ref_from_bytes(&base_bytes)
126+
.expect("base_bytes is exactly 12 bytes with no alignment requirement");
127+
128+
let timestamp = raw.timestamp.get();
129+
let entry_type = EntryType::try_from(raw.entry_type.get())?;
130+
let entry_subtype = raw.entry_subtype.get();
65131
// the length field does not include the length of the common header
66-
let mut length = data.get_u32();
132+
let mut length = raw.length.get();
67133

68134
let (microsecond_timestamp, raw_bytes) = match &entry_type {
69135
EntryType::BGP4MP_ET => {
@@ -76,15 +142,11 @@ pub fn parse_common_header_with_bytes<T: Read>(input: &mut T) -> Result<ParsedHe
76142
));
77143
}
78144
length -= 4;
79-
let mut et_bytes = [0u8; 4];
80-
input.read_exact(&mut et_bytes)?;
81-
let microseconds = (&et_bytes[..]).get_u32();
82-
83-
// Combine base header bytes + ET bytes
84-
let mut combined = BytesMut::with_capacity(16);
85-
combined.put_slice(&base_bytes);
86-
combined.put_slice(&et_bytes);
87-
(Some(microseconds), combined.freeze())
145+
let mut combined = [0u8; 16];
146+
combined[..12].copy_from_slice(&base_bytes);
147+
input.read_exact(&mut combined[12..])?;
148+
let microseconds = u32::from_be_bytes(combined[12..16].try_into().unwrap());
149+
(Some(microseconds), Bytes::copy_from_slice(&combined))
88150
}
89151
_ => (None, Bytes::copy_from_slice(&base_bytes)),
90152
};
@@ -103,21 +165,8 @@ pub fn parse_common_header_with_bytes<T: Read>(input: &mut T) -> Result<ParsedHe
103165

104166
impl CommonHeader {
105167
pub fn encode(&self) -> Bytes {
106-
let mut bytes = BytesMut::new();
107-
bytes.put_slice(&self.timestamp.to_be_bytes());
108-
bytes.put_u16(self.entry_type as u16);
109-
bytes.put_u16(self.entry_subtype);
110-
111-
match self.microsecond_timestamp {
112-
None => bytes.put_u32(self.length),
113-
Some(microseconds) => {
114-
// When the microsecond timestamp is present, the length must be adjusted to account
115-
// for the stace used by the extra timestamp data.
116-
bytes.put_u32(self.length + 4);
117-
bytes.put_u32(microseconds);
118-
}
119-
};
120-
bytes.freeze()
168+
let raw = RawMrtHeader::from(self);
169+
Bytes::copy_from_slice(raw.as_bytes())
121170
}
122171
}
123172

0 commit comments

Comments
 (0)