Compare commits

..

2 commits

Author SHA1 Message Date
162ba78430
Adding fill functions 2026-01-20 16:36:14 -08:00
440e897edf
Modulified it 2026-01-20 16:08:36 -08:00
7 changed files with 346 additions and 195 deletions

View file

@ -32,6 +32,7 @@ CREATE TABLE items (
CREATE TABLE feeds (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
);

View file

@ -1,11 +1,14 @@
use std::error::Error;
use reqwest::Client;
use koucha::fetch_channel;
use koucha::{
AdapterOptions,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let client = Client::new();
let _channel = fetch_channel(&client, "https://lorem-rss.herokuapp.com/feed?unit=year").await?;
// let adapter = AdapterOptions::new().create().await?;
//
// let _channel = fetch_channel(&client, "https://lorem-rss.herokuapp.com/feed?unit=year").await?;
Ok(())
}

166
koucha/src/channel.rs Normal file
View file

@ -0,0 +1,166 @@
use reqwest::{Url, Client};
use sqlx::SqlitePool;
use chrono::{DateTime, Utc};
use crate::{Result};
use std::hash::{Hash, Hasher};
pub struct ChannelId(pub i64);
impl From<i64> for ChannelId { fn from(id: i64) -> Self { ChannelId(id) } }
impl From<ChannelId> for i64 { fn from(id: ChannelId) -> Self { id.0 } }
struct FetchedRSSItem {
guid: String,
title: String,
description: String,
content: String,
}
impl FetchedRSSItem {
fn parse(item: rss::Item) -> Self {
FetchedRSSItem {
guid: Self::get_or_create_guid(&item),
title: item.title().unwrap_or("").to_string(),
description: item.description().unwrap_or("").to_string(),
content: item.content().unwrap_or("").to_string(),
}
}
fn get_or_create_guid(item: &rss::Item) -> String {
if let Some(guid) = item.guid() {
return guid.value().to_string();
}
let mut hasher = std::collections::hash_map::DefaultHasher::new();
item.link().unwrap_or("").hash(&mut hasher);
item.title().unwrap_or("").hash(&mut hasher);
item.description().unwrap_or("").hash(&mut hasher);
format!("gen-{:x}", hasher.finish())
}
}
pub struct FetchedRSSChannel {
title: String,
link: Url,
description: String,
items: Vec<FetchedRSSItem>,
fetched_at: DateTime<Utc>,
}
impl FetchedRSSChannel {
pub fn fetched_at(&self) -> &DateTime<Utc> {
&self.fetched_at
}
fn parse(rss: rss::Channel) -> Result<Self> {
Ok(FetchedRSSChannel {
title: rss.title,
link: Url::parse(&rss.link)?,
description: rss.description,
items: rss.items.into_iter().map(FetchedRSSItem::parse).collect(),
fetched_at: Utc::now(),
})
}
}
pub struct UnparsedChannel {
pub id: i64,
pub title: String,
pub link: String,
pub description: Option<String>,
pub last_fetched: Option<String>,
}
impl UnparsedChannel {
pub fn parse(self) -> Result<Channel> {
Ok(Channel {
id: ChannelId(self.id),
title: self.title,
link: Url::parse(&self.link)?,
description: self.description,
last_fetched: self.last_fetched.as_deref()
.map(DateTime::parse_from_rfc2822)
.transpose()?
.map(|dt| dt.with_timezone(&Utc)),
})
}
}
pub struct Channel {
pub id: ChannelId,
pub title: String,
pub link: Url,
pub description: Option<String>,
pub last_fetched: Option<DateTime<Utc>>,
}
impl Channel {
pub async fn get_all(pool: &SqlitePool) -> Result<Vec<Self>> {
let channels: Result<Vec<Channel>> = sqlx::query_as!(
UnparsedChannel,
"SELECT id, title, link, description, last_fetched FROM channels"
)
.fetch_all(pool).await?.into_iter()
.map(UnparsedChannel::parse).collect();
channels
}
// TODO implement fetch skipping
fn should_skip_fetch(&self) -> bool { false }
// TODO implement conditional fetching
pub async fn fetch_rss(
client: &Client, channel: &Channel
) -> Result<Option<FetchedRSSChannel>> {
if channel.should_skip_fetch() {
return Ok(None);
}
let bytestream = client.get(channel.link.clone())
.send().await?
.bytes().await?;
let rss_channel = rss::Channel::read_from(&bytestream[..])?;
Ok(Some(FetchedRSSChannel::parse(rss_channel)?))
}
pub async fn update_metadata(
pool: &SqlitePool, id: ChannelId, fetched: FetchedRSSChannel
) -> Result<()> {
let link = fetched.link.as_str();
let fetched_at = fetched.fetched_at.to_rfc2822();
sqlx::query!(
"UPDATE channels
SET title = ?, link = ?, description = ?,
last_fetched = ?
WHERE id = ?",
fetched.title, link, fetched.description, fetched_at,
id.0
).execute(pool).await?;
Ok(())
}
pub async fn update_items(
pool: &SqlitePool, id: ChannelId, fetched: FetchedRSSChannel
) -> Result<()> {
let fetched_at = fetched.fetched_at.to_rfc2822();
for item in fetched.items {
sqlx::query!(
"INSERT OR IGNORE INTO items
(channel_id, guid, fetched_at, title, description, content)
VALUES (?, ?, ?, ?, ?, ?)",
id.0, item.guid, fetched_at, item.title, item.description, item.content
)
.execute(pool)
.await?;
}
Ok(())
}
}

96
koucha/src/feed.rs Normal file
View file

@ -0,0 +1,96 @@
use crate::{
Result,
Item,
Channel,
channel::{
UnparsedChannel,
ChannelId,
},
user::UserId,
};
use sqlx::SqlitePool;
pub struct FeedId(pub i64);
impl From<i64> for FeedId { fn from(id: i64) -> Self { FeedId(id) } }
impl From<FeedId> for i64 { fn from(id: FeedId) -> Self { id.0 } }
pub struct UnparsedFeed {
pub id: i64,
pub title: String,
pub user_id: i64
}
impl UnparsedFeed {
pub fn parse(self) -> Result<Feed> {
Ok(Feed {
id: FeedId(self.id),
title: self.title,
user_id: UserId(self.user_id),
})
}
}
pub struct Feed {
pub id: FeedId,
pub title: String,
pub user_id: UserId,
}
impl Feed {
pub async fn create(
pool: &SqlitePool, id: UserId, name: &str
) -> Result<Self> {
let new_feed = sqlx::query_as!(
UnparsedFeed,
"INSERT INTO feeds (user_id, title)
VALUES (?, ?)
RETURNING id as `id!`, user_id, title",
id.0, name
).fetch_one(pool).await?.parse();
new_feed
}
pub async fn add_channel(
pool: &SqlitePool, id: FeedId, channel_id: ChannelId
) -> Result<()> {
sqlx::query!(
"INSERT INTO feed_channels (feed_id, channel_id)
VALUES (?, ?)",
id.0, channel_id.0
).execute(pool).await?;
Ok(())
}
pub async fn get_items(
pool: &SqlitePool, id: FeedId, limit: u8, offset: u32
) -> Result<Vec<Item>> {
let items = sqlx::query_as!(
Item,
"SELECT item_id as id FROM feed_items
WHERE feed_id = ? AND archived = FALSE
ORDER BY score DESC
LIMIT ? OFFSET ?",
id.0, limit, offset
).fetch_all(pool).await?;
Ok(items)
}
pub async fn get_channels(
pool: &SqlitePool, id: FeedId
) -> Result<Vec<Channel>> {
let channels: Result<Vec<Channel>> = sqlx::query_as!(
UnparsedChannel,
"SELECT c.id as `id!`, c.title, c.link, c.description, c.last_fetched
FROM channels c
JOIN feed_channels fc on c.id = fc.channel_id
WHERE fc.feed_id = ?",
id.0
).fetch_all(pool).await?.into_iter()
.map(UnparsedChannel::parse).collect();
channels
}
}

3
koucha/src/item.rs Normal file
View file

@ -0,0 +1,3 @@
pub struct Item {
pub id: i64,
}

View file

@ -1,15 +1,16 @@
use std::{
error::Error,
hash::{Hash, Hasher},
};
use reqwest::Url;
use chrono::{
Utc,
DateTime,
};
use std::error::Error;
type Result<T> = std::result::Result<T, Box<dyn Error>>;
mod user;
pub use user::User;
mod feed;
pub use feed::Feed;
mod channel;
pub use channel::Channel;
mod item;
pub use item::Item;
pub struct AdapterOptions {
database_url: String,
}
@ -41,187 +42,6 @@ pub struct Adapter {
}
impl Adapter {
pub async fn get_all_users(&self) -> Result<Vec<User>> {
let users = sqlx::query_as!(
User,
"SELECT id, name FROM users"
).fetch_all(&self.db).await?;
Ok(users)
}
// pub async fn update_channels(&self) -> Result<()> {
//
// }
//
// async fn get_all_channels(&self) -> Result<Vec<impl Channel>> {
// let users = sqlx::query_as!(
// Channel,
// "SELECT id FROM channels"
// ).fetch_all(&self.db).await?;
//
// Ok(users)
// }
fn get_pool(&self) -> &sqlx::SqlitePool { &self.db }
fn get_client(&self) -> &reqwest::client { &self.client }
}
pub struct User {
id: i64,
name: String,
}
impl User {
// async fn get_by_id(adapter: &Adapter, id: i64) -> Result<Self> {
// let user = sqlx::query!("SELECT name FROM users WHERE id = ?", id)
// .fetch_one(adapter.get_pool()).await?;
//
// Ok(Self {
// id: id,
// name: user.name,
// })
// }
pub async fn create(adapter: &Adapter, name: &str) -> Result<Self> {
let result = sqlx::query!("INSERT INTO users (name) VALUES (?)", name)
.execute(adapter.get_pool()).await?;
Ok(Self {
id: result.last_insert_rowid(),
name: name.to_string()
})
}
pub async fn change_name(
&mut self, adapter: &Adapter, new_name: &str) -> Result<()> {
sqlx::query!(
"UPDATE users SET name = ? WHERE id = ?",
new_name, self.id
).execute(adapter.get_pool()).await?;
self.name = new_name.to_string();
Ok(())
}
pub async fn get_feeds(&self, adapter: &Adapter) -> Result<Vec<Feed>> {
let feeds = sqlx::query_as!(
Feed,
"SELECT id FROM feeds WHERE user_id = ?",
self.id
).fetch_all(adapter.get_pool()).await?;
Ok(feeds)
}
pub fn name(&self) -> &str { &self.name }
}
pub struct Feed {
id: i64,
}
impl Feed {
pub async fn get_items(
&self, adapter: &Adapter, limit: u8, offset: u32) -> Result<Vec<Item>> {
let items = sqlx::query_as!(
Item,
"SELECT item_id as id FROM feed_items
WHERE feed_id = ? AND archived = FALSE
ORDER BY score DESC
LIMIT ? OFFSET ?",
self.id, limit, offset
).fetch_all(adapter.get_pool()).await?;
Ok(items)
}
pub async fn get_channels(&self, adapter: &Adapter) -> Result<Vec<Channel>> {
let db_channels = sqlx::query!(
"SELECT c.id as `id!`, c.title, c.link, c.description, c.last_fetched
FROM channels c
JOIN feed_channels fc on c.id = fc.channel_id
WHERE fc.feed_id = ?",
self.id
).fetch_all(adapter.get_pool()).await?;
let mut channels = Vec::with_capacity(db_channels.len());
for db_channel in db_channels {
channels.push(Channel {
id: db_channel.id,
title: db_channel.title,
link: Url::parse(&db_channel.link)?,
description: db_channel.description,
last_fetched: db_channel.last_fetched.as_deref()
.map(DateTime::parse_from_rfc2822)
.transpose()?
.map(|dt| dt.with_timezone(&Utc)),
})
}
Ok(channels)
}
}
pub struct Channel {
id: i64,
title: String,
link: Url,
description: Option<String>,
last_fetched: Option<DateTime<Utc>>,
}
impl Channel {
pub async fn fetch(mut self, adapter: &Adapter) -> Result<Self> {
let bytestream = adapter.get_client().get(self.link.clone())
.send().await?
.bytes().await?;
let rss_channel = rss::Channel::read_from(&bytestream[..])?;
self.title = rss_channel.title;
self.link = Url::parse(&rss_channel.link)?;
self.description = Some(rss_channel.description);
let now = Utc::now();
self.last_fetched = Some(now);
sqlx::query!(
"UPDATE channels
SET title = ?, link = ?, description = ?,
last_fetched = ?
WHERE id = ?",
self.title, self.link.as_str(), self.description, now.to_rfc2822(),
self.id
).execute(adapter.get_pool()).await?;
fn get_or_create_guid(item: &rss::Item) -> String {
if let Some(guid) = item.guid() {
return guid.value().to_string();
}
let mut hasher = std::collections::hash_map::DefaultHasher::new();
item.link().unwrap_or("").hash(&mut hasher);
item.title().unwrap_or("").hash(&mut hasher);
item.description().unwrap_or("").hash(&mut hasher);
format!("gen-{:x}", hasher.finish())
}
for item in rss_channel.items {
sqlx::query!(
"INSERT OR IGNORE INTO items
(channel_id, guid, fetched_at, title, description, content)
VALUES (?, ?, ?, ?, ?, ?)",
self.id, get_or_create_guid(&item), now.to_rfc2822(),
item.title().unwrap_or(""), item.description().unwrap_or(""),
item.content().unwrap_or("")
)
.execute(adapter.get_pool())
.await?;
}
Ok(self)
}
}
pub struct Item {
id: i64,
pub fn get_pool(&self) -> &sqlx::SqlitePool { &self.db }
pub fn get_client(&self) -> &reqwest::Client { &self.client }
}

62
koucha/src/user.rs Normal file
View file

@ -0,0 +1,62 @@
use sqlx::SqlitePool;
use crate::{
Result,
Feed
};
pub struct UserId(pub i64);
impl From<i64> for UserId { fn from(id: i64) -> Self { UserId(id) } }
impl From<UserId> for i64 { fn from(id: UserId) -> Self { id.0 } }
pub struct User {
pub id: UserId,
pub name: String,
}
impl User {
pub async fn get_all(pool: &SqlitePool) -> Result<Vec<Self>> {
let users = sqlx::query_as!(
User,
"SELECT id, name FROM users"
).fetch_all(pool).await?;
Ok(users)
}
pub async fn create(pool: &SqlitePool, name: &str) -> Result<Self> {
let result = sqlx::query!(
"INSERT INTO users (name)
VALUES (?)
RETURNING id, name",
name
).fetch_one(pool).await?;
Ok(Self {
id: UserId(result.id),
name: result.name,
})
}
pub async fn update_name(
pool: &SqlitePool, id: UserId, new_name: &str
) -> Result<()> {
sqlx::query!(
"UPDATE users SET name = ? WHERE id = ?",
new_name, id.0
).execute(pool).await?;
Ok(())
}
pub async fn get_feeds(pool: &SqlitePool, id: UserId) -> Result<Vec<Feed>> {
let feeds = sqlx::query_as!(
Feed,
"SELECT id FROM feeds WHERE user_id = ?",
id.0
).fetch_all(pool).await?;
Ok(feeds)
}
pub fn name(&self) -> &str { &self.name }
}