Skip to main content

Real-Time Monitoring

Detect new tweets from specific accounts, track keyword mentions as they happen, and feed live X data into your application. This guide shows how to build a near-real-time monitoring pipeline on top of the Sorsa API using a pull-based polling pattern. Sorsa returns fresh data on every request. If a tweet was posted half a second ago, the next API call will include it. Combined with a 20 requests-per-second rate limit and response times around 300ms, polling-based loops match or beat most managed streaming services for the use cases below, without persistent connections, reconnect logic, or OAuth.
Note: For a fuller walkthrough with additional architecture patterns and end-to-end examples, see Real-Time Twitter Monitoring with REST API on the blog.

How Polling-Based Monitoring Works

Two ways to get data from a social platform: push-based (streaming, webhooks) or pull-based (polling). The Sorsa API uses polling. The pattern has four steps:
  1. Poll an endpoint at a regular interval (1 to 30 seconds).
  2. Compare results against previously seen tweet IDs to identify what is new.
  3. Process new tweets: send alerts, store, fan out to Slack, Discord, etc.
  4. Repeat.
Tweet IDs (Snowflake) are monotonically increasing, so deduplication is reliable: a higher ID always means a newer tweet. If a script crashes, it resumes from the last checkpoint on the next poll. No persistent connection to manage, no reconnect strategy required.

Choosing the Right Endpoint

What you want to monitorEndpointMethodWhy this one
A single account/user-tweetsPOSTReturns the latest tweets from one user’s timeline
Up to 5,000 accounts at once/list-tweetsGETOne request covers all members of an X List
A keyword or hashtag/search-tweetsPOSTFull search operator support, chronological ordering with order: latest
@mentions of an account/mentionsPOSTPurpose-built for mention tracking with engagement filters

Level 1: Monitor a Single Account

The simplest case. The loop polls /user-tweets and emits tweets newer than the last seen ID.

Python

import requests
import time

API_KEY = "YOUR_API_KEY"
USERNAME = "elonmusk"
POLL_INTERVAL = 5  # seconds

URL = "https://api.sorsa.io/v3/user-tweets"
HEADERS = {"ApiKey": API_KEY, "Content-Type": "application/json"}

last_seen_id = None

print(f"Monitoring @{USERNAME}...")

while True:
    try:
        resp = requests.post(URL, headers=HEADERS, json={"link": f"https://x.com/{USERNAME}"})
        resp.raise_for_status()
        tweets = resp.json().get("tweets", [])

        if tweets:
            # Snowflake IDs arrive as strings. Cast to int for safe comparison.
            top_id = int(tweets[0]["id"])

            if last_seen_id is None:
                last_seen_id = top_id
                print(f"Baseline set: {last_seen_id}")
            else:
                new_tweets = [t for t in tweets if int(t["id"]) > last_seen_id]
                for tweet in reversed(new_tweets):  # oldest first
                    print(f"[NEW] @{USERNAME}: {tweet['full_text'][:140]}")
                if new_tweets:
                    last_seen_id = top_id

    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        time.sleep(POLL_INTERVAL * 2)
        continue

    time.sleep(POLL_INTERVAL)

JavaScript

const API_KEY = "YOUR_API_KEY";
const USERNAME = "elonmusk";
const POLL_INTERVAL = 5000;

let lastSeenId = null;
console.log(`Monitoring @${USERNAME}...`);

while (true) {
  try {
    const resp = await fetch("https://api.sorsa.io/v3/user-tweets", {
      method: "POST",
      headers: { "ApiKey": API_KEY, "Content-Type": "application/json" },
      body: JSON.stringify({ link: `https://x.com/${USERNAME}` }),
    });
    if (!resp.ok) throw new Error(`HTTP ${resp.status}`);

    const tweets = (await resp.json()).tweets || [];

    if (tweets.length > 0) {
      // BigInt avoids precision loss on 64-bit Snowflake IDs.
      const topId = BigInt(tweets[0].id);

      if (lastSeenId === null) {
        lastSeenId = topId;
        console.log(`Baseline set: ${lastSeenId}`);
      } else {
        const newTweets = tweets.filter((t) => BigInt(t.id) > lastSeenId);
        for (const t of [...newTweets].reverse()) {
          console.log(`[NEW] @${USERNAME}: ${t.full_text.slice(0, 140)}`);
        }
        if (newTweets.length) lastSeenId = topId;
      }
    }
  } catch (err) {
    console.error(`Error: ${err.message}`);
    await new Promise((r) => setTimeout(r, POLL_INTERVAL * 2));
    continue;
  }
  await new Promise((r) => setTimeout(r, POLL_INTERVAL));
}
This scales poorly for many accounts. Monitoring 50 accounts means 50 separate loops and 50x the API requests. That is where X Lists come in.

Level 2: Monitor Many Accounts with a Single Request

X Lists group up to 5,000 accounts. /list-tweets returns the merged latest tweets across all members in one API call. This is the default pattern for production multi-account monitoring. See Lists & Communities for more.

Step 1: Create a Public X List

  1. Go to X Lists and create a list.
  2. Add the accounts to monitor (up to 5,000).
  3. Set the list to Public. Private lists are not accessible via the API.
  4. Copy the List ID from the URL. For https://x.com/i/lists/1234567890 the ID is 1234567890.

Step 2: Poll the List

import requests
import time

API_KEY = "YOUR_API_KEY"
LIST_ID = "YOUR_LIST_ID"
POLL_INTERVAL = 5

URL = f"https://api.sorsa.io/v3/list-tweets?list_id={LIST_ID}"
HEADERS = {"ApiKey": API_KEY, "Accept": "application/json"}


def monitor_list(callback, interval=POLL_INTERVAL):
    """Poll an X List and call `callback` for each new tweet detected."""
    last_seen_id = None
    print(f"Monitoring List {LIST_ID} (interval: {interval}s)")

    while True:
        try:
            resp = requests.get(URL, headers=HEADERS, timeout=10)
            resp.raise_for_status()
            tweets = resp.json().get("tweets", [])

            if not tweets:
                time.sleep(interval)
                continue

            top_id = int(tweets[0]["id"])

            if last_seen_id is None:
                last_seen_id = top_id
                print(f"Baseline set: {last_seen_id}")
            else:
                new_tweets = [t for t in tweets if int(t["id"]) > last_seen_id]
                if new_tweets:
                    for tweet in reversed(new_tweets):
                        callback(tweet)
                    last_seen_id = top_id

        except requests.exceptions.RequestException as e:
            print(f"Request error: {e}. Retrying in {interval * 2}s")
            time.sleep(interval * 2)
            continue

        time.sleep(interval)


def on_new_tweet(tweet):
    user = tweet["user"]
    print(f"[NEW] @{user['username']}: {tweet['full_text'][:120]}")
    print(
        f"       Likes: {tweet.get('likes_count', 0)} | "
        f"RTs: {tweet.get('retweet_count', 0)} | "
        f"Views: {tweet.get('view_count', 'N/A')}\n"
    )


if __name__ == "__main__":
    monitor_list(on_new_tweet)
Efficiency gain. Polling 50 accounts individually at a 10-second interval costs 50 * 8,640 = 432,000 requests per day. The same 50 accounts in one List polled at 10 seconds costs 8,640 requests per day. A 50x reduction. For more patterns like this, see Optimizing API Usage.
/list-tweets returns up to 20 tweets per page. If list members tweet faster than that within one poll interval, drop the interval to 2 to 3 seconds, or paginate via next_cursor until reaching a previously seen ID.

Level 3: Monitor a Keyword or Hashtag

Instead of tracking accounts, poll /search-tweets with order: "latest" for chronological results matching a query.
import requests
import time

API_KEY = "YOUR_API_KEY"
QUERY = '"your brand" OR @yourbrand lang:en'
POLL_INTERVAL = 10

URL = "https://api.sorsa.io/v3/search-tweets"
HEADERS = {"ApiKey": API_KEY, "Content-Type": "application/json"}


def monitor_keyword(query, callback, interval=10):
    last_seen_id = None
    print(f"Monitoring: {query} (interval: {interval}s)")

    while True:
        try:
            resp = requests.post(
                URL,
                headers=HEADERS,
                json={"query": query, "order": "latest"},
                timeout=10,
            )
            resp.raise_for_status()
            tweets = resp.json().get("tweets", [])

            if tweets:
                top_id = int(tweets[0]["id"])
                if last_seen_id is None:
                    last_seen_id = top_id
                    print(f"Baseline set: {last_seen_id}")
                else:
                    new_tweets = [t for t in tweets if int(t["id"]) > last_seen_id]
                    for tweet in reversed(new_tweets):
                        callback(tweet)
                    if new_tweets:
                        last_seen_id = top_id

        except requests.exceptions.RequestException as e:
            print(f"Error: {e}")
            time.sleep(interval * 2)
            continue

        time.sleep(interval)


monitor_keyword(QUERY, on_new_tweet, interval=10)
Any search operator works in the query string. To monitor high-engagement English mentions of your brand and exclude retweets:
monitor_keyword('"your brand" min_faves:10 lang:en -filter:retweets', on_new_tweet)

Routing New Tweets to Slack, Discord, or Any HTTP Endpoint

The polling loop is the producer; the callback decides what happens to each new tweet. Because the callback is just a function, the same monitor can route to anything that speaks HTTP.

Slack via Incoming Webhook

import requests

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"


def send_to_slack(tweet):
    user = tweet["user"]
    text = (
        f"*New tweet from @{user['username']}*\n"
        f"{tweet['full_text']}\n"
        f"Likes: {tweet.get('likes_count', 0)} | "
        f"RTs: {tweet.get('retweet_count', 0)} | "
        f"Views: {tweet.get('view_count', 'N/A')}\n"
        f"https://x.com/{user['username']}/status/{tweet['id']}"
    )
    requests.post(SLACK_WEBHOOK_URL, json={"text": text})


# Plug into any monitor:
monitor_list(send_to_slack)
# or: monitor_keyword("bitcoin lang:en min_faves:50", send_to_slack)

Discord

DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/YOUR/WEBHOOK"


def send_to_discord(tweet):
    user = tweet["user"]
    content = (
        f"**@{user['username']}** just tweeted:\n"
        f"{tweet['full_text']}\n"
        f"https://x.com/{user['username']}/status/{tweet['id']}"
    )
    requests.post(DISCORD_WEBHOOK_URL, json={"content": content})

Telegram

TELEGRAM_BOT_TOKEN = "YOUR_BOT_TOKEN"
TELEGRAM_CHAT_ID = "YOUR_CHAT_ID"


def send_to_telegram(tweet):
    user = tweet["user"]
    text = (
        f"New tweet from @{user['username']}\n\n"
        f"{tweet['full_text']}\n\n"
        f"https://x.com/{user['username']}/status/{tweet['id']}"
    )
    requests.post(
        f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
        json={"chat_id": TELEGRAM_CHAT_ID, "text": text},
    )

Any Custom HTTP Endpoint

def send_to_internal_api(tweet):
    requests.post(
        "https://internal.example.com/events/twitter",
        json={
            "tweet_id": tweet["id"],
            "username": tweet["user"]["username"],
            "text": tweet["full_text"],
            "metrics": {
                "likes": tweet.get("likes_count", 0),
                "retweets": tweet.get("retweet_count", 0),
                "views": tweet.get("view_count", 0),
            },
            "url": f"https://x.com/{tweet['user']['username']}/status/{tweet['id']}",
        },
        headers={"Authorization": "Bearer YOUR_INTERNAL_TOKEN"},
        timeout=5,
    )

API Usage Calculator

Polling uses one request per cycle. Pick an interval that balances latency against monthly request volume.
IntervalReq/HourReq/DayReq/Month (30d)
1 second3,60086,4002,592,000
5 seconds72017,280518,400
10 seconds3608,640259,200
30 seconds1202,88086,400
1 minute601,44043,200
For most social listening and brand monitoring, 10 to 30 seconds is sufficient: any new tweet is detected within half a minute. Reserve 1 to 5 second intervals for financial signal detection or breaking-news pipelines. See Pricing to estimate monthly cost.
For volumes that exceed standard plan limits, contact [email protected] or Discord for a custom rate-limit quota.

Production Hardening

The examples above work for development. For production, address these five concerns.

1. Persist last_seen_id Across Restarts

If the script crashes and restarts without remembering its checkpoint, it either reprocesses old tweets (duplicate alerts) or silently skips the gap. Store the last seen ID in a file, database, or Redis.
import json
import os

STATE_FILE = "monitor_state.json"


def load_state():
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE) as f:
            return json.load(f).get("last_seen_id")
    return None


def save_state(last_seen_id):
    with open(STATE_FILE, "w") as f:
        json.dump({"last_seen_id": last_seen_id}, f)
Load on startup, save after every successful poll that updates the cursor.

2. Exponential Backoff for Errors

Network issues, rate limits (HTTP 429), and transient API errors will happen. Back off gradually with a cap rather than retrying immediately. See Error Codes for the full reference.
retry_delay = POLL_INTERVAL
MAX_DELAY = 60

while True:
    try:
        resp = requests.get(URL, headers=HEADERS, timeout=10)
        if resp.status_code == 429:
            print(f"Rate limited. Backing off {retry_delay}s")
            time.sleep(retry_delay)
            retry_delay = min(retry_delay * 2, MAX_DELAY)
            continue
        resp.raise_for_status()
        retry_delay = POLL_INTERVAL  # reset on success
        # process tweets
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        time.sleep(retry_delay)
        retry_delay = min(retry_delay * 2, MAX_DELAY)
        continue

    time.sleep(POLL_INTERVAL)

3. Separate Polling from Processing

Do not run expensive operations (NLP, database writes, external API calls) synchronously inside the polling loop. If a downstream system slows down, the loop falls behind schedule. Push new tweets into a queue and process them in a separate worker.
from collections import deque
import threading

tweet_queue = deque()


def polling_loop():
    """Fast loop: poll and enqueue. No heavy work here."""
    # Standard polling code, but instead of calling callback(tweet):
    # tweet_queue.append(tweet)
    pass


def processing_worker():
    """Separate thread: dequeue and dispatch."""
    while True:
        if tweet_queue:
            tweet = tweet_queue.popleft()
            send_to_slack(tweet)
            save_to_database(tweet)
        else:
            time.sleep(0.1)


threading.Thread(target=processing_worker, daemon=True).start()
polling_loop()
For heavier workloads, replace the in-memory deque with Redis, RabbitMQ, SQS, or any message broker the stack already uses.

4. Monitor the Monitor

Log each poll cycle: timestamp, new tweet count, response time, errors. Alert if the monitor has not completed a successful poll in the last N minutes; silent failures cause invisible data gaps in alerting pipelines. Operational status of the API is available at the Sorsa Status Page.

5. Handle Edge Cases

  • Deleted tweets: if a tweet is deleted between fetch and callback, the URL will 404. Treat as expected.
  • Protected accounts: if a tracked user goes private, /user-tweets returns an empty list. Log and continue.
  • Pinned tweets: the first tweet in a /user-tweets response is often the pinned one, not the most recent. Sort by created_at for strict chronological order.
  • Retweets: tweet["retweeted_status"] is populated for retweets. Decide whether to include them or filter out.
  • Reply restrictions: is_replies_limited indicates the author restricted replies; useful signal for some monitoring use cases.

Next Steps

  • Search Operators: advanced filters to reduce noise in keyword-based monitoring.
  • Track Mentions: dedicated endpoint for @mentions with engagement filters.
  • Rate Limits: handling 429 errors and request patterns.
  • Pagination: backfill historical data alongside real-time monitoring.
  • API Reference: full specification for /list-tweets, /user-tweets, /search-tweets, and all Sorsa API endpoints.