diff --git a/koucha/migrations/20260115003047_initial_schema.sql b/koucha/migrations/20260115003047_initial_schema.sql index 25122e7..d437df3 100644 --- a/koucha/migrations/20260115003047_initial_schema.sql +++ b/koucha/migrations/20260115003047_initial_schema.sql @@ -32,7 +32,6 @@ CREATE TABLE items ( CREATE TABLE feeds ( id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL, - title TEXT NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); diff --git a/koucha/src/bin/server.rs b/koucha/src/bin/server.rs index 4a05eed..d4a8d34 100644 --- a/koucha/src/bin/server.rs +++ b/koucha/src/bin/server.rs @@ -1,14 +1,11 @@ use std::error::Error; use reqwest::Client; -use koucha::{ - AdapterOptions, -}; +use koucha::fetch_channel; #[tokio::main] async fn main() -> Result<(), Box> { - // let adapter = AdapterOptions::new().create().await?; - // - // let _channel = fetch_channel(&client, "https://lorem-rss.herokuapp.com/feed?unit=year").await?; + let client = Client::new(); + let _channel = fetch_channel(&client, "https://lorem-rss.herokuapp.com/feed?unit=year").await?; Ok(()) } diff --git a/koucha/src/channel.rs b/koucha/src/channel.rs deleted file mode 100644 index 6d130ff..0000000 --- a/koucha/src/channel.rs +++ /dev/null @@ -1,166 +0,0 @@ -use reqwest::{Url, Client}; -use sqlx::SqlitePool; -use chrono::{DateTime, Utc}; -use crate::{Result}; -use std::hash::{Hash, Hasher}; - -pub struct ChannelId(pub i64); -impl From for ChannelId { fn from(id: i64) -> Self { ChannelId(id) } } -impl From for i64 { fn from(id: ChannelId) -> Self { id.0 } } - - -struct FetchedRSSItem { - guid: String, - title: String, - description: String, - content: String, -} -impl FetchedRSSItem { - fn parse(item: rss::Item) -> Self { - FetchedRSSItem { - guid: Self::get_or_create_guid(&item), - title: item.title().unwrap_or("").to_string(), - description: item.description().unwrap_or("").to_string(), - content: item.content().unwrap_or("").to_string(), - } - } - - fn get_or_create_guid(item: &rss::Item) -> String { - if let Some(guid) = item.guid() { - return guid.value().to_string(); - } - - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - item.link().unwrap_or("").hash(&mut hasher); - item.title().unwrap_or("").hash(&mut hasher); - item.description().unwrap_or("").hash(&mut hasher); - - format!("gen-{:x}", hasher.finish()) - } -} -pub struct FetchedRSSChannel { - title: String, - link: Url, - description: String, - - items: Vec, - - fetched_at: DateTime, -} -impl FetchedRSSChannel { - pub fn fetched_at(&self) -> &DateTime { - &self.fetched_at - } - - fn parse(rss: rss::Channel) -> Result { - Ok(FetchedRSSChannel { - title: rss.title, - link: Url::parse(&rss.link)?, - description: rss.description, - - items: rss.items.into_iter().map(FetchedRSSItem::parse).collect(), - - fetched_at: Utc::now(), - }) - } -} - -pub struct UnparsedChannel { - pub id: i64, - pub title: String, - pub link: String, - pub description: Option, - pub last_fetched: Option, -} -impl UnparsedChannel { - pub fn parse(self) -> Result { - Ok(Channel { - id: ChannelId(self.id), - title: self.title, - link: Url::parse(&self.link)?, - description: self.description, - last_fetched: self.last_fetched.as_deref() - .map(DateTime::parse_from_rfc2822) - .transpose()? - .map(|dt| dt.with_timezone(&Utc)), - }) - } -} - -pub struct Channel { - pub id: ChannelId, - pub title: String, - pub link: Url, - pub description: Option, - pub last_fetched: Option>, -} - -impl Channel { - - - pub async fn get_all(pool: &SqlitePool) -> Result> { - let channels: Result> = sqlx::query_as!( - UnparsedChannel, - "SELECT id, title, link, description, last_fetched FROM channels" - ) - .fetch_all(pool).await?.into_iter() - .map(UnparsedChannel::parse).collect(); - - channels - } - - // TODO implement fetch skipping - fn should_skip_fetch(&self) -> bool { false } - - // TODO implement conditional fetching - pub async fn fetch_rss( - client: &Client, channel: &Channel - ) -> Result> { - if channel.should_skip_fetch() { - return Ok(None); - } - let bytestream = client.get(channel.link.clone()) - .send().await? - .bytes().await?; - - let rss_channel = rss::Channel::read_from(&bytestream[..])?; - - Ok(Some(FetchedRSSChannel::parse(rss_channel)?)) - } - - pub async fn update_metadata( - pool: &SqlitePool, id: ChannelId, fetched: FetchedRSSChannel - ) -> Result<()> { - let link = fetched.link.as_str(); - let fetched_at = fetched.fetched_at.to_rfc2822(); - sqlx::query!( - "UPDATE channels - SET title = ?, link = ?, description = ?, - last_fetched = ? - WHERE id = ?", - fetched.title, link, fetched.description, fetched_at, - id.0 - ).execute(pool).await?; - - Ok(()) - } - - pub async fn update_items( - pool: &SqlitePool, id: ChannelId, fetched: FetchedRSSChannel - ) -> Result<()> { - let fetched_at = fetched.fetched_at.to_rfc2822(); - - for item in fetched.items { - sqlx::query!( - "INSERT OR IGNORE INTO items - (channel_id, guid, fetched_at, title, description, content) - VALUES (?, ?, ?, ?, ?, ?)", - id.0, item.guid, fetched_at, item.title, item.description, item.content - ) - .execute(pool) - .await?; - } - - Ok(()) - } -} diff --git a/koucha/src/feed.rs b/koucha/src/feed.rs deleted file mode 100644 index 8e04a0d..0000000 --- a/koucha/src/feed.rs +++ /dev/null @@ -1,96 +0,0 @@ -use crate::{ - Result, - Item, - Channel, - channel::{ - UnparsedChannel, - ChannelId, - }, - user::UserId, -}; -use sqlx::SqlitePool; - -pub struct FeedId(pub i64); -impl From for FeedId { fn from(id: i64) -> Self { FeedId(id) } } -impl From for i64 { fn from(id: FeedId) -> Self { id.0 } } - -pub struct UnparsedFeed { - pub id: i64, - pub title: String, - pub user_id: i64 -} -impl UnparsedFeed { - pub fn parse(self) -> Result { - Ok(Feed { - id: FeedId(self.id), - title: self.title, - user_id: UserId(self.user_id), - }) - } -} - -pub struct Feed { - pub id: FeedId, - pub title: String, - pub user_id: UserId, -} - -impl Feed { - pub async fn create( - pool: &SqlitePool, id: UserId, name: &str - ) -> Result { - let new_feed = sqlx::query_as!( - UnparsedFeed, - "INSERT INTO feeds (user_id, title) - VALUES (?, ?) - RETURNING id as `id!`, user_id, title", - id.0, name - ).fetch_one(pool).await?.parse(); - - new_feed - } - - pub async fn add_channel( - pool: &SqlitePool, id: FeedId, channel_id: ChannelId - ) -> Result<()> { - sqlx::query!( - "INSERT INTO feed_channels (feed_id, channel_id) - VALUES (?, ?)", - id.0, channel_id.0 - ).execute(pool).await?; - - Ok(()) - } - - pub async fn get_items( - pool: &SqlitePool, id: FeedId, limit: u8, offset: u32 - ) -> Result> { - let items = sqlx::query_as!( - Item, - "SELECT item_id as id FROM feed_items - WHERE feed_id = ? AND archived = FALSE - ORDER BY score DESC - LIMIT ? OFFSET ?", - id.0, limit, offset - ).fetch_all(pool).await?; - - Ok(items) - } - - pub async fn get_channels( - pool: &SqlitePool, id: FeedId - ) -> Result> { - let channels: Result> = sqlx::query_as!( - UnparsedChannel, - "SELECT c.id as `id!`, c.title, c.link, c.description, c.last_fetched - FROM channels c - JOIN feed_channels fc on c.id = fc.channel_id - WHERE fc.feed_id = ?", - id.0 - ).fetch_all(pool).await?.into_iter() - .map(UnparsedChannel::parse).collect(); - - channels - } -} - diff --git a/koucha/src/item.rs b/koucha/src/item.rs deleted file mode 100644 index 0d2a7ab..0000000 --- a/koucha/src/item.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub struct Item { - pub id: i64, -} diff --git a/koucha/src/lib.rs b/koucha/src/lib.rs index d12c88e..961ad1d 100644 --- a/koucha/src/lib.rs +++ b/koucha/src/lib.rs @@ -1,16 +1,15 @@ -use std::error::Error; +use std::{ + error::Error, + hash::{Hash, Hasher}, +}; +use reqwest::Url; +use chrono::{ + Utc, + DateTime, +}; type Result = std::result::Result>; -mod user; -pub use user::User; -mod feed; -pub use feed::Feed; -mod channel; -pub use channel::Channel; -mod item; -pub use item::Item; - pub struct AdapterOptions { database_url: String, } @@ -42,6 +41,187 @@ pub struct Adapter { } impl Adapter { - pub fn get_pool(&self) -> &sqlx::SqlitePool { &self.db } - pub fn get_client(&self) -> &reqwest::Client { &self.client } + pub async fn get_all_users(&self) -> Result> { + let users = sqlx::query_as!( + User, + "SELECT id, name FROM users" + ).fetch_all(&self.db).await?; + + Ok(users) + } + + // pub async fn update_channels(&self) -> Result<()> { + // + // } + // + // async fn get_all_channels(&self) -> Result> { + // let users = sqlx::query_as!( + // Channel, + // "SELECT id FROM channels" + // ).fetch_all(&self.db).await?; + // + // Ok(users) + // } + + fn get_pool(&self) -> &sqlx::SqlitePool { &self.db } + fn get_client(&self) -> &reqwest::client { &self.client } +} + +pub struct User { + id: i64, + name: String, +} + +impl User { + // async fn get_by_id(adapter: &Adapter, id: i64) -> Result { + // let user = sqlx::query!("SELECT name FROM users WHERE id = ?", id) + // .fetch_one(adapter.get_pool()).await?; + // + // Ok(Self { + // id: id, + // name: user.name, + // }) + // } + + pub async fn create(adapter: &Adapter, name: &str) -> Result { + let result = sqlx::query!("INSERT INTO users (name) VALUES (?)", name) + .execute(adapter.get_pool()).await?; + + Ok(Self { + id: result.last_insert_rowid(), + name: name.to_string() + }) + } + + pub async fn change_name( + &mut self, adapter: &Adapter, new_name: &str) -> Result<()> { + sqlx::query!( + "UPDATE users SET name = ? WHERE id = ?", + new_name, self.id + ).execute(adapter.get_pool()).await?; + + self.name = new_name.to_string(); + + Ok(()) + } + + pub async fn get_feeds(&self, adapter: &Adapter) -> Result> { + let feeds = sqlx::query_as!( + Feed, + "SELECT id FROM feeds WHERE user_id = ?", + self.id + ).fetch_all(adapter.get_pool()).await?; + + Ok(feeds) + } + + pub fn name(&self) -> &str { &self.name } +} + +pub struct Feed { + id: i64, +} + +impl Feed { + pub async fn get_items( + &self, adapter: &Adapter, limit: u8, offset: u32) -> Result> { + let items = sqlx::query_as!( + Item, + "SELECT item_id as id FROM feed_items + WHERE feed_id = ? AND archived = FALSE + ORDER BY score DESC + LIMIT ? OFFSET ?", + self.id, limit, offset + ).fetch_all(adapter.get_pool()).await?; + + Ok(items) + } + + pub async fn get_channels(&self, adapter: &Adapter) -> Result> { + let db_channels = sqlx::query!( + "SELECT c.id as `id!`, c.title, c.link, c.description, c.last_fetched + FROM channels c + JOIN feed_channels fc on c.id = fc.channel_id + WHERE fc.feed_id = ?", + self.id + ).fetch_all(adapter.get_pool()).await?; + let mut channels = Vec::with_capacity(db_channels.len()); + for db_channel in db_channels { + channels.push(Channel { + id: db_channel.id, + title: db_channel.title, + link: Url::parse(&db_channel.link)?, + description: db_channel.description, + last_fetched: db_channel.last_fetched.as_deref() + .map(DateTime::parse_from_rfc2822) + .transpose()? + .map(|dt| dt.with_timezone(&Utc)), + }) + } + Ok(channels) + } +} + +pub struct Channel { + id: i64, + title: String, + link: Url, + description: Option, + last_fetched: Option>, +} + +impl Channel { + pub async fn fetch(mut self, adapter: &Adapter) -> Result { + let bytestream = adapter.get_client().get(self.link.clone()) + .send().await? + .bytes().await?; + + let rss_channel = rss::Channel::read_from(&bytestream[..])?; + self.title = rss_channel.title; + self.link = Url::parse(&rss_channel.link)?; + self.description = Some(rss_channel.description); + let now = Utc::now(); + self.last_fetched = Some(now); + + sqlx::query!( + "UPDATE channels + SET title = ?, link = ?, description = ?, + last_fetched = ? + WHERE id = ?", + self.title, self.link.as_str(), self.description, now.to_rfc2822(), + self.id + ).execute(adapter.get_pool()).await?; + + fn get_or_create_guid(item: &rss::Item) -> String { + if let Some(guid) = item.guid() { + return guid.value().to_string(); + } + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + item.link().unwrap_or("").hash(&mut hasher); + item.title().unwrap_or("").hash(&mut hasher); + item.description().unwrap_or("").hash(&mut hasher); + + format!("gen-{:x}", hasher.finish()) + } + + for item in rss_channel.items { + sqlx::query!( + "INSERT OR IGNORE INTO items + (channel_id, guid, fetched_at, title, description, content) + VALUES (?, ?, ?, ?, ?, ?)", + self.id, get_or_create_guid(&item), now.to_rfc2822(), + item.title().unwrap_or(""), item.description().unwrap_or(""), + item.content().unwrap_or("") + ) + .execute(adapter.get_pool()) + .await?; + } + + Ok(self) + } +} + +pub struct Item { + id: i64, } diff --git a/koucha/src/user.rs b/koucha/src/user.rs deleted file mode 100644 index 5a038a0..0000000 --- a/koucha/src/user.rs +++ /dev/null @@ -1,62 +0,0 @@ -use sqlx::SqlitePool; -use crate::{ - Result, - Feed -}; - -pub struct UserId(pub i64); -impl From for UserId { fn from(id: i64) -> Self { UserId(id) } } -impl From for i64 { fn from(id: UserId) -> Self { id.0 } } - -pub struct User { - pub id: UserId, - pub name: String, -} - -impl User { - pub async fn get_all(pool: &SqlitePool) -> Result> { - let users = sqlx::query_as!( - User, - "SELECT id, name FROM users" - ).fetch_all(pool).await?; - - Ok(users) - } - - pub async fn create(pool: &SqlitePool, name: &str) -> Result { - let result = sqlx::query!( - "INSERT INTO users (name) - VALUES (?) - RETURNING id, name", - name - ).fetch_one(pool).await?; - - Ok(Self { - id: UserId(result.id), - name: result.name, - }) - } - - pub async fn update_name( - pool: &SqlitePool, id: UserId, new_name: &str - ) -> Result<()> { - sqlx::query!( - "UPDATE users SET name = ? WHERE id = ?", - new_name, id.0 - ).execute(pool).await?; - - Ok(()) - } - - pub async fn get_feeds(pool: &SqlitePool, id: UserId) -> Result> { - let feeds = sqlx::query_as!( - Feed, - "SELECT id FROM feeds WHERE user_id = ?", - id.0 - ).fetch_all(pool).await?; - - Ok(feeds) - } - - pub fn name(&self) -> &str { &self.name } -}