Skip to content

MT5Service - Streaming Methods (Mid-Level API)

⚠️ Important: Read This First

MT5Service is an architectural layer between high-level (Sugar) and low-level (Account) APIs.

Understanding the 5 Streaming Methods:

Method Direct Use Value Architectural Role
stream_ticks() HIGH - Converts protobuf Timestamp → datetime + creates SymbolTick dataclass (8 fields) ✅ Used by Sugar
stream_trade_updates() NONE - Direct pass-through (yield data) without conversion ✅ Used by Sugar
stream_position_profits() NONE - Direct pass-through (yield data) without conversion ✅ Used by Sugar
stream_opened_tickets() NONE - Direct pass-through (yield data) without conversion ✅ Used by Sugar
stream_transactions() NONE - Direct pass-through (yield data) without conversion ✅ Used by Sugar

What This Means:

Architecture (3 layers):

MT5Sugar (HIGH)     →  service.stream_ticks()          ← Sugar uses Service methods
MT5Service (MID)    →  account.on_symbol_tick()        ← Service converts datetime + creates dataclass
MT5Account (LOW)    →  gRPC stream → protobuf objects

Value breakdown:

  • HIGH (1 method): stream_ticks() - protobuf Timestamp → datetime + SymbolTick dataclass conversion
  • NONE (4 methods): Direct pass-through - just yield data from Account layer

For direct MT5Service usage:

  • Use stream_ticks() - saves you from manual Timestamp conversion
  • Optional: Other 4 methods - no conversion, could call Account directly

For MT5Sugar users:

  • ✅ All methods work perfectly - the layer serves its architectural purpose

API Layer: MID-LEVEL - wrappers over MT5Account with automatic datetime conversion

Implementation:

These methods are implemented in src/pymt5/mt5_service.py, which wraps package/MetaRpcMT5/helpers/mt5_account.py low-level API with dataclass conversion and automatic reconnection.

Example files:

  • examples/2_service/05_service_streaming.py - comprehensive demo of all MT5Service streaming methods

Why These Methods Exist

Real Value: Mixed Processing

Only 1 method adds value through datetime conversion + dataclass creation, 4 methods are direct pass-through:

Problem with MT5Account (Low-level):

# stream_ticks() - MT5Account returns protobuf with Timestamp
async for data in account.on_symbol_tick(["EURUSD"], None):
    # data.symbol_tick.time is protobuf Timestamp - need manual conversion!
    tick_time = data.symbol_tick.time.ToDatetime()  # ← Manual datetime conversion
    bid = data.symbol_tick.bid
    ask = data.symbol_tick.ask
    # ... manual field extraction from protobuf

# Other streams - already clean protobuf objects:
async for data in account.on_trade(None):
    # data is OnTradeData - already has all fields accessible
    if data.HasField("position_info"):
        print(f"Position: {data.position_info.ticket}")

Solution with MT5Service (Mid-level):

# stream_ticks() - automatic datetime conversion + dataclass:
async for tick in service.stream_ticks(["EURUSD"]):
    # tick is SymbolTick dataclass with time already datetime!
    print(f"{tick.time}: Bid={tick.bid}, Ask={tick.ask}")
    # ✅ VALUE: No manual ToDatetime() needed, clean dataclass

# Other streams - direct pass-through (NO conversion):
async for trade in service.stream_trade_updates():
    # ⚪ NO VALUE: Same OnTradeData protobuf as Account layer
    if trade.HasField("position_info"):
        print(f"Position: {trade.position_info.ticket}")

What MT5Service provides:

  1. Datetime conversion + dataclass (1 method with HIGH value):

  2. stream_ticks(): Converts protobuf Timestamp → Python datetime + creates SymbolTick dataclass (8 fields)

  3. Implementation: tick_time = data.symbol_tick.time.ToDatetime() + yield SymbolTick(...)

  4. Direct pass-through (4 methods with NO value):

  5. stream_trade_updates(): async for data in account.on_trade(): yield data

  6. stream_position_profits(): async for data in account.on_position_profit(): yield data
  7. stream_opened_tickets(): async for data in account.on_positions_and_pending_orders_tickets(): yield data
  8. stream_transactions(): async for data in account.on_trade_transaction(): yield data
  9. These literally just forward protobuf objects without any processing

  10. Auto-reconnection (ALL methods):

  11. All streaming methods benefit from execute_stream_with_reconnect() infrastructure

  12. But this is also available on Account layer

Architectural purpose:

  • MT5Sugar uses these methods to maintain layered architecture (Sugar → Service → Account)
  • stream_ticks() adds real value through datetime conversion
  • Other 4 methods - architectural consistency only, no processing

All 5 Streaming Methods

Method Yields Description
stream_ticks() SymbolTick Real-time tick data stream (time auto-converted)
stream_trade_updates() OnTradeData Trade events (new/closed positions)
stream_position_profits() OnPositionProfitData Real-time position P&L updates
stream_opened_tickets() OnPositionsAndPendingOrdersTicketsData Position/order ticket updates (lightweight)
stream_transactions() OnTradeTransactionData Detailed trade transaction events

Key Concepts

Async Generators

All streaming methods are async generators - use async for to iterate:

async for item in service.stream_ticks(["EURUSD"]):
    print(item)  # Process each tick as it arrives

Cancellation

Use cancellation_event (asyncio.Event) to stop streaming:

import asyncio

cancel_event = asyncio.Event()

async for tick in service.stream_ticks(["EURUSD"], cancel_event):
    print(tick)
    if some_condition:
        cancel_event.set()  # Stop streaming

Automatic Reconnection

All streaming methods use execute_stream_with_reconnect() internally - automatic reconnection on connection loss!


Method Signatures

1) stream_ticks

async def stream_ticks(
    self,
    symbols: List[str],
    cancellation_event: Optional[Any] = None,
) -> AsyncIterator[SymbolTick]

Real-time tick data stream.

Args:

  • symbols: List of symbol names to stream (e.g., ["EURUSD", "GBPUSD"])
  • cancellation_event: Optional asyncio.Event for stopping stream

Yields: SymbolTick dataclass with:

  • time: datetime (already converted from protobuf Timestamp!)
  • bid, ask, last: float prices
  • volume, time_ms, flags, volume_real: tick details

Technical: Low-level streams OnSymbolTickData with symbol_tick.time as protobuf Timestamp. This wrapper converts each Timestamp to Python datetime via ToDatetime() for every tick.

Auto-reconnection: Stream continues until cancellation_event.set() or connection loss (auto-reconnects via execute_stream_with_reconnect).

Usage:

async for tick in service.stream_ticks(["EURUSD"]):
    print(f"{tick.time}: {tick.bid}/{tick.ask}")

2) stream_trade_updates

async def stream_trade_updates(
    self,
    cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]

Real-time trade events stream (new/closed positions).

Args:

  • cancellation_event: Optional asyncio.Event for stopping stream

Yields: OnTradeData protobuf events with:

  • position_info: Position details when position opens/closes
  • order_info: Order details when pending order placed/deleted

Technical: Server pushes OnTradeData when:

  • Position opens or closes
  • Pending order placed or deleted

Each event contains either position_info or order_info with full details (ticket, symbol, volume, type, etc.).

Thin wrapper: Passes through protobuf OnTradeData without conversion (minimal overhead).

Usage:

async for trade in service.stream_trade_updates():
    if trade.HasField("position_info"):
        print(f"Position: {trade.position_info.ticket}")
    if trade.HasField("order_info"):
        print(f"Order: {trade.order_info.ticket}")

3) stream_position_profits

async def stream_position_profits(
    self,
    interval_ms: int = 1000,
    ignore_empty: bool = True,
    cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]

Real-time position profit updates stream.

Args:

  • interval_ms: Polling interval in milliseconds (default: 1000ms = 1 second)
  • ignore_empty: Skip frames with no changes (default: True)
  • cancellation_event: Optional asyncio.Event for stopping stream

Yields: OnPositionProfitData protobuf with:

  • position_profits: Repeated field with ticket €™ profit mapping

Technical: Server polls positions every interval_ms and pushes updates when profit changes.

ignore_empty=True filters out frames where no position P&L changed, reducing bandwidth.

Each OnPositionProfitData contains position_profits repeated field with ticket€™profit mapping.

Usage:

async for profit_data in service.stream_position_profits(interval_ms=500):
    for pos_profit in profit_data.position_profits:
        print(f"Ticket {pos_profit.ticket}: ${pos_profit.profit:.2f}")

4) stream_opened_tickets

async def stream_opened_tickets(
    self,
    interval_ms: int = 1000,
    cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]

Real-time position/order ticket updates stream (lightweight).

Args:

  • interval_ms: Polling interval in milliseconds (default: 1000ms = 1 second)
  • cancellation_event: Optional asyncio.Event for stopping stream

Yields: OnPositionsAndPendingOrdersTicketsData protobuf with:

  • opened_position_tickets: List of position ticket numbers (repeated int64)
  • opened_orders_tickets: List of pending order ticket numbers (repeated int64)

Technical: Server polls every interval_ms and pushes OnPositionsAndPendingOrdersTicketsData.

Advantage: 10-20x less bandwidth than stream_trade_updates() - use when you only need to track ticket changes.

Usage:

async for tickets in service.stream_opened_tickets(interval_ms=2000):
    print(f"Positions: {list(tickets.opened_position_tickets)}")
    print(f"Orders: {list(tickets.opened_orders_tickets)}")

5) stream_transactions

async def stream_transactions(
    self,
    cancellation_event: Optional[Any] = None,
) -> AsyncIterator[Any]

Real-time trade transaction stream (detailed).

Args:

  • cancellation_event: Optional asyncio.Event for stopping stream

Yields: OnTradeTransactionData protobuf events with:

  • transaction_type: Transaction type (DEAL_ADD, ORDER_DELETE, etc.)
  • Request details and results

Technical: Server pushes OnTradeTransactionData for every trade operation step (request€™broker€™result).

More detailed than stream_trade_updates(): Includes transaction_type (DEAL_ADD, ORDER_DELETE, etc.) and request details.

Thin wrapper: Passes through protobuf OnTradeTransactionData without conversion.

Usage:

async for transaction in service.stream_transactions():
    print(f"Transaction type: {transaction.transaction_type}")

🔗 Usage Examples

Example 1: Streaming Ticks with Auto-Converted Datetime

import asyncio
from pymt5 import MT5Service

async def stream_eurusd_ticks(service: MT5Service, duration_seconds: int = 60):
    """Stream EURUSD ticks for specified duration."""

    cancel_event = asyncio.Event()

    # Auto-stop after duration
    async def auto_stop():
        await asyncio.sleep(duration_seconds)
        cancel_event.set()

    asyncio.create_task(auto_stop())

    print(f"Streaming EURUSD ticks for {duration_seconds} seconds...")

    try:
        async for tick in service.stream_ticks(["EURUSD"], cancel_event):
            # tick.time is already datetime - no conversion needed!
            timestamp = tick.time.strftime('%H:%M:%S.%f')[:-3]
            spread = tick.ask - tick.bid

            print(f"[{timestamp}] Bid: {tick.bid:.5f} | "
                  f"Ask: {tick.ask:.5f} | "
                  f"Spread: {spread:.5f}")

    except asyncio.CancelledError:
        print("Stream cancelled")

    print("Streaming stopped")

Example 2: Monitoring Multiple Symbols

async def monitor_multiple_symbols(
    service: MT5Service,
    symbols: list,
    duration_seconds: int = 30
):
    """Stream ticks for multiple symbols simultaneously."""

    cancel_event = asyncio.Event()

    # Auto-stop after duration
    asyncio.create_task(auto_cancel(cancel_event, duration_seconds))

    print(f"Monitoring {len(symbols)} symbols: {', '.join(symbols)}")

    tick_counts = {symbol: 0 for symbol in symbols}

    async for tick in service.stream_ticks(symbols, cancel_event):
        # Track which symbol this tick is for
        # Note: tick doesn't have symbol field, need to track via subscription
        tick_counts[symbols[0]] += 1  # Simplified

        if tick_counts[symbols[0]] % 10 == 0:
            print(f"Received {tick_counts[symbols[0]]} ticks for {symbols[0]}")

    print(f"\nTotal ticks received: {sum(tick_counts.values())}")


async def auto_cancel(event, seconds):
    await asyncio.sleep(seconds)
    event.set()

Example 3: Detecting New Positions with stream_trade_updates

async def monitor_trades(service: MT5Service):
    """Monitor for new positions and closed positions."""

    print("Monitoring trade events (Ctrl+C to stop)...")

    try:
        async for trade in service.stream_trade_updates():
            # Check if this is a position update
            if trade.HasField("position_info"):
                pos = trade.position_info
                print(f"\nPOSITION UPDATE:")
                print(f"  Ticket: {pos.ticket}")
                print(f"  Symbol: {pos.symbol}")
                print(f"  Type: {'BUY' if pos.type == 0 else 'SELL'}")
                print(f"  Volume: {pos.volume}")
                print(f"  Profit: ${pos.profit:.2f}")

            # Check if this is an order update
            if trade.HasField("order_info"):
                order = trade.order_info
                print(f"\nORDER UPDATE:")
                print(f"  Ticket: {order.ticket}")
                print(f"  Symbol: {order.symbol}")
                print(f"  Type: {order.type}")
                print(f"  Volume: {order.volume_current}")

    except KeyboardInterrupt:
        print("\nMonitoring stopped")

Example 4: Real-time P&L Monitoring

async def monitor_position_profits(
    service: MT5Service,
    update_interval_ms: int = 500
):
    """Monitor position profits in real-time."""

    print(f"Monitoring position profits (update every {update_interval_ms}ms)...")
    print("Press Ctrl+C to stop\n")

    total_profit = 0.0

    try:
        async for profit_data in service.stream_position_profits(
            interval_ms=update_interval_ms,
            ignore_empty=True  # Only show when profit changes
        ):
            # Calculate total profit across all positions
            current_total = 0.0

            print("\n" + "=" * 60)
            print("POSITION PROFITS:")
            print("-" * 60)

            for pos_profit in profit_data.position_profits:
                current_total += pos_profit.profit
                print(f"Ticket #{pos_profit.ticket}: ${pos_profit.profit:+.2f}")

            print("-" * 60)
            print(f"TOTAL PROFIT: ${current_total:+.2f}")

            # Alert on significant changes
            if abs(current_total - total_profit) > 10.0:
                print("ALERT: Significant P&L change!")

            total_profit = current_total

    except KeyboardInterrupt:
        print("\n\nMonitoring stopped")
        print(f"Final total profit: ${total_profit:+.2f}")

Example 5: Lightweight Ticket Monitoring

async def monitor_opened_tickets(service: MT5Service):
    """Monitor for new/closed positions using lightweight ticket stream."""

    known_position_tickets = set()
    known_order_tickets = set()

    print("Monitoring opened tickets (lightweight)...")

    try:
        async for tickets in service.stream_opened_tickets(interval_ms=1000):
            # Convert to sets for easy comparison
            current_positions = set(tickets.opened_position_tickets)
            current_orders = set(tickets.opened_orders_tickets)

            # Detect new positions
            new_positions = current_positions - known_position_tickets
            if new_positions:
                for ticket in new_positions:
                    print(f"NEW POSITION: #{ticket}")

            # Detect closed positions
            closed_positions = known_position_tickets - current_positions
            if closed_positions:
                for ticket in closed_positions:
                    print(f"CLOSED POSITION: #{ticket}")

            # Detect new pending orders
            new_orders = current_orders - known_order_tickets
            if new_orders:
                for ticket in new_orders:
                    print(f"NEW ORDER: #{ticket}")

            # Detect deleted orders
            deleted_orders = known_order_tickets - current_orders
            if deleted_orders:
                for ticket in deleted_orders:
                    print(f"DELETED ORDER: #{ticket}")

            # Update known tickets
            known_position_tickets = current_positions
            known_order_tickets = current_orders

            # Show current status
            print(f"[Status] Positions: {len(current_positions)}, "
                  f"Orders: {len(current_orders)}")

    except KeyboardInterrupt:
        print("\nMonitoring stopped")

Example 6: Detailed Transaction Monitoring

async def monitor_transactions(service: MT5Service):
    """Monitor detailed trade transactions."""

    print("Monitoring trade transactions (detailed)...")

    try:
        async for transaction in service.stream_transactions():
            print(f"\nTRANSACTION:")
            print(f"  Type: {transaction.transaction_type}")

            if transaction.HasField("trade"):
                trade = transaction.trade
                print(f"  Action: {trade.action}")
                print(f"  Order: {trade.order}")
                print(f"  Symbol: {trade.symbol}")
                print(f"  Volume: {trade.volume}")
                print(f"  Price: {trade.price}")

            if transaction.HasField("request"):
                req = transaction.request
                print(f"  Request: {req}")

    except KeyboardInterrupt:
        print("\nMonitoring stopped")

Example 7: Combined Monitoring Dashboard

import asyncio
from datetime import datetime

async def trading_dashboard(service: MT5Service):
    """Comprehensive trading dashboard with multiple streams."""

    cancel_event = asyncio.Event()

    # Task 1: Monitor ticks
    async def tick_monitor():
        async for tick in service.stream_ticks(["EURUSD"], cancel_event):
            spread = tick.ask - tick.bid
            print(f"[TICK] {tick.time.strftime('%H:%M:%S')}: "
                  f"Bid={tick.bid:.5f}, Ask={tick.ask:.5f}, Spread={spread:.5f}")

    # Task 2: Monitor trades
    async def trade_monitor():
        async for trade in service.stream_trade_updates(cancel_event):
            if trade.HasField("position_info"):
                pos = trade.position_info
                print(f"[TRADE] Position {pos.ticket}: {pos.symbol} "
                      f"{'BUY' if pos.type == 0 else 'SELL'} {pos.volume} lots")

    # Task 3: Monitor profits
    async def profit_monitor():
        async for profit_data in service.stream_position_profits(
            interval_ms=2000,
            ignore_empty=True,
            cancellation_event=cancel_event
        ):
            total = sum(p.profit for p in profit_data.position_profits)
            print(f"[PROFIT] Total P&L: ${total:+.2f}")

    # Run all monitors concurrently
    print("Starting trading dashboard...")
    print("Press Ctrl+C to stop\n")

    try:
        await asyncio.gather(
            tick_monitor(),
            trade_monitor(),
            profit_monitor()
        )
    except KeyboardInterrupt:
        print("\n\nShutting down dashboard...")
        cancel_event.set()
        await asyncio.sleep(1)  # Give streams time to close

    print("Dashboard stopped")

When to Use Each Method

Use stream_ticks()

Use when:

  • Need real-time price updates
  • Building price charts or indicators
  • Monitoring market movements
  • Time precision is important

Advantage: Time already converted to datetime!

Example:

async for tick in service.stream_ticks(["EURUSD"]):
    print(f"{tick.time}: {tick.bid}")

Use stream_trade_updates()

Use when:

  • Monitoring position opens/closes
  • Tracking new orders
  • Building trade notifications
  • Need full position/order details

Example:

async for trade in service.stream_trade_updates():
    if trade.HasField("position_info"):
        print(f"New position: {trade.position_info.ticket}")

Use stream_position_profits()

Use when:

  • Real-time P&L monitoring
  • Automatic stop-loss triggers
  • Profit alerts
  • Building P&L dashboards

Example:

async for profit_data in service.stream_position_profits():
    total = sum(p.profit for p in profit_data.position_profits)
    if total < -100:
        print("ALERT: Large loss!")

Use stream_opened_tickets()

Use when:

  • Lightweight position monitoring
  • Only need ticket numbers (not full details)
  • Bandwidth is limited
  • High-frequency monitoring

Advantage: 10-20x less bandwidth than stream_trade_updates()

Example:

async for tickets in service.stream_opened_tickets():
    print(f"Open positions: {len(tickets.opened_position_tickets)}")

Use stream_transactions()

Use when:

  • Need detailed transaction history
  • Debugging trade execution
  • Logging all trading activity
  • Building audit trail

More detailed than stream_trade_updates()

Example:

async for tx in service.stream_transactions():
    print(f"Transaction: {tx.transaction_type}")

Recommendations

  1. Use cancellation_event - Always provide way to stop streams gracefully
  2. Handle KeyboardInterrupt - Use try/except for clean shutdown
  3. Don't block the loop - Process data quickly in async for loop
  4. Use stream_opened_tickets() for monitoring - 10-20x more efficient
  5. Use stream_ticks() for prices - Time already datetime!
  6. Set appropriate intervals - Balance between latency and performance
  7. Use ignore_empty=True - Reduce bandwidth for profit streaming

Cancellation pattern:

cancel_event = asyncio.Event()

try:
    async for item in service.stream_ticks(["EURUSD"], cancel_event):
        if should_stop:
            cancel_event.set()
except KeyboardInterrupt:
    cancel_event.set()

Performance Comparison

Method Bandwidth Update Frequency Use Case
stream_ticks() High Every tick Price monitoring
stream_trade_updates() Medium On event Position changes
stream_position_profits() Low Configurable P&L monitoring
stream_opened_tickets() Very Low Configurable Lightweight monitoring
stream_transactions() High Every transaction Detailed logging

Best for monitoring: stream_opened_tickets() (very low bandwidth, configurable interval)

Best for prices: stream_ticks() (real-time, datetime auto-converted)

Best for P&L: stream_position_profits() with ignore_empty=True



Summary

Real Value Assessment

5 streaming methods with minimal processing - only 1 adds value, 4 are direct pass-through:

Method Value Level What It Does
stream_ticks() HIGH Converts protobuf Timestamp → datetime + creates SymbolTick dataclass (8 fields)
stream_trade_updates() NONE Direct pass-through: async for data in account.on_trade(): yield data
stream_position_profits() NONE Direct pass-through: async for data in account.on_position_profit(): yield data
stream_opened_tickets() NONE Direct pass-through: async for data in account.on_positions_and_pending_orders_tickets(): yield data
stream_transactions() NONE Direct pass-through: async for data in account.on_trade_transaction(): yield data

Why these methods have value (or don't):

MT5Account streams protobuf objects:

# Low-level streams:
async for data in account.on_symbol_tick(...):  # OnSymbolTickData with protobuf Timestamp
async for data in account.on_trade(...):  # OnTradeData protobuf (already accessible)
async for data in account.on_position_profit(...):  # OnPositionProfitData protobuf (already accessible)

MT5Service processing:

# Only stream_ticks() has processing:
async for tick in service.stream_ticks(...):
    # ✅ Converts: data.symbol_tick.time.ToDatetime() + SymbolTick dataclass

# Other 4 methods - no processing:
async for trade in service.stream_trade_updates(...):
    # ⚪ Just yields: Same OnTradeData protobuf from Account

Key insights:

  1. Only 1 method adds value (stream_ticks):

  2. Automatic datetime conversion saves calling .ToDatetime() manually

  3. SymbolTick dataclass with 8 fields is cleaner than protobuf access
  4. Real functional improvement

  5. 4 methods add NO value (direct pass-through):

  6. Literally just async for data in account.method(): yield data

  7. No conversion, no processing, no unpacking
  8. Same protobuf objects as Account layer
  9. Exist only for architectural consistency (MT5Sugar needs them)

  10. All methods have auto-reconnection:

  11. But this is available on Account layer too via execute_stream_with_reconnect()

Performance tips:

  • Use stream_opened_tickets() for frequent monitoring (very efficient - low bandwidth)
  • Use ignore_empty=True for profit streaming (reduce bandwidth)
  • Set appropriate interval_ms (balance latency vs performance)
  • Always provide cancellation_event for graceful shutdown

Cancellation pattern:

cancel_event = asyncio.Event()
async for item in service.stream_ticks(symbols, cancel_event):
    if should_stop:
        cancel_event.set()

Bottom line:

  • For direct users: Only stream_ticks() adds value - other 4 methods could call Account directly
  • Other 4 methods: No processing - consider calling Account layer directly for these
  • For Sugar users: All methods serve architectural purpose perfectly
  • vs MT5Account: Significant improvement only for stream_ticks() through datetime conversion