Skip to content

MT5Service - Streaming Methods (Mid-Level API)¶

5 methods for real-time streams: ticks, trade events, position profits, ticket changes

🧩 API Layer: MID-LEVEL - wrappers over MT5Account with clean Go types


āš ļø Important Note About Streaming Methods¶

Key insight:

Among the 5 streaming methods, only StreamTicks() provides significant abstraction over the low-level API by converting protobuf timestamps to time.Time.

The other 4 methods (StreamTradeUpdates, StreamPositionProfits, StreamOpenedTickets, StreamTransactions) are thin wrappers that directly pass through channels from MT5Account.

What this means:¶

  • StreamTicks: Automatically converts Timestamp → time.Time (real convenience)

  • Other 4 methods: Essentially identical to low-level, just wrapped for consistency

  • Choice is yours: Using mid-level vs low-level for streaming is largely a matter of preference and API consistency in your codebase

šŸ’” If you prefer working directly with MT5Account for streaming (except ticks), that's perfectly valid!


šŸŽÆ Why These Methods Exist¶

Problem: Low-Level Complexity¶

In MT5Account, streaming returns protobuf channels with nested structures:

// Low-level (MT5Account)
dataCh, errCh := account.OnSymbolTick(ctx, &pb.OnSymbolTickRequest{...})
for data := range dataCh {
    tick := data.SymbolTick  // ← nested structure
    t := tick.Time.AsTime()   // ← manual time conversion
    fmt.Printf("Tick: %.5f\n", tick.Bid)
}

Solution: Clean Go Types¶

MT5Service automatically converts data and returns clean Go types:

// Mid-level (MT5Service)
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for tick := range tickCh {
    // tick.Time is already time.Time!
    fmt.Printf("Tick at %s: %.5f\n", tick.Time.Format("15:04:05"), tick.Bid)
}

Advantages:¶

  • āœ… Automatic Timestamp → time.Time conversion (StreamTicks only)
  • āœ… Clean *SymbolTick DTO instead of nested protobuf structures
  • āœ… Convenient channel types (<-chan *SymbolTick instead of <-chan *pb.OnSymbolTickData)
  • āœ… Clearer method names (Stream* instead of On*)
  • āœ… Automatic unpacking of nested structures

šŸ“‹ All 5 Methods¶

Method Returns Low-Level Equivalent
StreamTicks(ctx, symbols) (<-chan *SymbolTick, <-chan error) OnSymbolTick(ctx, req) + time.Time conversion
StreamTradeUpdates(ctx) (<-chan *OnTradeData, <-chan error) OnTrade(ctx, req) (pass-through)
StreamPositionProfits(ctx) (<-chan *OnPositionProfitData, <-chan error) OnPositionProfit(ctx, req) (pass-through)
StreamOpenedTickets(ctx) (<-chan *OnPositionsAndPendingOrdersTicketsData, <-chan error) OnPositionsAndPendingOrdersTickets(ctx, req) (pass-through)
StreamTransactions(ctx) (<-chan *OnTradeTransactionData, <-chan error) OnTradeTransaction(ctx, req) (pass-through)

šŸ“¦ DTO Structures¶

SymbolTick (for StreamTicks)¶

type SymbolTick struct {
    Time       time.Time // Tick time (converted from Unix timestamp)
    Bid        float64   // Current Bid price
    Ask        float64   // Current Ask price
    Last       float64   // Last deal price
    Volume     uint64    // Tick volume
    TimeMS     int64     // Tick time in milliseconds
    Flags      uint32    // Tick flags
    VolumeReal float64   // Tick volume with decimal precision
}

Advantage: Time is already time.Time, no manual conversion needed.


šŸ“– Method Signatures¶

1) StreamTicks¶

func (s *MT5Service) StreamTicks(
    ctx context.Context,
    symbols []string,
) (<-chan *SymbolTick, <-chan error)

Real-time tick stream for specified symbols.

ADVANTAGE over MT5Account.OnSymbolTick:¶

  • Returns channel of *SymbolTick structs (clean Go types)
  • Automatically converts protobuf OnSymbolTickData → SymbolTick
  • Time is already time.Time (no need for .AsTime())
  • Cleaner API with separate tick and error channels

Parameters:¶

  • symbols - list of symbols to stream (e.g., []string{"EURUSD", "GBPUSD"})

Returns:¶

  • Read-only channel of *SymbolTick structs
  • Read-only channel of errors

Note: Channels close when streaming stops.

Example:¶

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD", "GBPUSD"})

for {
    select {
    case tick, ok := <-tickCh:
        if !ok {
            return  // channel closed
        }
        fmt.Printf("[%s] Bid=%.5f, Ask=%.5f\n",
            tick.Time.Format("15:04:05"),
            tick.Bid, tick.Ask)

    case err, ok := <-errCh:
        if !ok {
            return  // channel closed
        }
        fmt.Printf("āŒ Error: %v\n", err)
        return
    }
}

āš ļø IMPORTANT: Always read from both channels in a select statement.


2) StreamTradeUpdates¶

func (s *MT5Service) StreamTradeUpdates(
    ctx context.Context,
) (<-chan *pb.OnTradeData, <-chan error)

Real-time stream of trade events (new/disappeared orders and positions, history updates).

Events include:¶

  • New orders and positions opened
  • Orders and positions closed/cancelled
  • Historical orders and deals

Returns:¶

  • Read-only channel of *pb.OnTradeData (protobuf)
  • Read-only channel of errors

Example:¶

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tradeCh, errCh := service.StreamTradeUpdates(ctx)

for {
    select {
    case data, ok := <-tradeCh:
        if !ok {
            return
        }

        // New positions
        for _, pos := range data.NewOpenedPositions {
            fmt.Printf("āœ… New position #%d: %s, Volume: %.2f\n",
                pos.Ticket, pos.Symbol, pos.Volume)
        }

        // Closed positions
        for _, ticket := range data.DisappearedOpenedPositionTickets {
            fmt.Printf("āŒ Position closed: #%d\n", ticket)
        }

        // New deals
        for _, deal := range data.NewHistoryDeals {
            fmt.Printf("šŸ’° New deal #%d: %s, Profit: %.2f\n",
                deal.Ticket, deal.Symbol, deal.Profit)
        }

    case err, ok := <-errCh:
        if !ok {
            return
        }
        fmt.Printf("āŒ Error: %v\n", err)
        return
    }
}

3) StreamPositionProfits¶

func (s *MT5Service) StreamPositionProfits(
    ctx context.Context,
) (<-chan *pb.OnPositionProfitData, <-chan error)

Real-time stream of profit/loss updates for open positions.

Events include:¶

  • Position profit updates as prices change
  • New positions opened
  • Positions modified (SL/TP changes)
  • Positions closed

Returns:¶

  • Read-only channel of *pb.OnPositionProfitData (protobuf)
  • Read-only channel of errors

Example:¶

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

profitCh, errCh := service.StreamPositionProfits(ctx)

for {
    select {
    case data, ok := <-profitCh:
        if !ok {
            return
        }

        totalProfit := 0.0
        for _, pos := range data.OpenedPositions {
            fmt.Printf("Position #%d: %s, Profit: %.2f\n",
                pos.Ticket, pos.Symbol, pos.Profit)
            totalProfit += pos.Profit
        }

        fmt.Printf("šŸ“Š Total P&L: %.2f\n", totalProfit)

    case err, ok := <-errCh:
        if !ok {
            return
        }
        fmt.Printf("āŒ Error: %v\n", err)
        return
    }
}

4) StreamOpenedTickets¶

func (s *MT5Service) StreamOpenedTickets(
    ctx context.Context,
) (<-chan *pb.OnPositionsAndPendingOrdersTicketsData, <-chan error)

Real-time stream of updates to open position and pending order ticket numbers.

Events include:¶

  • List of current position tickets
  • List of current pending order tickets
  • Updates when tickets are added/removed

šŸ’” Lightweight alternative to StreamTradeUpdates when you only need ticket IDs.

Returns:¶

  • Read-only channel of *pb.OnPositionsAndPendingOrdersTicketsData (protobuf)
  • Read-only channel of errors

Example:¶

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ticketCh, errCh := service.StreamOpenedTickets(ctx)

for {
    select {
    case data, ok := <-ticketCh:
        if !ok {
            return
        }

        fmt.Printf("Open positions: %d tickets\n",
            len(data.OpenedPositionTickets))
        fmt.Printf("  Tickets: %v\n", data.OpenedPositionTickets)

        fmt.Printf("Pending orders: %d tickets\n",
            len(data.OpenedOrdersTickets))
        fmt.Printf("  Tickets: %v\n", data.OpenedOrdersTickets)

    case err, ok := <-errCh:
        if !ok {
            return
        }
        fmt.Printf("āŒ Error: %v\n", err)
        return
    }
}

5) StreamTransactions¶

func (s *MT5Service) StreamTransactions(
    ctx context.Context,
) (<-chan *pb.OnTradeTransactionData, <-chan error)

Real-time stream of all trade transactions (most detailed streaming method).

Events include:¶

  • Order placement, modification, deletion
  • Deal execution
  • Position opening, modification, closing
  • All intermediate states and changes

šŸ”„ Most powerful streaming method with detailed transaction information.

When to use:¶

  • Use StreamTradeUpdates for simple trade monitoring
  • Use StreamPositionProfits for P&L tracking
  • Use StreamTransactions for detailed analysis of all events

Returns:¶

  • Read-only channel of *pb.OnTradeTransactionData (protobuf)
  • Read-only channel of errors

Example:¶

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

txCh, errCh := service.StreamTransactions(ctx)

for {
    select {
    case data, ok := <-txCh:
        if !ok {
            return
        }

        fmt.Printf("Transaction type: %v\n", data.Type)
        if data.Deal != 0 {
            fmt.Printf("  Deal #%d executed\n", data.Deal)
        }
        if data.Order != 0 {
            fmt.Printf("  Order #%d: %v\n", data.Order, data.OrderType)
        }
        if data.Position != 0 {
            fmt.Printf("  Position #%d affected\n", data.Position)
        }

    case err, ok := <-errCh:
        if !ok {
            return
        }
        fmt.Printf("āŒ Error: %v\n", err)
        return
    }
}

šŸ’” Usage Examples¶

Example 1: Real-time Price Monitoring¶

// āœ… MT5Service - clean types and auto-conversion
ctx := context.Background()

tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})

for {
    select {
    case tick, ok := <-tickCh:
        if !ok {
            return  // channel closed
        }
        // tick.Time is already time.Time!
        fmt.Printf("[%s] Bid: %.5f\n",
            tick.Time.Format("15:04:05"), tick.Bid)

    case err, ok := <-errCh:
        if !ok {
            return
        }
        return err
    }
}

Advantage: time.Time conversion is automatic, no need for .AsTime() calls.


Example 2: Monitor Multiple Symbols¶

// āœ… MT5Service - monitor 3 symbols simultaneously
func MonitorMultipleSymbols(service *mt5.MT5Service) {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
    defer cancel()

    symbols := []string{"EURUSD", "GBPUSD", "USDJPY"}
    tickCh, errCh := service.StreamTicks(ctx, symbols)

    for {
        select {
        case tick, ok := <-tickCh:
            if !ok {
                fmt.Println("Stream closed")
                return
            }

            // Note: SymbolTick doesn't contain symbol name
            // Use data.SymbolName from OnSymbolTickData if needed
            fmt.Printf("[%s] Tick: Bid=%.5f, Ask=%.5f, Spread=%.5f\n",
                tick.Time.Format("15:04:05"),
                tick.Bid, tick.Ask, tick.Ask-tick.Bid)

        case err, ok := <-errCh:
            if !ok {
                return
            }
            fmt.Printf("āŒ Error: %v\n", err)
            return

        case <-ctx.Done():
            fmt.Println("Timeout reached")
            return
        }
    }
}

Example 3: Real-time Total Profit Calculation¶

// āœ… MT5Service - automatic P&L recalculation
func MonitorTotalProfit(service *mt5.MT5Service) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    profitCh, errCh := service.StreamPositionProfits(ctx)

    fmt.Println("šŸ“Š Real-time P&L Monitor")
    fmt.Println("Press Ctrl+C to stop...")

    for {
        select {
        case data, ok := <-profitCh:
            if !ok {
                return
            }

            totalProfit := 0.0
            posCount := len(data.OpenedPositions)

            for _, pos := range data.OpenedPositions {
                totalProfit += pos.Profit
            }

            // Colored output
            profitColor := "🟢"
            if totalProfit < 0 {
                profitColor = "šŸ”“"
            }

            fmt.Printf("\r%s Positions: %d | Total P&L: %.2f   ",
                profitColor, posCount, totalProfit)

        case err, ok := <-errCh:
            if !ok {
                return
            }
            fmt.Printf("\nāŒ Error: %v\n", err)
            return
        }
    }
}

Example 4: Trade Event Notifications¶

// āœ… MT5Service - trade event notifications
func NotifyTradeEvents(service *mt5.MT5Service) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    tradeCh, errCh := service.StreamTradeUpdates(ctx)

    fmt.Println("šŸ”” Trade Event Monitor")

    for {
        select {
        case data, ok := <-tradeCh:
            if !ok {
                return
            }

            // New positions
            for _, pos := range data.NewOpenedPositions {
                typeStr := "BUY"
                if pos.Type == pb.ENUM_POSITION_TYPE_POSITION_TYPE_SELL {
                    typeStr = "SELL"
                }
                fmt.Printf("\nšŸ†• Position opened:\n")
                fmt.Printf("   #%d: %s %s %.2f lots @ %.5f\n",
                    pos.Ticket, typeStr, pos.Symbol, pos.Volume, pos.PriceOpen)
            }

            // Closed positions
            for _, ticket := range data.DisappearedOpenedPositionTickets {
                fmt.Printf("\nšŸ”’ Position closed: #%d\n", ticket)
            }

            // New deals
            for _, deal := range data.NewHistoryDeals {
                profitStr := "Profit"
                if deal.Profit < 0 {
                    profitStr = "Loss"
                }
                fmt.Printf("\n�� Deal executed:\n")
                fmt.Printf("   #%d: %s, %s: %.2f\n",
                    deal.Ticket, deal.Symbol, profitStr, deal.Profit)
            }

        case err, ok := <-errCh:
            if !ok {
                return
            }
            fmt.Printf("\nāŒ Error: %v\n", err)
            return
        }
    }
}

Example 5: Log All Transactions¶

// āœ… MT5Service - detailed transaction logging
func LogAllTransactions(service *mt5.MT5Service, logFile string) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    f, err := os.Create(logFile)
    if err != nil {
        return err
    }
    defer f.Close()

    txCh, errCh := service.StreamTransactions(ctx)

    fmt.Println("šŸ“ Transaction Logger started")
    fmt.Printf("Logging to: %s\n", logFile)

    for {
        select {
        case data, ok := <-txCh:
            if !ok {
                return nil
            }

            timestamp := time.Now().Format("2006-01-02 15:04:05")
            logLine := fmt.Sprintf("[%s] Type: %v, Order: %d, Deal: %d, Position: %d\n",
                timestamp, data.Type, data.Order, data.Deal, data.Position)

            // Write to file
            f.WriteString(logLine)
            f.Sync()  // Flush buffer

            // Console output
            fmt.Print(logLine)

        case err, ok := <-errCh:
            if !ok {
                return nil
            }
            return err
        }
    }
}

šŸ”§ When to Use Each Method¶

āœ… StreamTicks¶

Use when:¶

  • Need real-time price monitoring
  • Building charts or indicators
  • Tracking spread in real-time
  • Implementing algorithmic trading

Example:¶

tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for tick := range tickCh {
    if tick.Ask - tick.Bid > maxSpread {
        // Wide spread, don't trade
    }
}

āœ… StreamTradeUpdates¶

Use when:¶

  • Monitoring new/closed positions
  • Tracking trading activity
  • Need notifications about deals
  • Building trade journal

Example:¶

tradeCh, errCh := service.StreamTradeUpdates(ctx)
for data := range tradeCh {
    for _, pos := range data.NewOpenedPositions {
        // Notification about new position
    }
}

āœ… StreamPositionProfits¶

Use when:¶

  • Tracking P&L in real-time
  • Building profit dashboard
  • Implementing automatic closing on profit target
  • Monitoring risk

Example:¶

profitCh, errCh := service.StreamPositionProfits(ctx)
for data := range profitCh {
    totalProfit := 0.0
    for _, pos := range data.OpenedPositions {
        totalProfit += pos.Profit
    }
    // Check profit/loss limits
}

āœ… StreamOpenedTickets¶

Use when:¶

  • Only need ticket numbers (lightweight)
  • Monitoring number of positions
  • Checking existence of specific ticket

Example:¶

ticketCh, errCh := service.StreamOpenedTickets(ctx)
for data := range ticketCh {
    if len(data.OpenedPositionTickets) > maxPositions {
        // Too many positions, don't open new ones
    }
}

āœ… StreamTransactions¶

Use when:¶

  • Need detailed information about all events
  • Building audit log of trading
  • Tracking intermediate states
  • Analyzing trade flow

Example:¶

txCh, errCh := service.StreamTransactions(ctx)
for data := range txCh {
    // Log all events
    log.Printf("Transaction: %v", data)
}

šŸ“Š Method Comparison¶

Method Detail Level Load Usage
StreamTicks High (every tick) High Algo trading, charts
StreamTradeUpdates Medium (trade events) Medium Position monitoring
StreamPositionProfits Medium (P&L updates) Medium Profit dashboard
StreamOpenedTickets Low (tickets only) Low Lightweight monitoring
StreamTransactions Maximum (all events) High Full audit

šŸ’” Best Practices & Recommendations¶

1. Always use context to manage stream lifetime¶

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

2. Read from both channels in select statement¶

select {
case data := <-dataCh:
    // processing
case err := <-errCh:
    // error handling
}

3. Check channel closure¶

case data, ok := <-dataCh:
    if !ok {
        return  // channel closed
    }

4. Use timeout for long-running streams¶

ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)

5. Choose lightweight methods when possible¶

For lightweight monitoring, use StreamOpenedTickets instead of StreamTradeUpdates

6. Handle errors properly¶

Streams can be interrupted - always handle error channels


šŸ›”ļø "Graceful Shutdown" Pattern¶

func StreamWithShutdown(service *mt5.MT5Service) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle Ctrl+C
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt)
    go func() {
        <-sigCh
        fmt.Println("\nShutting down...")
        cancel()
    }()

    tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})

    for {
        select {
        case tick, ok := <-tickCh:
            if !ok {
                return
            }
            // Process tick

        case err, ok := <-errCh:
            if !ok {
                return
            }
            fmt.Printf("Error: %v\n", err)
            return

        case <-ctx.Done():
            fmt.Println("Context cancelled")
            return
        }
    }
}

MT5Service (Mid-Level):¶

MT5Account (Low-Level):¶


šŸŽÆ Summary¶

MT5Service Streaming methods solve the main task - simplifying real-time monitoring:

What You Avoid:¶

  • āŒ No need to unpack data.SymbolTick and nested structures
  • āŒ No need to manually convert Timestamp.AsTime() (StreamTicks only)
  • āŒ No need to work with <-chan *pb.OnSymbolTickData

What You Get:¶

  • āœ… Clean Go types (*SymbolTick with time.Time)
  • āœ… Convenient channels (<-chan *SymbolTick, <-chan error)
  • āœ… Automatic time conversion (StreamTicks)
  • āœ… Code reads like standard Go

Real-time monitoring in 3 lines:¶

tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for tick := range tickCh {
    fmt.Printf("Price: %.5f at %s\n", tick.Bid, tick.Time)
}

You now have all the tools for real-time market monitoring!

Whether you choose mid-level or low-level streaming APIs, both will serve you well. The choice is yours based on your coding style and project requirements.

Happy trading and may your streams be fast and your profits be plentiful! šŸ“ˆ