1
Fork 0

Serialize incr comp structures to file via fixed-size buffer

Reduce a large memory spike that happens during serialization by writing
the incr comp structures to file by way of a fixed-size buffer, rather
than an unbounded vector.

Effort was made to keep the instruction count close to that of the
previous implementation. However, buffered writing to a file inherently
has more overhead than writing to a vector, because each write may
result in a handleable error. To reduce this overhead, arrangements are
made so that each LEB128-encoded integer can be written to the buffer
with only one capacity and error check. Higher-level optimizations in
which entire composite structures can be written with one capacity and
error check are possible, but would require much more work.

The performance is mostly on par with the previous implementation, with
small to moderate instruction count regressions. The memory reduction is
significant, however, so it seems like a worth-while trade-off.
This commit is contained in:
Tyson Nottingham 2020-12-06 17:30:55 -08:00
parent 6526e5c772
commit 52f21791fb
11 changed files with 611 additions and 176 deletions

View file

@ -1,7 +1,10 @@
use crate::leb128::{self, read_signed_leb128, write_signed_leb128};
use crate::leb128::{self, max_leb128_len, read_signed_leb128};
use crate::serialize;
use std::borrow::Cow;
use std::fs::File;
use std::io::{self, Write};
use std::mem::MaybeUninit;
use std::path::Path;
use std::ptr;
// -----------------------------------------------------------------------------
@ -23,22 +26,35 @@ impl Encoder {
self.data
}
#[inline]
pub fn position(&self) -> usize {
self.data.len()
}
#[inline]
pub fn emit_raw_bytes(&mut self, s: &[u8]) {
self.data.extend_from_slice(s);
}
}
macro_rules! write_uleb128 {
($enc:expr, $value:expr, $fun:ident) => {{
leb128::$fun(&mut $enc.data, $value);
Ok(())
}};
}
macro_rules! write_leb128 {
($enc:expr, $value:expr, $int_ty:ty, $fun:ident) => {{
const MAX_ENCODED_LEN: usize = max_leb128_len!($int_ty);
let old_len = $enc.data.len();
if MAX_ENCODED_LEN > $enc.data.capacity() - old_len {
$enc.data.reserve(MAX_ENCODED_LEN);
}
// SAFETY: The above check and `reserve` ensures that there is enough
// room to write the encoded value to the vector's internal buffer.
unsafe {
let buf = &mut *($enc.data.as_mut_ptr().add(old_len)
as *mut [MaybeUninit<u8>; MAX_ENCODED_LEN]);
let encoded = leb128::$fun(buf, $value);
$enc.data.set_len(old_len + encoded.len());
}
macro_rules! write_sleb128 {
($enc:expr, $value:expr) => {{
write_signed_leb128(&mut $enc.data, $value as i128);
Ok(())
}};
}
@ -53,27 +69,27 @@ impl serialize::Encoder for Encoder {
#[inline]
fn emit_usize(&mut self, v: usize) -> EncodeResult {
write_uleb128!(self, v, write_usize_leb128)
write_leb128!(self, v, usize, write_usize_leb128)
}
#[inline]
fn emit_u128(&mut self, v: u128) -> EncodeResult {
write_uleb128!(self, v, write_u128_leb128)
write_leb128!(self, v, u128, write_u128_leb128)
}
#[inline]
fn emit_u64(&mut self, v: u64) -> EncodeResult {
write_uleb128!(self, v, write_u64_leb128)
write_leb128!(self, v, u64, write_u64_leb128)
}
#[inline]
fn emit_u32(&mut self, v: u32) -> EncodeResult {
write_uleb128!(self, v, write_u32_leb128)
write_leb128!(self, v, u32, write_u32_leb128)
}
#[inline]
fn emit_u16(&mut self, v: u16) -> EncodeResult {
write_uleb128!(self, v, write_u16_leb128)
write_leb128!(self, v, u16, write_u16_leb128)
}
#[inline]
@ -84,27 +100,27 @@ impl serialize::Encoder for Encoder {
#[inline]
fn emit_isize(&mut self, v: isize) -> EncodeResult {
write_sleb128!(self, v)
write_leb128!(self, v, isize, write_isize_leb128)
}
#[inline]
fn emit_i128(&mut self, v: i128) -> EncodeResult {
write_sleb128!(self, v)
write_leb128!(self, v, i128, write_i128_leb128)
}
#[inline]
fn emit_i64(&mut self, v: i64) -> EncodeResult {
write_sleb128!(self, v)
write_leb128!(self, v, i64, write_i64_leb128)
}
#[inline]
fn emit_i32(&mut self, v: i32) -> EncodeResult {
write_sleb128!(self, v)
write_leb128!(self, v, i32, write_i32_leb128)
}
#[inline]
fn emit_i16(&mut self, v: i16) -> EncodeResult {
write_sleb128!(self, v)
write_leb128!(self, v, i16, write_i16_leb128)
}
#[inline]
@ -143,10 +159,354 @@ impl serialize::Encoder for Encoder {
}
}
impl Encoder {
pub type FileEncodeResult = Result<(), io::Error>;
// `FileEncoder` encodes data to file via fixed-size buffer.
//
// When encoding large amounts of data to a file, using `FileEncoder` may be
// preferred over using `Encoder` to encode to a `Vec`, and then writing the
// `Vec` to file, as the latter uses as much memory as there is encoded data,
// while the former uses the fixed amount of memory allocated to the buffer.
// `FileEncoder` also has the advantage of not needing to reallocate as data
// is appended to it, but the disadvantage of requiring more error handling,
// which has some runtime overhead.
pub struct FileEncoder {
// The input buffer. For adequate performance, we need more control over
// buffering than `BufWriter` offers. If `BufWriter` ever offers a raw
// buffer access API, we can use it, and remove `buf` and `buffered`.
buf: Box<[MaybeUninit<u8>]>,
buffered: usize,
flushed: usize,
file: File,
}
impl FileEncoder {
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
const DEFAULT_BUF_SIZE: usize = 8192;
FileEncoder::with_capacity(path, DEFAULT_BUF_SIZE)
}
pub fn with_capacity<P: AsRef<Path>>(path: P, capacity: usize) -> io::Result<Self> {
// Require capacity at least as large as the largest LEB128 encoding
// here, so that we don't have to check or handle this on every write.
assert!(capacity >= max_leb128_len());
// Require capacity small enough such that some capacity checks can be
// done using guaranteed non-overflowing add rather than sub, which
// shaves an instruction off those code paths (on x86 at least).
assert!(capacity <= usize::MAX - max_leb128_len());
let file = File::create(path)?;
Ok(FileEncoder { buf: Box::new_uninit_slice(capacity), buffered: 0, flushed: 0, file })
}
#[inline]
pub fn position(&self) -> usize {
self.data.len()
// Tracking position this way instead of having a `self.position` field
// means that we don't have to update the position on every write call.
self.flushed + self.buffered
}
#[inline]
pub fn emit_raw_bytes(&mut self, s: &[u8]) -> FileEncodeResult {
self.write_all(s)
}
pub fn flush(&mut self) -> FileEncodeResult {
// This is basically a copy of `BufWriter::flush`. If `BufWriter` ever
// offers a raw buffer access API, we can use it, and remove this.
/// Helper struct to ensure the buffer is updated after all the writes
/// are complete. It tracks the number of written bytes and drains them
/// all from the front of the buffer when dropped.
struct BufGuard<'a> {
buffer: &'a mut [u8],
encoder_buffered: &'a mut usize,
encoder_flushed: &'a mut usize,
flushed: usize,
}
impl<'a> BufGuard<'a> {
fn new(
buffer: &'a mut [u8],
encoder_buffered: &'a mut usize,
encoder_flushed: &'a mut usize,
) -> Self {
assert_eq!(buffer.len(), *encoder_buffered);
Self { buffer, encoder_buffered, encoder_flushed, flushed: 0 }
}
/// The unwritten part of the buffer
fn remaining(&self) -> &[u8] {
&self.buffer[self.flushed..]
}
/// Flag some bytes as removed from the front of the buffer
fn consume(&mut self, amt: usize) {
self.flushed += amt;
}
/// true if all of the bytes have been written
fn done(&self) -> bool {
self.flushed >= *self.encoder_buffered
}
}
impl Drop for BufGuard<'_> {
fn drop(&mut self) {
if self.flushed > 0 {
if self.done() {
*self.encoder_flushed += *self.encoder_buffered;
*self.encoder_buffered = 0;
} else {
self.buffer.copy_within(self.flushed.., 0);
*self.encoder_flushed += self.flushed;
*self.encoder_buffered -= self.flushed;
}
}
}
}
let mut guard = BufGuard::new(
unsafe { MaybeUninit::slice_assume_init_mut(&mut self.buf[..self.buffered]) },
&mut self.buffered,
&mut self.flushed,
);
while !guard.done() {
match self.file.write(guard.remaining()) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write the buffered data",
));
}
Ok(n) => guard.consume(n),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
#[inline]
fn capacity(&self) -> usize {
self.buf.len()
}
#[inline]
fn write_one(&mut self, value: u8) -> FileEncodeResult {
// We ensure this during `FileEncoder` construction.
debug_assert!(self.capacity() >= 1);
let mut buffered = self.buffered;
if std::intrinsics::unlikely(buffered >= self.capacity()) {
self.flush()?;
buffered = 0;
}
// SAFETY: The above check and `flush` ensures that there is enough
// room to write the input to the buffer.
unsafe {
*MaybeUninit::slice_as_mut_ptr(&mut self.buf).add(buffered) = value;
}
self.buffered = buffered + 1;
Ok(())
}
#[inline]
fn write_all(&mut self, buf: &[u8]) -> FileEncodeResult {
let capacity = self.capacity();
let buf_len = buf.len();
if std::intrinsics::likely(buf_len <= capacity) {
let mut buffered = self.buffered;
if std::intrinsics::unlikely(buf_len > capacity - buffered) {
self.flush()?;
buffered = 0;
}
// SAFETY: The above check and `flush` ensures that there is enough
// room to write the input to the buffer.
unsafe {
let src = buf.as_ptr();
let dst = MaybeUninit::slice_as_mut_ptr(&mut self.buf).add(buffered);
ptr::copy_nonoverlapping(src, dst, buf_len);
}
self.buffered = buffered + buf_len;
Ok(())
} else {
self.write_all_unbuffered(buf)
}
}
fn write_all_unbuffered(&mut self, mut buf: &[u8]) -> FileEncodeResult {
if self.buffered > 0 {
self.flush()?;
}
// This is basically a copy of `Write::write_all` but also updates our
// `self.flushed`. It's necessary because `Write::write_all` does not
// return the number of bytes written when an error is encountered, and
// without that, we cannot accurately update `self.flushed` on error.
while !buf.is_empty() {
match self.file.write(buf) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
self.flushed += n;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
}
impl Drop for FileEncoder {
fn drop(&mut self) {
let _result = self.flush();
}
}
macro_rules! file_encoder_write_leb128 {
($enc:expr, $value:expr, $int_ty:ty, $fun:ident) => {{
const MAX_ENCODED_LEN: usize = max_leb128_len!($int_ty);
// We ensure this during `FileEncoder` construction.
debug_assert!($enc.capacity() >= MAX_ENCODED_LEN);
let mut buffered = $enc.buffered;
// This can't overflow. See assertion in `FileEncoder::with_capacity`.
if std::intrinsics::unlikely(buffered + MAX_ENCODED_LEN > $enc.capacity()) {
$enc.flush()?;
buffered = 0;
}
// SAFETY: The above check and flush ensures that there is enough
// room to write the encoded value to the buffer.
let buf = unsafe {
&mut *($enc.buf.as_mut_ptr().add(buffered) as *mut [MaybeUninit<u8>; MAX_ENCODED_LEN])
};
let encoded = leb128::$fun(buf, $value);
$enc.buffered = buffered + encoded.len();
Ok(())
}};
}
impl serialize::Encoder for FileEncoder {
type Error = io::Error;
#[inline]
fn emit_unit(&mut self) -> FileEncodeResult {
Ok(())
}
#[inline]
fn emit_usize(&mut self, v: usize) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, usize, write_usize_leb128)
}
#[inline]
fn emit_u128(&mut self, v: u128) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, u128, write_u128_leb128)
}
#[inline]
fn emit_u64(&mut self, v: u64) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, u64, write_u64_leb128)
}
#[inline]
fn emit_u32(&mut self, v: u32) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, u32, write_u32_leb128)
}
#[inline]
fn emit_u16(&mut self, v: u16) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, u16, write_u16_leb128)
}
#[inline]
fn emit_u8(&mut self, v: u8) -> FileEncodeResult {
self.write_one(v)
}
#[inline]
fn emit_isize(&mut self, v: isize) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, isize, write_isize_leb128)
}
#[inline]
fn emit_i128(&mut self, v: i128) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, i128, write_i128_leb128)
}
#[inline]
fn emit_i64(&mut self, v: i64) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, i64, write_i64_leb128)
}
#[inline]
fn emit_i32(&mut self, v: i32) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, i32, write_i32_leb128)
}
#[inline]
fn emit_i16(&mut self, v: i16) -> FileEncodeResult {
file_encoder_write_leb128!(self, v, i16, write_i16_leb128)
}
#[inline]
fn emit_i8(&mut self, v: i8) -> FileEncodeResult {
let as_u8: u8 = unsafe { std::mem::transmute(v) };
self.emit_u8(as_u8)
}
#[inline]
fn emit_bool(&mut self, v: bool) -> FileEncodeResult {
self.emit_u8(if v { 1 } else { 0 })
}
#[inline]
fn emit_f64(&mut self, v: f64) -> FileEncodeResult {
let as_u64: u64 = v.to_bits();
self.emit_u64(as_u64)
}
#[inline]
fn emit_f32(&mut self, v: f32) -> FileEncodeResult {
let as_u32: u32 = v.to_bits();
self.emit_u32(as_u32)
}
#[inline]
fn emit_char(&mut self, v: char) -> FileEncodeResult {
self.emit_u32(v as u32)
}
#[inline]
fn emit_str(&mut self, v: &str) -> FileEncodeResult {
self.emit_usize(v.len())?;
self.emit_raw_bytes(v.as_bytes())
}
}
@ -342,6 +702,13 @@ impl serialize::Encodable<Encoder> for [u8] {
}
}
impl serialize::Encodable<FileEncoder> for [u8] {
fn encode(&self, e: &mut FileEncoder) -> FileEncodeResult {
serialize::Encoder::emit_usize(e, self.len())?;
e.emit_raw_bytes(self)
}
}
// Specialize decoding `Vec<u8>`. This specialization also applies to decoding `Box<[u8]>`s, etc.,
// since the default implementations call `decode` to produce a `Vec<u8>` internally.
impl<'a> serialize::Decodable<Decoder<'a>> for Vec<u8> {