1
0
mirror of https://github.com/wolfpld/tracy.git synced 2025-03-20 07:40:02 +08:00

Only one outgoing server connection is supported.

This commit is contained in:
Bartosz Taudul 2018-09-09 17:47:20 +02:00
parent 2ae2399f31
commit 806c8de463

View File

@ -1273,141 +1273,141 @@ void Worker::Exec()
return m_shutdown.load( std::memory_order_relaxed ); return m_shutdown.load( std::memory_order_relaxed );
}; };
auto lz4buf = std::make_unique<char[]>( LZ4Size );
for(;;) for(;;)
{ {
if( m_shutdown.load( std::memory_order_relaxed ) ) return; if( m_shutdown.load( std::memory_order_relaxed ) ) return;
if( !m_sock.Connect( m_addr.c_str(), "8086" ) ) continue; if( m_sock.Connect( m_addr.c_str(), "8086" ) ) break;
}
std::chrono::time_point<std::chrono::high_resolution_clock> t0; auto lz4buf = std::make_unique<char[]>( LZ4Size );
std::chrono::time_point<std::chrono::high_resolution_clock> t0;
uint64_t bytes = 0; uint64_t bytes = 0;
uint64_t decBytes = 0; uint64_t decBytes = 0;
m_data.framesBase = m_data.frames.Retrieve( 0, [this] ( uint64_t name ) { m_data.framesBase = m_data.frames.Retrieve( 0, [this] ( uint64_t name ) {
auto fd = m_slab.AllocInit<FrameData>(); auto fd = m_slab.AllocInit<FrameData>();
fd->name = name; fd->name = name;
fd->continuous = 1; fd->continuous = 1;
return fd; return fd;
}, [this] ( uint64_t name ) { }, [this] ( uint64_t name ) {
assert( name == 0 ); assert( name == 0 );
char tmp[6] = "Frame"; char tmp[6] = "Frame";
HandleFrameName( name, tmp, 5 ); HandleFrameName( name, tmp, 5 );
} ); } );
{
WelcomeMessage welcome;
if( !m_sock.Read( &welcome, sizeof( welcome ), &tv, ShouldExit ) ) goto close;
m_timerMul = welcome.timerMul;
const auto initEnd = TscTime( welcome.initEnd );
m_data.framesBase->frames.push_back( FrameEvent{ TscTime( welcome.initBegin ), -1 } );
m_data.framesBase->frames.push_back( FrameEvent{ initEnd, -1 } );
m_data.lastTime = initEnd;
m_delay = TscTime( welcome.delay );
m_resolution = TscTime( welcome.resolution );
m_onDemand = welcome.onDemand;
m_captureProgram = welcome.programName;
m_captureTime = welcome.epoch;
char dtmp[64];
time_t date = welcome.epoch;
auto lt = localtime( &date );
strftime( dtmp, 64, "%F %T", lt );
char tmp[1024];
sprintf( tmp, "%s @ %s", welcome.programName, dtmp );
m_captureName = tmp;
m_hostInfo = welcome.hostInfo;
if( welcome.onDemand != 0 )
{
OnDemandPayloadMessage onDemand;
if( !m_sock.Read( &onDemand, sizeof( onDemand ), &tv, ShouldExit ) ) goto close;
m_data.frameOffset = onDemand.frames;
}
}
m_hasData.store( true, std::memory_order_release );
LZ4_setStreamDecode( m_stream, nullptr, 0 );
m_connected.store( true, std::memory_order_relaxed );
t0 = std::chrono::high_resolution_clock::now();
for(;;)
{
if( m_shutdown.load( std::memory_order_relaxed ) ) return;
auto buf = m_buffer + m_bufferOffset;
lz4sz_t lz4sz;
if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), &tv, ShouldExit ) ) goto close;
if( !m_sock.Read( lz4buf.get(), lz4sz, &tv, ShouldExit ) ) goto close;
bytes += sizeof( lz4sz ) + lz4sz;
auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize );
assert( sz >= 0 );
decBytes += sz;
char* ptr = buf;
const char* end = buf + sz;
{ {
WelcomeMessage welcome; std::lock_guard<TracyMutex> lock( m_data.lock );
if( !m_sock.Read( &welcome, sizeof( welcome ), &tv, ShouldExit ) ) goto close; while( ptr < end )
m_timerMul = welcome.timerMul;
const auto initEnd = TscTime( welcome.initEnd );
m_data.framesBase->frames.push_back( FrameEvent{ TscTime( welcome.initBegin ), -1 } );
m_data.framesBase->frames.push_back( FrameEvent{ initEnd, -1 } );
m_data.lastTime = initEnd;
m_delay = TscTime( welcome.delay );
m_resolution = TscTime( welcome.resolution );
m_onDemand = welcome.onDemand;
m_captureProgram = welcome.programName;
m_captureTime = welcome.epoch;
char dtmp[64];
time_t date = welcome.epoch;
auto lt = localtime( &date );
strftime( dtmp, 64, "%F %T", lt );
char tmp[1024];
sprintf( tmp, "%s @ %s", welcome.programName, dtmp );
m_captureName = tmp;
m_hostInfo = welcome.hostInfo;
if( welcome.onDemand != 0 )
{ {
OnDemandPayloadMessage onDemand; auto ev = (const QueueItem*)ptr;
if( !m_sock.Read( &onDemand, sizeof( onDemand ), &tv, ShouldExit ) ) goto close; DispatchProcess( *ev, ptr );
m_data.frameOffset = onDemand.frames;
} }
m_bufferOffset += sz;
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
HandlePostponedPlots();
} }
m_hasData.store( true, std::memory_order_release ); auto t1 = std::chrono::high_resolution_clock::now();
auto td = std::chrono::duration_cast<std::chrono::milliseconds>( t1 - t0 ).count();
LZ4_setStreamDecode( m_stream, nullptr, 0 ); enum { MbpsUpdateTime = 200 };
m_connected.store( true, std::memory_order_relaxed ); if( td > MbpsUpdateTime )
t0 = std::chrono::high_resolution_clock::now();
for(;;)
{ {
if( m_shutdown.load( std::memory_order_relaxed ) ) return; std::lock_guard<TracyMutex> lock( m_mbpsData.lock );
m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() );
auto buf = m_buffer + m_bufferOffset; m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) );
lz4sz_t lz4sz; m_mbpsData.compRatio = float( bytes ) / decBytes;
if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), &tv, ShouldExit ) ) goto close; t0 = t1;
if( !m_sock.Read( lz4buf.get(), lz4sz, &tv, ShouldExit ) ) goto close; bytes = 0;
bytes += sizeof( lz4sz ) + lz4sz; decBytes = 0;
}
auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize );
assert( sz >= 0 );
decBytes += sz;
char* ptr = buf;
const char* end = buf + sz;
if( m_terminate )
{
if( m_pendingStrings != 0 || m_pendingThreads != 0 || m_pendingSourceLocation != 0 || m_pendingCallstackFrames != 0 ||
!m_pendingCustomStrings.empty() || m_data.plots.IsPending() || !m_pendingCallstacks.empty() )
{ {
std::lock_guard<TracyMutex> lock( m_data.lock ); continue;
while( ptr < end )
{
auto ev = (const QueueItem*)ptr;
DispatchProcess( *ev, ptr );
}
m_bufferOffset += sz;
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
HandlePostponedPlots();
} }
if( !m_crashed )
auto t1 = std::chrono::high_resolution_clock::now();
auto td = std::chrono::duration_cast<std::chrono::milliseconds>( t1 - t0 ).count();
enum { MbpsUpdateTime = 200 };
if( td > MbpsUpdateTime )
{ {
std::lock_guard<TracyMutex> lock( m_mbpsData.lock ); bool done = true;
m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() ); for( auto& v : m_data.threads )
m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) );
m_mbpsData.compRatio = float( bytes ) / decBytes;
t0 = t1;
bytes = 0;
decBytes = 0;
}
if( m_terminate )
{
if( m_pendingStrings != 0 || m_pendingThreads != 0 || m_pendingSourceLocation != 0 || m_pendingCallstackFrames != 0 ||
!m_pendingCustomStrings.empty() || m_data.plots.IsPending() || !m_pendingCallstacks.empty() )
{ {
continue; if( !v->stack.empty() )
}
if( !m_crashed )
{
bool done = true;
for( auto& v : m_data.threads )
{ {
if( !v->stack.empty() ) done = false;
{ break;
done = false;
break;
}
} }
if( !done ) continue;
} }
ServerQuery( ServerQueryTerminate, 0 ); if( !done ) continue;
break;
} }
ServerQuery( ServerQueryTerminate, 0 );
break;
} }
}
close: close:
m_sock.Close(); m_sock.Close();
m_connected.store( false, std::memory_order_relaxed ); m_connected.store( false, std::memory_order_relaxed );
}
} }
void Worker::ServerQuery( uint8_t type, uint64_t data ) void Worker::ServerQuery( uint8_t type, uint64_t data )