// Copyright (C) 2022 Then Try This // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. #include #include #include #include "block_stream.h" using namespace spiralcore; using namespace std; #define BUFFER_SIZE 4096*10 #define MAX_BLOCKS 200 #define NUM_WORKERS 4 block_stream::block_stream() : m_ready(false), m_block_index(0), m_block_position(0), m_buffer_position(0), m_buffer(BUFFER_SIZE), m_block_index_offset(0), m_sent_block_index(0) { } block_stream::~block_stream() {} void block_stream::start() { cerr<<"block stream starting up"<m_thread,NULL); } usleep(500); for (u32 i=0; i=m_block_size) m_overlap=0; m_window.init(m_block_size); m_window.set_current_type(t); m_blocks.clear(); m_blocks.reserve(MAX_BLOCKS); sample dummy(block_size); for (u32 i=0; im_buffer.get_length()) { m_buffer_position=0; } // time to make a new block if (m_block_position>m_block_size-m_overlap) { sample region; // m_buffer_pos-m_block_size can be negative to deal // with the buffer wrapping... //cerr<<(s32)(m_buffer_position-m_block_size)<<" to "<MAX_BLOCKS) { m_blocks.erase(m_blocks.begin()); m_block_index_offset++; } } m_block_position++; } } const block &block_stream::get_block(u32 index) const { //cerr<<"returning "<<(index-m_block_index_offset)%m_blocks.size()<<" ("<run(); return NULL; } block_stream::worker::worker(u32 id, window *w) : m_id(id), m_status(READY), m_window(w) { m_worker_mutex = new pthread_mutex_t; pthread_mutex_init(m_worker_mutex,NULL); m_thread = new pthread_t; pthread_create(m_thread,NULL,(void*(*)(void*))_run_worker,this); } block_stream::worker::~worker() { delete m_worker_mutex; delete m_thread; } void block_stream::worker::run() { while (true) { pthread_mutex_lock(m_worker_mutex); if (m_status==ACTIVATE) { cerr<<"starting "<m_worker_mutex)==0) { if (w->m_status == worker::FINISHED) { //cerr<<"adding finished block "<m_block_index<m_block_index]=*w->m_output; w->m_status = worker::READY; } if (w->m_status == worker::READY) { w->m_region = region; w->m_status = worker::ACTIVATE; w->m_block_index = block_index; return; } pthread_mutex_unlock(w->m_worker_mutex); } } } }