MT5Service - Streaming Methods (Mid-Level API)¶
5 methods for real-time streams: ticks, trade events, position profits, ticket changes
š§© API Layer: MID-LEVEL - wrappers over MT5Account with clean Go types
ā ļø Important Note About Streaming Methods¶
Key insight:
Among the 5 streaming methods, only StreamTicks() provides significant abstraction over the low-level API by converting protobuf timestamps to time.Time.
The other 4 methods (StreamTradeUpdates, StreamPositionProfits, StreamOpenedTickets, StreamTransactions) are thin wrappers that directly pass through channels from MT5Account.
What this means:¶
-
StreamTicks: Automatically converts
Timestampātime.Time(real convenience) -
Other 4 methods: Essentially identical to low-level, just wrapped for consistency
-
Choice is yours: Using mid-level vs low-level for streaming is largely a matter of preference and API consistency in your codebase
š” If you prefer working directly with MT5Account for streaming (except ticks), that's perfectly valid!
šÆ Why These Methods Exist¶
Problem: Low-Level Complexity¶
In MT5Account, streaming returns protobuf channels with nested structures:
// Low-level (MT5Account)
dataCh, errCh := account.OnSymbolTick(ctx, &pb.OnSymbolTickRequest{...})
for data := range dataCh {
tick := data.SymbolTick // ā nested structure
t := tick.Time.AsTime() // ā manual time conversion
fmt.Printf("Tick: %.5f\n", tick.Bid)
}
Solution: Clean Go Types¶
MT5Service automatically converts data and returns clean Go types:
// Mid-level (MT5Service)
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for tick := range tickCh {
// tick.Time is already time.Time!
fmt.Printf("Tick at %s: %.5f\n", tick.Time.Format("15:04:05"), tick.Bid)
}
Advantages:¶
- ā
Automatic
Timestampātime.Timeconversion (StreamTicks only) - ā
Clean
*SymbolTickDTO instead of nested protobuf structures - ā
Convenient channel types (
<-chan *SymbolTickinstead of<-chan *pb.OnSymbolTickData) - ā
Clearer method names (
Stream*instead ofOn*) - ā Automatic unpacking of nested structures
š All 5 Methods¶
| Method | Returns | Low-Level Equivalent |
|---|---|---|
StreamTicks(ctx, symbols) |
(<-chan *SymbolTick, <-chan error) |
OnSymbolTick(ctx, req) + time.Time conversion |
StreamTradeUpdates(ctx) |
(<-chan *OnTradeData, <-chan error) |
OnTrade(ctx, req) (pass-through) |
StreamPositionProfits(ctx) |
(<-chan *OnPositionProfitData, <-chan error) |
OnPositionProfit(ctx, req) (pass-through) |
StreamOpenedTickets(ctx) |
(<-chan *OnPositionsAndPendingOrdersTicketsData, <-chan error) |
OnPositionsAndPendingOrdersTickets(ctx, req) (pass-through) |
StreamTransactions(ctx) |
(<-chan *OnTradeTransactionData, <-chan error) |
OnTradeTransaction(ctx, req) (pass-through) |
š¦ DTO Structures¶
SymbolTick (for StreamTicks)¶
type SymbolTick struct {
Time time.Time // Tick time (converted from Unix timestamp)
Bid float64 // Current Bid price
Ask float64 // Current Ask price
Last float64 // Last deal price
Volume uint64 // Tick volume
TimeMS int64 // Tick time in milliseconds
Flags uint32 // Tick flags
VolumeReal float64 // Tick volume with decimal precision
}
Advantage:
Timeis alreadytime.Time, no manual conversion needed.
š Method Signatures¶
1) StreamTicks¶
func (s *MT5Service) StreamTicks(
ctx context.Context,
symbols []string,
) (<-chan *SymbolTick, <-chan error)
Real-time tick stream for specified symbols.
ADVANTAGE over MT5Account.OnSymbolTick:¶
- Returns channel of
*SymbolTickstructs (clean Go types) - Automatically converts protobuf
OnSymbolTickDataāSymbolTick Timeis alreadytime.Time(no need for.AsTime())- Cleaner API with separate tick and error channels
Parameters:¶
symbols- list of symbols to stream (e.g.,[]string{"EURUSD", "GBPUSD"})
Returns:¶
- Read-only channel of
*SymbolTickstructs - Read-only channel of errors
Note: Channels close when streaming stops.
Example:¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD", "GBPUSD"})
for {
select {
case tick, ok := <-tickCh:
if !ok {
return // channel closed
}
fmt.Printf("[%s] Bid=%.5f, Ask=%.5f\n",
tick.Time.Format("15:04:05"),
tick.Bid, tick.Ask)
case err, ok := <-errCh:
if !ok {
return // channel closed
}
fmt.Printf("ā Error: %v\n", err)
return
}
}
ā ļø IMPORTANT: Always read from both channels in a
selectstatement.
2) StreamTradeUpdates¶
func (s *MT5Service) StreamTradeUpdates(
ctx context.Context,
) (<-chan *pb.OnTradeData, <-chan error)
Real-time stream of trade events (new/disappeared orders and positions, history updates).
Events include:¶
- New orders and positions opened
- Orders and positions closed/cancelled
- Historical orders and deals
Returns:¶
- Read-only channel of
*pb.OnTradeData(protobuf) - Read-only channel of errors
Example:¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tradeCh, errCh := service.StreamTradeUpdates(ctx)
for {
select {
case data, ok := <-tradeCh:
if !ok {
return
}
// New positions
for _, pos := range data.NewOpenedPositions {
fmt.Printf("ā
New position #%d: %s, Volume: %.2f\n",
pos.Ticket, pos.Symbol, pos.Volume)
}
// Closed positions
for _, ticket := range data.DisappearedOpenedPositionTickets {
fmt.Printf("ā Position closed: #%d\n", ticket)
}
// New deals
for _, deal := range data.NewHistoryDeals {
fmt.Printf("š° New deal #%d: %s, Profit: %.2f\n",
deal.Ticket, deal.Symbol, deal.Profit)
}
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("ā Error: %v\n", err)
return
}
}
3) StreamPositionProfits¶
func (s *MT5Service) StreamPositionProfits(
ctx context.Context,
) (<-chan *pb.OnPositionProfitData, <-chan error)
Real-time stream of profit/loss updates for open positions.
Events include:¶
- Position profit updates as prices change
- New positions opened
- Positions modified (SL/TP changes)
- Positions closed
Returns:¶
- Read-only channel of
*pb.OnPositionProfitData(protobuf) - Read-only channel of errors
Example:¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
profitCh, errCh := service.StreamPositionProfits(ctx)
for {
select {
case data, ok := <-profitCh:
if !ok {
return
}
totalProfit := 0.0
for _, pos := range data.OpenedPositions {
fmt.Printf("Position #%d: %s, Profit: %.2f\n",
pos.Ticket, pos.Symbol, pos.Profit)
totalProfit += pos.Profit
}
fmt.Printf("š Total P&L: %.2f\n", totalProfit)
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("ā Error: %v\n", err)
return
}
}
4) StreamOpenedTickets¶
func (s *MT5Service) StreamOpenedTickets(
ctx context.Context,
) (<-chan *pb.OnPositionsAndPendingOrdersTicketsData, <-chan error)
Real-time stream of updates to open position and pending order ticket numbers.
Events include:¶
- List of current position tickets
- List of current pending order tickets
- Updates when tickets are added/removed
š” Lightweight alternative to StreamTradeUpdates when you only need ticket IDs.
Returns:¶
- Read-only channel of
*pb.OnPositionsAndPendingOrdersTicketsData(protobuf) - Read-only channel of errors
Example:¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticketCh, errCh := service.StreamOpenedTickets(ctx)
for {
select {
case data, ok := <-ticketCh:
if !ok {
return
}
fmt.Printf("Open positions: %d tickets\n",
len(data.OpenedPositionTickets))
fmt.Printf(" Tickets: %v\n", data.OpenedPositionTickets)
fmt.Printf("Pending orders: %d tickets\n",
len(data.OpenedOrdersTickets))
fmt.Printf(" Tickets: %v\n", data.OpenedOrdersTickets)
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("ā Error: %v\n", err)
return
}
}
5) StreamTransactions¶
func (s *MT5Service) StreamTransactions(
ctx context.Context,
) (<-chan *pb.OnTradeTransactionData, <-chan error)
Real-time stream of all trade transactions (most detailed streaming method).
Events include:¶
- Order placement, modification, deletion
- Deal execution
- Position opening, modification, closing
- All intermediate states and changes
š„ Most powerful streaming method with detailed transaction information.
When to use:¶
- Use StreamTradeUpdates for simple trade monitoring
- Use StreamPositionProfits for P&L tracking
- Use StreamTransactions for detailed analysis of all events
Returns:¶
- Read-only channel of
*pb.OnTradeTransactionData(protobuf) - Read-only channel of errors
Example:¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txCh, errCh := service.StreamTransactions(ctx)
for {
select {
case data, ok := <-txCh:
if !ok {
return
}
fmt.Printf("Transaction type: %v\n", data.Type)
if data.Deal != 0 {
fmt.Printf(" Deal #%d executed\n", data.Deal)
}
if data.Order != 0 {
fmt.Printf(" Order #%d: %v\n", data.Order, data.OrderType)
}
if data.Position != 0 {
fmt.Printf(" Position #%d affected\n", data.Position)
}
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("ā Error: %v\n", err)
return
}
}
š” Usage Examples¶
Example 1: Real-time Price Monitoring¶
// ā
MT5Service - clean types and auto-conversion
ctx := context.Background()
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for {
select {
case tick, ok := <-tickCh:
if !ok {
return // channel closed
}
// tick.Time is already time.Time!
fmt.Printf("[%s] Bid: %.5f\n",
tick.Time.Format("15:04:05"), tick.Bid)
case err, ok := <-errCh:
if !ok {
return
}
return err
}
}
Advantage:
time.Timeconversion is automatic, no need for.AsTime()calls.
Example 2: Monitor Multiple Symbols¶
// ā
MT5Service - monitor 3 symbols simultaneously
func MonitorMultipleSymbols(service *mt5.MT5Service) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
symbols := []string{"EURUSD", "GBPUSD", "USDJPY"}
tickCh, errCh := service.StreamTicks(ctx, symbols)
for {
select {
case tick, ok := <-tickCh:
if !ok {
fmt.Println("Stream closed")
return
}
// Note: SymbolTick doesn't contain symbol name
// Use data.SymbolName from OnSymbolTickData if needed
fmt.Printf("[%s] Tick: Bid=%.5f, Ask=%.5f, Spread=%.5f\n",
tick.Time.Format("15:04:05"),
tick.Bid, tick.Ask, tick.Ask-tick.Bid)
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("ā Error: %v\n", err)
return
case <-ctx.Done():
fmt.Println("Timeout reached")
return
}
}
}
Example 3: Real-time Total Profit Calculation¶
// ā
MT5Service - automatic P&L recalculation
func MonitorTotalProfit(service *mt5.MT5Service) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
profitCh, errCh := service.StreamPositionProfits(ctx)
fmt.Println("š Real-time P&L Monitor")
fmt.Println("Press Ctrl+C to stop...")
for {
select {
case data, ok := <-profitCh:
if !ok {
return
}
totalProfit := 0.0
posCount := len(data.OpenedPositions)
for _, pos := range data.OpenedPositions {
totalProfit += pos.Profit
}
// Colored output
profitColor := "š¢"
if totalProfit < 0 {
profitColor = "š“"
}
fmt.Printf("\r%s Positions: %d | Total P&L: %.2f ",
profitColor, posCount, totalProfit)
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("\nā Error: %v\n", err)
return
}
}
}
Example 4: Trade Event Notifications¶
// ā
MT5Service - trade event notifications
func NotifyTradeEvents(service *mt5.MT5Service) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tradeCh, errCh := service.StreamTradeUpdates(ctx)
fmt.Println("š Trade Event Monitor")
for {
select {
case data, ok := <-tradeCh:
if !ok {
return
}
// New positions
for _, pos := range data.NewOpenedPositions {
typeStr := "BUY"
if pos.Type == pb.ENUM_POSITION_TYPE_POSITION_TYPE_SELL {
typeStr = "SELL"
}
fmt.Printf("\nš Position opened:\n")
fmt.Printf(" #%d: %s %s %.2f lots @ %.5f\n",
pos.Ticket, typeStr, pos.Symbol, pos.Volume, pos.PriceOpen)
}
// Closed positions
for _, ticket := range data.DisappearedOpenedPositionTickets {
fmt.Printf("\nš Position closed: #%d\n", ticket)
}
// New deals
for _, deal := range data.NewHistoryDeals {
profitStr := "Profit"
if deal.Profit < 0 {
profitStr = "Loss"
}
fmt.Printf("\n�� Deal executed:\n")
fmt.Printf(" #%d: %s, %s: %.2f\n",
deal.Ticket, deal.Symbol, profitStr, deal.Profit)
}
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("\nā Error: %v\n", err)
return
}
}
}
Example 5: Log All Transactions¶
// ā
MT5Service - detailed transaction logging
func LogAllTransactions(service *mt5.MT5Service, logFile string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := os.Create(logFile)
if err != nil {
return err
}
defer f.Close()
txCh, errCh := service.StreamTransactions(ctx)
fmt.Println("š Transaction Logger started")
fmt.Printf("Logging to: %s\n", logFile)
for {
select {
case data, ok := <-txCh:
if !ok {
return nil
}
timestamp := time.Now().Format("2006-01-02 15:04:05")
logLine := fmt.Sprintf("[%s] Type: %v, Order: %d, Deal: %d, Position: %d\n",
timestamp, data.Type, data.Order, data.Deal, data.Position)
// Write to file
f.WriteString(logLine)
f.Sync() // Flush buffer
// Console output
fmt.Print(logLine)
case err, ok := <-errCh:
if !ok {
return nil
}
return err
}
}
}
š§ When to Use Each Method¶
ā StreamTicks¶
Use when:¶
- Need real-time price monitoring
- Building charts or indicators
- Tracking spread in real-time
- Implementing algorithmic trading
Example:¶
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for tick := range tickCh {
if tick.Ask - tick.Bid > maxSpread {
// Wide spread, don't trade
}
}
ā StreamTradeUpdates¶
Use when:¶
- Monitoring new/closed positions
- Tracking trading activity
- Need notifications about deals
- Building trade journal
Example:¶
tradeCh, errCh := service.StreamTradeUpdates(ctx)
for data := range tradeCh {
for _, pos := range data.NewOpenedPositions {
// Notification about new position
}
}
ā StreamPositionProfits¶
Use when:¶
- Tracking P&L in real-time
- Building profit dashboard
- Implementing automatic closing on profit target
- Monitoring risk
Example:¶
profitCh, errCh := service.StreamPositionProfits(ctx)
for data := range profitCh {
totalProfit := 0.0
for _, pos := range data.OpenedPositions {
totalProfit += pos.Profit
}
// Check profit/loss limits
}
ā StreamOpenedTickets¶
Use when:¶
- Only need ticket numbers (lightweight)
- Monitoring number of positions
- Checking existence of specific ticket
Example:¶
ticketCh, errCh := service.StreamOpenedTickets(ctx)
for data := range ticketCh {
if len(data.OpenedPositionTickets) > maxPositions {
// Too many positions, don't open new ones
}
}
ā StreamTransactions¶
Use when:¶
- Need detailed information about all events
- Building audit log of trading
- Tracking intermediate states
- Analyzing trade flow
Example:¶
txCh, errCh := service.StreamTransactions(ctx)
for data := range txCh {
// Log all events
log.Printf("Transaction: %v", data)
}
š Method Comparison¶
| Method | Detail Level | Load | Usage |
|---|---|---|---|
| StreamTicks | High (every tick) | High | Algo trading, charts |
| StreamTradeUpdates | Medium (trade events) | Medium | Position monitoring |
| StreamPositionProfits | Medium (P&L updates) | Medium | Profit dashboard |
| StreamOpenedTickets | Low (tickets only) | Low | Lightweight monitoring |
| StreamTransactions | Maximum (all events) | High | Full audit |
š” Best Practices & Recommendations¶
1. Always use context to manage stream lifetime¶
2. Read from both channels in select statement¶
3. Check channel closure¶
4. Use timeout for long-running streams¶
5. Choose lightweight methods when possible¶
For lightweight monitoring, use StreamOpenedTickets instead of StreamTradeUpdates
6. Handle errors properly¶
Streams can be interrupted - always handle error channels
š”ļø "Graceful Shutdown" Pattern¶
func StreamWithShutdown(service *mt5.MT5Service) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle Ctrl+C
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go func() {
<-sigCh
fmt.Println("\nShutting down...")
cancel()
}()
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for {
select {
case tick, ok := <-tickCh:
if !ok {
return
}
// Process tick
case err, ok := <-errCh:
if !ok {
return
}
fmt.Printf("Error: %v\n", err)
return
case <-ctx.Done():
fmt.Println("Context cancelled")
return
}
}
}
š Related Sections¶
MT5Service (Mid-Level):¶
- MT5Service Overview - general mid-level API overview
- Account Methods - account information retrieval
- Symbol Methods - symbol parameters and quotes
- Position & Orders Methods - query positions and orders
- Trading Methods - place and manage orders
- Market Depth Methods - order book operations
MT5Account (Low-Level):¶
- Streaming Methods - low-level streaming operations
- MT5Account Master Overview - complete low-level API reference
šÆ Summary¶
MT5Service Streaming methods solve the main task - simplifying real-time monitoring:
What You Avoid:¶
- ā No need to unpack
data.SymbolTickand nested structures - ā No need to manually convert
Timestamp.AsTime()(StreamTicks only) - ā No need to work with
<-chan *pb.OnSymbolTickData
What You Get:¶
- ā
Clean Go types (
*SymbolTickwithtime.Time) - ā
Convenient channels (
<-chan *SymbolTick,<-chan error) - ā Automatic time conversion (StreamTicks)
- ā Code reads like standard Go
Real-time monitoring in 3 lines:¶
tickCh, errCh := service.StreamTicks(ctx, []string{"EURUSD"})
for tick := range tickCh {
fmt.Printf("Price: %.5f at %s\n", tick.Bid, tick.Time)
}
You now have all the tools for real-time market monitoring!
Whether you choose mid-level or low-level streaming APIs, both will serve you well. The choice is yours based on your coding style and project requirements.
Happy trading and may your streams be fast and your profits be plentiful! š