Add miniflux webhook processing with KV store, change prompt.
Also updated README detailing the changes and setup required.
This commit is contained in:
249
src/lib.rs
249
src/lib.rs
@@ -1,24 +1,24 @@
|
||||
extern crate console_error_panic_hook;
|
||||
use base64::{engine::general_purpose::STANDARD, Engine as _};
|
||||
use futures::{stream, StreamExt};
|
||||
use hmac::{Hmac, Mac};
|
||||
use reqwest::header::{AUTHORIZATION, CONTENT_TYPE};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use worker::{event, Env, ScheduleContext, ScheduledEvent};
|
||||
use sha2::Sha256;
|
||||
use std::panic;
|
||||
use worker::*;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Feed {
|
||||
site_url: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct Entry {
|
||||
id: u64,
|
||||
title: String,
|
||||
url: String,
|
||||
content: String,
|
||||
feed: Feed,
|
||||
feed_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ApiResponse {
|
||||
struct NewEntriesRequest {
|
||||
entries: Vec<Entry>,
|
||||
}
|
||||
|
||||
@@ -27,40 +27,13 @@ struct UpdateRequest {
|
||||
content: String,
|
||||
}
|
||||
|
||||
async fn get_entries(
|
||||
base_url: &str,
|
||||
username: &str,
|
||||
password: &str,
|
||||
) -> Result<ApiResponse, Box<dyn std::error::Error>> {
|
||||
// 创建 HTTP 客户端
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
// 使用 Basic Auth 进行身份验证
|
||||
let auth = format!(
|
||||
"Basic {}",
|
||||
STANDARD.encode(format!("{}:{}", username, password))
|
||||
);
|
||||
|
||||
// 发送 GET 请求
|
||||
let response = client
|
||||
.get(&format!("{}/v1/entries?status=unread&limit=100", base_url))
|
||||
.header(AUTHORIZATION, auth)
|
||||
.header(CONTENT_TYPE, "application/json")
|
||||
.send()
|
||||
.await?
|
||||
.json::<ApiResponse>()
|
||||
.await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn update_entry(
|
||||
base_url: &str,
|
||||
username: &str,
|
||||
password: &str,
|
||||
id: u64,
|
||||
content: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let auth = format!(
|
||||
@@ -72,15 +45,17 @@ async fn update_entry(
|
||||
let update_request = UpdateRequest {
|
||||
content: content.to_string(),
|
||||
};
|
||||
console_log!("created update_request");
|
||||
|
||||
client
|
||||
.put(&url)
|
||||
.header(AUTHORIZATION, auth)
|
||||
.header(CONTENT_TYPE, "application/json")
|
||||
.json(&update_request) // 将请求体序列化为 JSON
|
||||
.json(&update_request)
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
console_log!("updated entry in miniflux");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -112,7 +87,8 @@ async fn request_openai_chat_completion(
|
||||
api_key: &str,
|
||||
model: &str,
|
||||
messages: Vec<Message>,
|
||||
) -> Result<String, Box<dyn std::error::Error>> {
|
||||
) -> std::result::Result<String, Box<dyn std::error::Error>> {
|
||||
console_log!("request_openai_chat_completion");
|
||||
let client = reqwest::Client::new();
|
||||
let request_body = ChatCompletionRequest {
|
||||
model: model.to_string(),
|
||||
@@ -120,7 +96,7 @@ async fn request_openai_chat_completion(
|
||||
};
|
||||
|
||||
let response = client
|
||||
.post(&format!("{}/v1/chat/completions", base_url))
|
||||
.post(format!("{}/v1/chat/completions", base_url))
|
||||
.header(AUTHORIZATION, format!("Bearer {}", api_key))
|
||||
.header(CONTENT_TYPE, "application/json")
|
||||
.json(&request_body)
|
||||
@@ -128,53 +104,56 @@ async fn request_openai_chat_completion(
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
console_log!("request_openai_chat_completion success");
|
||||
let completion_response: ChatCompletionResponse = response.json().await?;
|
||||
Ok(completion_response.choices[0].message.content.clone())
|
||||
} else {
|
||||
let error_message = response.text().await?;
|
||||
console_log!("request_openai_chat_completion error: {}", error_message);
|
||||
Err(format!("Error: {:?}", error_message).into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Miniflux {
|
||||
url: String,
|
||||
username: String,
|
||||
password: String,
|
||||
webhook_secret: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct OpenAi {
|
||||
url: String,
|
||||
token: String,
|
||||
model: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Config {
|
||||
miniflux: Miniflux,
|
||||
openai: OpenAi,
|
||||
whitelist: HashSet<String>,
|
||||
}
|
||||
|
||||
async fn generate_and_update_entry(
|
||||
config: &Config,
|
||||
entry: Entry,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let content: &str = &entry.content;
|
||||
// Check if the content should be summarized and if the site is whitelisted
|
||||
if content.starts_with("<pre") || !config.whitelist.contains(&entry.feed.site_url) {
|
||||
) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
console_log!("entry id: {}", entry.id);
|
||||
console_log!("entry title: {}", entry.title);
|
||||
console_log!("entry url: {}", entry.url);
|
||||
if entry.content.trim().is_empty() || entry.content.len() < 500 {
|
||||
console_log!("skipping entry due to empty content or short content length");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let messages = vec![
|
||||
Message {
|
||||
role: "system".to_string(),
|
||||
content: "Please summarize the content of the article under 150 words in Chinese. Do not add any additional Character、markdown language to the result text. 请用不超过150个汉字概括文章内容。结果文本中不要添加任何额外的字符、Markdown语言。".to_string(),
|
||||
content: "You are an experienced and knowledgeable internet blogger that writes short and easy-to-read summaries for articles from various RSS feeds. Please summarize the content of the article in 250 words or less. Format your output in CommonMark compliant markdown. Do not give any extra comments, headers, or prefix. Only return the actual summary text. Similar to the blurbs on the back of books, highlight any aspects of the articles that may be of interest and grab the attention to any readers perusing.".to_string(),
|
||||
},
|
||||
Message {
|
||||
role: "user".to_string(),
|
||||
content: format!(
|
||||
"The following is the input content:\n---\n {}",
|
||||
content,
|
||||
),
|
||||
content: format!("The following is the article:\n---\nTitle: {}\nURL: {}\nContent: {}", &entry.title, &entry.url, &entry.content),
|
||||
},
|
||||
];
|
||||
|
||||
@@ -188,9 +167,11 @@ async fn generate_and_update_entry(
|
||||
.await
|
||||
{
|
||||
if !summary.trim().is_empty() {
|
||||
console_log!("Summary: {}", summary);
|
||||
let updated_content = format!(
|
||||
"<pre style=\"white-space: pre-wrap;\"><code>\n💡AI 摘要:\n{}</code></pre><hr><br />{}",
|
||||
summary, content
|
||||
"<div class=\"ai-summary\"><h4>✨ AI Summary</h4>{}</div><hr><br />{}",
|
||||
markdown::to_html(&summary),
|
||||
&entry.content
|
||||
);
|
||||
|
||||
// Update the entry
|
||||
@@ -205,50 +186,158 @@ async fn generate_and_update_entry(
|
||||
}
|
||||
}
|
||||
|
||||
console_log!("processed entry: {}", entry.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_options() -> Result<Response> {
|
||||
let mut headers = Headers::new();
|
||||
headers.set("Access-Control-Allow-Origin", "*")?;
|
||||
headers.set("Access-Control-Allow-Methods", "POST, OPTIONS")?;
|
||||
headers.set(
|
||||
"Access-Control-Allow-Headers",
|
||||
"Content-Type, X-Miniflux-Signature",
|
||||
)?;
|
||||
Response::ok("").map(|resp| resp.with_headers(headers))
|
||||
}
|
||||
|
||||
#[event(scheduled)]
|
||||
async fn scheduled(_event: ScheduledEvent, env: Env, _ctx: ScheduleContext) {
|
||||
pub async fn scheduled(_event: ScheduledEvent, env: Env, _ctx: ScheduleContext) {
|
||||
console_log!("scheduled");
|
||||
let config = &Config {
|
||||
whitelist: env
|
||||
.var("WHITELIST_URL")
|
||||
.unwrap()
|
||||
.to_string()
|
||||
.split(",")
|
||||
.map(|s| s.to_string())
|
||||
.collect(),
|
||||
openai: OpenAi {
|
||||
url: env.var("OPENAI_URL").unwrap().to_string(),
|
||||
token: env.var("OPENAI_TOKEN").unwrap().to_string(),
|
||||
url: env.secret("OPENAI_URL").unwrap().to_string(),
|
||||
token: env.secret("OPENAI_TOKEN").unwrap().to_string(),
|
||||
model: env.var("OPENAI_MODEL").unwrap().to_string(),
|
||||
},
|
||||
miniflux: Miniflux {
|
||||
url: env.var("MINIFLUX_URL").unwrap().to_string(),
|
||||
username: env.var("MINIFLUX_USERNAME").unwrap().to_string(),
|
||||
password: env.var("MINIFLUX_PASSWORD").unwrap().to_string(),
|
||||
url: env.secret("MINIFLUX_URL").unwrap().to_string(),
|
||||
username: env.secret("MINIFLUX_USERNAME").unwrap().to_string(),
|
||||
password: env.secret("MINIFLUX_PASSWORD").unwrap().to_string(),
|
||||
webhook_secret: env.secret("MINIFLUX_WEBHOOK_SECRET").unwrap().to_string(),
|
||||
},
|
||||
};
|
||||
console_log!("config");
|
||||
|
||||
// 查询未读文章
|
||||
let entries = get_entries(
|
||||
&config.miniflux.url,
|
||||
&config.miniflux.username,
|
||||
&config.miniflux.password,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let kv = env.kv("entries").unwrap();
|
||||
|
||||
// List all keys with the "entry:" prefix
|
||||
let keys = kv
|
||||
.list()
|
||||
.prefix("entry:".to_string())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 生成摘要并更新的并发任务
|
||||
let max_concurrent_tasks = 5;
|
||||
|
||||
// Create a stream to process tasks with concurrency limit
|
||||
let _: Vec<_> = stream::iter(entries.entries)
|
||||
.map(|entry| {
|
||||
let _: Vec<_> = stream::iter(keys.keys)
|
||||
.map(|key| {
|
||||
let config = &config;
|
||||
async move { generate_and_update_entry(config, entry).await }
|
||||
let kv = kv.clone();
|
||||
async move {
|
||||
// Retrieve the entry
|
||||
if let Ok(Some(entry)) = kv.get(&key.name).json::<Entry>().await {
|
||||
console_log!("Processing entry: {}", key.name);
|
||||
|
||||
// Process the entry (call AI API, etc.)
|
||||
match generate_and_update_entry(config, entry).await {
|
||||
Ok(_) => {
|
||||
// If processing was successful, delete the entry from KV
|
||||
kv.delete(&key.name).await.unwrap();
|
||||
console_log!("Processed and removed entry: {}", key.name);
|
||||
}
|
||||
Err(e) => {
|
||||
console_error!("Error processing entry {}: {:?}", key.name, e);
|
||||
// Optionally, you could implement retry logic here
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(max_concurrent_tasks)
|
||||
.collect()
|
||||
.await;
|
||||
}
|
||||
|
||||
#[event(fetch)]
|
||||
async fn fetch(mut req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
||||
panic::set_hook(Box::new(console_error_panic_hook::hook));
|
||||
console_log!("fetch");
|
||||
// Check if it's an OPTIONS request and handle it
|
||||
if req.method() == Method::Options {
|
||||
return handle_options();
|
||||
}
|
||||
console_log!("not options");
|
||||
|
||||
// Only proceed with POST requests
|
||||
if req.method() != Method::Post {
|
||||
return Response::error("Method not allowed", 405);
|
||||
}
|
||||
console_log!("is post");
|
||||
|
||||
let config = &Config {
|
||||
openai: OpenAi {
|
||||
url: env.secret("OPENAI_URL").unwrap().to_string(),
|
||||
token: env.secret("OPENAI_TOKEN").unwrap().to_string(),
|
||||
model: env.var("OPENAI_MODEL").unwrap().to_string(),
|
||||
},
|
||||
miniflux: Miniflux {
|
||||
url: env.secret("MINIFLUX_URL").unwrap().to_string(),
|
||||
username: env.secret("MINIFLUX_USERNAME").unwrap().to_string(),
|
||||
password: env.secret("MINIFLUX_PASSWORD").unwrap().to_string(),
|
||||
webhook_secret: env.secret("MINIFLUX_WEBHOOK_SECRET").unwrap().to_string(),
|
||||
},
|
||||
};
|
||||
console_log!("config");
|
||||
|
||||
let signature = req
|
||||
.headers()
|
||||
.get("x-miniflux-signature")
|
||||
.map_err(|_err| {
|
||||
Error::RustError("Missing signature header in webhook request".to_string())
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
Error::RustError("Missing signature header in webhook request".to_string())
|
||||
})?;
|
||||
console_log!("signature");
|
||||
let payload = match req.bytes().await {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => return Response::error(format!("Failed to read payload: {}", err), 400),
|
||||
};
|
||||
console_log!("payload");
|
||||
|
||||
let mut mac = Hmac::<Sha256>::new_from_slice(config.miniflux.webhook_secret.as_bytes())
|
||||
.map_err(|_| Error::RustError("HMAC key error".to_string()))?;
|
||||
console_log!("mac");
|
||||
|
||||
mac.update(&payload);
|
||||
let hmac = hex::encode(mac.finalize().into_bytes());
|
||||
console_log!("hmac");
|
||||
|
||||
if hmac != signature {
|
||||
return Response::error("Incorrect webhook request signature", 403);
|
||||
}
|
||||
|
||||
// convert body to json
|
||||
let body: NewEntriesRequest = serde_json::from_slice::<NewEntriesRequest>(&payload)
|
||||
.map_err(|_err| Error::RustError("Failed to parse webhook json body".to_string()))?;
|
||||
console_log!("body");
|
||||
|
||||
let kv = env.kv("entries")?;
|
||||
|
||||
let max_concurrent_tasks = 5;
|
||||
|
||||
// Create a stream to process tasks with concurrency limit
|
||||
let _: Vec<_> = stream::iter(body.entries)
|
||||
.map(|entry| {
|
||||
let key = format!("entry:{}", entry.id);
|
||||
let kv = kv.clone();
|
||||
console_log!("putting KV key {}", key);
|
||||
async move { kv.put(&key, entry)?.execute().await }
|
||||
})
|
||||
.buffer_unordered(max_concurrent_tasks)
|
||||
.collect()
|
||||
.await;
|
||||
Response::ok("Webhook request processed")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user