MT5Service - Streaming Methods (Mid-Level API)¶
⚠️ Important: Read This First¶
MT5Service is an architectural layer between high-level (Sugar) and low-level (Account) APIs.
Understanding the 5 Streaming Methods:¶
| Method | Direct Use Value | Architectural Role |
|---|---|---|
stream_ticks() |
✅ HIGH - Converts protobuf Timestamp → datetime + creates SymbolTick dataclass (8 fields) | ✅ Used by Sugar |
stream_trade_updates() |
⚪ NONE - Direct pass-through (yield data) without conversion |
✅ Used by Sugar |
stream_position_profits() |
⚪ NONE - Direct pass-through (yield data) without conversion |
✅ Used by Sugar |
stream_opened_tickets() |
⚪ NONE - Direct pass-through (yield data) without conversion |
✅ Used by Sugar |
stream_transactions() |
⚪ NONE - Direct pass-through (yield data) without conversion |
✅ Used by Sugar |
What This Means:¶
Architecture (3 layers):
MT5Sugar (HIGH) → service.stream_ticks() ← Sugar uses Service methods
MT5Service (MID) → account.on_symbol_tick() ← Service converts datetime + creates dataclass
MT5Account (LOW) → gRPC stream → protobuf objects
Value breakdown:
- ✅ HIGH (1 method):
stream_ticks()- protobuf Timestamp → datetime + SymbolTick dataclass conversion - ⚪ NONE (4 methods): Direct pass-through - just
yield datafrom Account layer
For direct MT5Service usage:
- ✅ Use
stream_ticks()- saves you from manual Timestamp conversion - ⚪ Optional: Other 4 methods - no conversion, could call Account directly
For MT5Sugar users:
- ✅ All methods work perfectly - the layer serves its architectural purpose
API Layer: MID-LEVEL - wrappers over MT5Account with automatic datetime conversion
Implementation:
These methods are implemented in src/pymt5/mt5_service.py, which wraps package/MetaRpcMT5/helpers/mt5_account.py low-level API with dataclass conversion and automatic reconnection.
Example files:
examples/2_service/05_service_streaming.py- comprehensive demo of all MT5Service streaming methods
Why These Methods Exist¶
Real Value: Mixed Processing¶
Only 1 method adds value through datetime conversion + dataclass creation, 4 methods are direct pass-through:
Problem with MT5Account (Low-level):
# stream_ticks() - MT5Account returns protobuf with Timestamp
async for data in account.on_symbol_tick(["EURUSD"], None):
# data.symbol_tick.time is protobuf Timestamp - need manual conversion!
tick_time = data.symbol_tick.time.ToDatetime() # ← Manual datetime conversion
bid = data.symbol_tick.bid
ask = data.symbol_tick.ask
# ... manual field extraction from protobuf
# Other streams - already clean protobuf objects:
async for data in account.on_trade(None):
# data is OnTradeData - already has all fields accessible
if data.HasField("position_info"):
print(f"Position: {data.position_info.ticket}")
Solution with MT5Service (Mid-level):
# stream_ticks() - automatic datetime conversion + dataclass:
async for tick in service.stream_ticks(["EURUSD"]):
# tick is SymbolTick dataclass with time already datetime!
print(f"{tick.time}: Bid={tick.bid}, Ask={tick.ask}")
# ✅ VALUE: No manual ToDatetime() needed, clean dataclass
# Other streams - direct pass-through (NO conversion):
async for trade in service.stream_trade_updates():
# ⚪ NO VALUE: Same OnTradeData protobuf as Account layer
if trade.HasField("position_info"):
print(f"Position: {trade.position_info.ticket}")
What MT5Service provides:
-
Datetime conversion + dataclass (1 method with HIGH value):
-
stream_ticks(): Converts protobuf Timestamp → Python datetime + creates SymbolTick dataclass (8 fields) -
Implementation:
tick_time = data.symbol_tick.time.ToDatetime()+yield SymbolTick(...) -
Direct pass-through (4 methods with NO value):
-
stream_trade_updates():async for data in account.on_trade(): yield data stream_position_profits():async for data in account.on_position_profit(): yield datastream_opened_tickets():async for data in account.on_positions_and_pending_orders_tickets(): yield datastream_transactions():async for data in account.on_trade_transaction(): yield data-
These literally just forward protobuf objects without any processing
-
Auto-reconnection (ALL methods):
-
All streaming methods benefit from
execute_stream_with_reconnect()infrastructure - But this is also available on Account layer
Architectural purpose:
- ✅ MT5Sugar uses these methods to maintain layered architecture (Sugar → Service → Account)
- ✅ stream_ticks() adds real value through datetime conversion
- ⚪ Other 4 methods - architectural consistency only, no processing
All 5 Streaming Methods¶
| Method | Yields | Description |
|---|---|---|
stream_ticks() |
SymbolTick |
Real-time tick data stream (time auto-converted) |
stream_trade_updates() |
OnTradeData |
Trade events (new/closed positions) |
stream_position_profits() |
OnPositionProfitData |
Real-time position P&L updates |
stream_opened_tickets() |
OnPositionsAndPendingOrdersTicketsData |
Position/order ticket updates (lightweight) |
stream_transactions() |
OnTradeTransactionData |
Detailed trade transaction events |
Key Concepts¶
Async Generators¶
All streaming methods are async generators - use async for to iterate:
Cancellation¶
Use cancellation_event (asyncio.Event) to stop streaming:
import asyncio
cancel_event = asyncio.Event()
async for tick in service.stream_ticks(["EURUSD"], cancel_event):
print(tick)
if some_condition:
cancel_event.set() # Stop streaming
Automatic Reconnection¶
All streaming methods use execute_stream_with_reconnect() internally - automatic reconnection on connection loss!
Method Signatures¶
1) stream_ticks¶
async def stream_ticks(
self,
symbols: List[str],
cancellation_event: Optional[Any] = None,
) -> AsyncIterator[SymbolTick]
Real-time tick data stream.
Args:
symbols: List of symbol names to stream (e.g.,["EURUSD", "GBPUSD"])cancellation_event: Optional asyncio.Event for stopping stream
Yields: SymbolTick dataclass with:
time: datetime (already converted from protobuf Timestamp!)bid,ask,last: float pricesvolume,time_ms,flags,volume_real: tick details
Technical: Low-level streams OnSymbolTickData with symbol_tick.time as protobuf Timestamp. This wrapper converts each Timestamp to Python datetime via ToDatetime() for every tick.
Auto-reconnection: Stream continues until cancellation_event.set() or connection loss (auto-reconnects via execute_stream_with_reconnect).
Usage:
2) stream_trade_updates¶
async def stream_trade_updates(
self,
cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]
Real-time trade events stream (new/closed positions).
Args:
cancellation_event: Optional asyncio.Event for stopping stream
Yields: OnTradeData protobuf events with:
position_info: Position details when position opens/closesorder_info: Order details when pending order placed/deleted
Technical: Server pushes OnTradeData when:
- Position opens or closes
- Pending order placed or deleted
Each event contains either position_info or order_info with full details (ticket, symbol, volume, type, etc.).
Thin wrapper: Passes through protobuf OnTradeData without conversion (minimal overhead).
Usage:
async for trade in service.stream_trade_updates():
if trade.HasField("position_info"):
print(f"Position: {trade.position_info.ticket}")
if trade.HasField("order_info"):
print(f"Order: {trade.order_info.ticket}")
3) stream_position_profits¶
async def stream_position_profits(
self,
interval_ms: int = 1000,
ignore_empty: bool = True,
cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]
Real-time position profit updates stream.
Args:
interval_ms: Polling interval in milliseconds (default: 1000ms = 1 second)ignore_empty: Skip frames with no changes (default: True)cancellation_event: Optional asyncio.Event for stopping stream
Yields: OnPositionProfitData protobuf with:
position_profits: Repeated field with ticket €™ profit mapping
Technical: Server polls positions every interval_ms and pushes updates when profit changes.
ignore_empty=True filters out frames where no position P&L changed, reducing bandwidth.
Each OnPositionProfitData contains position_profits repeated field with ticket€™profit mapping.
Usage:
async for profit_data in service.stream_position_profits(interval_ms=500):
for pos_profit in profit_data.position_profits:
print(f"Ticket {pos_profit.ticket}: ${pos_profit.profit:.2f}")
4) stream_opened_tickets¶
async def stream_opened_tickets(
self,
interval_ms: int = 1000,
cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]
Real-time position/order ticket updates stream (lightweight).
Args:
interval_ms: Polling interval in milliseconds (default: 1000ms = 1 second)cancellation_event: Optional asyncio.Event for stopping stream
Yields: OnPositionsAndPendingOrdersTicketsData protobuf with:
opened_position_tickets: List of position ticket numbers (repeated int64)opened_orders_tickets: List of pending order ticket numbers (repeated int64)
Technical: Server polls every interval_ms and pushes OnPositionsAndPendingOrdersTicketsData.
Advantage: 10-20x less bandwidth than stream_trade_updates() - use when you only need to track ticket changes.
Usage:
async for tickets in service.stream_opened_tickets(interval_ms=2000):
print(f"Positions: {list(tickets.opened_position_tickets)}")
print(f"Orders: {list(tickets.opened_orders_tickets)}")
5) stream_transactions¶
async def stream_transactions(
self,
cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]
Real-time trade transaction stream (detailed).
Args:
cancellation_event: Optional asyncio.Event for stopping stream
Yields: OnTradeTransactionData protobuf events with:
transaction_type: Transaction type (DEAL_ADD, ORDER_DELETE, etc.)- Request details and results
Technical: Server pushes OnTradeTransactionData for every trade operation step (request€™broker€™result).
More detailed than stream_trade_updates(): Includes transaction_type (DEAL_ADD, ORDER_DELETE, etc.) and request details.
Thin wrapper: Passes through protobuf OnTradeTransactionData without conversion.
Usage:
async for transaction in service.stream_transactions():
print(f"Transaction type: {transaction.transaction_type}")
🔗 Usage Examples¶
Example 1: Streaming Ticks with Auto-Converted Datetime¶
import asyncio
from pymt5 import MT5Service
async def stream_eurusd_ticks(service: MT5Service, duration_seconds: int = 60):
"""Stream EURUSD ticks for specified duration."""
cancel_event = asyncio.Event()
# Auto-stop after duration
async def auto_stop():
await asyncio.sleep(duration_seconds)
cancel_event.set()
asyncio.create_task(auto_stop())
print(f"Streaming EURUSD ticks for {duration_seconds} seconds...")
try:
async for tick in service.stream_ticks(["EURUSD"], cancel_event):
# tick.time is already datetime - no conversion needed!
timestamp = tick.time.strftime('%H:%M:%S.%f')[:-3]
spread = tick.ask - tick.bid
print(f"[{timestamp}] Bid: {tick.bid:.5f} | "
f"Ask: {tick.ask:.5f} | "
f"Spread: {spread:.5f}")
except asyncio.CancelledError:
print("Stream cancelled")
print("Streaming stopped")
Example 2: Monitoring Multiple Symbols¶
async def monitor_multiple_symbols(
service: MT5Service,
symbols: list,
duration_seconds: int = 30
):
"""Stream ticks for multiple symbols simultaneously."""
cancel_event = asyncio.Event()
# Auto-stop after duration
asyncio.create_task(auto_cancel(cancel_event, duration_seconds))
print(f"Monitoring {len(symbols)} symbols: {', '.join(symbols)}")
tick_counts = {symbol: 0 for symbol in symbols}
async for tick in service.stream_ticks(symbols, cancel_event):
# Track which symbol this tick is for
# Note: tick doesn't have symbol field, need to track via subscription
tick_counts[symbols[0]] += 1 # Simplified
if tick_counts[symbols[0]] % 10 == 0:
print(f"Received {tick_counts[symbols[0]]} ticks for {symbols[0]}")
print(f"\nTotal ticks received: {sum(tick_counts.values())}")
async def auto_cancel(event, seconds):
await asyncio.sleep(seconds)
event.set()
Example 3: Detecting New Positions with stream_trade_updates¶
async def monitor_trades(service: MT5Service):
"""Monitor for new positions and closed positions."""
print("Monitoring trade events (Ctrl+C to stop)...")
try:
async for trade in service.stream_trade_updates():
# Check if this is a position update
if trade.HasField("position_info"):
pos = trade.position_info
print(f"\nPOSITION UPDATE:")
print(f" Ticket: {pos.ticket}")
print(f" Symbol: {pos.symbol}")
print(f" Type: {'BUY' if pos.type == 0 else 'SELL'}")
print(f" Volume: {pos.volume}")
print(f" Profit: ${pos.profit:.2f}")
# Check if this is an order update
if trade.HasField("order_info"):
order = trade.order_info
print(f"\nORDER UPDATE:")
print(f" Ticket: {order.ticket}")
print(f" Symbol: {order.symbol}")
print(f" Type: {order.type}")
print(f" Volume: {order.volume_current}")
except KeyboardInterrupt:
print("\nMonitoring stopped")
Example 4: Real-time P&L Monitoring¶
async def monitor_position_profits(
service: MT5Service,
update_interval_ms: int = 500
):
"""Monitor position profits in real-time."""
print(f"Monitoring position profits (update every {update_interval_ms}ms)...")
print("Press Ctrl+C to stop\n")
total_profit = 0.0
try:
async for profit_data in service.stream_position_profits(
interval_ms=update_interval_ms,
ignore_empty=True # Only show when profit changes
):
# Calculate total profit across all positions
current_total = 0.0
print("\n" + "=" * 60)
print("POSITION PROFITS:")
print("-" * 60)
for pos_profit in profit_data.position_profits:
current_total += pos_profit.profit
print(f"Ticket #{pos_profit.ticket}: ${pos_profit.profit:+.2f}")
print("-" * 60)
print(f"TOTAL PROFIT: ${current_total:+.2f}")
# Alert on significant changes
if abs(current_total - total_profit) > 10.0:
print("ALERT: Significant P&L change!")
total_profit = current_total
except KeyboardInterrupt:
print("\n\nMonitoring stopped")
print(f"Final total profit: ${total_profit:+.2f}")
Example 5: Lightweight Ticket Monitoring¶
async def monitor_opened_tickets(service: MT5Service):
"""Monitor for new/closed positions using lightweight ticket stream."""
known_position_tickets = set()
known_order_tickets = set()
print("Monitoring opened tickets (lightweight)...")
try:
async for tickets in service.stream_opened_tickets(interval_ms=1000):
# Convert to sets for easy comparison
current_positions = set(tickets.opened_position_tickets)
current_orders = set(tickets.opened_orders_tickets)
# Detect new positions
new_positions = current_positions - known_position_tickets
if new_positions:
for ticket in new_positions:
print(f"NEW POSITION: #{ticket}")
# Detect closed positions
closed_positions = known_position_tickets - current_positions
if closed_positions:
for ticket in closed_positions:
print(f"CLOSED POSITION: #{ticket}")
# Detect new pending orders
new_orders = current_orders - known_order_tickets
if new_orders:
for ticket in new_orders:
print(f"NEW ORDER: #{ticket}")
# Detect deleted orders
deleted_orders = known_order_tickets - current_orders
if deleted_orders:
for ticket in deleted_orders:
print(f"DELETED ORDER: #{ticket}")
# Update known tickets
known_position_tickets = current_positions
known_order_tickets = current_orders
# Show current status
print(f"[Status] Positions: {len(current_positions)}, "
f"Orders: {len(current_orders)}")
except KeyboardInterrupt:
print("\nMonitoring stopped")
Example 6: Detailed Transaction Monitoring¶
async def monitor_transactions(service: MT5Service):
"""Monitor detailed trade transactions."""
print("Monitoring trade transactions (detailed)...")
try:
async for transaction in service.stream_transactions():
print(f"\nTRANSACTION:")
print(f" Type: {transaction.transaction_type}")
if transaction.HasField("trade"):
trade = transaction.trade
print(f" Action: {trade.action}")
print(f" Order: {trade.order}")
print(f" Symbol: {trade.symbol}")
print(f" Volume: {trade.volume}")
print(f" Price: {trade.price}")
if transaction.HasField("request"):
req = transaction.request
print(f" Request: {req}")
except KeyboardInterrupt:
print("\nMonitoring stopped")
Example 7: Combined Monitoring Dashboard¶
import asyncio
from datetime import datetime
async def trading_dashboard(service: MT5Service):
"""Comprehensive trading dashboard with multiple streams."""
cancel_event = asyncio.Event()
# Task 1: Monitor ticks
async def tick_monitor():
async for tick in service.stream_ticks(["EURUSD"], cancel_event):
spread = tick.ask - tick.bid
print(f"[TICK] {tick.time.strftime('%H:%M:%S')}: "
f"Bid={tick.bid:.5f}, Ask={tick.ask:.5f}, Spread={spread:.5f}")
# Task 2: Monitor trades
async def trade_monitor():
async for trade in service.stream_trade_updates(cancel_event):
if trade.HasField("position_info"):
pos = trade.position_info
print(f"[TRADE] Position {pos.ticket}: {pos.symbol} "
f"{'BUY' if pos.type == 0 else 'SELL'} {pos.volume} lots")
# Task 3: Monitor profits
async def profit_monitor():
async for profit_data in service.stream_position_profits(
interval_ms=2000,
ignore_empty=True,
cancellation_event=cancel_event
):
total = sum(p.profit for p in profit_data.position_profits)
print(f"[PROFIT] Total P&L: ${total:+.2f}")
# Run all monitors concurrently
print("Starting trading dashboard...")
print("Press Ctrl+C to stop\n")
try:
await asyncio.gather(
tick_monitor(),
trade_monitor(),
profit_monitor()
)
except KeyboardInterrupt:
print("\n\nShutting down dashboard...")
cancel_event.set()
await asyncio.sleep(1) # Give streams time to close
print("Dashboard stopped")
When to Use Each Method¶
Use stream_ticks()¶
Use when:
- Need real-time price updates
- Building price charts or indicators
- Monitoring market movements
- Time precision is important
Advantage: Time already converted to datetime!
Example:
Use stream_trade_updates()¶
Use when:
- Monitoring position opens/closes
- Tracking new orders
- Building trade notifications
- Need full position/order details
Example:
async for trade in service.stream_trade_updates():
if trade.HasField("position_info"):
print(f"New position: {trade.position_info.ticket}")
Use stream_position_profits()¶
Use when:
- Real-time P&L monitoring
- Automatic stop-loss triggers
- Profit alerts
- Building P&L dashboards
Example:
async for profit_data in service.stream_position_profits():
total = sum(p.profit for p in profit_data.position_profits)
if total < -100:
print("ALERT: Large loss!")
Use stream_opened_tickets()¶
Use when:
- Lightweight position monitoring
- Only need ticket numbers (not full details)
- Bandwidth is limited
- High-frequency monitoring
Advantage: 10-20x less bandwidth than stream_trade_updates()
Example:
async for tickets in service.stream_opened_tickets():
print(f"Open positions: {len(tickets.opened_position_tickets)}")
Use stream_transactions()¶
Use when:
- Need detailed transaction history
- Debugging trade execution
- Logging all trading activity
- Building audit trail
More detailed than stream_trade_updates()
Example:
Recommendations¶
- Use cancellation_event - Always provide way to stop streams gracefully
- Handle KeyboardInterrupt - Use try/except for clean shutdown
- Don't block the loop - Process data quickly in async for loop
- Use stream_opened_tickets() for monitoring - 10-20x more efficient
- Use stream_ticks() for prices - Time already datetime!
- Set appropriate intervals - Balance between latency and performance
- Use ignore_empty=True - Reduce bandwidth for profit streaming
Cancellation pattern:
cancel_event = asyncio.Event()
try:
async for item in service.stream_ticks(["EURUSD"], cancel_event):
if should_stop:
cancel_event.set()
except KeyboardInterrupt:
cancel_event.set()
Performance Comparison¶
| Method | Bandwidth | Update Frequency | Use Case |
|---|---|---|---|
| stream_ticks() | High | Every tick | Price monitoring |
| stream_trade_updates() | Medium | On event | Position changes |
| stream_position_profits() | Low | Configurable | P&L monitoring |
| stream_opened_tickets() | Very Low | Configurable | Lightweight monitoring |
| stream_transactions() | High | Every transaction | Detailed logging |
Best for monitoring: stream_opened_tickets() (very low bandwidth, configurable interval)
Best for prices: stream_ticks() (real-time, datetime auto-converted)
Best for P&L: stream_position_profits() with ignore_empty=True
📚 Related Sections¶
- MT5Service Overview - mid-level API overview
- Symbol Methods (Mid-Level) - getting tick snapshots
- Positions & Orders (Mid-Level) - getting position snapshots
- Trading Methods (Mid-Level) - placing orders
- Streaming Methods (Low-Level) - low-level streaming API
- MT5Service API Reference - complete mid-level API reference
Summary¶
Real Value Assessment¶
5 streaming methods with minimal processing - only 1 adds value, 4 are direct pass-through:
| Method | Value Level | What It Does |
|---|---|---|
stream_ticks() |
✅ HIGH | Converts protobuf Timestamp → datetime + creates SymbolTick dataclass (8 fields) |
stream_trade_updates() |
⚪ NONE | Direct pass-through: async for data in account.on_trade(): yield data |
stream_position_profits() |
⚪ NONE | Direct pass-through: async for data in account.on_position_profit(): yield data |
stream_opened_tickets() |
⚪ NONE | Direct pass-through: async for data in account.on_positions_and_pending_orders_tickets(): yield data |
stream_transactions() |
⚪ NONE | Direct pass-through: async for data in account.on_trade_transaction(): yield data |
Why these methods have value (or don't):
MT5Account streams protobuf objects:
# Low-level streams:
async for data in account.on_symbol_tick(...): # OnSymbolTickData with protobuf Timestamp
async for data in account.on_trade(...): # OnTradeData protobuf (already accessible)
async for data in account.on_position_profit(...): # OnPositionProfitData protobuf (already accessible)
MT5Service processing:
# Only stream_ticks() has processing:
async for tick in service.stream_ticks(...):
# ✅ Converts: data.symbol_tick.time.ToDatetime() + SymbolTick dataclass
# Other 4 methods - no processing:
async for trade in service.stream_trade_updates(...):
# ⚪ Just yields: Same OnTradeData protobuf from Account
Key insights:
-
Only 1 method adds value (stream_ticks):
-
Automatic datetime conversion saves calling
.ToDatetime()manually - SymbolTick dataclass with 8 fields is cleaner than protobuf access
-
Real functional improvement
-
4 methods add NO value (direct pass-through):
-
Literally just
async for data in account.method(): yield data - No conversion, no processing, no unpacking
- Same protobuf objects as Account layer
-
Exist only for architectural consistency (MT5Sugar needs them)
-
All methods have auto-reconnection:
-
But this is available on Account layer too via
execute_stream_with_reconnect()
Performance tips:
- Use
stream_opened_tickets()for frequent monitoring (very efficient - low bandwidth) - Use
ignore_empty=Truefor profit streaming (reduce bandwidth) - Set appropriate
interval_ms(balance latency vs performance) - Always provide
cancellation_eventfor graceful shutdown
Cancellation pattern:
cancel_event = asyncio.Event()
async for item in service.stream_ticks(symbols, cancel_event):
if should_stop:
cancel_event.set()
Bottom line:
- ✅ For direct users: Only
stream_ticks()adds value - other 4 methods could call Account directly - ⚪ Other 4 methods: No processing - consider calling Account layer directly for these
- ✅ For Sugar users: All methods serve architectural purpose perfectly
- ✅ vs MT5Account: Significant improvement only for stream_ticks() through datetime conversion