Skip to main content

How to Monitor Twitter Accounts and Keywords in Real Time via API

Real-time Twitter monitoring - detecting new tweets from specific accounts, tracking keyword mentions as they happen, and feeding live X data into your applications - is the foundation of social listening, breaking news alerts, competitive intelligence, and automated engagement tools. This guide shows you how to build a near-real-time monitoring system using the Sorsa API that detects new tweets within seconds of posting, with complete working examples in Python and JavaScript. Sorsa API delivers fresh data on every request: if a tweet was posted half a second ago, your next API call will return it. Combined with a rate limit of 20 requests per second and response times under 400ms, you can build monitoring loops that rival dedicated streaming services - with the simplicity of standard REST calls and without managing persistent connections, reconnection logic, or complex authentication flows.

How Polling-Based Monitoring Works

There are two approaches to getting real-time data from a social platform: push-based (streaming/webhooks) and pull-based (polling). The official X API offers a Filtered Stream endpoint, but it requires expensive access tiers ($5,000+/month Pro plan), persistent HTTP connections with reconnection logic, and careful rule management. Sorsa API uses a pull-based approach. The pattern has four steps:
  1. Poll an endpoint at a regular interval (every 1-30 seconds).
  2. Compare results against previously seen tweet IDs to identify new tweets.
  3. Process new tweets - send alerts, store in a database, post to Slack, etc.
  4. Repeat.
You control the polling interval, so you can balance latency against API usage. There are no persistent connections to manage, no reconnection strategies to implement, and if your script crashes, it picks up where it left off on the next poll. Deduplication is reliable because X tweet IDs (Snowflake IDs) are monotonically increasing - a higher ID always means a newer tweet.

Choosing the Right Endpoint

What you want to monitorEndpointMethodWhy this one
A single account/user-tweetsPOSTReturns the latest tweets from one user’s timeline.
A group of accounts (up to 5,000)/list-tweetsGETSingle request covers all members of an X List.
A keyword or hashtag/search-tweetsPOSTFull search operator support, chronological ordering.
@mentions of an account/mentionsPOSTPurpose-built for mention tracking with engagement filters.

Level 1: Monitor a Single Account

The simplest case. You want to know the moment a specific account posts something new - a competitor, a CEO, a regulator, or an influencer.

Python

import requests
import time

API_KEY = "YOUR_API_KEY"
USERNAME = "elonmusk"
POLL_INTERVAL = 5  # seconds

URL = "https://api.sorsa.io/v3/user-tweets"
HEADERS = {"ApiKey": API_KEY, "Content-Type": "application/json"}

last_seen_id = None

print(f"Monitoring @{USERNAME}...")

while True:
    try:
        resp = requests.post(URL, headers=HEADERS, json={"link": f"https://x.com/{USERNAME}"})
        resp.raise_for_status()
        tweets = resp.json().get("tweets", [])

        if tweets:
            if last_seen_id is None:
                last_seen_id = tweets[0]["id"]
                print(f"Baseline set: {last_seen_id}")
            else:
                new_tweets = [t for t in tweets if t["id"] > last_seen_id]
                for tweet in reversed(new_tweets):
                    print(f"[NEW] @{USERNAME}: {tweet['full_text'][:140]}")
                if new_tweets:
                    last_seen_id = new_tweets[0]["id"]

    except Exception as e:
        print(f"Error: {e}")
        time.sleep(POLL_INTERVAL * 2)
        continue

    time.sleep(POLL_INTERVAL)

JavaScript

const API_KEY = "YOUR_API_KEY";
const USERNAME = "elonmusk";
const POLL_INTERVAL = 5000;

let lastSeenId = null;
console.log(`Monitoring @${USERNAME}...`);

while (true) {
  try {
    const resp = await fetch("https://api.sorsa.io/v3/user-tweets", {
      method: "POST",
      headers: { "ApiKey": API_KEY, "Content-Type": "application/json" },
      body: JSON.stringify({ link: `https://x.com/${USERNAME}` }),
    });
    if (!resp.ok) throw new Error(`HTTP ${resp.status}`);

    const tweets = (await resp.json()).tweets || [];

    if (tweets.length > 0) {
      if (lastSeenId === null) {
        lastSeenId = tweets[0].id;
        console.log(`Baseline set: ${lastSeenId}`);
      } else {
        const newTweets = tweets.filter((t) => t.id > lastSeenId);
        for (const t of [...newTweets].reverse()) {
          console.log(`[NEW] @${USERNAME}: ${t.full_text.slice(0, 140)}`);
        }
        if (newTweets.length) lastSeenId = newTweets[0].id;
      }
    }
  } catch (err) {
    console.error(`Error: ${err.message}`);
    await new Promise((r) => setTimeout(r, POLL_INTERVAL * 2));
    continue;
  }
  await new Promise((r) => setTimeout(r, POLL_INTERVAL));
}
This works, but it scales poorly. Monitoring 50 accounts means 50 separate polling loops and 50x the API requests. That is where X Lists come in.

Level 2: Monitor Many Accounts with a Single Request

X Lists let you group up to 5,000 accounts and fetch their combined activity in one API call via /list-tweets. This is the most efficient approach for multi-account monitoring and the pattern you should default to in production. For more on working with Lists, see Lists & Communities.

Step 1: Create a Public X List

  1. Go to X Lists and create a new list.
  2. Add the accounts you want to monitor.
  3. Make sure the list is Public - private lists cannot be accessed via the API.
  4. Copy the List ID from the URL (e.g., in https://x.com/i/lists/1234567890, the ID is 1234567890).

Step 2: Poll the List

import requests
import time

API_KEY = "YOUR_API_KEY"
LIST_ID = "YOUR_LIST_ID"
POLL_INTERVAL = 5

URL = f"https://api.sorsa.io/v3/list-tweets?list_id={LIST_ID}"
HEADERS = {"ApiKey": API_KEY, "Accept": "application/json"}


def monitor_list(callback):
    """Poll an X List and call `callback` for each new tweet detected."""
    last_seen_id = None
    print(f"Monitoring List {LIST_ID} (interval: {POLL_INTERVAL}s)")

    while True:
        try:
            resp = requests.get(URL, headers=HEADERS)
            resp.raise_for_status()
            tweets = resp.json().get("tweets", [])

            if not tweets:
                time.sleep(POLL_INTERVAL)
                continue

            if last_seen_id is None:
                last_seen_id = tweets[0]["id"]
                print(f"Baseline set: {last_seen_id}")
            else:
                new_tweets = [t for t in tweets if t["id"] > last_seen_id]
                if new_tweets:
                    for tweet in reversed(new_tweets):
                        callback(tweet)
                    last_seen_id = new_tweets[0]["id"]

        except requests.exceptions.RequestException as e:
            print(f"Request error: {e}. Retrying in {POLL_INTERVAL * 2}s...")
            time.sleep(POLL_INTERVAL * 2)
            continue

        time.sleep(POLL_INTERVAL)


def on_new_tweet(tweet):
    user = tweet["user"]
    print(f"[NEW] @{user['username']}: {tweet['full_text'][:120]}")
    print(f"       Likes: {tweet.get('likes_count', 0)} | "
          f"RTs: {tweet.get('retweet_count', 0)} | "
          f"Views: {tweet.get('view_count', 'N/A')}\n")


if __name__ == "__main__":
    monitor_list(on_new_tweet)
The efficiency gain is massive. Monitoring 50 accounts individually at 10-second intervals costs 50 x 8,640 = 432,000 requests/day. Putting those same 50 accounts into one X List and polling /list-tweets costs 8,640 requests/day - a 50x reduction. For more cost optimization patterns like this, see Optimizing API Usage.

Level 3: Monitor a Keyword or Hashtag

Instead of tracking specific accounts, you can monitor a keyword, hashtag, or complex search query in real time. This uses /search-tweets with order: "latest" for chronological results.
import requests
import time

API_KEY = "YOUR_API_KEY"
QUERY = '"your brand" OR @yourbrand lang:en'
POLL_INTERVAL = 10

URL = "https://api.sorsa.io/v3/search-tweets"
HEADERS = {"ApiKey": API_KEY, "Content-Type": "application/json"}


def monitor_keyword(query, callback, interval=10):
    last_seen_id = None
    print(f"Monitoring: {query} (interval: {interval}s)")

    while True:
        try:
            resp = requests.post(URL, headers=HEADERS, json={"query": query, "order": "latest"})
            resp.raise_for_status()
            tweets = resp.json().get("tweets", [])

            if tweets:
                if last_seen_id is None:
                    last_seen_id = tweets[0]["id"]
                    print(f"Baseline set: {last_seen_id}")
                else:
                    new_tweets = [t for t in tweets if t["id"] > last_seen_id]
                    for tweet in reversed(new_tweets):
                        callback(tweet)
                    if new_tweets:
                        last_seen_id = new_tweets[0]["id"]

        except Exception as e:
            print(f"Error: {e}")
            time.sleep(interval * 2)
            continue

        time.sleep(interval)


monitor_keyword(QUERY, on_new_tweet, interval=10)
You can use any search operator in the query string. For example, to monitor only high-engagement mentions of your brand in English, excluding retweets:
monitor_keyword('"your brand" min_faves:10 lang:en -filter:retweets', on_new_tweet)

Routing New Tweets to Slack, Discord, or Webhooks

The monitoring loop detects new tweets; the callback decides what to do with them. Here is a callback that forwards each tweet to a Slack channel via an Incoming Webhook:
import requests

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"

def send_to_slack(tweet):
    user = tweet["user"]
    text = (
        f"*New tweet from @{user['username']}*\n"
        f"{tweet['full_text']}\n"
        f"Likes: {tweet.get('likes_count', 0)} | "
        f"RTs: {tweet.get('retweet_count', 0)} | "
        f"Views: {tweet.get('view_count', 'N/A')}\n"
        f"https://x.com/{user['username']}/status/{tweet['id']}"
    )
    requests.post(SLACK_WEBHOOK_URL, json={"text": text})


# Plug it into any monitor:
monitor_list(send_to_slack)
# or: monitor_keyword("bitcoin lang:en min_faves:50", send_to_slack)
The same pattern works for Discord (different JSON format), Telegram (Bot API call), or any HTTP endpoint your system exposes. You are effectively building your own webhook relay - Sorsa provides the data, your script routes it.

API Usage Calculator

Polling uses one request per cycle. Before choosing your interval, consider the monthly cost:
IntervalReq/HourReq/DayReq/Month (30d)
1 second3,60086,4002,592,000
5 seconds72017,280518,400
10 seconds3608,640259,200
30 seconds1202,88086,400
1 minute601,44043,200
For most social listening and brand monitoring, polling every 10-30 seconds is sufficient - you catch any tweet within half a minute of posting. Reserve 1-5 second intervals for financial signal detection or breaking news bots. See Pricing to estimate your monthly cost based on these numbers.
For high-volume monitoring needs that exceed standard plan limits, contact us at [email protected] or on Discord for custom enterprise quotas and volume discounts.

Production Hardening

The examples above work for development and testing. For production, address these three concerns:

1. Persist last_seen_id Across Restarts

If your script crashes and restarts without remembering its checkpoint, it either reprocesses old tweets (duplicate alerts) or misses the gap entirely. Store the last seen ID in a file, database, or Redis:
import json
import os

STATE_FILE = "monitor_state.json"

def load_state():
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE) as f:
            return json.load(f).get("last_seen_id")
    return None

def save_state(last_seen_id):
    with open(STATE_FILE, "w") as f:
        json.dump({"last_seen_id": last_seen_id}, f)

2. Implement Exponential Backoff for Errors

Network issues, rate limits (HTTP 429), and temporary API errors will happen. Instead of retrying immediately, back off gradually. For the full error code reference, see Error Codes.
retry_delay = POLL_INTERVAL

while True:
    try:
        resp = requests.get(URL, headers=HEADERS)
        if resp.status_code == 429:
            print("Rate limited. Backing off...")
            retry_delay = min(retry_delay * 2, 60)
            time.sleep(retry_delay)
            continue
        resp.raise_for_status()
        retry_delay = POLL_INTERVAL  # Reset on success
        # ... process tweets ...
    except Exception as e:
        print(f"Error: {e}")
        retry_delay = min(retry_delay * 2, 60)
        time.sleep(retry_delay)
        continue

    time.sleep(POLL_INTERVAL)

3. Separate Polling from Processing

Do not run expensive operations (NLP, database writes, external API calls) synchronously inside the polling loop. If a downstream system is slow, your polling loop falls behind schedule. Instead, push new tweets into a queue and process them in a separate worker:
from collections import deque
import threading

tweet_queue = deque()

def polling_loop():
    """Fast loop: poll and enqueue. No heavy processing here."""
    # ... standard polling code ...
    # Instead of calling callback(tweet), do:
    tweet_queue.append(tweet)

def processing_worker():
    """Separate thread/process: dequeue and handle."""
    while True:
        if tweet_queue:
            tweet = tweet_queue.popleft()
            send_to_slack(tweet)      # or any expensive operation
            save_to_database(tweet)
        else:
            time.sleep(0.1)

threading.Thread(target=processing_worker, daemon=True).start()
polling_loop()
For heavier workloads, replace the in-memory deque with Redis, RabbitMQ, or any message broker your stack already uses.

4. Monitor Your Monitor

In production, log each poll cycle (timestamp, new tweet count, errors). Set up an alert if the monitor has not completed a successful poll in the last N minutes - this catches silent failures before they become data gaps. You can check the API’s operational status at any time on the Sorsa Status Page.

Next Steps

  • Search Operators - use advanced filters to reduce noise in keyword-based monitoring.
  • Track Mentions - dedicated endpoint for tracking @mentions with engagement filters.
  • Rate Limits - handling 429 errors and optimizing request patterns.
  • Pagination - backfill historical data alongside real-time monitoring.
  • API Reference - full specification for /list-tweets, /user-tweets, /search-tweets, and all Sorsa API endpoints.