media-converter: Add media-converter gstreamer plugin

This commit is contained in:
Andrew Eikum 2020-07-28 08:20:33 -05:00
parent 2605bdf477
commit f21922d970
19 changed files with 3395 additions and 6 deletions

View file

@ -0,0 +1,882 @@
/*
* Copyright (c) 2020 Valve Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
use crate::format_hash;
use crate::HASH_SEED;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
use gst::QueryView;
use gst_audio;
use std::sync::Mutex;
use std::io;
use std::io::Read;
use std::fs;
use std::fs::OpenOptions;
#[cfg(target_arch = "x86")]
use crate::murmur3_x86_128::murmur3_x86_128_full as murmur3_128_full;
#[cfg(target_arch = "x86")]
use crate::murmur3_x86_128::murmur3_x86_128_state as murmur3_128_state;
#[cfg(target_arch = "x86_64")]
use crate::murmur3_x64_128::murmur3_x64_128_full as murmur3_128_full;
#[cfg(target_arch = "x86_64")]
use crate::murmur3_x64_128::murmur3_x64_128_state as murmur3_128_state;
use crate::fossilize;
use crate::copy_into_array;
use crate::BufferedReader;
use once_cell::sync::Lazy;
/* Algorithm
* ---------
*
* The application feeds encoded audio into XAudio2 in chunks. Since we don't have access to all
* chunks in a stream on initialization (as we do with the video converter), we continuously hash
* the stream as it is sent to us. Each "chunk" is identified as the hash of the entire stream up
* to that chunk.
*
* Since chunks are small (~2 kilobytes), this leads to a significant possibility of two different
* streams having identical intro chunks (imagine two streams that start with several seconds of
* silence). This means we need a tree of chunks. Suppose two incoming streams with chunks that
* hash as shown (i.e. identical intro chunks that diverge later):
*
* Stream 1: [AA BB CC DD]
*
* Stream 2: [AA BB YY ZZ]
*
* We record a tree and the transcoder will walk it depth-first in order to reconstruct each unique
* stream:
*
* AA => aa.ptna
* AA+BB => bb.ptna
* AA+BB+CC => cc.ptna
* AA+BB+CC+DD => dd.ptna
* AA+BB+YY => yy.ptna
* AA+BB+YY+ZZ => zz.ptna
*
* Upon playback, we chain each transcoded stream chunk together as the packets come in:
*
* AA -> start stream with aa.ptna
* BB -> play bb.ptna
* CC -> play cc.ptna
* DD -> play dd.ptna
*
* or:
*
* AA -> start stream with aa.ptna
* BB -> play bb.ptna
* YY -> play yy.ptna
* ZZ -> play zz.ptna
*
* or:
*
* AA -> start stream with aa.ptna
* NN -> not recognized, instead play blank.ptna and mark this stream as needs-transcoding
* OO -> play blank.ptna
* PP -> play blank.ptna
* When the Stream is destroyed, we'll record AA+NN+OO+PP into the needs-transcode database
* for the transcoder to convert later.
*
*
* Physical Format
* ---------------
*
* All stored values are little-endian.
*
* Transcoded audio is stored in the "transcoded" Fossilize database under the
* AUDIOCONV_FOZ_TAG_PTNADATA tag. Each chunk is stored in one entry with as many of the following
* "Proton Audio" (ptna) packets as are required to store the entire transcoded chunk:
*
* uint32_t packet_header: Information about the upcoming packet, see bitmask:
* MSB [FFFF PPPP PPPP PPPP PPPP LLLL LLLL LLLL] LSB
* L: Number of _bytes_ in this packet following this header.
* P: Number of _samples_ at the end of this packet which are padding and should be skipped.
* F: Flag bits:
* 0x1: This packet is an Opus header
* 0x2, 0x4, 0x8: Reserved for future use.
*
* If the Opus header flag is set:
* Following packet is an Opus identification header, as defined in RFC 7845 "Ogg
* Encapsulation for the Opus Audio Codec" Section 5.1.
* <https://tools.ietf.org/html/rfc7845#section-5.1>
*
* If the header flag is not set:
* Following packet is raw Opus data to be sent to an Opus decoder.
*
*
* If we encounter a stream which needs transcoding, we record the buffers and metadata in
* a Fossilize database. The database has three tag types:
*
* AUDIOCONV_FOZ_TAG_STREAM: This identifies each unique stream of buffers. For example:
* [hash(AA+BB+CC+DD)] -> [AA, BB, CC, DD]
* [hash(AA+BB+XX+YY)] -> [AA, BB, XX, YY]
*
* AUDIOCONV_FOZ_TAG_AUDIODATA: This contans the actual encoded audio data. For example:
* [AA] -> [AA's buffer data]
* [BB] -> [BB's buffer data]
*
* AUDIOCONV_FOZ_TAG_CODECINFO: This contans the codec data required to decode the buffer. Only
* the "head" of each stream is recorded. For example:
* [AA] -> [
* uint32_t wmaversion (from WAVEFORMATEX.wFormatTag)
* uint32_t bitrate (from WAVEFORMATEX.nAvgBytesPerSec)
* uint32_t channels (WAVEFORMATEX.nChannels)
* uint32_t rate (WAVEFORMATEX.nSamplesPerSec)
* uint32_t block_align (WAVEFORMATEX.nBlockAlign)
* uint32_t depth (WAVEFORMATEX.wBitsPerSample)
* char[remainder of entry] codec_data (codec data which follows WAVEFORMATEX)
* ]
*
*/
const AUDIOCONV_ENCODED_LENGTH_MASK: u32 = 0x00000fff; /* 4kB fits in here */
const AUDIOCONV_PADDING_LENGTH_MASK: u32 = 0x0ffff000; /* 120ms of samples at 48kHz fits in here */
const AUDIOCONV_PADDING_LENGTH_SHIFT: u32 = 12;
const AUDIOCONV_FLAG_MASK: u32 = 0xf0000000;
const AUDIOCONV_FLAG_HEADER: u32 = 0x10000000; /* this chunk is the Opus header */
const _AUDIOCONV_FLAG_RESERVED1: u32 = 0x20000000; /* not yet used */
const _AUDIOCONV_FLAG_RESERVED2: u32 = 0x40000000; /* not yet used */
const _AUDIOCONV_FLAG_V2: u32 = 0x80000000; /* indicates a "version 2" header, process somehow differently (TBD) */
/* properties of the "blank" audio file */
const BLANK_AUDIO_FILE_LENGTH_MS: f32 = 10.0;
const BLANK_AUDIO_FILE_RATE: f32 = 48000.0;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"protonaudioconverter",
gst::DebugColorFlags::empty(),
Some("Proton audio converter"))
});
static DUMP_FOZDB: Lazy<Mutex<Option<fossilize::StreamArchive>>> = Lazy::new(|| {
let dump_file_path = match std::env::var("MEDIACONV_AUDIO_DUMP_FILE") {
Err(_) => { return Mutex::new(None); },
Ok(c) => c,
};
let dump_file_path = std::path::Path::new(&dump_file_path);
if fs::create_dir_all(&dump_file_path.parent().unwrap()).is_err() {
return Mutex::new(None);
}
match fossilize::StreamArchive::new(&dump_file_path, OpenOptions::new().write(true).read(true).create(true), AUDIOCONV_FOZ_NUM_TAGS) {
Ok(newdb) => Mutex::new(Some(newdb)),
Err(_) => Mutex::new(None),
}
});
#[derive(Clone)]
struct NeedTranscodeHead {
wmaversion: i32,
bitrate: i32,
channels: i32,
rate: i32,
block_align: i32,
depth: i32,
codec_data: Vec<u8>
}
impl NeedTranscodeHead {
fn new_from_caps(caps: &gst::CapsRef) -> Result<Self, gst::LoggableError> {
let s = caps.get_structure(0).ok_or(gst_loggable_error!(CAT, "Caps have no WMA data!"))?;
let wmaversion = s.get_some::<i32>("wmaversion").map_err(|_| gst_loggable_error!(CAT, "Caps have no wmaversion field"))?;
let bitrate = s.get_some::<i32>("bitrate").map_err(|_| gst_loggable_error!(CAT, "Caps have no bitrate field"))?;
let channels = s.get_some::<i32>("channels").map_err(|_| gst_loggable_error!(CAT, "Caps have no channels field"))?;
let rate = s.get_some::<i32>("rate").map_err(|_| gst_loggable_error!(CAT, "Caps have no rate field"))?;
let block_align = s.get_some::<i32>("block_align").map_err(|_| gst_loggable_error!(CAT, "Caps have no block_align field"))?;
let depth = s.get_some::<i32>("depth").map_err(|_| gst_loggable_error!(CAT, "Caps have no depth field"))?;
let codec_data_buf = s.get::<gst::Buffer>("codec_data")
.map_err(|_| gst_loggable_error!(CAT, "Caps have no codec_data field"))?
.ok_or(gst_loggable_error!(CAT, "Caps have NULL codec_data field"))?;
let mapped = codec_data_buf.into_mapped_buffer_readable().unwrap();
let mut codec_data = Vec::new();
codec_data.extend_from_slice(mapped.as_slice());
Ok(NeedTranscodeHead {
wmaversion,
bitrate,
channels,
rate,
block_align,
depth,
codec_data,
})
}
fn serialize(&self) -> Vec<u8> {
let mut ret = Vec::new();
ret.extend_from_slice(&self.wmaversion.to_le_bytes());
ret.extend_from_slice(&self.bitrate.to_le_bytes());
ret.extend_from_slice(&self.channels.to_le_bytes());
ret.extend_from_slice(&self.rate.to_le_bytes());
ret.extend_from_slice(&self.block_align.to_le_bytes());
ret.extend_from_slice(&self.depth.to_le_bytes());
ret.extend(self.codec_data.iter());
ret
}
}
const AUDIOCONV_FOZ_TAG_STREAM: u32 = 0;
const AUDIOCONV_FOZ_TAG_CODECINFO: u32 = 1;
const AUDIOCONV_FOZ_TAG_AUDIODATA: u32 = 2;
const AUDIOCONV_FOZ_TAG_PTNADATA: u32 = 3;
const AUDIOCONV_FOZ_NUM_TAGS: usize = 4;
/* represents a Stream, a sequence of buffers */
struct StreamState {
hash_state: murmur3_128_state,
cur_hash: u128,
buffers: Vec<(u128, gst::MappedBuffer<gst::buffer::Readable>)>,
loop_buffers: Vec<(u128, gst::MappedBuffer<gst::buffer::Readable>)>,
codec_info: Option<NeedTranscodeHead>,
needs_dump: bool,
}
enum LoopState {
NoLoop,
Looping,
LoopEnded,
}
impl StreamState {
fn new() -> Self {
Self {
hash_state: murmur3_128_state::new(HASH_SEED),
buffers: Vec::<(u128, gst::MappedBuffer<gst::buffer::Readable>)>::new(),
loop_buffers: Vec::<(u128, gst::MappedBuffer<gst::buffer::Readable>)>::new(),
cur_hash: 0,
codec_info: None,
needs_dump: false,
}
}
fn record_buffer(&mut self, buf_hash: u128, loop_hash: u128, buffer: gst::MappedBuffer<gst::buffer::Readable>, codec_info: Option<&NeedTranscodeHead>) -> io::Result<LoopState> {
if self.codec_info.is_none() {
if let Some(codec_info) = codec_info {
self.codec_info = Some(codec_info.clone());
}
}
if self.loop_buffers.len() < self.buffers.len() &&
self.buffers[self.loop_buffers.len()].0 == loop_hash {
self.loop_buffers.push((buf_hash /* not loop_hash! */, buffer));
if self.loop_buffers.len() == self.buffers.len() {
/* full loop, just drop them */
self.loop_buffers.clear();
return Ok(LoopState::LoopEnded);
}
Ok(LoopState::Looping)
}else{
if self.loop_buffers.len() > 0 {
/* partial loop, track them and then continue */
self.buffers.append(&mut self.loop_buffers);
}
self.buffers.push((buf_hash, buffer));
self.cur_hash = murmur3_128_full(&mut (&buf_hash.to_le_bytes() as &[u8]), &mut self.hash_state)?;
Ok(LoopState::NoLoop)
}
}
fn write_to_foz(&self) -> Result<(), gst::LoggableError> {
if self.needs_dump && self.buffers.len() > 0 {
let mut db = (*DUMP_FOZDB).lock().unwrap();
let db = match &mut *db {
Some(d) => d,
None => { return Err(gst_loggable_error!(CAT, "Failed to open fossilize db!")) },
};
let mut found = db.has_entry(AUDIOCONV_FOZ_TAG_STREAM, self.cur_hash);
if !found {
/* are there any recorded streams of which this stream is a subset? */
let stream_ids = db.iter_tag(AUDIOCONV_FOZ_TAG_STREAM).cloned().collect::<Vec<u128>>();
found = stream_ids.iter().find(|stream_id| {
let mut offs = 0;
for cur_buf_id in self.buffers.iter() {
let mut buf = [0u8; 16];
let res = db.read_entry(AUDIOCONV_FOZ_TAG_STREAM, **stream_id, offs, &mut buf, fossilize::CRCCheck::WithCRC);
let buffer_id = match res {
Err(_) => { return false; }
Ok(s) => {
if s != std::mem::size_of::<u128>() {
return false;
}
u128::from_le_bytes(buf)
}
};
if buffer_id != (*cur_buf_id).0 {
return false;
}
offs += 16;
}
gst_trace!(CAT, "stream id {} is a subset of {}, so not recording stream", self.cur_hash, **stream_id);
return true;
}).is_some();
}
if !found {
gst_trace!(CAT, "recording stream id {}", self.cur_hash);
db.write_entry(AUDIOCONV_FOZ_TAG_CODECINFO,
self.buffers[0].0,
&mut self.codec_info.as_ref().unwrap().serialize().as_slice(),
fossilize::CRCCheck::WithCRC)
.map_err(|e| gst_loggable_error!(CAT, "Unable to write stream header: {:?}", e))?;
db.write_entry(AUDIOCONV_FOZ_TAG_STREAM,
self.cur_hash,
&mut StreamStateSerializer::new(self),
fossilize::CRCCheck::WithCRC)
.map_err(|e| gst_loggable_error!(CAT, "Unable to write stream: {:?}", e))?;
for buffer in self.buffers.iter() {
db.write_entry(AUDIOCONV_FOZ_TAG_AUDIODATA,
buffer.0,
&mut buffer.1.as_slice(),
fossilize::CRCCheck::WithCRC)
.map_err(|e| gst_loggable_error!(CAT, "Unable to write audio data: {:?}", e))?;
}
}
}
Ok(())
}
fn reset(&mut self) {
self.hash_state.reset();
self.buffers.clear();
self.loop_buffers.clear();
self.cur_hash = 0;
self.codec_info = None;
self.needs_dump = false;
}
}
struct StreamStateSerializer<'a> {
stream_state: &'a StreamState,
cur_idx: usize,
}
impl<'a> StreamStateSerializer<'a> {
fn new(stream_state: &'a StreamState) -> Self {
StreamStateSerializer {
stream_state,
cur_idx: 0
}
}
}
impl<'a> Read for StreamStateSerializer<'a> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
if self.cur_idx >= self.stream_state.buffers.len() {
return Ok(0);
}
out[0..std::mem::size_of::<u128>()].copy_from_slice(&self.stream_state.buffers[self.cur_idx].0.to_le_bytes());
self.cur_idx += 1;
Ok(std::mem::size_of::<u128>())
}
}
fn hash_data(dat: &[u8], len: usize, hash_state: &mut murmur3_128_state) -> io::Result<u128> {
murmur3_128_full(&mut BufferedReader::new(dat, len), hash_state)
}
struct AudioConvState {
sent_header: bool,
codec_data: Option<NeedTranscodeHead>,
hash_state: murmur3_128_state,
loop_hash_state: murmur3_128_state,
stream_state: StreamState,
read_fozdb: Option<fossilize::StreamArchive>,
}
impl AudioConvState {
fn new() -> Result<AudioConvState, gst::LoggableError> {
let read_fozdb_path = std::env::var("MEDIACONV_AUDIO_TRANSCODED_FILE").map_err(|_| {
gst_loggable_error!(CAT, "MEDIACONV_AUDIO_TRANSCODED_FILE is not set!")
})?;
let read_fozdb = match fossilize::StreamArchive::new(&read_fozdb_path, OpenOptions::new().read(true), AUDIOCONV_FOZ_NUM_TAGS) {
Ok(s) => Some(s),
Err(_) => None,
};
Ok(AudioConvState {
sent_header: false,
codec_data: None,
hash_state: murmur3_128_state::new(HASH_SEED),
loop_hash_state: murmur3_128_state::new(HASH_SEED),
stream_state: StreamState::new(),
read_fozdb,
})
}
fn reset(&mut self) {
if let Err(e) = self.stream_state.write_to_foz() {
e.log();
}
self.stream_state.reset();
self.hash_state.reset();
self.loop_hash_state.reset();
}
fn open_transcode_file(&mut self, buffer: gst::Buffer) -> io::Result<(Box<[u8]>, f32)> {
let mapped = buffer.into_mapped_buffer_readable().unwrap();
let buf_len = mapped.get_size();
let hash = hash_data(mapped.as_slice(), buf_len, &mut self.hash_state)
.map_err(|e|{ gst_warning!(CAT, "Hashing buffer failed! {}", e); io::ErrorKind::Other })?;
let loop_hash = hash_data(mapped.as_slice(), buf_len, &mut self.loop_hash_state)
.map_err(|e|{ gst_warning!(CAT, "Hashing buffer failed! {}", e); io::ErrorKind::Other })?;
let try_loop = match self.stream_state.record_buffer(hash, loop_hash, mapped, Some(self.codec_data.as_ref().unwrap()))? {
LoopState::NoLoop => { self.loop_hash_state.reset(); false },
LoopState::Looping => { true },
LoopState::LoopEnded => { self.loop_hash_state.reset(); true },
};
if try_loop {
gst_log!(CAT, "Buffer hash: {} (loop: {})", format_hash(hash), format_hash(loop_hash));
}else{
gst_log!(CAT, "Buffer hash: {}", format_hash(hash));
}
/* try to read transcoded data */
if let Some(read_fozdb) = &mut self.read_fozdb {
if let Ok(transcoded_size) = read_fozdb.entry_size(AUDIOCONV_FOZ_TAG_PTNADATA, hash) {
/* success */
let mut buf = vec![0u8; transcoded_size].into_boxed_slice();
if let Ok(_) = read_fozdb.read_entry(AUDIOCONV_FOZ_TAG_PTNADATA, hash, 0, &mut buf, fossilize::CRCCheck::WithoutCRC) {
return Ok((buf, 0.0));
}
}
if try_loop {
if let Ok(transcoded_size) = read_fozdb.entry_size(AUDIOCONV_FOZ_TAG_PTNADATA, loop_hash) {
/* success */
let mut buf = vec![0u8; transcoded_size].into_boxed_slice();
if let Ok(_) = read_fozdb.read_entry(AUDIOCONV_FOZ_TAG_PTNADATA, loop_hash, 0, &mut buf, fossilize::CRCCheck::WithoutCRC) {
return Ok((buf, 0.0));
}
}
}
}
/* if we can't, return the blank file */
self.stream_state.needs_dump = true;
let buf = Box::new(include_bytes!("../blank.ptna").clone());
/* calculate average expected length of this buffer */
let codec_data = self.codec_data.as_ref().unwrap();
let mut repeat_count = if codec_data.bitrate > 0 {
let buffer_time_ms = (buf_len * 8 /* to bits */ * 1000 /* to ms */) as f32
/ (codec_data.bitrate as f32);
/* repeat the known-length blank file enough times to fill the expected length */
buffer_time_ms / BLANK_AUDIO_FILE_LENGTH_MS
} else {
/* invalid bitrate, so just play the file once */
0.0
};
/* scale to output rate */
repeat_count *= codec_data.rate as f32 / BLANK_AUDIO_FILE_RATE;
Ok((buf, repeat_count))
}
}
struct AudioConv {
state: Mutex<Option<AudioConvState>>,
sinkpad: gst::Pad,
srcpad: gst::Pad,
}
impl ObjectSubclass for AudioConv {
const NAME: &'static str = "ProtonAudioConverter";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
AudioConv::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|audioconv, element| audioconv.chain(pad, element, buffer)
)
})
.event_function(|pad, parent, event| {
AudioConv::catch_panic_pad_function(
parent,
|| false,
|audioconv, element| audioconv.sink_event(pad, element, event)
)
}).build();
let templ = klass.get_pad_template("src").unwrap();
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
.query_function(|pad, parent, query| {
AudioConv::catch_panic_pad_function(
parent,
|| false,
|audioconv, element| audioconv.src_query(pad, element, query)
)
})
.activatemode_function(|pad, parent, mode, active| {
AudioConv::catch_panic_pad_function(
parent,
|| Err(gst_loggable_error!(CAT, "Panic activating srcpad with mode")),
|audioconv, element| audioconv.src_activatemode(pad, element, mode, active)
)
}).build();
AudioConv {
state: Mutex::new(None),
sinkpad,
srcpad,
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Proton audio converter",
"Codec/Parser",
"Converts audio for Proton",
"Andrew Eikum <aeikum@codeweavers.com>");
let mut caps = gst::Caps::new_empty();
{
let caps = caps.get_mut().unwrap();
caps.append(gst::Caps::builder("audio/x-wma").build());
}
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps).unwrap();
klass.add_pad_template(sink_pad_template);
let caps = gst::Caps::builder("audio/x-opus").build();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps).unwrap();
klass.add_pad_template(src_pad_template);
}
}
impl ObjectImpl for AudioConv {
glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.sinkpad).unwrap();
element.add_pad(&self.srcpad).unwrap();
}
}
impl ElementImpl for AudioConv {
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_log!(CAT, obj: element, "State transition: {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
/* do runtime setup */
{
/* open fozdb here; this is the right place to fail and opening may be
* expensive */
let db = (*DUMP_FOZDB).lock().unwrap();
if (*db).is_none() {
gst_error!(CAT, "Failed to open fossilize db!");
return Err(gst::StateChangeError);
}
}
let new_state = AudioConvState::new().map_err(|err| {
err.log();
return gst::StateChangeError;
})?;
let mut state = self.state.lock().unwrap();
assert!((*state).is_none());
*state = Some(new_state);
},
gst::StateChange::ReadyToNull => {
/* do runtime teardown */
let old_state = self.state.lock().unwrap().take(); // dispose of state
if let Some(old_state) = old_state {
if old_state.stream_state.write_to_foz().is_err() {
gst_warning!(CAT, "Error writing out stream data!");
}
}
},
_ => (),
};
self.parent_change_state(element, transition)
/* XXX on ReadyToNull, sodium drops state _again_ here... why? */
}
}
impl AudioConv {
fn chain(
&self,
_pad: &gst::Pad,
_element: &gst::Element,
buffer: gst::Buffer
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, "Handling buffer {:?}", buffer);
let mut state = self.state.lock().unwrap();
let mut state = match &mut *state {
Some(s) => s,
None => { return Err(gst::FlowError::Error); },
};
let (ptnadata, mut repeat_count) = state.open_transcode_file(buffer).map_err(|_| {
gst_error!(CAT, "ERROR! Failed to read transcoded audio! Things will go badly..."); gst::FlowError::Error
})?;
let mut offs: usize = 0;
loop {
if offs >= ptnadata.len() {
if repeat_count > 0.0 {
/* TODO: we're sending the whole partial packet. we could set padding instead */
repeat_count -= 1.0;
offs = 0;
continue;
} else {
break;
}
}
if offs + 4 >= ptnadata.len() {
gst_warning!(CAT, "Short read on ptna header?");
break;
}
let packet_hdr = u32::from_le_bytes(copy_into_array(&ptnadata[offs..offs + 4]));
offs += 4;
let (flags, padding_len, encoded_len) =
((packet_hdr & AUDIOCONV_FLAG_MASK),
(packet_hdr & AUDIOCONV_PADDING_LENGTH_MASK) >> AUDIOCONV_PADDING_LENGTH_SHIFT,
(packet_hdr & AUDIOCONV_ENCODED_LENGTH_MASK) as usize);
if offs + encoded_len > ptnadata.len() {
gst_warning!(CAT, "Short read on ptna data?");
break;
}
let pkt_is_header = (flags & AUDIOCONV_FLAG_HEADER) != 0;
if pkt_is_header && state.sent_header {
/* only send one header */
offs += encoded_len;
continue;
}
/* TODO: can we use a gstbuffer cache here? */
let mut buffer = gst::Buffer::with_size(encoded_len as usize).unwrap();
if !pkt_is_header && padding_len > 0 {
gst_audio::AudioClippingMeta::add(buffer.get_mut().unwrap(), gst::format::Default(Some(0)), gst::format::Default(Some(padding_len as u64)));
}
let mut writable = buffer.into_mapped_buffer_writable().unwrap();
writable.as_mut_slice().copy_from_slice(&ptnadata[offs..offs + encoded_len]);
gst_log!(CAT, "pushing one packet of len {}", encoded_len);
self.srcpad.push(writable.into_buffer())?;
if pkt_is_header {
state.sent_header = true;
}
offs += encoded_len;
}
Ok(gst::FlowSuccess::Ok)
}
fn sink_event(
&self,
pad: &gst::Pad,
element: &gst::Element,
event: gst::Event
) -> bool {
gst_log!(CAT, obj:pad, "Got an event {:?}", event);
match event.view() {
EventView::Caps(event_caps) => {
let mut state = self.state.lock().unwrap();
if let Some(state) = &mut *state {
let head = match NeedTranscodeHead::new_from_caps(&event_caps.get_caps()){
Ok(h) => h,
Err(e) => {
gst_error!(CAT, "Invalid WMA caps!");
e.log();
return false;
},
};
state.codec_data = Some(head);
};
drop(state);
let mut caps = gst::Caps::new_empty();
{
let caps = caps.get_mut().unwrap();
let s = gst::Structure::builder("audio/x-opus")
.field("channel-mapping-family", &0i32)
.build();
caps.append_structure(s);
}
self.srcpad.push_event(gst::event::Caps::new(&caps))
},
EventView::FlushStop(_) => {
let mut state = self.state.lock().unwrap();
if let Some(state) = &mut *state {
state.reset();
};
drop(state);
pad.event_default(Some(element), event)
},
_ => pad.event_default(Some(element), event)
}
}
fn src_query(
&self,
pad: &gst::Pad,
element: &gst::Element,
query: &mut gst::QueryRef) -> bool
{
gst_log!(CAT, obj: pad, "got query: {:?}", query);
match query.view_mut() {
QueryView::Scheduling(mut q) => {
let mut peer_query = gst::query::Scheduling::new();
let res = self.sinkpad.peer_query(&mut peer_query);
if ! res {
return res;
}
let (flags, min, max, align) = peer_query.get_result();
q.set(flags, min, max, align);
true
},
_ => pad.query_default(Some(element), query)
}
}
fn src_activatemode(
&self,
_pad: &gst::Pad,
_element: &gst::Element,
mode: gst::PadMode,
active: bool
) -> Result<(), gst::LoggableError> {
self.sinkpad
.activate_mode(mode, active)?;
Ok(())
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"protonaudioconverter",
gst::Rank::Marginal,
AudioConv::get_type())
}

View file

@ -0,0 +1,218 @@
/*
* Copyright (c) 2020 Valve Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
use once_cell::sync::Lazy;
/* Opus is a great fit for our usecase except for one problem: it only supports a few samplerates.
* Notably it doesn't support 44100 Hz, which is a very frequently used samplerate. This bin
* provides a capssetter element which will override the rate we get from Opus with the rate the
* application requested. Similarly, on the transcoder side, we just encode the audio as if it were
* at 48 kHz, even if it is actually at 44.1 kHz.
*
* The downside to this is a small decrease in audio quality. If Opus is most responsive between 20
* Hz and 20 kHz, then when 44.1 audio is converted to 48, we'll gain noise between 18-20 Hz
* (although WMA probably already filtered that out) and start to lose audio above 18,375 kHz. This
* is at the very edge of human hearing, so we're unlikely to lose any noticeable quality.
*
* Resampling is an option, but has some problems. It's significantly more complicated, and more
* CPU-intensive. Also, XAudio2 buffers can be started and ended at arbitrary points, so if we
* start moving audio data from one buffer into another due to resampling, it may result in audible
* artifacts. I think just encoding at the wrong rate is the best compromise. If the application
* actually cared about audio quality, they probably would not have used WMA in the first place.
*/
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"protonaudioconverterbin",
gst::DebugColorFlags::empty(),
Some("Proton audio converter bin"))
});
struct AudioConvBin {
audioconv: gst::Element,
opusdec: gst::Element,
capssetter: gst::Element,
srcpad: gst::GhostPad,
sinkpad: gst::GhostPad,
}
impl ObjectSubclass for AudioConvBin {
const NAME: &'static str = "ProtonAudioConverterBin";
type ParentType = gst::Bin;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let srcpad = gst::GhostPad::builder_with_template(&templ, Some("src")).build();
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::GhostPad::builder_with_template(&templ, Some("sink"))
.event_function(|pad, parent, event| {
AudioConvBin::catch_panic_pad_function(
parent,
|| false,
|audioconvbin, element| audioconvbin.sink_event(pad, element, event)
)
}).build();
let audioconv = gst::ElementFactory::make("protonaudioconverter", None).unwrap();
let opusdec = gst::ElementFactory::make("opusdec", None).unwrap();
let capssetter = gst::ElementFactory::make("capssetter", None).unwrap();
AudioConvBin {
audioconv,
opusdec,
capssetter,
srcpad,
sinkpad,
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata("Proton audio converter with rate fixup",
"Codec/Parser",
"Converts audio for Proton, fixing up samplerates",
"Andrew Eikum <aeikum@codeweavers.com>");
let mut caps = gst::Caps::new_empty();
{
let caps = caps.get_mut().unwrap();
caps.append(gst::Caps::builder("audio/x-wma").build());
}
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps).unwrap();
klass.add_pad_template(sink_pad_template);
let caps = gst::Caps::builder("audio/x-raw").build();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps).unwrap();
klass.add_pad_template(src_pad_template);
}
}
impl ObjectImpl for AudioConvBin {
glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let bin = obj.downcast_ref::<gst::Bin>().unwrap();
bin.add(&self.audioconv).unwrap();
bin.add(&self.opusdec).unwrap();
bin.add(&self.capssetter).unwrap();
self.audioconv.link(&self.opusdec).unwrap();
self.opusdec.link(&self.capssetter).unwrap();
self.sinkpad
.set_target(Some(&self.audioconv.get_static_pad("sink").unwrap()))
.unwrap();
self.srcpad
.set_target(Some(&self.capssetter.get_static_pad("src").unwrap()))
.unwrap();
bin.add_pad(&self.sinkpad).unwrap();
bin.add_pad(&self.srcpad).unwrap();
}
}
impl BinImpl for AudioConvBin { }
impl ElementImpl for AudioConvBin { }
impl AudioConvBin {
fn sink_event(
&self,
pad: &gst::GhostPad,
element: &gst::Element,
event: gst::Event
) -> bool {
match event.view() {
EventView::Caps(event_caps) => {
/* set up capssetter with this rate */
if let Some(s) = event_caps.get_caps().get_structure(0) {
if let Ok(override_rate) = s.get_some::<i32>("rate") {
let mut rate_caps = gst::Caps::new_empty();
{
let rate_caps = rate_caps.get_mut().unwrap();
let s = gst::Structure::builder("audio/x-raw")
.field("rate", &override_rate)
.build();
rate_caps.append_structure(s);
}
self.capssetter.set_property("caps",
&rate_caps).unwrap();
}else{
gst_warning!(CAT, "event has no rate");
}
} else {
gst_warning!(CAT, "event has no structure");
}
/* forward on to the real pad */
self.audioconv.get_static_pad("sink").unwrap()
.send_event(event)
},
_ => pad.event_default(Some(element), event)
}
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"protonaudioconverterbin",
gst::Rank::Marginal + 1,
AudioConvBin::get_type()
)
}

View file

@ -0,0 +1,427 @@
/*
* Copyright (c) 2020 Valve Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Based on "Fossilize," which is
* Copyright (c) 2018-2019 Hans-Kristian Arntzen
* https://github.com/ValveSoftware/Fossilize/
*/
/* This is a read/write implementation of the Fossilize database format.
*
* https://github.com/ValveSoftware/Fossilize/
*
* That C++ implementation is specific to Vulkan, while this one tries to be generic to store any
* type of data.
*
* FIXME: It should probably live in that repo or in a separate project.
*/
use std::fs;
use std::io;
use std::io::Read;
use std::io::Write;
use std::io::Seek;
use std::fs::OpenOptions;
use std::convert::From;
use std::collections::HashMap;
use crc32fast;
use crate::*;
/* Fossilize StreamArchive database format version 6:
*
* The file consists of a header, followed by an unlimited series of "entries".
*
* All multi-byte entities are little-endian.
*
* The file header is as follows:
*
* Field Type Description
* ----- ---- -----------
* magic_number uint8_t[12] Constant value: "\x81""FOSSILIZEDB"
* version uint32_t StreamArchive version: 6
*
*
* Each entry follows this format:
*
* Field Type Description
* ----- ---- -----------
* name unsigned char[40] Application-defined entry identifier, stored in hexadecimal big-endian
* ASCII. Usually N-char tag followed by (40 - N)-char hash.
* stored_size uint32_t Size of the payload as stored in this file.
* flags uint32_t Flags for this entry (e.g. compression). See below.
* crc32 uint32_t CRC32 of the payload as stored in this file.
* payload_size uint32_t Size of this payload after decompression.
* payload uint8_t[stored_size] Entry data.
*
* The flags field may contain:
* 0x1: No compression.
* 0x2: Deflate compression.
*/
const FOSSILIZE_MAGIC: [u8; 12] = [0x81, 0x46, 0x4f, 0x53, 0x53, 0x49, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x42];
const FOSSILIZE_MIN_COMPAT_VERSION: u8 = 5;
const FOSSILIZE_VERSION: u8 = 6;
const MAGIC_LEN_BYTES: usize = 12 + 4;
const FOSSILIZE_COMPRESSION_NONE: u32 = 1;
const _FOSSILIZE_COMPRESSION_DEFLATE: u32 = 2;
#[derive(Debug)]
pub enum Error {
NotImplemented,
IOError(io::Error),
CorruptDatabase,
DataTooLarge,
InvalidTag,
EntryNotFound,
FailedChecksum,
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Error {
Error::IOError(e)
}
}
type FossilizeHash = u128;
const _FOSSILIZEHASH_ASCII_LEN: usize = (128 / 8) * 2;
trait ToAscii {
fn to_ascii_bytes(&self) -> Vec<u8>;
fn from_ascii_bytes(b: &[u8]) -> Result<Self, Error>
where Self: std::marker::Sized;
}
impl ToAscii for FossilizeHash {
fn to_ascii_bytes(&self) -> Vec<u8> {
format_hash(*self).into_bytes()
}
fn from_ascii_bytes(b: &[u8]) -> Result<Self, Error> {
let s = String::from_utf8(b.to_vec())
.map_err(|_| Error::CorruptDatabase)?;
Self::from_str_radix(&s, 16)
.map_err(|_| Error::CorruptDatabase)
}
}
type FossilizeTag = u32;
const FOSSILIZETAG_ASCII_LEN: usize = (32 / 8) * 2;
impl ToAscii for FossilizeTag {
fn to_ascii_bytes(&self) -> Vec<u8> {
format!("{:08x}", *self).into_bytes()
}
fn from_ascii_bytes(b: &[u8]) -> Result<Self, Error> {
let s = String::from_utf8(b.to_vec())
.map_err(|_| Error::CorruptDatabase)?;
Self::from_str_radix(&s, 16)
.map_err(|_| Error::CorruptDatabase)
}
}
const PAYLOAD_NAME_LEN_BYTES: usize = 40;
struct PayloadInfo {
size: u32,
compression: u32,
crc: u32,
full_size: u32,
}
const PAYLOAD_HEADER_LEN_BYTES: usize = 4 * 4;
impl PayloadInfo {
fn new_from_slice(dat: &[u8]) -> Self {
Self {
size: u32::from_le_bytes(copy_into_array(&dat[0..4])),
compression: u32::from_le_bytes(copy_into_array(&dat[4..8])),
crc: u32::from_le_bytes(copy_into_array(&dat[8..12])),
full_size: u32::from_le_bytes(copy_into_array(&dat[12..16])),
}
}
fn to_slice(&self) -> [u8; PAYLOAD_HEADER_LEN_BYTES] {
let mut ret = [0u8; PAYLOAD_HEADER_LEN_BYTES];
ret[0..4].copy_from_slice(&self.size.to_le_bytes());
ret[4..8].copy_from_slice(&self.compression.to_le_bytes());
ret[8..12].copy_from_slice(&self.crc.to_le_bytes());
ret[12..16].copy_from_slice(&self.full_size.to_le_bytes());
ret
}
}
pub struct PayloadEntry {
offset: u64,
payload_info: PayloadInfo,
}
impl PayloadEntry {
fn new_from_slice(offset: u64, dat: &[u8]) -> Self {
Self {
offset,
payload_info: PayloadInfo::new_from_slice(dat),
}
}
}
pub struct StreamArchive {
file: fs::File,
seen_blobs: Vec<HashMap<FossilizeHash, PayloadEntry>>,
write_pos: u64,
}
pub enum CRCCheck {
WithoutCRC,
WithCRC,
}
impl StreamArchive {
pub fn new<P: AsRef<std::path::Path>>(filename: P, fileopts: &OpenOptions, num_tags: usize) -> Result<Self, Error> {
let file = fileopts.open(filename)?;
let mut seen_blobs = Vec::new();
for _ in 0..num_tags {
seen_blobs.push(HashMap::new());
}
let mut ret = Self {
file,
seen_blobs,
write_pos: 0,
};
ret.prepare()?;
Ok(ret)
}
pub fn prepare(&mut self) -> Result<(), Error> {
self.write_pos = self.file.seek(io::SeekFrom::Start(0))?;
if self.file.metadata().unwrap().len() > 0 {
let mut magic_and_version = [0 as u8; MAGIC_LEN_BYTES];
self.file.read_exact(&mut magic_and_version)?;
let version = magic_and_version[15];
if magic_and_version[0..12] != FOSSILIZE_MAGIC ||
version < FOSSILIZE_MIN_COMPAT_VERSION ||
version > FOSSILIZE_VERSION {
return Err(Error::CorruptDatabase);
}
loop {
let mut name_and_header = [0u8; PAYLOAD_NAME_LEN_BYTES + PAYLOAD_HEADER_LEN_BYTES];
let res = self.file.read_exact(&mut name_and_header);
if let Err(fail) = res {
if fail.kind() == io::ErrorKind::UnexpectedEof {
break;
}
return Err(Error::IOError(fail));
}
let name = &name_and_header[0..PAYLOAD_NAME_LEN_BYTES];
let tag: usize = FossilizeTag::from_ascii_bytes(&name[0..FOSSILIZETAG_ASCII_LEN])? as usize;
let hash = FossilizeHash::from_ascii_bytes(&name[FOSSILIZETAG_ASCII_LEN..])?;
let payload_entry = PayloadEntry::new_from_slice(
self.file.seek(io::SeekFrom::Current(0))?,
&name_and_header[PAYLOAD_NAME_LEN_BYTES..]
);
let res = self.file.seek(io::SeekFrom::Current(payload_entry.payload_info.size as i64));
match res {
Ok(p) => {
self.write_pos = p;
self.seen_blobs[tag].insert(hash, payload_entry);
},
Err(e) => {
/* truncated chunk is not fatal */
if e.kind() != io::ErrorKind::UnexpectedEof {
return Err(Error::IOError(e));
}
},
}
}
} else {
/* new file, write foz header */
self.file.write_all(&FOSSILIZE_MAGIC)?;
self.file.write_all(&[0u8, 0u8, 0u8, FOSSILIZE_VERSION])?;
self.write_pos = MAGIC_LEN_BYTES as u64;
}
Ok(())
}
pub fn has_entry(&self, tag: FossilizeTag, hash: FossilizeHash) -> bool {
self.seen_blobs[tag as usize].contains_key(&hash)
}
pub fn iter_tag(&self, tag: FossilizeTag) -> std::collections::hash_map::Keys<FossilizeHash, PayloadEntry> {
self.seen_blobs[tag as usize].keys()
}
pub fn entry_size(&self, tag: FossilizeTag, hash: FossilizeHash) -> Result<usize, Error> {
match self.seen_blobs[tag as usize].get(&hash) {
None => Err(Error::EntryNotFound),
Some(e) => Ok(e.payload_info.full_size as usize),
}
}
pub fn read_entry(&mut self, tag: FossilizeTag, hash: FossilizeHash, offset: u64, buf: &mut [u8], crc_opt: CRCCheck) -> Result<usize, Error> {
if tag as usize >= self.seen_blobs.len() {
return Err(Error::InvalidTag);
}
let entry = &self.seen_blobs[tag as usize].get(&hash);
let entry = match entry {
None => { return Err(Error::EntryNotFound); }
Some(e) => e,
};
if entry.payload_info.compression != FOSSILIZE_COMPRESSION_NONE {
return Err(Error::NotImplemented);
}
if offset >= entry.payload_info.full_size as u64 {
return Ok(0);
}
self.file.seek(io::SeekFrom::Start(entry.offset + offset))?;
let to_copy = std::cmp::min(entry.payload_info.full_size as usize - offset as usize, buf.len());
self.file.read_exact(&mut buf[0..to_copy])
.map_err(|e| Error::IOError(e))?;
if entry.payload_info.crc != 0 {
if let CRCCheck::WithCRC = crc_opt {
let mut crc_hasher = crc32fast::Hasher::new();
crc_hasher.update(&buf[0..to_copy]);
let got_crc = crc_hasher.finalize();
if got_crc != entry.payload_info.crc {
return Err(Error::FailedChecksum);
}
}
}
Ok(to_copy)
}
pub fn write_entry(&mut self, tag: FossilizeTag, hash: FossilizeHash, data: &mut dyn Read, crc_opt: CRCCheck) -> Result<(), Error> {
if self.has_entry(tag, hash) {
return Ok(());
}
self.file.seek(io::SeekFrom::Start(self.write_pos))?;
/* write entry name */
let mut name = [0u8; PAYLOAD_NAME_LEN_BYTES];
name[0..FOSSILIZETAG_ASCII_LEN].copy_from_slice(&tag.to_ascii_bytes());
name[FOSSILIZETAG_ASCII_LEN..].copy_from_slice(&hash.to_ascii_bytes());
self.file.write_all(&name)?;
/* allocate payload info space */
let payload_info_pos = self.file.seek(io::SeekFrom::Current(0))?;
let payload_info = PayloadInfo {
size: u32::max_value(), /* will be filled later */
compression: FOSSILIZE_COMPRESSION_NONE,
crc: 0, /* will be filled later */
full_size: u32::max_value(), /* will be filled later */
};
self.file.write_all(&payload_info.to_slice())?;
/* write data */
let mut payload_entry = PayloadEntry {
offset: self.file.seek(io::SeekFrom::Current(0))?,
payload_info,
};
const BUFFER_COPY_BYTES: usize = 8 * 1024 * 1024; /* tuneable */
let mut buf = box_array![0u8; BUFFER_COPY_BYTES];
let mut size: usize = 0;
let mut crc_hasher = crc32fast::Hasher::new();
loop {
let readed = data.read(&mut *buf)?;
if readed == 0 {
break;
}
if size + readed > u32::max_value() as usize {
/* Fossilize format only supports 4 GiB entries */
return Err(Error::DataTooLarge);
}
size += readed;
self.file.write_all(&buf[0..readed])?;
if let CRCCheck::WithCRC = crc_opt {
crc_hasher.update(&buf[0..readed]);
}
}
self.write_pos = self.file.seek(io::SeekFrom::Current(0))?;
/* seek back and fill in the size */
self.file.seek(io::SeekFrom::Start(payload_info_pos))?;
payload_entry.payload_info.size = size as u32;
payload_entry.payload_info.full_size = size as u32;
if let CRCCheck::WithCRC = crc_opt {
payload_entry.payload_info.crc = crc_hasher.finalize();
}
self.file.write_all(&payload_entry.payload_info.to_slice())?;
/* success. record entry and exit */
self.seen_blobs[tag as usize].insert(hash, payload_entry);
Ok(())
}
}

161
media-converter/src/lib.rs Normal file
View file

@ -0,0 +1,161 @@
/*
* Copyright (c) 2020 Valve Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#[macro_use]
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_base as gst_base;
extern crate gstreamer_video as gst_video;
extern crate gstreamer_audio as gst_audio;
extern crate once_cell;
use std::io;
use std::io::Read;
#[cfg(target_arch = "x86")]
mod murmur3_x86_128;
#[cfg(target_arch = "x86_64")]
mod murmur3_x64_128;
mod videoconv;
mod audioconv;
mod audioconvbin;
mod fossilize;
// copy_into_array:
//
// Copyright (c) 2020 Stu Small
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software
// and associated documentation files (the "Software"), to deal in the Software without
// restriction, including without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or
// substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
// BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
fn copy_into_array<A, T>(slice: &[T]) -> A
where
A: Default + AsMut<[T]>,
T: Copy,
{
let mut a = A::default();
<A as AsMut<[T]>>::as_mut(&mut a).copy_from_slice(slice);
a
}
/* rust has a hard time with large heap allocations. below macro works around that.
*
* by @simias from https://github.com/rust-lang/rust/issues/53827 */
#[macro_export]
macro_rules! box_array {
($val:expr ; $len:expr) => {{
// Use a generic function so that the pointer cast remains type-safe
fn vec_to_boxed_array<T>(vec: Vec<T>) -> Box<[T; $len]> {
let boxed_slice = vec.into_boxed_slice();
let ptr = ::std::boxed::Box::into_raw(boxed_slice) as *mut [T; $len];
unsafe { Box::from_raw(ptr) }
}
vec_to_boxed_array(vec![$val; $len])
}};
}
/* you MUST use this to consistently format the hash bytes into a string */
fn format_hash(hash: u128) -> String {
return format!("{:032x}", hash);
}
/* changing this will invalidate the cache. you MUST clear it. */
const HASH_SEED: u32 = 0x4AA61F63;
pub struct BufferedReader<'a> {
dat: &'a [u8],
len: usize,
ofs: usize,
}
impl<'a> BufferedReader<'a> {
fn new(dat: &'a [u8], len: usize) -> Self {
BufferedReader {
dat,
len,
ofs: 0,
}
}
}
impl<'a> Read for BufferedReader<'a> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
let to_copy = std::cmp::min(self.len - self.ofs, out.len());
if to_copy == 0 {
return Ok(0);
}
if to_copy == out.len() {
out.copy_from_slice(&self.dat[self.ofs..(self.ofs + to_copy)]);
}else{
out[0..to_copy].copy_from_slice(&self.dat[self.ofs..(self.ofs + to_copy)]);
}
self.ofs += to_copy;
Ok(to_copy)
}
}
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
videoconv::register(plugin)?;
audioconvbin::register(plugin)?;
audioconv::register(plugin)?;
Ok(())
}
gst_plugin_define!(
protonmediaconverter,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MIT/X11",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);

View file

@ -0,0 +1,232 @@
// Copyright (c) 2020 Stu Small
//
// Modified to return its internal state for continuous hashing:
// Copyright (c) 2020 Andrew Eikum <aeikum@codeweavers.com> for CodeWeavers
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
use std::io::{Read, Result};
use std::ops::Shl;
use crate::copy_into_array;
#[allow(non_camel_case_types)]
pub struct murmur3_x64_128_state {
seed: u32,
h1: u64,
h2: u64,
processed: usize,
}
impl murmur3_x64_128_state {
pub fn new(seed: u32) -> Self {
murmur3_x64_128_state {
seed,
h1: seed as u64,
h2: seed as u64,
processed: 0,
}
}
#[allow(dead_code)]
pub fn reset(&mut self) {
self.h1 = self.seed as u64;
self.h2 = self.seed as u64;
self.processed = 0;
}
}
/// Use the x64 variant of the 128 bit murmur3 to hash some [Read] implementation.
///
/// # Example
/// ```
/// use std::io::Cursor;
/// use murmur3::murmur3_x64_128;
/// let hash_result = murmur3_x64_128(&mut Cursor::new("hello world"), 0);
/// ```
pub fn murmur3_x64_128<T: Read>(source: &mut T, seed: u32) -> Result<u128> {
let mut state = murmur3_x64_128_state::new(seed);
murmur3_x64_128_full(source, &mut state)
}
pub fn murmur3_x64_128_full<T: Read>(source: &mut T, state: &mut murmur3_x64_128_state) -> Result<u128> {
const C1: u64 = 0x87c3_7b91_1142_53d5;
const C2: u64 = 0x4cf5_ad43_2745_937f;
const C3: u64 = 0x52dc_e729;
const C4: u64 = 0x3849_5ab5;
const R1: u32 = 27;
const R2: u32 = 31;
const R3: u32 = 33;
const M: u64 = 5;
let mut h1: u64 = state.h1;
let mut h2: u64 = state.h2;
let mut buf = [0; 16];
let mut processed: usize = state.processed;
loop {
let read = source.read(&mut buf[..])?;
processed += read;
if read == 16 {
let k1 = u64::from_le_bytes(copy_into_array(&buf[0..8]));
let k2 = u64::from_le_bytes(copy_into_array(&buf[8..]));
h1 ^= k1.wrapping_mul(C1).rotate_left(R2).wrapping_mul(C2);
h1 = h1
.rotate_left(R1)
.wrapping_add(h2)
.wrapping_mul(M)
.wrapping_add(C3);
h2 ^= k2.wrapping_mul(C2).rotate_left(R3).wrapping_mul(C1);
h2 = h2
.rotate_left(R2)
.wrapping_add(h1)
.wrapping_mul(M)
.wrapping_add(C4);
} else if read == 0 {
state.h1 = h1;
state.h2 = h2;
state.processed = processed;
h1 ^= processed as u64;
h2 ^= processed as u64;
h1 = h1.wrapping_add(h2);
h2 = h2.wrapping_add(h1);
h1 = fmix64(h1);
h2 = fmix64(h2);
h1 = h1.wrapping_add(h2);
h2 = h2.wrapping_add(h1);
let x = ((h2 as u128) << 64) | (h1 as u128);
return Ok(x);
} else {
let mut k1 = 0;
let mut k2 = 0;
if read >= 15 {
k2 ^= (buf[14] as u64).shl(48);
}
if read >= 14 {
k2 ^= (buf[13] as u64).shl(40);
}
if read >= 13 {
k2 ^= (buf[12] as u64).shl(32);
}
if read >= 12 {
k2 ^= (buf[11] as u64).shl(24);
}
if read >= 11 {
k2 ^= (buf[10] as u64).shl(16);
}
if read >= 10 {
k2 ^= (buf[9] as u64).shl(8);
}
if read >= 9 {
k2 ^= buf[8] as u64;
k2 = k2.wrapping_mul(C2).rotate_left(33).wrapping_mul(C1);
h2 ^= k2;
}
if read >= 8 {
k1 ^= (buf[7] as u64).shl(56);
}
if read >= 7 {
k1 ^= (buf[6] as u64).shl(48);
}
if read >= 6 {
k1 ^= (buf[5] as u64).shl(40);
}
if read >= 5 {
k1 ^= (buf[4] as u64).shl(32);
}
if read >= 4 {
k1 ^= (buf[3] as u64).shl(24);
}
if read >= 3 {
k1 ^= (buf[2] as u64).shl(16);
}
if read >= 2 {
k1 ^= (buf[1] as u64).shl(8);
}
if read >= 1 {
k1 ^= buf[0] as u64;
}
k1 = k1.wrapping_mul(C1);
k1 = k1.rotate_left(31);
k1 = k1.wrapping_mul(C2);
h1 ^= k1;
}
}
}
fn fmix64(k: u64) -> u64 {
const C1: u64 = 0xff51_afd7_ed55_8ccd;
const C2: u64 = 0xc4ce_b9fe_1a85_ec53;
const R: u32 = 33;
let mut tmp = k;
tmp ^= tmp >> R;
tmp = tmp.wrapping_mul(C1);
tmp ^= tmp >> R;
tmp = tmp.wrapping_mul(C2);
tmp ^= tmp >> R;
tmp
}
#[cfg(test)]
mod tests {
use super::*;
use std::cmp::min;
use std::io;
use std::io::Read;
const TEST_SEED: u32 = 0x00000000;
const CONST_DATA: [u8; 64] =
[ 0u8, 1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8,
10u8, 11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8,
20u8, 21u8, 22u8, 23u8, 24u8, 25u8, 26u8, 27u8,
30u8, 31u8, 32u8, 33u8, 34u8, 35u8, 36u8, 37u8,
40u8, 41u8, 42u8, 43u8, 44u8, 45u8, 46u8, 47u8,
50u8, 51u8, 52u8, 53u8, 54u8, 55u8, 56u8, 57u8,
60u8, 61u8, 62u8, 63u8, 64u8, 65u8, 66u8, 67u8,
70u8, 71u8, 72u8, 73u8, 74u8, 75u8, 76u8, 77u8 ];
struct TestReader<'a> {
data: &'a [u8],
ofs: usize,
}
impl<'a> TestReader<'a> {
fn new(data: &'a [u8]) -> Self {
TestReader {
data,
ofs: 0,
}
}
}
impl<'a> Read for TestReader<'a> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
let to_copy = min(out.len(), self.data.len() - self.ofs);
if to_copy > 0 {
out[0..to_copy].copy_from_slice(&self.data[self.ofs..(self.ofs + to_copy)]);
self.ofs += to_copy;
}
Ok(to_copy)
}
}
#[test]
fn test_full_hash() {
/* test with the full buffer */
let full_hash = murmur3_x64_128(&mut TestReader::new(&CONST_DATA), TEST_SEED).unwrap();
assert_eq!(full_hash, 0xeb91a9599de8337d969b1e101c4ee3bc);
/* accumulate hash across 16-byte chunks (short reads change hash due to 0-padding) */
let mut hash_state = murmur3_x64_128_state::new(TEST_SEED);
murmur3_x64_128_full(&mut TestReader::new(&CONST_DATA[0..16]), &mut hash_state).unwrap();
murmur3_x64_128_full(&mut TestReader::new(&CONST_DATA[16..32]), &mut hash_state).unwrap();
murmur3_x64_128_full(&mut TestReader::new(&CONST_DATA[32..48]), &mut hash_state).unwrap();
let hash = murmur3_x64_128_full(&mut TestReader::new(&CONST_DATA[48..64]), &mut hash_state).unwrap();
assert_eq!(hash, full_hash);
}
}

View file

@ -0,0 +1,216 @@
// Copyright (c) 2020 Stu Small
//
// Modified to return its internal state for continuous hashing:
// Copyright (c) 2020 Andrew Eikum <aeikum@codeweavers.com> for CodeWeavers
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
use std::io::{Read, Result};
use std::ops::Shl;
use crate::copy_into_array;
#[allow(non_camel_case_types)]
pub struct murmur3_x86_128_state {
seed: u32,
h1: u32,
h2: u32,
h3: u32,
h4: u32,
processed: usize,
}
impl murmur3_x86_128_state {
pub fn new(seed: u32) -> Self {
murmur3_x86_128_state {
seed,
h1: seed,
h2: seed,
h3: seed,
h4: seed,
processed: 0,
}
}
#[allow(dead_code)]
pub fn reset(&mut self) {
self.h1 = self.seed;
self.h2 = self.seed;
self.h3 = self.seed;
self.h4 = self.seed;
self.processed = 0;
}
}
/// Use the x86 variant of the 128 bit murmur3 to hash some [Read] implementation.
///
/// # Example
/// ```
/// use std::io::Cursor;
/// use murmur3::murmur3_x86_128;
/// let hash_result = murmur3_x86_128(&mut Cursor::new("hello world"), 0);
/// ```
pub fn murmur3_x86_128<T: Read>(source: &mut T, seed: u32) -> Result<u128> {
let mut state = murmur3_x86_128_state::new(seed);
murmur3_x86_128_full(source, &mut state)
}
pub fn murmur3_x86_128_full<T: Read>(source: &mut T, state: &mut murmur3_x86_128_state) -> Result<u128> {
const C1: u32 = 0x239b_961b;
const C2: u32 = 0xab0e_9789;
const C3: u32 = 0x38b3_4ae5;
const C4: u32 = 0xa1e3_8b93;
const C5: u32 = 0x561c_cd1b;
const C6: u32 = 0x0bca_a747;
const C7: u32 = 0x96cd_1c35;
const C8: u32 = 0x32ac_3b17;
const M: u32 = 5;
let mut h1: u32 = state.h1;
let mut h2: u32 = state.h2;
let mut h3: u32 = state.h3;
let mut h4: u32 = state.h4;
let mut buf = [0; 16];
let mut processed: usize = state.processed;
loop {
let read = source.read(&mut buf[..])?;
processed += read;
if read == 16 {
let k1 = u32::from_le_bytes(copy_into_array(&buf[0..4]));
let k2 = u32::from_le_bytes(copy_into_array(&buf[4..8]));
let k3 = u32::from_le_bytes(copy_into_array(&buf[8..12]));
let k4 = u32::from_le_bytes(copy_into_array(&buf[12..16]));
h1 ^= k1.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2);
h1 = h1
.rotate_left(19)
.wrapping_add(h2)
.wrapping_mul(M)
.wrapping_add(C5);
h2 ^= k2.wrapping_mul(C2).rotate_left(16).wrapping_mul(C3);
h2 = h2
.rotate_left(17)
.wrapping_add(h3)
.wrapping_mul(M)
.wrapping_add(C6);
h3 ^= k3.wrapping_mul(C3).rotate_left(17).wrapping_mul(C4);
h3 = h3
.rotate_left(15)
.wrapping_add(h4)
.wrapping_mul(M)
.wrapping_add(C7);
h4 ^= k4.wrapping_mul(C4).rotate_left(18).wrapping_mul(C1);
h4 = h4
.rotate_left(13)
.wrapping_add(h1)
.wrapping_mul(M)
.wrapping_add(C8);
} else if read == 0 {
state.h1 = h1;
state.h2 = h2;
state.h3 = h3;
state.h4 = h4;
state.processed = processed;
h1 ^= processed as u32;
h2 ^= processed as u32;
h3 ^= processed as u32;
h4 ^= processed as u32;
h1 = h1.wrapping_add(h2);
h1 = h1.wrapping_add(h3);
h1 = h1.wrapping_add(h4);
h2 = h2.wrapping_add(h1);
h3 = h3.wrapping_add(h1);
h4 = h4.wrapping_add(h1);
h1 = fmix32(h1);
h2 = fmix32(h2);
h3 = fmix32(h3);
h4 = fmix32(h4);
h1 = h1.wrapping_add(h2);
h1 = h1.wrapping_add(h3);
h1 = h1.wrapping_add(h4);
h2 = h2.wrapping_add(h1);
h3 = h3.wrapping_add(h1);
h4 = h4.wrapping_add(h1);
let x = ((h4 as u128) << 96) | ((h3 as u128) << 64) | ((h2 as u128) << 32) | h1 as u128;
return Ok(x);
} else {
let mut k1 = 0;
let mut k2 = 0;
let mut k3 = 0;
let mut k4 = 0;
if read >= 15 {
k4 ^= (buf[14] as u32).shl(16);
}
if read >= 14 {
k4 ^= (buf[13] as u32).shl(8);
}
if read >= 13 {
k4 ^= buf[12] as u32;
k4 = k4.wrapping_mul(C4).rotate_left(18).wrapping_mul(C1);
h4 ^= k4;
}
if read >= 12 {
k3 ^= (buf[11] as u32).shl(24);
}
if read >= 11 {
k3 ^= (buf[10] as u32).shl(16);
}
if read >= 10 {
k3 ^= (buf[9] as u32).shl(8);
}
if read >= 9 {
k3 ^= buf[8] as u32;
k3 = k3.wrapping_mul(C3).rotate_left(17).wrapping_mul(C4);
h3 ^= k3;
}
if read >= 8 {
k2 ^= (buf[7] as u32).shl(24);
}
if read >= 7 {
k2 ^= (buf[6] as u32).shl(16);
}
if read >= 6 {
k2 ^= (buf[5] as u32).shl(8);
}
if read >= 5 {
k2 ^= buf[4] as u32;
k2 = k2.wrapping_mul(C2).rotate_left(16).wrapping_mul(C3);
h2 ^= k2;
}
if read >= 4 {
k1 ^= (buf[3] as u32).shl(24);
}
if read >= 3 {
k1 ^= (buf[2] as u32).shl(16);
}
if read >= 2 {
k1 ^= (buf[1] as u32).shl(8);
}
if read >= 1 {
k1 ^= buf[0] as u32;
}
k1 = k1.wrapping_mul(C1);
k1 = k1.rotate_left(15);
k1 = k1.wrapping_mul(C2);
h1 ^= k1;
}
}
}
fn fmix32(k: u32) -> u32 {
const C1: u32 = 0x85eb_ca6b;
const C2: u32 = 0xc2b2_ae35;
const R1: u32 = 16;
const R2: u32 = 13;
let mut tmp = k;
tmp ^= tmp >> R1;
tmp = tmp.wrapping_mul(C1);
tmp ^= tmp >> R2;
tmp = tmp.wrapping_mul(C2);
tmp ^= tmp >> R1;
tmp
}

View file

@ -0,0 +1,709 @@
/*
* Copyright (c) 2020 Valve Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation and/or
* other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
use crate::format_hash;
use crate::HASH_SEED;
use crate::box_array;
use crate::BufferedReader;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::EventView;
use gst::QueryView;
use std::sync::Mutex;
use std::fs;
use std::io;
use std::io::Read;
use std::fs::OpenOptions;
#[cfg(target_arch = "x86")]
use crate::murmur3_x86_128::murmur3_x86_128 as murmur3_128;
#[cfg(target_arch = "x86_64")]
use crate::murmur3_x64_128::murmur3_x64_128 as murmur3_128;
use crate::fossilize;
use once_cell::sync::Lazy;
/* Algorithm
* ---------
*
* Nicely, both Quartz and Media Foundation allow us random access to the entire data stream. So we
* can easily hash the entire incoming stream and substitute it with our Ogg Theora video. If there
* is a cache miss, then we dump the entire incoming stream. In case of a cache hit, we dump
* nothing.
*
* Incoming video data is stored in the video.foz Fossilize database.
*
* Transcoded video data is stored in the transcoded_video.foz Fossilize database.
*
*
* Hashing algorithm
* -----------------
*
* We use murmur3 hash with the seed given below. We use the x32 variant for 32-bit programs, and
* the x64 variant for 64-bit programs.
*
* For speed when hashing, we specify a stride which will skip over chunks of the input. However,
* we will always hash the first "stride" number of bytes, to try to avoid collisions on smaller
* files with size between chunk and stride.
*
* For example, the 'H's below are hashed, the 'x's are skipped:
*
* let chunk = 4;
* let stride = chunk * 3;
* H = hashed, x = skipped
* [HHHH HHHH HHHH HHHH xxxx xxxx HHHH xxxx xxxx HHHH xxxx] < data stream
* ^^^^ ^^^^ ^^^^ stride prefix, hashed
* ^^^^ chunk
* ^^^^ ^^^^ ^^^^ stride
* ^^^^ chunk
* ^^^^ ^^^^ ^^^^ stride
* ^^^^ chunk
* ^^^^ ^^^^ stride
*/
/* changing any of these will invalidate the cache. you MUST clear it. */
const HASH_CHUNK_SIZE: usize = 8 * 1024 /* to kiB */ * 1024 /* to MiB */;
const HASH_STRIDE: usize = 6 * HASH_CHUNK_SIZE;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"protonvideoconverter",
gst::DebugColorFlags::empty(),
Some("Proton video converter"))
});
const VIDEOCONV_FOZ_TAG_VIDEODATA: u32 = 0;
const VIDEOCONV_FOZ_TAG_OGVDATA: u32 = 1;
const VIDEOCONV_FOZ_TAG_STREAM: u32 = 2;
const VIDEOCONV_FOZ_NUM_TAGS: usize = 3;
static DUMP_FOZDB: Lazy<Mutex<Option<fossilize::StreamArchive>>> = Lazy::new(|| {
let dump_file_path = match std::env::var("MEDIACONV_VIDEO_DUMP_FILE") {
Err(_) => { return Mutex::new(None); },
Ok(c) => c,
};
let dump_file_path = std::path::Path::new(&dump_file_path);
if fs::create_dir_all(&dump_file_path.parent().unwrap()).is_err() {
return Mutex::new(None);
}
match fossilize::StreamArchive::new(&dump_file_path, OpenOptions::new().write(true).read(true).create(true), VIDEOCONV_FOZ_NUM_TAGS) {
Ok(newdb) => Mutex::new(Some(newdb)),
Err(_) => Mutex::new(None),
}
});
struct PadReader<'a> {
pad: &'a gst::Pad,
offs: usize,
chunk: Box<[u8; HASH_CHUNK_SIZE]>,
chunk_offs: usize,
chunk_end: usize,
stride: usize, /* set to usize::max_value() to skip no bytes */
}
impl<'a> PadReader<'a> {
fn new_with_stride(pad: &'a gst::Pad, stride: usize) -> Self {
PadReader {
pad,
offs: 0,
chunk: box_array![0u8; HASH_CHUNK_SIZE],
chunk_offs: 0,
chunk_end: 0,
stride
}
}
fn new(pad: &'a gst::Pad) -> Self {
Self::new_with_stride(pad, usize::max_value())
}
}
impl<'a> Read for PadReader<'a> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
if self.chunk_offs >= self.chunk_end {
self.chunk_offs = 0;
self.chunk_end = 0;
let buf = self.pad.pull_range(self.offs as u64, HASH_CHUNK_SIZE as u32);
match buf {
Err(err) => {
/* on Eos, keep going; we'll return later */
if err != gst::FlowError::Eos {
return Err(io::Error::new(io::ErrorKind::Other, "upstream pull_range failed" /* TODO can we print our gst err here? */));
}
},
Ok(buf) => {
let to_copy;
if self.offs + buf.get_size() < self.stride {
to_copy = buf.get_size();
self.offs += to_copy;
}else if self.offs < self.stride {
to_copy = self.stride - self.offs;
self.offs = self.stride;
}else{
to_copy = buf.get_size();
self.offs += self.stride;
};
if out.len() >= to_copy {
/* copy directly into out buffer and return */
return Ok(
match buf.copy_to_slice(0, &mut out[0..to_copy]) {
Err(c) => c,
Ok(_) => to_copy,
});
} else {
self.chunk_end = match buf.copy_to_slice(0, &mut (*self.chunk)[0..to_copy]) {
Err(c) => c,
Ok(_) => to_copy,
};
}
}
}
}
if self.chunk_offs >= self.chunk_end {
return Ok(0);
}
let to_copy = std::cmp::min(self.chunk_end - self.chunk_offs, out.len());
if to_copy == 0 {
return Ok(0);
}
if to_copy == out.len() {
out.copy_from_slice(&self.chunk[self.chunk_offs..(self.chunk_offs + to_copy)]);
}else{
/* FIXME: there's probably some cool functional way to do this */
for i in 0..to_copy {
out[i] = self.chunk[self.chunk_offs + i];
}
}
self.chunk_offs += to_copy;
Ok(to_copy)
}
}
struct VideoConvState {
transcode_hash: Option<u128>,
read_fozdb: Option<fossilize::StreamArchive>,
upstream_duration: Option<u64>,
our_duration: Option<u64>,
}
impl VideoConvState {
fn new() -> Result<VideoConvState, gst::LoggableError> {
let read_fozdb_path = std::env::var("MEDIACONV_VIDEO_TRANSCODED_FILE").map_err(|_| {
gst_loggable_error!(CAT, "MEDIACONV_VIDEO_TRANSCODED_FILE is not set!")
})?;
let read_fozdb = match fossilize::StreamArchive::new(&read_fozdb_path, OpenOptions::new().read(true), VIDEOCONV_FOZ_NUM_TAGS) {
Ok(s) => Some(s),
Err(_) => None,
};
Ok(VideoConvState {
transcode_hash: None,
read_fozdb,
upstream_duration: None,
our_duration: None,
})
}
/* true if the file is transcoded; false if not */
fn begin_transcode(&mut self, hash: u128) -> bool {
if let Some(read_fozdb) = &mut self.read_fozdb {
if let Ok(transcoded_size) = read_fozdb.entry_size(VIDEOCONV_FOZ_TAG_OGVDATA, hash) {
self.transcode_hash = Some(hash);
self.our_duration = Some(transcoded_size as u64);
return true;
}
}
gst_log!(CAT, "No transcoded video for {}. Substituting a blank video.", format_hash(hash));
self.transcode_hash = None;
self.our_duration = Some(include_bytes!("../blank.ogv").len() as u64);
false
}
fn fill_buffer(&mut self, offs: usize, out: &mut [u8]) -> Result<usize, gst::LoggableError> {
match self.transcode_hash {
Some(hash) => {
let read_fozdb = self.read_fozdb.as_mut().unwrap();
read_fozdb.read_entry(VIDEOCONV_FOZ_TAG_OGVDATA, hash, offs as u64, out, fossilize::CRCCheck::WithoutCRC)
.map_err(|e| gst_loggable_error!(CAT, "Error reading ogvdata: {:?}", e))
},
None => {
let blank = include_bytes!("../blank.ogv");
let remaining = blank.len() - offs;
if out.len() <= remaining {
out.copy_from_slice(&blank[offs..(offs + out.len())]);
Ok(out.len())
}else{
/* FIXME: there's probably some cool functional way to do this */
for i in 0..remaining {
out[i] = blank[offs + i];
}
Ok(remaining)
}
},
}
}
}
struct VideoConv {
state: Mutex<Option<VideoConvState>>,
sinkpad: gst::Pad,
srcpad: gst::Pad,
}
impl ObjectSubclass for VideoConv {
const NAME: &'static str = "ProtonVideoConverter";
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.event_function(|pad, parent, event| {
VideoConv::catch_panic_pad_function(
parent,
|| false,
|videoconv, element| videoconv.sink_event(pad, element, event)
)
}).build();
let templ = klass.get_pad_template("src").unwrap();
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
.getrange_function(|pad, parent, offset, in_buf, size| {
VideoConv::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|videoconv, element| videoconv.get_range(pad, element, offset, in_buf, size)
)
})
.query_function(|pad, parent, query| {
VideoConv::catch_panic_pad_function(
parent,
|| false,
|videoconv, element| videoconv.src_query(pad, element, query)
)
})
.activatemode_function(|pad, parent, mode, active| {
VideoConv::catch_panic_pad_function(
parent,
|| Err(gst_loggable_error!(CAT, "Panic activating srcpad with mode")),
|videoconv, element| videoconv.src_activatemode(pad, element, mode, active)
)
}).build();
VideoConv {
state: Mutex::new(None),
sinkpad,
srcpad,
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"Proton video converter",
"Codec/Parser",
"Converts video for Proton",
"Andrew Eikum <aeikum@codeweavers.com>");
let mut caps = gst::Caps::new_empty();
{
let caps = caps.get_mut().unwrap();
caps.append(gst::Caps::builder("video/x-ms-asf").build());
caps.append(gst::Caps::builder("video/x-msvideo").build());
caps.append(gst::Caps::builder("video/mpeg").build());
}
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps).unwrap();
klass.add_pad_template(sink_pad_template);
let caps = gst::Caps::builder("application/ogg").build();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps).unwrap();
klass.add_pad_template(src_pad_template);
}
}
impl ObjectImpl for VideoConv {
glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.sinkpad).unwrap();
element.add_pad(&self.srcpad).unwrap();
}
}
impl ElementImpl for VideoConv {
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_log!(CAT, obj: element, "State transition: {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
/* do runtime setup */
let new_state = VideoConvState::new().map_err(|err| {
err.log();
return gst::StateChangeError;
})?;
let mut state = self.state.lock().unwrap();
assert!((*state).is_none());
*state = Some(new_state);
},
gst::StateChange::ReadyToNull => {
/* do runtime teardown */
let _ = self.state.lock().unwrap().take(); // dispose of state
},
_ => (),
};
self.parent_change_state(element, transition)
/* XXX on ReadyToNull, sodium drops state _again_ here... why? */
}
}
struct StreamSerializer<'a> {
stream: &'a Vec<u128>,
cur_idx: usize,
}
impl<'a> StreamSerializer<'a> {
fn new(stream: &'a Vec<u128>) -> Self {
StreamSerializer {
stream,
cur_idx: 0,
}
}
}
impl <'a> Read for StreamSerializer<'a> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
if self.cur_idx >= self.stream.len() {
return Ok(0)
}
out[0..std::mem::size_of::<u128>()].copy_from_slice(&self.stream[self.cur_idx].to_le_bytes());
self.cur_idx += 1;
Ok(std::mem::size_of::<u128>())
}
}
impl VideoConv {
fn get_range(
&self,
_pad: &gst::Pad,
_element: &gst::Element,
offset: u64,
in_buf: Option<&mut gst::BufferRef>,
requested_size: u32,
) -> Result<gst::PadGetRangeSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let state = match &mut *state {
Some(s) => s,
None => { return Err(gst::FlowError::Error); }
};
let ups_offset = self.duration_ours_to_upstream(&state, offset).unwrap();
let ups_requested_size = self.duration_ours_to_upstream(&state, requested_size as u64).unwrap() as u32;
/* read and ignore upstream bytes */
self.sinkpad.pull_range(ups_offset, ups_requested_size)?;
match in_buf {
Some(buf) => {
let readed;
{
let mut map = buf.map_writable().unwrap();
readed = state.fill_buffer(offset as usize, map.as_mut())
.map_err(|e| { e.log(); gst::FlowError::Error })?;
}
if readed > 0 || buf.get_size() == 0 {
buf.set_size(readed);
return Ok(gst::PadGetRangeSuccess::FilledBuffer);
}
Err(gst::FlowError::Eos)
},
None => {
/* XXX: can we use a buffer cache here? */
let b = gst::Buffer::with_size(requested_size as usize)
.map_err(|_| gst::FlowError::Error)?;
let mut map = b.into_mapped_buffer_writable().unwrap();
let readed = state.fill_buffer(offset as usize, map.as_mut())
.map_err(|e| { e.log(); gst::FlowError::Error })?;
let mut b = map.into_buffer();
if readed > 0 || b.get_size() == 0 {
b.get_mut().unwrap().set_size(readed);
return Ok(gst::PadGetRangeSuccess::NewBuffer(b));
}
Err(gst::FlowError::Eos)
}
}
}
fn sink_event(
&self,
pad: &gst::Pad,
element: &gst::Element,
event: gst::Event
) -> bool {
gst_log!(CAT, obj:pad, "Got an event {:?}", event);
match event.view() {
EventView::Caps(_) => {
let caps = gst::Caps::builder("application/ogg").build();
self.srcpad.push_event(gst::event::Caps::new(&caps))
}
_ => pad.event_default(Some(element), event)
}
}
fn query_upstream_duration(&self, state: &mut VideoConvState) {
let mut query = gst::query::Duration::new(gst::Format::Bytes);
if self.sinkpad.peer_query(&mut query) {
/* XXX: what? */
let res = query.get_result();
drop(res);
state.upstream_duration = match query.get_result() {
gst::format::GenericFormattedValue::Bytes(b) =>
*b,
_ => None,
}
}else{
gst_warning!(CAT, "upstream duration query failure");
}
}
fn duration_ours_to_upstream(&self, state: &VideoConvState, pos: u64) -> Option<u64> {
if let Some(our) = state.our_duration {
if let Some(upstream) = state.upstream_duration {
return Some(pos * upstream / our);
}
}
None
}
fn src_query(
&self,
pad: &gst::Pad,
element: &gst::Element,
query: &mut gst::QueryRef) -> bool
{
gst_log!(CAT, obj: pad, "got query: {:?}", query);
match query.view_mut() {
QueryView::Scheduling(mut q) => {
let mut peer_query = gst::query::Scheduling::new();
let res = self.sinkpad.peer_query(&mut peer_query);
if ! res {
return res;
}
let (flags, min, max, align) = peer_query.get_result();
q.set(flags, min, max, align);
q.add_scheduling_modes(&[gst::PadMode::Pull]);
true
},
QueryView::Duration(ref mut q) => {
let mut state = self.state.lock().unwrap();
let mut state = match &mut *state {
Some(s) => s,
None => { return false; }
};
if let None = state.upstream_duration {
self.query_upstream_duration(&mut state);
}
if let Some(sz) = state.our_duration {
if q.get_format() == gst::Format::Bytes {
q.set(gst::format::Bytes::from(sz));
return true
}
}
false
}
_ => pad.query_default(Some(element), query)
}
}
fn hash_upstream_data(&self) -> io::Result<u128> {
murmur3_128(&mut PadReader::new_with_stride(&self.sinkpad, HASH_STRIDE), HASH_SEED)
}
fn dump_upstream_data(&self, hash: u128) -> io::Result<()> {
let mut db = (*DUMP_FOZDB).lock().unwrap();
let db = match &mut *db {
Some(d) => d,
None => { gst_error!(CAT, "Unable to open fozdb!"); return Err(io::Error::new(io::ErrorKind::Other, "unable to open fozdb")); },
};
let mut chunks = Vec::<u128>::new();
let mut reader = PadReader::new(&self.sinkpad);
let mut buf = box_array![0u8; HASH_CHUNK_SIZE];
loop {
let readed = reader.read(&mut *buf)?;
if readed == 0 {
break;
}
let chunk_hash = murmur3_128(&mut BufferedReader::new(&*buf, readed), HASH_SEED)?;
chunks.push(chunk_hash);
db.write_entry(VIDEOCONV_FOZ_TAG_VIDEODATA, chunk_hash, &mut BufferedReader::new(&*buf, readed), fossilize::CRCCheck::WithCRC)
.map_err(|e| { gst_warning!(CAT, "Error writing video data to fozdb: {:?}", e); io::Error::new(io::ErrorKind::Other, "error writing video data to fozdb") } )?;
}
db.write_entry(VIDEOCONV_FOZ_TAG_STREAM, hash, &mut StreamSerializer::new(&chunks), fossilize::CRCCheck::WithCRC)
.map_err(|e| { gst_warning!(CAT, "Error writing stream data to fozdb: {:?}", e); io::Error::new(io::ErrorKind::Other, "error writing stream data to fozdb") } )?;
Ok(())
}
fn init_transcode(
&self,
state: &mut VideoConvState
) -> Result<(), gst::LoggableError> {
let hash = self.hash_upstream_data();
if let Ok(hash) = hash {
if !state.begin_transcode(hash) {
self.dump_upstream_data(hash).map_err(|_| gst_loggable_error!(CAT, "Dumping file to disk failed"))?;
}
}
Ok(())
}
fn src_activatemode(
&self,
_pad: &gst::Pad,
_element: &gst::Element,
mode: gst::PadMode,
active: bool
) -> Result<(), gst::LoggableError> {
self.sinkpad
.activate_mode(mode, active)?;
if mode == gst::PadMode::Pull {
let mut state = self.state.lock().unwrap();
let mut state = match &mut *state {
Some(s) => s,
None => { return Err(gst_loggable_error!(CAT, "VideoConv not yet in READY state?")); }
};
/* once we're initted in pull mode, we can start transcoding */
self.init_transcode(&mut state)?;
}
Ok(())
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"protonvideoconverter",
gst::Rank::Marginal,
VideoConv::get_type())
}