Get Total Number of Open Positions¶
Request: total count of currently open positions on the trading account.
API Information:
- Low-level API:
MT5Account.positions_total(...)(defined inpackage/MetaRpcMT5/helpers/mt5_account.py) - gRPC service:
mt5_term_api.TradeFunctions - Proto definition:
PositionsTotal(defined inmt5-term-api-trade-functions.proto) - Enums in this method: 0 enums (simple count only)
RPC¶
- Service:
mt5_term_api.TradeFunctions - Method:
PositionsTotal(Empty) -> PositionsTotalReply - Low-level client (generated):
TradeFunctionsStub.PositionsTotal(request, metadata, timeout)
💬 Just the essentials¶
- What it is. Get the total count of open positions without fetching detailed position data.
- Why you need it. Quick check for open positions. Useful for risk management and trading logic.
- When to use. Use this when you only need the count. Use
opened_orders()for full position details.
🎯 Purpose¶
Use it to get a simple count of open positions:
- Check if there are any open positions
- Monitor position count before placing new trades
- Implement position limits (max concurrent positions)
- Quick status check without loading full position data
- Efficient - minimal data transfer
📚 Tutorial¶
For a detailed line-by-line explanation with examples, see: -> positions_total - How it works
Method Signature¶
async def positions_total(
self,
deadline: Optional[datetime] = None,
cancellation_event: Optional[asyncio.Event] = None,
) -> trade_functions_pb2.PositionsTotalData
Request message:
Reply message:
message PositionsTotalReply {
oneof response {
PositionsTotalData data = 1;
Error error = 2;
}
}
message PositionsTotalData {
int32 total_positions = 1;
}
🔽 Input¶
| Parameter | Type | Description |
|---|---|---|
deadline |
datetime (optional) |
Deadline for the gRPC call (UTC datetime) |
cancellation_event |
asyncio.Event (optional) |
Event to cancel the operation |
No additional parameters - this method returns the total count only.
Deadline options:
from datetime import datetime, timedelta
# 1. With deadline (recommended)
deadline = datetime.utcnow() + timedelta(seconds=3)
result = await account.positions_total(deadline=deadline)
# 2. With cancellation event
cancel_event = asyncio.Event()
result = await account.positions_total(cancellation_event=cancel_event)
# 3. No deadline (uses default timeout if configured)
result = await account.positions_total()
⬆️ Output - PositionsTotalData¶
| Field | Type | Python Type | Description |
|---|---|---|---|
total_positions |
int32 |
int |
Total number of currently open positions |
Return value: The method returns PositionsTotalData object with total_positions field accessible as attribute.
Access the count:
data = await account.positions_total()
count = data.total_positions
print(f"Open positions: {count}")
🧩 Notes & Tips¶
- Automatic reconnection: All
MT5Accountmethods have built-in protection against transient gRPC errors with automatic reconnection viaexecute_with_reconnect. - Default timeout: If
deadlineisNone, the method will wait indefinitely (or until server timeout). - Lightweight: This method only returns a count - use
opened_orders()to get full position details. - Connection required: You must call
connect_by_host_port()orconnect_by_server_name()before using this method. - Real-time count: The count reflects the current state at the time of the call.
- Thread safety: All async methods are safe to call concurrently from multiple asyncio tasks.
- UUID handling: The terminal instance UUID is auto-generated by the server if not provided. For explicit control (e.g., in streaming scenarios), pass
id_=uuid4()to constructor.
🔗 Usage Examples¶
1) Get total open positions¶
import asyncio
from datetime import datetime, timedelta
from MetaRpcMT5 import MT5Account
async def main():
# Create account instance
account = MT5Account(
user=12345678,
password="your_password",
grpc_server="mt5.mrpc.pro:443"
)
# Connect first
await account.connect_by_server_name(
server_name="YourBroker-Demo",
base_chart_symbol="EURUSD"
)
try:
# Set deadline
deadline = datetime.utcnow() + timedelta(seconds=3)
# Get total positions
data = await account.positions_total(deadline=deadline)
print(f"Total open positions: {data.total_positions}")
finally:
await account.channel.close()
asyncio.run(main())
2) Check if any positions are open¶
async def has_open_positions(account: MT5Account) -> bool:
"""
Check if there are any open positions.
Returns:
True if there are open positions
"""
deadline = datetime.utcnow() + timedelta(seconds=3)
data = await account.positions_total(deadline=deadline)
has_positions = data.total_positions > 0
if has_positions:
print(f"[OK] {data.total_positions} position(s) open")
else:
print("[OK] No open positions")
return has_positions
# Usage:
if await has_open_positions(account):
print("Account has active positions")
else:
print("Account is flat")
3) Position limit checker¶
async def check_position_limit(account: MT5Account, max_positions: int = 5) -> bool:
"""
Check if current position count is below limit.
Args:
account: MT5Account instance
max_positions: Maximum allowed open positions
Returns:
True if can open more positions
"""
data = await account.positions_total()
current = data.total_positions
if current >= max_positions:
print(f"[WARNING] Position limit reached: {current}/{max_positions}")
return False
remaining = max_positions - current
print(f"[OK] Can open {remaining} more position(s)")
return True
# Usage:
if await check_position_limit(account, max_positions=5):
# Place new order
pass
else:
print("Cannot open new position - limit reached")
4) Monitor position count¶
async def monitor_positions(account: MT5Account, interval: float = 5.0):
"""
Monitor position count every N seconds.
Args:
account: MT5Account instance
interval: Update interval in seconds
"""
previous_count = None
while True:
try:
deadline = datetime.utcnow() + timedelta(seconds=3)
data = await account.positions_total(deadline=deadline)
count = data.total_positions
if previous_count is not None and count != previous_count:
if count > previous_count:
print(f"[+] Position opened: {count} total")
else:
print(f"[-] Position closed: {count} total")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Positions: {count}")
previous_count = count
except Exception as e:
print(f"[ERROR] {e}")
await asyncio.sleep(interval)
# Usage:
# await monitor_positions(account, interval=5.0) # Update every 5 seconds
5) Pre-trade validation¶
async def validate_can_trade(account: MT5Account, max_positions: int = 10) -> bool:
"""
Validate account can accept new positions.
Args:
account: MT5Account instance
max_positions: Maximum concurrent positions allowed
Returns:
True if trading is allowed
"""
# Get position count
data = await account.positions_total()
count = data.total_positions
# Check position limit
if count >= max_positions:
print(f"[ERROR] Position limit exceeded: {count}/{max_positions}")
return False
print(f"[OK] Current positions: {count}/{max_positions}")
return True
# Usage:
if await validate_can_trade(account, max_positions=10):
# Place order
pass
else:
print("Trading blocked - too many positions")
6) Wait for positions to close¶
async def wait_for_all_closed(
account: MT5Account,
timeout_seconds: int = 300,
check_interval: float = 2.0
):
"""
Wait until all positions are closed.
Args:
account: MT5Account instance
timeout_seconds: Maximum time to wait
check_interval: How often to check
Raises:
TimeoutError: If positions don't close within timeout
"""
import time
start_time = time.time()
print("Waiting for all positions to close...")
while True:
# Check timeout
elapsed = time.time() - start_time
if elapsed > timeout_seconds:
data = await account.positions_total()
raise TimeoutError(
f"Timeout: {data.total_positions} position(s) still open after {timeout_seconds}s"
)
# Get position count
data = await account.positions_total()
count = data.total_positions
if count == 0:
print("[OK] All positions closed")
return
print(f"[INFO] Waiting... {count} position(s) still open")
await asyncio.sleep(check_interval)
# Usage:
try:
await wait_for_all_closed(account, timeout_seconds=300)
except TimeoutError as e:
print(f"[ERROR] {e}")
7) Position count alert system¶
async def position_alert_system(
account: MT5Account,
warning_threshold: int = 8,
critical_threshold: int = 10,
interval: float = 10.0
):
"""
Alert when position count reaches thresholds.
Args:
account: MT5Account instance
warning_threshold: Warn at this count
critical_threshold: Critical at this count
interval: Check interval in seconds
"""
while True:
try:
data = await account.positions_total()
count = data.total_positions
if count >= critical_threshold:
print(f"[CRITICAL] {count} positions - at limit!")
elif count >= warning_threshold:
print(f"[WARNING] {count} positions - approaching limit")
else:
print(f"[OK] {count} positions")
except Exception as e:
print(f"[ERROR] {e}")
await asyncio.sleep(interval)
# Usage:
# await position_alert_system(account, warning_threshold=8, critical_threshold=10)
8) Compare position count with target¶
async def compare_with_target(
account: MT5Account,
target_positions: int
) -> dict:
"""
Compare current positions with target.
Args:
account: MT5Account instance
target_positions: Target number of positions
Returns:
Dictionary with comparison data
"""
data = await account.positions_total()
current = data.total_positions
difference = current - target_positions
result = {
"current": current,
"target": target_positions,
"difference": difference,
"status": "exact" if difference == 0 else "above" if difference > 0 else "below"
}
if difference == 0:
print(f"[OK] Exactly at target: {current} positions")
elif difference > 0:
print(f"[INFO] {abs(difference)} above target ({current}/{target_positions})")
else:
print(f"[INFO] {abs(difference)} below target ({current}/{target_positions})")
return result
# Usage:
result = await compare_with_target(account, target_positions=5)
print(f"Status: {result['status']}")
Common Patterns¶
Quick position check¶
async def is_flat(account: MT5Account) -> bool:
"""Check if account has no open positions"""
data = await account.positions_total()
return data.total_positions == 0
Position limit guard¶
async def can_open_position(
account: MT5Account,
max_concurrent: int = 10
) -> bool:
"""Check if can open another position"""
data = await account.positions_total()
return data.total_positions < max_concurrent
Risk check before trading¶
async def pre_trade_risk_check(account: MT5Account) -> bool:
"""Comprehensive risk check before trading"""
data = await account.positions_total()
# No more than 10 concurrent positions
if data.total_positions >= 10:
print("[ERROR] Too many open positions")
return False
print(f"[OK] Risk check passed ({data.total_positions} positions)")
return True
📚 See also¶
- opened_orders - Get full details of all open orders and positions
- opened_orders_tickets - Get ticket IDs only (lighter than opened_orders)
- positions_history - Get historical closed positions