#![allow(clippy::inline_always)]
use std::{
path::Path,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
use arc_swap::ArcSwap;
use bincode::Options;
use event_listener::{Event, EventListener};
use tokio::{select, sync::Notify};
use crate::{
consts::{SmallBytes, INDEX_SIZE, MIN_LOG_SIZE},
error::Result,
formats::{Header, Log, UuidIndex},
raw::RawMap,
util::{bincode_option, BincodeOptions},
ErrorType, TopicBuilder,
};
#[derive(Debug)]
pub struct Shared {
pub conf: TopicBuilder,
pub event: Event,
pub stop: Notify,
map: ArcSwap<SharedMap>,
}
impl Shared {
pub fn new(conf: TopicBuilder, map: Arc<SharedMap>) -> Self {
Self {
conf,
event: Event::new(),
stop: Notify::new(),
map: ArcSwap::from(map),
}
}
pub fn swap_map(&self, map: Arc<SharedMap>) -> Arc<SharedMap> {
self.map.swap(map)
}
pub fn map(&self) -> Arc<SharedMap> {
self.map.load_full()
}
pub fn offset(&self) -> usize {
self.map.load().offset()
}
pub fn subscribe(&self) -> EventListener {
self.event.listen()
}
}
#[derive(Debug)]
pub struct SharedMap {
map: RawMap,
offset: AtomicUsize,
finished: AtomicBool,
}
impl SharedMap {
pub fn new(dir: &Path, name: &str, size: u64) -> Result<Self> {
let map = RawMap::new(&dir.join(name).with_extension("limlog"), size, Header::LOG)?;
let offset = AtomicUsize::new(0);
let finished = AtomicBool::new(false);
Ok(Self {
map,
offset,
finished,
})
}
#[inline(always)]
pub fn offset(&self) -> usize {
self.offset.load(Ordering::Acquire)
}
#[inline(always)]
pub fn offset_relaxed(&self) -> usize {
self.offset.load(Ordering::Relaxed)
}
#[inline]
#[allow(clippy::mut_from_ref)]
pub unsafe fn mut_slice(&self) -> &mut [u8] {
let at = self.offset();
debug_assert!(at <= self.map.len());
let len = self.map.len() - at;
std::slice::from_raw_parts_mut(self.map.as_mut_ptr().add(at), len)
}
#[inline]
pub fn slice(&self, from: usize) -> &[u8] {
let at = self.offset();
let from = from.min(at);
unsafe { self.map.range(from, at - from) }
}
pub fn commit(&self, len: usize) -> Result<()> {
self.map.flush_range(self.offset(), len)?;
self.offset.fetch_add(len, Ordering::AcqRel);
Ok(())
}
#[inline]
pub fn remaining(&self) -> usize {
self.map.len() - self.offset_relaxed()
}
#[inline]
pub fn finish(&self) -> Result<()> {
self.finished.store(true, Ordering::Release);
self.map.flush_sync()?;
Ok(())
}
#[inline]
pub fn is_finished(&self) -> bool {
self.finished.load(Ordering::Acquire)
}
}
impl Drop for SharedMap {
fn drop(&mut self) {
unsafe { self.map.close(self.offset() as _) }.unwrap();
}
}
#[derive(Debug)]
pub struct UniqueMap {
map: RawMap,
pos: usize,
}
impl UniqueMap {
pub fn new(dir: &Path, name: &str, size: u64) -> Result<Self> {
let map = RawMap::new(&dir.join(name).with_extension("idx"), size, Header::INDEX)?;
Ok(Self { map, pos: 0 })
}
pub fn is_full(&self) -> bool {
self.pos + INDEX_SIZE > self.map.len()
}
#[allow(clippy::missing_panics_doc)]
pub fn push(&mut self, index: UuidIndex) -> Result<()> {
debug_assert!(!self.is_full());
let slice = unsafe { self.map.range_mut(self.pos, INDEX_SIZE) };
index.write_to(slice.try_into().unwrap());
self.map.flush_range(self.pos, INDEX_SIZE)?;
self.pos += INDEX_SIZE;
Ok(())
}
}
impl Drop for UniqueMap {
fn drop(&mut self) {
unsafe { self.map.close(self.pos as _) }.unwrap();
}
}
#[derive(Debug)]
pub struct Appender {
pub log: Arc<SharedMap>,
pub idx: UniqueMap,
pub recv: kanal::AsyncReceiver<Log>,
}
impl Appender {
pub async fn run(&mut self, mut rem: Option<Log>, shared: &Shared) -> Result<Option<Log>> {
let opt: BincodeOptions = bincode_option();
if let Some(log) = rem.take() {
if let Some(rem) = self.write_one(opt, log, &shared.event)? {
return Ok(Some(rem));
}
}
loop {
let log = select!(
received = self.recv.recv() => received?,
_ = shared.stop.notified() => return Err(ErrorType::Shutdown)
);
if let Some(rem) = self.write_one(opt, log, &shared.event)? {
return Ok(Some(rem));
}
if self.log.remaining() < MIN_LOG_SIZE || self.idx.is_full() {
return Ok(None);
}
}
}
fn write_one(&mut self, opt: BincodeOptions, log: Log, event: &Event) -> Result<Option<Log>> {
let len = log.byte_len();
if self.log.remaining() < len || self.idx.is_full() {
return Ok(Some(log));
}
let offset = self.log.offset() as _;
{
let buf = unsafe { self.log.mut_slice() };
opt.serialize_into(&mut buf[..len], &log)?;
}
self.log.commit(len)?;
self.idx.push(UuidIndex {
uuid: log.uuid,
offset,
})?;
event.notify_additional(usize::MAX);
Ok(None)
}
}
#[test]
fn test_map() {
use bincode::Options;
use uuid7::Uuid;
use crate::Log;
let dir = tempfile::tempdir().unwrap();
let map = SharedMap::new(dir.path(), "123", 100).unwrap();
let (r, w) = unsafe { (map.slice(10), map.mut_slice()) };
assert_eq!(r.len(), 0);
assert_eq!(&[0; 100], &w[..100]);
let l = Log {
uuid: Uuid::MAX,
body: SmallBytes::from_iter([114u8, 191]),
};
bincode_option().serialize_into(&mut w[..], &l).unwrap();
let counter = [
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 2, 0, 0, 0,
0, 0, 0, 0, 114, 191,
];
assert_eq!(&counter[..], &w[..counter.len()]);
}