Trend Intelligence System is a full-stack, distributed engine capable of dynamically measuring and predicting real-time global trends. By leveraging an event-driven architecture with high-speed caching and background Machine Learning workflows, the system calculates the Sentiment, Velocity, and Momentum of topics parsed from three complementary sources: Reddit (social sentiment), NewsAPI (authoritative fresh news), and HackerNews (tech community discourse).
We utilize an orchestrated blend of real-time caching, asynchronous workers, and heavy machine learning algorithms precisely tuned to isolate trends safely from social noise.
- FastAPI / Uvicorn: Orchestrates the core ASGI REST APIs. Processes all incoming queries seamlessly via non-blocking asynchronous requests (
httpx). - Nginx API Gateway: Acts as the primary entrance node (
:8080), seamlessly tunneling external traffic down to the internal FastAPI node while applying strictly enforced connection limits. - SQLAlchemy ORM: Secures database insertions, managing bulk uploads from the data pipelines safely.
- sentence-transformers (
all-MiniLM-L6-v2): HuggingFace transformers mathematically translate plain sentences into massive 384-dimensional dense vectors to uncover underlying similarities beyond exact keyword matches. - scikit-learn (Agglomerative / KMeans): Groups vectorized thoughts into clusters structurally, mathematically separating distinct world trends from each other.
- scikit-learn (TF-IDF Vectorizer): Responsible for labeling clustered posts into 5 human-readable keywords.
- spaCy (
en_core_web_sm): Runs Named Entity Recognition (NER) to isolate locations/regions, dynamically feeding state-level tags into global posts for regional UI routing. - NLTK (VADER): Computes precise positive, negative, and neutral fractional metrics from uncleaned web chatter.
- PostgreSQL: Reliable structured warehouse containing
reddit_trends(unified input from Reddit + NewsAPI + HackerNews) andml_trend_results(fully computed topic structures). - Redis & Native Custom Worker: Completely decodes request overhead safely, routing complex ML pipeline lookups to background workers (Windows compatible via
brpop) while caching (TTL: 60s) instant fallback predictions. - Docker Compose: Streamlines booting the Gateway, PostgreSQL, and Redis in unison.
- React & Vite: Ultra-fast hot-module reloading rendering fully modular architectures (
TrendCard,Graph, etc.) with beautiful micro-animations for an impactful, native-app feel.
trend-intelligence-system/
βββ backend/ # FastAPI ASGI Backend
β βββ app/ # Main Application logic (Routes, Services, Models)
β βββ worker.py # Redis Queue Background Daemon
βββ data_pipeline/ # Extract, Transform, Load (ETL) logic
β βββ collectors/ # Data ingestion for Reddit, NewsAPI, and HackerNews
β βββ loaders/ # PostgreSQL database sync logic
β βββ processors/ # Regex cleaning and text normalization
β βββ schedulers/ # cron_jobs.py hourly triggers
βββ database/ # Structured Persistence schemas
β βββ postgres/ # schema.sql database configuration
βββ frontend/ # React + Vite UI Dashboard
βββ ml_engine/ # Core AI Engine (PyTorch, scikit-learn, NLTK)
β βββ pipelines/ # ml_runner.py main execution orchestrator
β βββ preprocessing/ # Unified text standardizations
β βββ region_detection/ # spaCy-based location tag extraction
β βββ sentiment/ # NLTK VADER emotional analytics
β βββ topic_modeling/ # KMeans, TF-IDF, HuggingFace sentence-transformers
β βββ trend_detection/ # Mathematical velocity/acceleration composite scoring
βββ scripts/ # Helper CLI utilities like clear_db.py
βββ docker-compose.yml # Multi-container cluster boot configuration
βββ RUN_GUIDE.md # Startup workflow instructions
βββ DATABASE.md # Schema reference and Redis lookup documentation
βββ README.md # System overview and architecture details
Follow these exact steps to run the complete environment (Databases, Redis, Nginx, ML queue, API, and Frontend).
- Docker Desktop
- Python 3.10+
- Node.js 18+ & npm
Create a .env file at the root folder of the project:
# PostgreSQL DB config
DB_USER=postgres
DB_PASSWORD=your_password
DB_HOST=127.0.0.1
DB_PORT=5433
DB_NAME=reddit_db
# NewsAPI (primary topic discovery source)
# Get a free key at https://newsapi.org/register
NEWS_API_KEY=your_newsapi_key_hereNo Reddit OAuth credentials are needed. Reddit is used with its public JSON API for sentiment signals only.
docker-compose up -d(Verify Postgres, Redis, and Nginx containers launch via docker ps)
python -m venv venv
.\venv\Scripts\activate
pip install -r backend/requirements.txt
pip install -r req-dev.txt
python -m spacy download en_core_web_smTerminal 2 (cron_jobs.py ETL):
.\venv\Scripts\activate
python data_pipeline\schedulers\cron_jobs.pyTerminal 3 (FastAPI Server):
.\venv\Scripts\activate
uvicorn app.main:app --app-dir backend --reload --host 127.0.0.1 --port 8000Terminal 4 (Windows-Compatible Redis Worker):
.\venv\Scripts\activate
python backend/worker.pyTerminal 5 (Vite):
cd frontend
npm install
npm run devVisit http://localhost:5173 to experience the system instantly.
graph TD
%% ββ Colour palette ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
classDef ui fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef gateway fill:#1e1e2e,stroke:#444,color:#fff;
classDef backend fill:#10b981,stroke:#065f46,color:#fff;
classDef service fill:#059669,stroke:#064e3b,color:#fff;
classDef etl fill:#f59e0b,stroke:#b45309,color:#000;
classDef ml fill:#8b5cf6,stroke:#4c1d95,color:#fff;
classDef db fill:#ef4444,stroke:#7f1d1d,color:#fff;
classDef cache fill:#f97316,stroke:#c2410c,color:#fff;
classDef external fill:#6b7280,stroke:#374151,color:#fff;
classDef worker fill:#a21caf,stroke:#701a75,color:#fff;
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 1 β FRONTEND (React + Vite)
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
subgraph Frontend["π¨ Frontend (React + Vite : localhost:5173)"]
direction LR
P_Search["π Search.jsx\n/search"]:::ui
P_Global["π GlobalTrends.jsx\n/global-trends"]:::ui
P_India["π IndiaTrends.jsx\n/india-trends"]:::ui
C_SearchBar["π SearchBar"]:::ui
C_TrendCard["π TrendCard"]:::ui
C_Graph["π Graph (Recharts)"]:::ui
C_NewsFeed["π° NewsFeed"]:::ui
C_Dropdown["π StateDropdown"]:::ui
end
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 2 β API GATEWAY (Nginx)
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Gateway["π Nginx API Gateway\n(Docker : port 8080)\nProxies / β FastAPI :8000"]:::gateway
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 3 β BACKEND (FastAPI + Uvicorn)
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
subgraph Backend["β‘ Backend (FastAPI + Uvicorn : localhost:8000)"]
direction TB
Main["main.py\nApp entry-point & router mount"]:::backend
subgraph Routes["Routes Layer"]
R_Search["GET /search\n(routes/search.py)"]:::backend
R_Trends["GET /trends\n(routes/trends.py)"]:::backend
R_Region["GET /region\n(routes/region.py)"]:::backend
R_News["GET /news\n(routes/news.py)"]:::backend
R_Health["GET /health\n(routes/health.py)"]:::backend
end
subgraph Services["Services Layer"]
S_Search["search_service.py\nβ ML score lookup\nβ‘ Live VADER fallback\nβ’ Save Search to DB"]:::service
S_Trend["trend_service.py\nβ Query ml_trend_results\nβ‘ Rank by composite score"]:::service
S_Region["region_service.py\nβ State keyword map\nβ‘ Filter ml_trend_results\n by subreddits column"]:::service
S_News["nlp_summarizer.py\nβ Fetch NewsAPI articles\nβ‘ VADER summary score"]:::service
end
end
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 4 β BACKGROUND WORKER (Custom Redis Daemon)
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Worker["π€ worker.py\nBlocking BRPOP on 'search_queue'\nRuns full ML TrendPipeline\nWrites result β PostgreSQL"]:::worker
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 5 β ETL DATA PIPELINE (Scheduled, hourly)
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
subgraph ETL["π Hybrid ETL Pipeline (cron_jobs.py β runs hourly)"]
direction TB
subgraph Ph1["Phase 1 β Reddit Sentiment"]
direction LR
Collector["reddit_collector.py\n5 subreddits, hot+new\nExponential backoff\nDedup by post_id"]:::etl
Cleaner["raw_to_clean.py\nRegex: strip URLs / emojis\nNormalise casing"]:::etl
Loader["db_loader.py\nDataLoader.load_to_postgres()\nUpsert ON CONFLICT post_id"]:::etl
end
subgraph Ph2["Phase 2 β NewsAPI Topics"]
NewsCol["news_collector.py\nHeadlines + topic search\nDirect to DB"]:::etl
end
subgraph Ph3["Phase 3 β HackerNews Tech"]
HNCol["hacker_news_collector.py\nTop + new stories\nDirect to DB"]:::etl
end
end
Collector --> Cleaner --> Loader
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 6 β ML ENGINE (Triggered after ETL)
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
subgraph MLEngine["π§ ML Engine (TrendPipeline β called by ml_runner.py)"]
direction TB
ML_Runner["ml_runner.py\nOrchestrator β reads reddit_trends\nthen runs full pipeline"]:::ml
Sentiment["sentiment/\nNLTK VADER\nCompound score β1 β +1\npos / neu / neg breakdown"]:::ml
Embed["topic_modeling/\nsentence-transformers\nall-MiniLM-L6-v2\n384-d dense vectors"]:::ml
Cluster["topic_modeling/\nscikit-learn KMeans\nSemantic grouping"]:::ml
Topic["topic_modeling/\nTF-IDF Vectorizer\nTop-5 keywords per cluster"]:::ml
Score["trend_detection/\nComposite score formula\n0.35Β·Vol + 0.30Β·Vel\n+0.20Β·Acc + 0.15Β·Sent"]:::ml
end
ML_Runner --> Sentiment --> Embed --> Cluster --> Topic --> Score
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% LAYER 7 β STORAGE
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
subgraph Storage["ποΈ Storage (Docker-managed)"]
PG_Raw[("PostgreSQL\nreddit_trends\npost_id Β· title Β· content\nups Β· subreddit Β· created_utc")]:::db
PG_ML[("PostgreSQL\nml_trend_results\ntopic_id Β· keywords Β· score\nsentiment Β· velocity Β· subreddits")]:::db
PG_Search[("PostgreSQL\nsearches\nquery Β· trend_score Β· region")]:::db
Redis_Queue[("Redis :6379\nList: search_queue\n(LPUSH / BRPOP queue)")]:::cache
Redis_Cache[("Redis :6379\nCache: search results\n(GET / SETEX)")]:::cache
end
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% EXTERNAL APIs
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
subgraph External["π External APIs"]
API_Reddit["Reddit JSON API\n(5 subreddits, sentiment only)\nhot+new, exponential backoff"]:::external
API_News["NewsAPI.org\nPrimary topic discovery\n(free tier, NEWS_API_KEY)"]:::external
API_HN["HackerNews API\n(Firebase + Algolia)\nNo auth, no rate limits"]:::external
end
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% CONNECTIONS
%% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
%% β Frontend β Gateway
P_Search -->|"GET /search"| Gateway
P_Global -->|"GET /trends"| Gateway
P_India -->|"GET /region?state=X"| Gateway
P_India -->|"GET /news?state=X"| Gateway
%% β Gateway β Backend
Gateway -->|"Proxy β :8000"| Main
Main --> R_Search & R_Trends & R_Region & R_News & R_Health
%% β Routes β Services
R_Search -->|"search_logic(query)"| S_Search
R_Trends -->|"get_trends(limit)"| S_Trend
R_Region -->|"get_region_trends(state)"| S_Region
R_News -->|"fetch + summarise"| S_News
%% β Search Service dual path
S_Search -->|"β Lookup ML keywords"| PG_ML
S_Search -->|"β‘ Cache miss β LPUSH query"| Redis_Queue
S_Search -->|"β‘ Parallel live fallback\nhttpx β VADER score"| API_Reddit
S_Search -->|"β‘ NewsAPI fallback\n(if Reddit < 5 results)"| API_News
S_Search -->|"β’ Save Search record"| PG_Search
S_Search -->|"Cache hit β return"| Redis_Cache
%% β Trend & Region Services
S_Trend -->|"MAX(run_at) + ORDER BY score"| PG_ML
S_Region -->|"Filter subreddits ILIKE state"| PG_ML
%% β News Service
S_News -->|"Fetch articles"| API_News
%% β Redis Queue β Worker
Redis_Queue -->|"BRPOP (blocking pop)"| Worker
Worker -->|"Extensive NLP & Clustering\nWrite MLTrendResult row"| PG_ML
%% β ETL Pipeline
Collector -->|"5 subreddits, hot+new"| API_Reddit
Loader -->|"Upsert ON CONFLICT post_id"| PG_Raw
NewsCol -->|"Headlines + topics"| API_News
NewsCol -->|"Direct UPSERT"| PG_Raw
HNCol -->|"Top + new stories"| API_HN
HNCol -->|"Direct UPSERT"| PG_Raw
%% β ML Engine trigger
Loader -->|"On ETL success\ntriggers ml_runner.py"| ML_Runner
ML_Runner -->|"Reads all sources combined"| PG_Raw
Score -->|"Writes analysed clusters"| PG_ML
flowchart TD
classDef ui fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef gateway fill:#1e1e2e,stroke:#555,color:#fff;
classDef backend fill:#10b981,stroke:#065f46,color:#fff;
classDef store fill:#ef4444,stroke:#7f1d1d,color:#fff;
classDef cache fill:#f97316,stroke:#c2410c,color:#fff;
classDef worker fill:#a21caf,stroke:#701a75,color:#fff;
classDef external fill:#6b7280,stroke:#374151,color:#fff;
USER["π€ User types query\nin Search.jsx"]:::ui
USER -->|"GET /search?q=X"| GW["π Nginx Gateway :8080"]:::gateway
GW -->|"Proxy β :8000"| API["β‘ FastAPI\nroutes/search.py"]:::backend
API --> SS["search_service.py\nsearch_logic(query)"]:::backend
%% Branch A: ML cache hit
SS -->|"β _lookup_ml_score()\nSELECT ml_trend_results\nWHERE keywords ILIKE '%word%'"| PG_ML[("π PostgreSQL\nml_trend_results")]:::store
PG_ML -->|"score > 0 β use it"| SS
%% Branch B: ML miss β live VADER
SS -->|"β‘ score == 0\nCache miss β live fallback"| VADER["_live_vader_fallback()\nhttpx async GET Reddit /search.json\nlimit=50, sort=new"]:::backend
VADER -->|"HTTPS request"| RDT["π Reddit JSON API"]:::external
RDT -->|"up to 50 posts"| VADER
VADER -->|"< 5 Reddit results?\nNewsAPI fallback"| NEWS_API["π NewsAPI.org\npageSize=100, sortBy=publishedAt"]:::external
NEWS_API -->|"articles JSON"| VADER
VADER -->|"NLTK VADER compound score\nβ fast_score formula"| SS
%% Enqueue worker job
SS -->|"β’ LPUSH query β search_queue"| REDIS_Q[("Redis\nList: search_queue")]:::cache
%% Save search record
SS -->|"β£ INSERT INTO searches\n(query, trend_score, region='Global')"| PG_SEARCH[("π PostgreSQL\nsearches\nquery Β· trend_score Β· region")]:::store
%% Return to user
SS -->|"β€ Return JSON\n{query, trend_score}"| API
API --> GW --> USER
%% Worker daemon
REDIS_Q -->|"BRPOP (blocking)\npops query string"| WORKER["π€ worker.py (Custom Daemon)\nβ NewsAPI /everything (primary)\nβ‘ HackerNews Algolia (fallback)\nβ’ VADER + MiniLM + KMeans NLP\nβ£ TF-IDF extracts Topic Labels"]:::worker
WORKER -->|"INSERT MLTrendResult\nrun_at = now() Β· subreddits=LIVE_SEARCH|source"| PG_ML
flowchart TD
classDef process fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef model fill:#8b5cf6,stroke:#4c1d95,color:#fff;
classDef data fill:#f59e0b,stroke:#b45309,color:#000;
classDef math fill:#10b981,stroke:#065f46,color:#fff;
RAW["π Raw Texts\nTitles & Content"]:::data
META["π Metadata\nUpvotes, Subreddits, Dates"]:::data
RAW --> PREPROC["π§Ή PreprocessingPipeline\nRegex: Remove URLs, Emojis, Special Chars"]:::process
PREPROC --> NER["π RegionService (spaCy)\n`en_core_web_sm`\nExtracts Localities & States"]:::model
NER -->|"Detects Indian States"| META_UPDATE["Inject State into Subreddits"]:::process
META --> META_UPDATE
META_UPDATE --> AGG["Data Assembly"]:::process
PREPROC --> VADER["π SentimentInference (NLTK)\nVADER Lexicon\nLabels: Pos/Neu/Neg & Score (-1 to 1)"]:::model
VADER --> AGG
PREPROC --> EMBED["π§ EmbeddingModel\n`sentence-transformers/all-MiniLM-L6-v2`\nTransforms text into 384-dimensional vectors"]:::model
EMBED --> CLUSTER["π§© ClusterModel\nAgglomerative Clustering / KMeans\nGroups similar vectors semantically"]:::model
CLUSTER --> TFIDF["π·οΈ TopicLabeler (scikit-learn)\nTF-IDF Vectorizer\nFinds top 5 keywords per Cluster"]:::model
TFIDF --> AGG
AGG --> SCORE["π TrendScorer\nAggregates Meta/NLP per Topic ID (Min 3 posts)"]:::math
SCORE -->|Current vs Previous Counts| VEL["π VelocityCalculator"]:::math
SCORE -->|Current vs Previous Velocity| ACC["β‘ AccelerationCalculator"]:::math
VEL & ACC --> FINAL_FORMULA["π Final Composite Score Formula: \n(0.35 * Volume) + (0.30 * Velocity) + \n(0.20 * Accel) + (0.15 * Sentiment)"]:::math
FINAL_FORMULA --> OUTPUT["π Top 20 Ranked Trends\nStructured JSON payload arrays"]:::data
flowchart TD
classDef process fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef ext fill:#6b7280,stroke:#374151,color:#fff;
classDef cache fill:#f97316,stroke:#c2410c,color:#fff;
classDef db fill:#ef4444,stroke:#7f1d1d,color:#fff;
classDef ml fill:#8b5cf6,stroke:#4c1d95,color:#fff;
REDIS[("Redis\nsearch_queue")]:::cache
WORK_LOOP["Worker Daemon Loop\n(brpop)"]:::process
REDIS -->|"User Query String (e.g. 'Tesla')"| WORK_LOOP
subgraph Worker ["π€ worker.py Architecture"]
direction TB
QUERY["Extract Query"]:::process
NEWS_API["π fetch_newsapi_posts(query)\nNewsAPI /everything (Primary)"]:::ext
HN_API["π fetch_hackernews_posts(query)\nAlgolia API (Fallback max 20)"]:::ext
QUERY -->|"Request live articles"| NEWS_API
NEWS_API -->|"< 5 results?"| HN_API
NEWS_API -->|"Combine text & meta"| MERGE
HN_API -->|"Combine text & meta"| MERGE
MERGE["π Standardize Raw Texts\nExtract Upvotes, Comments, Source"]:::process
PIPELINE["π§ TrendPipeline.run()\n1. VADER Sentiment Analysis\n2. MiniLM Embeddings\n3. KMeans Clustering\n4. TF-IDF Top Keywords"]:::ml
MERGE -->|"Inject into ML Pipeline"| PIPELINE
FORMAT["π¦ Construct MLTrendResult\nSet run_at = now()\nTag subreddits = 'LIVE_SEARCH | source'"]:::process
PIPELINE -->|"Return clustered topics"| FORMAT
end
WORK_LOOP --> Worker
POSTGRES[("π PostgreSQL\nml_trend_results_table")]:::db
FORMAT -->|"SQLAlchemy Session.commit()"| POSTGRES