Reference Documentation
This section contains technical specifications, API references, and detailed documentation for the TCP Dashboard platform.
📋 Contents
Technical Specifications
-
Project Specification - Technical specifications and requirements
- System requirements and constraints
- Database schema specifications
- API endpoint definitions
- Data format specifications
- Integration requirements
-
Aggregation Strategy - Comprehensive data aggregation documentation
- Right-aligned timestamp strategy (industry standard)
- Future leakage prevention safeguards
- Real-time vs historical processing
- Database storage patterns
- Testing methodology and examples
API References
Data Collection APIs
# BaseDataCollector API
class BaseDataCollector:
async def start() -> bool
async def stop(force: bool = False) -> None
async def restart() -> bool
def get_status() -> Dict[str, Any]
def get_health_status() -> Dict[str, Any]
def add_data_callback(data_type: DataType, callback: Callable) -> None
# CollectorManager API
class CollectorManager:
def add_collector(collector: BaseDataCollector) -> None
async def start() -> bool
async def stop() -> None
def get_status() -> Dict[str, Any]
def list_collectors() -> List[str]
Exchange Factory APIs
# Factory Pattern API
class ExchangeFactory:
@staticmethod
def create_collector(config: ExchangeCollectorConfig) -> BaseDataCollector
@staticmethod
def create_multiple_collectors(configs: List[ExchangeCollectorConfig]) -> List[BaseDataCollector]
@staticmethod
def get_supported_exchanges() -> List[str]
@staticmethod
def validate_config(config: ExchangeCollectorConfig) -> bool
# Configuration API
@dataclass
class ExchangeCollectorConfig:
exchange: str
symbol: str
data_types: List[DataType]
auto_restart: bool = True
health_check_interval: float = 30.0
store_raw_data: bool = True
custom_params: Optional[Dict[str, Any]] = None
📊 Data Schemas
Market Data Point
The standardized data structure for all market data:
@dataclass
class MarketDataPoint:
exchange: str # Exchange name (e.g., 'okx', 'binance')
symbol: str # Trading symbol (e.g., 'BTC-USDT')
timestamp: datetime # Data timestamp (UTC)
data_type: DataType # Type of data (TRADE, ORDERBOOK, etc.)
data: Dict[str, Any] # Raw data payload
Data Types
class DataType(Enum):
TICKER = "ticker" # Price and volume updates
TRADE = "trade" # Individual trade executions
ORDERBOOK = "orderbook" # Order book snapshots
CANDLE = "candle" # OHLCV candle data
BALANCE = "balance" # Account balance updates
Status Schemas
Collector Status
{
'exchange': str, # Exchange name
'status': str, # Current status (running, stopped, error)
'should_be_running': bool, # Desired state
'symbols': List[str], # Configured symbols
'data_types': List[str], # Data types being collected
'auto_restart': bool, # Auto-restart enabled
'health': {
'time_since_heartbeat': float, # Seconds since last heartbeat
'time_since_data': float, # Seconds since last data
'max_silence_duration': float # Max allowed silence
},
'statistics': {
'messages_received': int, # Total messages received
'messages_processed': int, # Successfully processed
'errors': int, # Error count
'restarts': int, # Restart count
'uptime_seconds': float, # Current uptime
'reconnect_attempts': int, # Current reconnect attempts
'last_message_time': str, # ISO timestamp
'connection_uptime': str, # Connection start time
'last_error': str, # Last error message
'last_restart_time': str # Last restart time
}
}
Health Status
{
'is_healthy': bool, # Overall health status
'issues': List[str], # List of current issues
'status': str, # Current collector status
'last_heartbeat': str, # Last heartbeat timestamp
'last_data_received': str, # Last data timestamp
'should_be_running': bool, # Expected state
'is_running': bool # Actual running state
}
🔧 Configuration Schemas
Database Configuration
{
"database": {
"url": "postgresql://user:pass@host:port/db",
"pool_size": 10,
"max_overflow": 20,
"pool_timeout": 30,
"pool_recycle": 3600
},
"tables": {
"market_data": "market_data",
"raw_trades": "raw_trades",
"collector_status": "collector_status"
}
}
Exchange Configuration
{
"exchange": "okx",
"connection": {
"public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
"ping_interval": 25.0,
"pong_timeout": 10.0,
"max_reconnect_attempts": 5,
"reconnect_delay": 5.0
},
"data_collection": {
"store_raw_data": true,
"health_check_interval": 30.0,
"auto_restart": true,
"buffer_size": 1000
},
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": true,
"data_types": ["trade", "orderbook"],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
}
]
}
Logging Configuration
{
"logging": {
"level": "INFO",
"format": "detailed",
"console_output": true,
"file_output": true,
"cleanup": true,
"max_files": 30,
"log_directory": "./logs"
},
"components": {
"data_collectors": {
"level": "INFO",
"verbose": false
},
"websocket_clients": {
"level": "DEBUG",
"verbose": true
}
}
}
🌐 Protocol Specifications
WebSocket Message Formats
OKX Message Format
{
"arg": {
"channel": "trades",
"instId": "BTC-USDT"
},
"data": [
{
"instId": "BTC-USDT",
"tradeId": "12345678",
"px": "50000.5",
"sz": "0.001",
"side": "buy",
"ts": "1697123456789"
}
]
}
Subscription Message Format
{
"op": "subscribe",
"args": [
{
"channel": "trades",
"instId": "BTC-USDT"
},
{
"channel": "books5",
"instId": "BTC-USDT"
}
]
}
Database Schemas
Market Data Table
CREATE TABLE market_data (
id SERIAL PRIMARY KEY,
exchange VARCHAR(50) NOT NULL,
symbol VARCHAR(50) NOT NULL,
data_type VARCHAR(20) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
INDEX(exchange, symbol, timestamp),
INDEX(data_type, timestamp)
);
Raw Trades Table
CREATE TABLE raw_trades (
id SERIAL PRIMARY KEY,
exchange VARCHAR(50) NOT NULL,
symbol VARCHAR(50) NOT NULL,
trade_id VARCHAR(100),
price DECIMAL(20, 8) NOT NULL,
size DECIMAL(20, 8) NOT NULL,
side VARCHAR(10) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
raw_data JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(exchange, symbol, trade_id),
INDEX(exchange, symbol, timestamp),
INDEX(timestamp)
);
📈 Performance Specifications
System Requirements
Minimum Requirements
- CPU: 2 cores, 2.0 GHz
- Memory: 4 GB RAM
- Storage: 20 GB available space
- Network: Stable internet connection (100 Mbps+)
Recommended Requirements
- CPU: 4+ cores, 3.0+ GHz
- Memory: 8+ GB RAM
- Storage: 100+ GB SSD
- Network: High-speed internet (1 Gbps+)
Performance Targets
Data Collection
- Latency: < 100ms from exchange to processing
- Throughput: 1000+ messages/second per collector
- Uptime: 99.9% availability
- Memory Usage: < 50 MB per collector
Database Operations
- Insert Rate: 10,000+ inserts/second
- Query Response: < 100ms for typical queries
- Storage Growth: ~1 GB/month per active trading pair
- Retention: 2+ years of historical data
🔒 Security Specifications
Authentication & Authorization
- API Keys: Secure storage in environment variables
- Database Access: Connection pooling with authentication
- WebSocket Connections: TLS encryption for all connections
- Logging: No sensitive data in logs
Data Protection
- Encryption: TLS 1.3 for all external communications
- Data Validation: Comprehensive input validation
- Error Handling: Secure error messages without data leakage
- Backup: Regular automated backups with encryption
🔗 Related Documentation
- Modules Documentation (
../modules/) - Implementation details - [Architecture Overview (
../architecture.md)] - System design - Exchange Documentation (
../modules/exchanges/) - Exchange integrations - Setup Guide (
../guides/) - Configuration and deployment
📞 Support
API Support
For API-related questions:
- Check Examples: Review code examples in each API section
- Test Endpoints: Use provided test scripts
- Validate Schemas: Ensure data matches specified formats
- Review Logs: Check detailed logs for API interactions
Schema Validation
For data schema issues:
# Validate data point structure
def validate_market_data_point(data_point):
required_fields = ['exchange', 'symbol', 'timestamp', 'data_type', 'data']
for field in required_fields:
if not hasattr(data_point, field):
raise ValueError(f"Missing required field: {field}")
if not isinstance(data_point.data_type, DataType):
raise ValueError("Invalid data_type")
For the complete documentation index, see the [main documentation README (../README.md)]