Skip to content

devbyhimans/Trend_Intelligence

Repository files navigation

Trend Intelligence System

Trend Intelligence Architecture Sources

Trend Intelligence System is a full-stack, distributed engine capable of dynamically measuring and predicting real-time global trends. By leveraging an event-driven architecture with high-speed caching and background Machine Learning workflows, the system calculates the Sentiment, Velocity, and Momentum of topics parsed from three complementary sources: Reddit (social sentiment), NewsAPI (authoritative fresh news), and HackerNews (tech community discourse).


πŸ› οΈ Complete Technology Stack & Architecture

We utilize an orchestrated blend of real-time caching, asynchronous workers, and heavy machine learning algorithms precisely tuned to isolate trends safely from social noise.

1. Core Backend Layer

FastAPI Python Nginx

  • FastAPI / Uvicorn: Orchestrates the core ASGI REST APIs. Processes all incoming queries seamlessly via non-blocking asynchronous requests (httpx).
  • Nginx API Gateway: Acts as the primary entrance node (:8080), seamlessly tunneling external traffic down to the internal FastAPI node while applying strictly enforced connection limits.
  • SQLAlchemy ORM: Secures database insertions, managing bulk uploads from the data pipelines safely.

2. Machine Learning & NLP

PyTorch scikit-learn

  • sentence-transformers (all-MiniLM-L6-v2): HuggingFace transformers mathematically translate plain sentences into massive 384-dimensional dense vectors to uncover underlying similarities beyond exact keyword matches.
  • scikit-learn (Agglomerative / KMeans): Groups vectorized thoughts into clusters structurally, mathematically separating distinct world trends from each other.
  • scikit-learn (TF-IDF Vectorizer): Responsible for labeling clustered posts into 5 human-readable keywords.
  • spaCy (en_core_web_sm): Runs Named Entity Recognition (NER) to isolate locations/regions, dynamically feeding state-level tags into global posts for regional UI routing.
  • NLTK (VADER): Computes precise positive, negative, and neutral fractional metrics from uncleaned web chatter.

3. Infrastructure, Databases & Queues

PostgreSQL Redis Docker

  • PostgreSQL: Reliable structured warehouse containing reddit_trends (unified input from Reddit + NewsAPI + HackerNews) and ml_trend_results (fully computed topic structures).
  • Redis & Native Custom Worker: Completely decodes request overhead safely, routing complex ML pipeline lookups to background workers (Windows compatible via brpop) while caching (TTL: 60s) instant fallback predictions.
  • Docker Compose: Streamlines booting the Gateway, PostgreSQL, and Redis in unison.

4. Modern Frontend

React Vite

  • React & Vite: Ultra-fast hot-module reloading rendering fully modular architectures (TrendCard, Graph, etc.) with beautiful micro-animations for an impactful, native-app feel.

πŸ“‚ Project Structure

trend-intelligence-system/
β”œβ”€β”€ backend/                  # FastAPI ASGI Backend
β”‚   β”œβ”€β”€ app/                  # Main Application logic (Routes, Services, Models)
β”‚   └── worker.py             # Redis Queue Background Daemon
β”œβ”€β”€ data_pipeline/            # Extract, Transform, Load (ETL) logic
β”‚   β”œβ”€β”€ collectors/           # Data ingestion for Reddit, NewsAPI, and HackerNews
β”‚   β”œβ”€β”€ loaders/              # PostgreSQL database sync logic
β”‚   β”œβ”€β”€ processors/           # Regex cleaning and text normalization
β”‚   └── schedulers/           # cron_jobs.py hourly triggers
β”œβ”€β”€ database/                 # Structured Persistence schemas
β”‚   └── postgres/             # schema.sql database configuration
β”œβ”€β”€ frontend/                 # React + Vite UI Dashboard
β”œβ”€β”€ ml_engine/                # Core AI Engine (PyTorch, scikit-learn, NLTK)
β”‚   β”œβ”€β”€ pipelines/            # ml_runner.py main execution orchestrator
β”‚   β”œβ”€β”€ preprocessing/        # Unified text standardizations
β”‚   β”œβ”€β”€ region_detection/     # spaCy-based location tag extraction
β”‚   β”œβ”€β”€ sentiment/            # NLTK VADER emotional analytics
β”‚   β”œβ”€β”€ topic_modeling/       # KMeans, TF-IDF, HuggingFace sentence-transformers
β”‚   └── trend_detection/      # Mathematical velocity/acceleration composite scoring
β”œβ”€β”€ scripts/                  # Helper CLI utilities like clear_db.py
β”œβ”€β”€ docker-compose.yml        # Multi-container cluster boot configuration
β”œβ”€β”€ RUN_GUIDE.md              # Startup workflow instructions
β”œβ”€β”€ DATABASE.md               # Schema reference and Redis lookup documentation
└── README.md                 # System overview and architecture details

πŸ–₯️ How To Run This Project Locally

Follow these exact steps to run the complete environment (Databases, Redis, Nginx, ML queue, API, and Frontend).

Prerequisites

  • Docker Desktop
  • Python 3.10+
  • Node.js 18+ & npm

Step 1: Environment Setup (.env)

Create a .env file at the root folder of the project:

# PostgreSQL DB config
DB_USER=postgres
DB_PASSWORD=your_password
DB_HOST=127.0.0.1
DB_PORT=5433
DB_NAME=reddit_db

# NewsAPI (primary topic discovery source)
# Get a free key at https://newsapi.org/register
NEWS_API_KEY=your_newsapi_key_here

No Reddit OAuth credentials are needed. Reddit is used with its public JSON API for sentiment signals only.

Step 2: Boot Infrastructure

docker-compose up -d

(Verify Postgres, Redis, and Nginx containers launch via docker ps)

Step 3: Set Up Python Virtual Environment

python -m venv venv
.\venv\Scripts\activate
pip install -r backend/requirements.txt
pip install -r req-dev.txt
python -m spacy download en_core_web_sm

Step 4: Run Services (3 Terminals Needed)

Terminal 2 (cron_jobs.py ETL):

.\venv\Scripts\activate
python data_pipeline\schedulers\cron_jobs.py

Terminal 3 (FastAPI Server):

.\venv\Scripts\activate
uvicorn app.main:app --app-dir backend --reload --host 127.0.0.1 --port 8000

Terminal 4 (Windows-Compatible Redis Worker):

.\venv\Scripts\activate
python backend/worker.py

Step 5: Start the React Frontend

Terminal 5 (Vite):

cd frontend
npm install
npm run dev

Visit http://localhost:5173 to experience the system instantly.


πŸ—ΊοΈ Visual Architecture & Flow Diagrams

1. High-Level Core Architecture

graph TD
%% ── Colour palette ──────────────────────────────────────────────────────────
classDef ui       fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef gateway  fill:#1e1e2e,stroke:#444,color:#fff;
classDef backend  fill:#10b981,stroke:#065f46,color:#fff;
classDef service  fill:#059669,stroke:#064e3b,color:#fff;
classDef etl      fill:#f59e0b,stroke:#b45309,color:#000;
classDef ml       fill:#8b5cf6,stroke:#4c1d95,color:#fff;
classDef db       fill:#ef4444,stroke:#7f1d1d,color:#fff;
classDef cache    fill:#f97316,stroke:#c2410c,color:#fff;
classDef external fill:#6b7280,stroke:#374151,color:#fff;
classDef worker   fill:#a21caf,stroke:#701a75,color:#fff;

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 1 β€” FRONTEND (React + Vite)
%% ══════════════════════════════════════════════════════════════════════════
subgraph Frontend["🎨 Frontend  (React + Vite : localhost:5173)"]
    direction LR
    P_Search["πŸ“„ Search.jsx\n/search"]:::ui
    P_Global["πŸ“„ GlobalTrends.jsx\n/global-trends"]:::ui
    P_India["πŸ“„ IndiaTrends.jsx\n/india-trends"]:::ui
    C_SearchBar["πŸ” SearchBar"]:::ui
    C_TrendCard["πŸƒ TrendCard"]:::ui
    C_Graph["πŸ“Š Graph (Recharts)"]:::ui
    C_NewsFeed["πŸ“° NewsFeed"]:::ui
    C_Dropdown["πŸ“ StateDropdown"]:::ui
end

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 2 β€” API GATEWAY (Nginx)
%% ══════════════════════════════════════════════════════════════════════════
Gateway["🌐 Nginx API Gateway\n(Docker : port 8080)\nProxies / β†’ FastAPI :8000"]:::gateway

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 3 β€” BACKEND (FastAPI + Uvicorn)
%% ══════════════════════════════════════════════════════════════════════════
subgraph Backend["⚑ Backend  (FastAPI + Uvicorn : localhost:8000)"]
    direction TB
    Main["main.py\nApp entry-point & router mount"]:::backend

    subgraph Routes["Routes Layer"]
        R_Search["GET /search\n(routes/search.py)"]:::backend
        R_Trends["GET  /trends\n(routes/trends.py)"]:::backend
        R_Region["GET  /region\n(routes/region.py)"]:::backend
        R_News["GET  /news\n(routes/news.py)"]:::backend
        R_Health["GET  /health\n(routes/health.py)"]:::backend
    end

    subgraph Services["Services Layer"]
        S_Search["search_service.py\nβ‘  ML score lookup\nβ‘‘ Live VADER fallback\nβ‘’ Save Search to DB"]:::service
        S_Trend["trend_service.py\nβ‘  Query ml_trend_results\nβ‘‘ Rank by composite score"]:::service
        S_Region["region_service.py\nβ‘  State keyword map\nβ‘‘ Filter ml_trend_results\n   by subreddits column"]:::service
        S_News["nlp_summarizer.py\nβ‘  Fetch NewsAPI articles\nβ‘‘ VADER summary score"]:::service
    end
end

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 4 β€” BACKGROUND WORKER (Custom Redis Daemon)
%% ══════════════════════════════════════════════════════════════════════════
Worker["πŸ€– worker.py\nBlocking BRPOP on 'search_queue'\nRuns full ML TrendPipeline\nWrites result β†’ PostgreSQL"]:::worker

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 5 β€” ETL DATA PIPELINE (Scheduled, hourly)
%% ══════════════════════════════════════════════════════════════════════════
subgraph ETL["πŸ”„ Hybrid ETL Pipeline  (cron_jobs.py β€” runs hourly)"]
    direction TB
    subgraph Ph1["Phase 1 β€” Reddit Sentiment"]
        direction LR
        Collector["reddit_collector.py\n5 subreddits, hot+new\nExponential backoff\nDedup by post_id"]:::etl
        Cleaner["raw_to_clean.py\nRegex: strip URLs / emojis\nNormalise casing"]:::etl
        Loader["db_loader.py\nDataLoader.load_to_postgres()\nUpsert ON CONFLICT post_id"]:::etl
    end
    subgraph Ph2["Phase 2 β€” NewsAPI Topics"]
        NewsCol["news_collector.py\nHeadlines + topic search\nDirect to DB"]:::etl
    end
    subgraph Ph3["Phase 3 β€” HackerNews Tech"]
        HNCol["hacker_news_collector.py\nTop + new stories\nDirect to DB"]:::etl
    end
end
Collector --> Cleaner --> Loader

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 6 β€” ML ENGINE (Triggered after ETL)
%% ══════════════════════════════════════════════════════════════════════════
subgraph MLEngine["🧠 ML Engine  (TrendPipeline β€” called by ml_runner.py)"]
    direction TB
    ML_Runner["ml_runner.py\nOrchestrator β€” reads reddit_trends\nthen runs full pipeline"]:::ml
    Sentiment["sentiment/\nNLTK VADER\nCompound score βˆ’1 β†’ +1\npos / neu / neg breakdown"]:::ml
    Embed["topic_modeling/\nsentence-transformers\nall-MiniLM-L6-v2\n384-d dense vectors"]:::ml
    Cluster["topic_modeling/\nscikit-learn KMeans\nSemantic grouping"]:::ml
    Topic["topic_modeling/\nTF-IDF Vectorizer\nTop-5 keywords per cluster"]:::ml
    Score["trend_detection/\nComposite score formula\n0.35Β·Vol + 0.30Β·Vel\n+0.20Β·Acc + 0.15Β·Sent"]:::ml
end
ML_Runner --> Sentiment --> Embed --> Cluster --> Topic --> Score

%% ══════════════════════════════════════════════════════════════════════════
%% LAYER 7 β€” STORAGE
%% ══════════════════════════════════════════════════════════════════════════
subgraph Storage["πŸ—„οΈ Storage  (Docker-managed)"]
    PG_Raw[("PostgreSQL\nreddit_trends\npost_id Β· title Β· content\nups Β· subreddit Β· created_utc")]:::db
    PG_ML[("PostgreSQL\nml_trend_results\ntopic_id Β· keywords Β· score\nsentiment Β· velocity Β· subreddits")]:::db
    PG_Search[("PostgreSQL\nsearches\nquery Β· trend_score Β· region")]:::db
    Redis_Queue[("Redis :6379\nList: search_queue\n(LPUSH / BRPOP queue)")]:::cache
    Redis_Cache[("Redis :6379\nCache: search results\n(GET / SETEX)")]:::cache
end

%% ══════════════════════════════════════════════════════════════════════════
%% EXTERNAL APIs
%% ══════════════════════════════════════════════════════════════════════════
subgraph External["🌍 External APIs"]
    API_Reddit["Reddit JSON API\n(5 subreddits, sentiment only)\nhot+new, exponential backoff"]:::external
    API_News["NewsAPI.org\nPrimary topic discovery\n(free tier, NEWS_API_KEY)"]:::external
    API_HN["HackerNews API\n(Firebase + Algolia)\nNo auth, no rate limits"]:::external
end

%% ══════════════════════════════════════════════════════════════════════════
%% CONNECTIONS
%% ══════════════════════════════════════════════════════════════════════════

%% β€” Frontend β†’ Gateway
P_Search  -->|"GET /search"| Gateway
P_Global  -->|"GET /trends"| Gateway
P_India   -->|"GET /region?state=X"| Gateway
P_India   -->|"GET /news?state=X"| Gateway

%% β€” Gateway β†’ Backend
Gateway -->|"Proxy β†’ :8000"| Main
Main --> R_Search & R_Trends & R_Region & R_News & R_Health

%% β€” Routes β†’ Services
R_Search -->|"search_logic(query)"| S_Search
R_Trends -->|"get_trends(limit)"| S_Trend
R_Region -->|"get_region_trends(state)"| S_Region
R_News   -->|"fetch + summarise"| S_News

%% β€” Search Service dual path
S_Search -->|"β‘  Lookup ML keywords"| PG_ML
S_Search -->|"β‘‘ Cache miss β†’ LPUSH query"| Redis_Queue
S_Search -->|"β‘‘ Parallel live fallback\nhttpx β†’ VADER score"| API_Reddit
S_Search -->|"β‘‘ NewsAPI fallback\n(if Reddit < 5 results)"| API_News
S_Search -->|"β‘’ Save Search record"| PG_Search
S_Search -->|"Cache hit β†’ return"| Redis_Cache

%% β€” Trend & Region Services
S_Trend  -->|"MAX(run_at) + ORDER BY score"| PG_ML
S_Region -->|"Filter subreddits ILIKE state"| PG_ML

%% β€” News Service
S_News -->|"Fetch articles"| API_News

%% β€” Redis Queue β†’ Worker
Redis_Queue -->|"BRPOP (blocking pop)"| Worker
Worker -->|"Extensive NLP & Clustering\nWrite MLTrendResult row"| PG_ML

%% β€” ETL Pipeline
Collector -->|"5 subreddits, hot+new"| API_Reddit
Loader    -->|"Upsert ON CONFLICT post_id"| PG_Raw
NewsCol   -->|"Headlines + topics"| API_News
NewsCol   -->|"Direct UPSERT"| PG_Raw
HNCol     -->|"Top + new stories"| API_HN
HNCol     -->|"Direct UPSERT"| PG_Raw

%% β€” ML Engine trigger
Loader    -->|"On ETL success\ntriggers ml_runner.py"| ML_Runner
ML_Runner -->|"Reads all sources combined"| PG_Raw
Score     -->|"Writes analysed clusters"| PG_ML
Loading

2. Live Data Flow Execution Diagram

flowchart TD

classDef ui       fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef gateway  fill:#1e1e2e,stroke:#555,color:#fff;
classDef backend  fill:#10b981,stroke:#065f46,color:#fff;
classDef store    fill:#ef4444,stroke:#7f1d1d,color:#fff;
classDef cache    fill:#f97316,stroke:#c2410c,color:#fff;
classDef worker   fill:#a21caf,stroke:#701a75,color:#fff;
classDef external fill:#6b7280,stroke:#374151,color:#fff;

USER["πŸ‘€ User types query\nin Search.jsx"]:::ui
USER -->|"GET /search?q=X"| GW["🌐 Nginx Gateway :8080"]:::gateway
GW -->|"Proxy β†’ :8000"| API["⚑ FastAPI\nroutes/search.py"]:::backend
API --> SS["search_service.py\nsearch_logic(query)"]:::backend

%% Branch A: ML cache hit
SS -->|"β‘  _lookup_ml_score()\nSELECT ml_trend_results\nWHERE keywords ILIKE '%word%'"| PG_ML[("🐘 PostgreSQL\nml_trend_results")]:::store
PG_ML -->|"score > 0 β†’ use it"| SS

%% Branch B: ML miss β†’ live VADER
SS -->|"β‘‘ score == 0\nCache miss β†’ live fallback"| VADER["_live_vader_fallback()\nhttpx async GET Reddit /search.json\nlimit=50, sort=new"]:::backend
VADER -->|"HTTPS request"| RDT["🌍 Reddit JSON API"]:::external
RDT -->|"up to 50 posts"| VADER

VADER -->|"< 5 Reddit results?\nNewsAPI fallback"| NEWS_API["🌍 NewsAPI.org\npageSize=100, sortBy=publishedAt"]:::external
NEWS_API -->|"articles JSON"| VADER

VADER -->|"NLTK VADER compound score\n→ fast_score formula"| SS

%% Enqueue worker job
SS -->|"β‘’ LPUSH query β†’ search_queue"| REDIS_Q[("Redis\nList: search_queue")]:::cache

%% Save search record
SS -->|"β‘£ INSERT INTO searches\n(query, trend_score, region='Global')"| PG_SEARCH[("🐘 PostgreSQL\nsearches\nquery Β· trend_score Β· region")]:::store

%% Return to user
SS -->|"β‘€ Return JSON\n{query, trend_score}"| API
API --> GW --> USER

%% Worker daemon
REDIS_Q -->|"BRPOP (blocking)\npops query string"| WORKER["πŸ€– worker.py (Custom Daemon)\nβ‘  NewsAPI /everything (primary)\nβ‘‘ HackerNews Algolia (fallback)\nβ‘’ VADER + MiniLM + KMeans NLP\nβ‘£ TF-IDF extracts Topic Labels"]:::worker
WORKER -->|"INSERT MLTrendResult\nrun_at = now() Β· subreddits=LIVE_SEARCH|source"| PG_ML
Loading

3. ML Engine Internal Mathematical Pipeline

flowchart TD

classDef process    fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef model      fill:#8b5cf6,stroke:#4c1d95,color:#fff;
classDef data       fill:#f59e0b,stroke:#b45309,color:#000;
classDef math       fill:#10b981,stroke:#065f46,color:#fff;

RAW["πŸ“ Raw Texts\nTitles & Content"]:::data
META["πŸ“Š Metadata\nUpvotes, Subreddits, Dates"]:::data

RAW --> PREPROC["🧹 PreprocessingPipeline\nRegex: Remove URLs, Emojis, Special Chars"]:::process

PREPROC --> NER["🌍 RegionService (spaCy)\n`en_core_web_sm`\nExtracts Localities & States"]:::model
NER -->|"Detects Indian States"| META_UPDATE["Inject State into Subreddits"]:::process
META --> META_UPDATE
META_UPDATE --> AGG["Data Assembly"]:::process

PREPROC --> VADER["😊 SentimentInference (NLTK)\nVADER Lexicon\nLabels: Pos/Neu/Neg & Score (-1 to 1)"]:::model
VADER --> AGG

PREPROC --> EMBED["🧠 EmbeddingModel\n`sentence-transformers/all-MiniLM-L6-v2`\nTransforms text into 384-dimensional vectors"]:::model
EMBED --> CLUSTER["🧩 ClusterModel\nAgglomerative Clustering / KMeans\nGroups similar vectors semantically"]:::model

CLUSTER --> TFIDF["🏷️ TopicLabeler (scikit-learn)\nTF-IDF Vectorizer\nFinds top 5 keywords per Cluster"]:::model
TFIDF --> AGG

AGG --> SCORE["πŸ“ˆ TrendScorer\nAggregates Meta/NLP per Topic ID (Min 3 posts)"]:::math
SCORE -->|Current vs Previous Counts| VEL["πŸš€ VelocityCalculator"]:::math
SCORE -->|Current vs Previous Velocity| ACC["⚑ AccelerationCalculator"]:::math

VEL & ACC --> FINAL_FORMULA["πŸ“Š Final Composite Score Formula: \n(0.35 * Volume) + (0.30 * Velocity) + \n(0.20 * Accel) + (0.15 * Sentiment)"]:::math

FINAL_FORMULA --> OUTPUT["πŸ† Top 20 Ranked Trends\nStructured JSON payload arrays"]:::data
Loading

4. Background Worker (Live Search ML Engine)

flowchart TD

classDef process    fill:#3b82f6,stroke:#1d4ed8,color:#fff;
classDef ext        fill:#6b7280,stroke:#374151,color:#fff;
classDef cache      fill:#f97316,stroke:#c2410c,color:#fff;
classDef db         fill:#ef4444,stroke:#7f1d1d,color:#fff;
classDef ml         fill:#8b5cf6,stroke:#4c1d95,color:#fff;

REDIS[("Redis\nsearch_queue")]:::cache
WORK_LOOP["Worker Daemon Loop\n(brpop)"]:::process

REDIS -->|"User Query String (e.g. 'Tesla')"| WORK_LOOP

subgraph Worker ["πŸ€– worker.py Architecture"]
    direction TB
    QUERY["Extract Query"]:::process
    
    NEWS_API["🌍 fetch_newsapi_posts(query)\nNewsAPI /everything (Primary)"]:::ext
    HN_API["🌍 fetch_hackernews_posts(query)\nAlgolia API (Fallback max 20)"]:::ext

    QUERY -->|"Request live articles"| NEWS_API
    NEWS_API -->|"< 5 results?"| HN_API
    
    NEWS_API -->|"Combine text & meta"| MERGE
    HN_API -->|"Combine text & meta"| MERGE
    
    MERGE["πŸ“ Standardize Raw Texts\nExtract Upvotes, Comments, Source"]:::process
    
    PIPELINE["🧠 TrendPipeline.run()\n1. VADER Sentiment Analysis\n2. MiniLM Embeddings\n3. KMeans Clustering\n4. TF-IDF Top Keywords"]:::ml
    
    MERGE -->|"Inject into ML Pipeline"| PIPELINE
    
    FORMAT["πŸ“¦ Construct MLTrendResult\nSet run_at = now()\nTag subreddits = 'LIVE_SEARCH | source'"]:::process
    
    PIPELINE -->|"Return clustered topics"| FORMAT
end

WORK_LOOP --> Worker

POSTGRES[("🐘 PostgreSQL\nml_trend_results_table")]:::db
FORMAT -->|"SQLAlchemy Session.commit()"| POSTGRES
Loading

About

πŸš€ Trend Intelligence | A modern Hybrid Lambda architecture measuring the internet's Semantic Momentum. Built with FastAPI, Redis, and React, it analyzes live data from Reddit, HackerNews & NewsAPI. By leveraging a distributed PyTorch ML pipeline and advanced NLP, it expertly filters social noise to uncover the world's next big trends. πŸ“ˆβœ¨

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors