Small Scripts
I will be sharing small scripts I have. I use them time to time. Before writing them I look for ready made ones to save time, these are the ones I couldn't find and had to implement my own solution. I hope you can find fitting solution for your use case, enjoy!
Bash
ftp_to_gdrive.sh
Purpose:
Some ftp servers have speed limitations and I might need those files right away at some point or use process them on Colab. There for I keep those files in Google Drive. This code will transfer them for you. For not creating load on systems I only make this transaction between 1am and 6am. Also if your download speed is lower than your upload speed it will continue downloading even if the file might have been transferred fully. You can modify it to do everything sequential.
This script uses gdrive CLI client for google drive sided of things.
Known issues:
It doesn't keep the file structure of the source
It is unable to continue where it left of after power cycle. keeps every variable in the RAM.
Usage:
I usually use it with nohup
Code:
#!/bin/bash
# Function to check if the current hour is between 1am and 6am
function is_between_1_and_6() {
current_hour=$(date +"%H")
[ "$current_hour" -ge 1 ] && [ "$current_hour" -lt 6 ]
}
# FTP variables
FTP_HOST="website"
FTP_USER="username"
FTP_PASS="password"
FTP_PATH="/ftp_directory"
# Google Drive variables
GDRIVE_FOLDER_ID="google_drive_folder_id" # Replace with the ID of the Google Drive folder where you want to upload files
# Capture the list of files with full paths into a variable
file_list=$(lftp -u "$FTP_USER","$FTP_PASS" "$FTP_HOST" <<EOF
cd "$FTP_PATH"
find
quit
EOF
)
# Loop through the list of file paths
IFS=$'\n' # Set Internal Field Separator to newline
for file_path in $file_list; do
# Check if the entry is a file and the last character is not "/"
if [ "${file_path: -1}" != "/" ]; then
while ! is_between_1_and_6; do
echo "Waiting for half an hour..."
sleep 1800 # Wait for half an hour (1800 seconds)
done
# Download each file
lftp -u "$FTP_USER","$FTP_PASS" "$FTP_HOST" -e "cd \"$FTP_PATH\"; get \"$file_path\"; quit"
# Extract file name from the full path
file_name=$(basename "$file_path")
# Print debug information
echo "Downloaded: $file_path"
# Run the upload and delete operations in subshells
(
# Upload the file to Google Drive using gdrive
gdrive files upload "$file_name" --parent "$GDRIVE_FOLDER_ID"
# Print debug information
echo "Uploaded to Google Drive: $file_name"
# Delete the downloaded file
rm "$file_name"
# Print debug information
echo "Deleted: $file_name"
) &
# Print debug information
echo "Initiated upload and delete sub-process for: $file_name"
else
# Print debug information for skipped files or directories
echo "Skipped: $file_path"
fi
done
# Wait for all background processes to finish
wait
# Add any additional logic that you want to execute after all uploads
echo "All uploads completed."
Single Line SHA256 Calculator
Purpose: Single line command for unix where it creates checksum for every file and saves them as different files.
Known issues: not that I know of.
Usage: Just run it in the directory of interest.
for file in *; do sha256sum "$file" > "$file.sha256"; done
reduce_nc_param_multi.sh
Purpose: Some NetCDF files I use contains variables that I'm not interested in and my laptop can only carry only so much data. For saving on storage space I sometimes reduce variables as it is downloading by running this script.
It looks for files that are older than 7 days old and as creating new file it only keeps those selected variables. It does it by utilizing NCO tools. It utilizes multicore, max 4 cores, each working on different files.
Known issues: Nothing major. It doesn't even have CLI.
Usage: Edit the variables in the code so it serves your needs.
# Folder where your .nc files are located
input_folder="./full_sized"
# Variables you want to keep (comma-separated, no spaces)
keep_vars="vo,uo,latitude,longitude,time" # Replace with your variables
output_folder="./reduced"
# Create output directory if it doesn't exist
mkdir -p "$output_folder"
# Function to process a single file
process_file() {
output_folder="./reduced"
keep_vars="vo,uo,latitude,longitude,time" # Replace with your variables
local ncfile="$1"
# Get the base filename and create a lock file for it
base_filename=$(basename "$ncfile")
lockfile="$ncfile.lock"
# Output file path
output_file="$output_folder/$base_filename"
# Check if the lock file exists, meaning the file is being processed
if [ -f "$lockfile" ]; then
echo "$base_filename is already being processed, skipping..."
return
fi
# Create a lock file
touch "$lockfile"
# Print the exact command that will be run
echo "Processing file: $base_filename"
echo "Running command: ncks -O -v \"$keep_vars\" \"$ncfile\" \"$output_file\""
# Remove unwanted variables and save the result to the output file
ncks -O -v "$keep_vars" "$ncfile" "$output_file"
# Check if ncks succeeded
if [ $? -eq 0 ]; then
echo "Successfully processed $ncfile and saved to $output_file"
# Delete the original file after processing
rm "$ncfile"
echo "Deleted original file $ncfile"
else
echo "Error processing $ncfile"
fi
# Remove the lock file
rm "$lockfile"
}
# Export the function to be used by parallel processes
export -f process_file
# Loop indefinitely
while true; do
# Find .nc files that were NOT modified today, excluding locked files
find "$input_folder" -name "*.nc" -type f -mtime +7 ! -name "*.lock" | \
xargs -P 4 -I {} bash -c 'process_file "$@"' _ {}
# Wait a few seconds before scanning the folder again
sleep 10
done
remove_yellow_background.sh
Purpose: When you have an old book scanned not always you get perfect white background sometimes you get yellow backgroud which is a problem if you want to print it later. This script removes yellow background from pdf pages. Specificly targets for this colour #FAEBD7 .
Known issues: Not that I know of.
Usage: Provide file location.
#!/bin/bash
# Check if input PDF filename is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 input.pdf"
exit 1
fi
input_pdf="$1"
output_pdf="cleaned_${input_pdf}"
# Check if input file exists
if [ ! -f "$input_pdf" ]; then
echo "Error: Input file '$input_pdf' not found"
exit 1
fi
# Create a temporary directory for processing
temp_dir=$(mktemp -d)
echo "Creating temporary directory: $temp_dir"
# Convert PDF to PNG images
echo "Converting PDF to images..."
gm convert -density 300 "$input_pdf" "$temp_dir/page-%03d.png"
# Process each image to remove yellow background
echo "Removing yellow background from each page..."
for image in "$temp_dir"/page-*.png; do
echo "Processing $(basename "$image")..."
gm convert "$image" -fuzz 40% -fill white -opaque "#FAEBD7" "$image"
done
# Combine back into PDF
echo "Combining pages back into PDF..."
gm convert $(ls "$temp_dir"/page-*.png | sort -V) "$output_pdf"
# Cleanup
echo "Cleaning up temporary files..."
rm -rf "$temp_dir"
echo "Process complete! Output saved as: $output_pdf"
C/C++
phrase_matcher.cpp
Purpose:
This script reads a list of phrases from a file and then searches for those phra
ses within a large text file. When a line in the large file contains any of the
specified phrases, it's saved to an output file.
In my use-case using my laptop I get reading speeds of the text file up to 1000 M/s and the process was cpu bound.
Known issues: not that I know of.
Usage: This program is a command-line interface (CLI);
./phrase_matcher <phrases_file> <large_file> <output_file>
<phrases_file>: File containing phrases to search for.
<large_file>: Large text file to search within.
<output_file>: File to save matching lines.
Code:
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
// Function to check if a line contains any of the specified phrases
bool containsPhrase(const std::string& line, const std::vector<std::string>& phrases) {
for (const auto& phrase : phrases) {
if (line.find(phrase) != std::string::npos) {
return true;
}
}
return false;
}
int main(int argc, char* argv[]) {
// Check if the correct number of command-line arguments is provided
if (argc != 4) {
std::cerr << "Usage: " << argv[0] << " <phrases_file> <large_file> <output_file>\n";
return 1;
}
// Open the file containing the phrases
std::ifstream phrasesFile(argv[1]);
if (!phrasesFile) {
std::cerr << "Error: Unable to open " << argv[1] << std::endl;
return 1;
}
// Read the phrases into a vector
std::vector<std::string> phrases;
std::string phrase;
while (std::getline(phrasesFile, phrase)) {
phrases.push_back(phrase);
}
phrasesFile.close();
// Open the large text file to search
std::ifstream largeFile(argv[2]);
if (!largeFile) {
std::cerr << "Error: Unable to open " << argv[2] << std::endl;
return 1;
}
// Open the output file to save matching lines
std::ofstream outputFile(argv[3]);
if (!outputFile) {
std::cerr << "Error: Unable to create " << argv[3] << std::endl;
return 1;
}
// Search line by line in the large file
std::string line;
while (std::getline(largeFile, line)) {
if (containsPhrase(line, phrases)) {
outputFile << line << "\n";
}
}
// Close files
largeFile.close();
outputFile.close();
std::cout << "Matching lines saved to " << argv[3] << std::endl;
return 0;
}
compile_R.sh
Purpose;
I was in need of compiling R. So I created this script. once desired version number is provided in the argument it downloads the files and install requirements. it does a generic compilation and installation. I wrote this for version 4.4.3 haven't tested in other versions or systems.
Known issues: Not that I know of.
Usage: provide desired version number as argument
#!/bin/bash
# R Compilation and Installation Script for Ubuntu
# Usage: ./install_r.sh [VERSION]
# Example: ./install_r.sh 4.4.2
set -e # Exit on any error
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Function to print colored output
print_status() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Function to check if command exists
command_exists() {
command -v "$1" >/dev/null 2>&1
}
# Function to check if running as root
check_not_root() {
if [[ $EUID -eq 0 ]]; then
print_error "This script should not be run as root!"
print_error "Run it as a regular user. It will ask for sudo when needed."
exit 1
fi
}
# Function to get R version
get_r_version() {
if [[ $# -eq 0 ]]; then
print_error "No R version specified!"
echo "Usage: $0 [VERSION]"
echo "Example: $0 4.4.2"
echo ""
echo "Available versions can be found at: https://cran.r-project.org/src/base/"
exit 1
fi
R_VERSION="$1"
R_MAJOR=$(echo $R_VERSION | cut -d. -f1)
print_status "Target R version: $R_VERSION"
}
# Function to install dependencies
install_dependencies() {
print_status "Updating package lists..."
sudo apt update
print_status "Installing essential build tools..."
sudo apt install -y build-essential gfortran
print_status "Installing R dependencies..."
sudo apt install -y \
libreadline-dev \
libx11-dev \
libxt-dev \
libpng-dev \
libjpeg-dev \
libcairo2-dev \
xvfb \
libbz2-dev \
libzstd-dev \
liblzma-dev \
libcurl4-openssl-dev \
texinfo \
libpcre2-dev \
libblas-dev \
liblapack-dev \
libssl-dev \
libxml2-dev \
libfontconfig1-dev \
libharfbuzz-dev \
libfribidi-dev \
libfreetype6-dev \
libtiff5-dev \
libicu-dev
print_success "Dependencies installed successfully"
}
# Function to download and extract R source
download_r_source() {
WORK_DIR="$HOME/r-build-$R_VERSION"
print_status "Creating working directory: $WORK_DIR"
mkdir -p "$WORK_DIR"
cd "$WORK_DIR"
# Construct download URL
R_URL="https://cran.r-project.org/src/base/R-${R_MAJOR}/R-${R_VERSION}.tar.gz"
R_TARBALL="R-${R_VERSION}.tar.gz"
R_DIR="R-${R_VERSION}"
print_status "Downloading R source from: $R_URL"
if ! wget -q --show-progress "$R_URL"; then
print_error "Failed to download R source!"
print_error "Please check if version $R_VERSION exists at:"
print_error "https://cran.r-project.org/src/base/R-${R_MAJOR}/"
exit 1
fi
print_status "Extracting R source..."
tar -xzf "$R_TARBALL"
if [[ ! -d "$R_DIR" ]]; then
print_error "Failed to extract R source or directory not found!"
exit 1
fi
cd "$R_DIR"
print_success "R source downloaded and extracted"
}
# Function to configure R build
configure_r() {
print_status "Configuring R build..."
# Temporarily remove conda from PATH to avoid conflicts
if command_exists conda; then
print_warning "Conda detected. Temporarily removing from PATH to avoid conflicts..."
export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
fi
# Basic configuration - reliable and widely compatible
./configure --prefix=/usr/local \
--enable-R-shlib \
--enable-memory-profiling \
--with-blas \
--with-lapack \
--with-readline \
--with-cairo \
--with-libpng \
--with-jpeglib \
--with-libtiff \
--enable-BLAS-shlib=no
if [[ $? -eq 0 ]]; then
print_success "R configured successfully"
else
print_error "R configuration failed!"
exit 1
fi
}
# Function to compile R
compile_r() {
print_status "Starting R compilation (this may take 15-30 minutes)..."
# Get number of CPU cores
NCORES=$(nproc)
# Use all cores but limit to avoid memory issues
if [[ $NCORES -gt 4 ]]; then
MAKE_JOBS=4
else
MAKE_JOBS=$NCORES
fi
print_status "Using $MAKE_JOBS parallel jobs for compilation"
if make -j$MAKE_JOBS; then
print_success "R compiled successfully"
else
print_warning "Compilation with $MAKE_JOBS jobs failed. Trying with single job..."
make clean
if make -j1; then
print_success "R compiled successfully (single-threaded)"
else
print_error "R compilation failed!"
exit 1
fi
fi
}
# Function to install R
install_r() {
print_status "Installing R to /usr/local..."
if sudo make install; then
print_success "R installed successfully"
else
print_error "R installation failed!"
exit 1
fi
# Update library cache
print_status "Updating library cache..."
sudo ldconfig
# Update PATH for current session
export PATH="/usr/local/bin:$PATH"
# Add to user's bashrc if not already there
if ! grep -q "/usr/local/bin" "$HOME/.bashrc"; then
print_status "Adding /usr/local/bin to PATH in ~/.bashrc"
echo 'export PATH="/usr/local/bin:$PATH"' >> "$HOME/.bashrc"
fi
}
# Function to verify installation
verify_installation() {
print_status "Verifying R installation..."
# Check if R binary exists
if ! command_exists R; then
print_error "R command not found in PATH!"
return 1
fi
# Check R version
INSTALLED_VERSION=$(R --version | head -n1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+')
if [[ "$INSTALLED_VERSION" == "$R_VERSION" ]]; then
print_success "R version $INSTALLED_VERSION installed successfully!"
else
print_error "Version mismatch! Expected: $R_VERSION, Found: $INSTALLED_VERSION"
return 1
fi
# Test basic R functionality
print_status "Testing basic R functionality..."
if R --slave --no-restore --no-save -e "cat('R is working correctly!\n'); sessionInfo()" >/dev/null 2>&1; then
print_success "R is working correctly!"
else
print_error "R is installed but not working properly!"
return 1
fi
# Show installation info
echo ""
print_success "=== R Installation Complete ==="
echo "R Version: $INSTALLED_VERSION"
echo "R Location: $(which R)"
echo "Installation Directory: /usr/local"
echo ""
echo "To start R, simply type: R"
echo "To check R version: R --version"
echo ""
print_status "Build directory saved at: $WORK_DIR"
print_status "You can remove it to save space: rm -rf $WORK_DIR"
}
# Function to cleanup on error
cleanup_on_error() {
print_error "Installation failed!"
print_status "Build directory preserved for debugging: $WORK_DIR"
print_status "Check the error messages above for troubleshooting."
exit 1
}
# Main function
main() {
echo "======================================"
echo " R Compilation and Installation"
echo "======================================"
echo ""
# Set up error handling
trap cleanup_on_error ERR
# Check if not running as root
check_not_root
# Get R version from command line
get_r_version "$@"
# Install dependencies
install_dependencies
# Download and extract R source
download_r_source
# Configure R build
configure_r
# Compile R
compile_r
# Install R
install_r
# Verify installation
verify_installation
print_success "All done! Enjoy your new R installation!"
}
# Run main function with all arguments
main "$@"
Python
spb_book_spine.py
Purpose:
I sometimes print some large pdfs to keep it as paper to refer to it. I print them on A5 pages and punch spiral holes. since I only actively read one or two books like this I reuse same spiral. But this method has flaw. since I reuse spiral, after I read the book it needs to be held together with something, like a rubber band or a string. Bigger problem is that you can't tell which book is which when it is on the self. So I made this script to create spine for them. I print on an A4 and fold it so it hugs the book.
Known issues: it is not convenient to modify it.
Usage: Just edit and run it!
Code:
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
def create_book_spine(title, author, doi):
# Generate QR code for DOI
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(doi)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
# Create PDF with specific dimensions
c = canvas.Canvas("book_spine.pdf", pagesize=(1.5 * cm, 21 * cm))
c.setFont("Helvetica", 12)
# Add title and author
c.drawString(0.2 * cm, 19 * cm, title)
c.drawString(0.2 * cm, 18 * cm, author)
# Add QR code for DOI at the bottom
qr_img_path = "doi_qr.png"
qr_img.save(qr_img_path)
c.drawImage(qr_img_path, 0.2 * cm, 0.5 * cm, 1.1 * cm, 1.1 * cm)
c.save()
import qrcode
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
def create_book_spine(title, author, doi):
# Generate QR code for DOI
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=1,
)
qr.add_data(doi)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
# Create PDF with specific dimensions
c = canvas.Canvas("book_spine.pdf", pagesize=(1.5 * cm, 20 * cm))
# Rotate the canvas to write vertical text
c.rotate(90)
# Set font to Helvetica-Bold for title
c.setFont("Helvetica-Bold", 12)
c.drawString(2 * cm, -0.8 * cm, title)
# Set font back to Helvetica for author
c.setFont("Helvetica", 12)
c.drawString(3 * cm, -1.3 * cm, author)
# Reset rotation for the QR code
c.rotate(-90)
# Add QR code for DOI at the bottom
qr_img_path = "doi_qr.png"
qr_img.save(qr_img_path)
c.drawImage(qr_img_path, 0.2 * cm, 0.5 * cm, 1.1 * cm, 1.1 * cm, mask='auto')
c.save()
create_book_spine(
"Books title",
"Author",
"URL"
)
doi_bib_converter.py
Purpose:
While drafting manuscripts nowadays, I usually just add the DOI in brackets for references. Every research group seems to have a different preference for reference management, and I find this approach easier—especially when multiple authors are making edits. Once everything is finalized, I properly insert the references. For now, I'm hosting it here: cs1.puntocopy.com/bib-convert/
Known issues: not that I know of
Usage: Just use it!
Code:
#!/usr/bin/env python3
"""
DOI Bibliography Converter
A Flask web service to convert DOIs to BibTeX or MS Word XML format
Production version for Apache2 deployment
"""
import re
import requests
import xml.etree.ElementTree as ET
from xml.dom import minidom
from flask import Flask, render_template_string, request, Response, jsonify
from urllib.parse import quote
import logging
import time
import os
import json
from typing import List, Set, Dict, Tuple
# Configure logging for production
try:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(name)s:%(message)s',
handlers=[
logging.FileHandler('/var/log/apache2/bib-convert.log'),
logging.StreamHandler()
]
)
except PermissionError:
# Fallback if can't write to log file
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(name)s:%(message)s'
)
app = Flask(__name__)
# HTML template for the web interface
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>DOI Bibliography Converter</title>
<meta charset="utf-8">
<style>
body {
font-family: Helvetica, sans-serif;
max-width: 800px;
margin: 50px auto;
padding: 20px;
line-height: 1.6;
}
.container {
background: #f9f9f9;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
textarea {
width: 100%;
height: 200px;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
font-family: monospace;
resize: vertical;
}
.form-group {
margin: 20px 0;
}
label {
display: block;
margin-bottom: 8px;
font-weight: bold;
}
.radio-group {
margin: 10px 0;
}
.radio-group label {
display: inline;
margin-left: 8px;
font-weight: normal;
}
.checkbox-group {
margin: 10px 0;
}
.checkbox-group label {
display: inline;
margin-left: 8px;
font-weight: normal;
}
button {
background: #007cba;
color: white;
padding: 12px 24px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
width: 100%;
}
button:hover {
background: #005a8b;
}
.result {
margin-top: 20px;
padding: 15px;
background: white;
border: 1px solid #ddd;
border-radius: 4px;
white-space: pre-wrap;
font-family: monospace;
max-height: 400px;
overflow-y: auto;
}
.error {
color: #d32f2f;
background: #ffebee;
border-color: #d32f2f;
}
.info {
font-size: 14px;
color: #666;
margin-top: 10px;
}
.progress-container {
margin: 20px 0;
display: none;
}
.progress-bar {
width: 100%;
height: 20px;
background-color: #f0f0f0;
border-radius: 10px;
overflow: hidden;
box-shadow: inset 0 1px 3px rgba(0, 0, 0, 0.2);
}
.progress-fill {
height: 100%;
background: linear-gradient(90deg, #007cba, #005a8b);
width: 0%;
transition: width 0.3s ease;
border-radius: 10px;
}
.progress-text {
text-align: center;
margin-top: 5px;
font-size: 14px;
color: #666;
}
.footer {
margin-top: 40px;
padding-top: 20px;
border-top: 1px solid #ddd;
text-align: center;
font-size: 14px;
color: #666;
}
.footer a {
color: #007cba;
text-decoration: none;
}
.footer a:hover {
text-decoration: underline;
}
.tex-info {
background: #fff3cd;
border: 1px solid #ffc107;
padding: 8px;
border-radius: 4px;
font-size: 12px;
margin-top: 5px;
color: #856404;
}
</style>
</head>
<body>
<div class="container">
<h1>DOI Bibliography Converter</h1>
<div class="info" style="background: #e3f2fd; padding: 10px; border-radius: 4px; margin-bottom: 20px;">
<strong>How it works:</strong> Paste text containing DOIs or enter DOIs directly. The service will automatically extract and convert them to your chosen format.
<br><strong>Rate limiting:</strong> Processing is throttled to respect CrossRef API limits (~10 requests/second).
</div>
<form id="doiForm">
<div class="form-group">
<label for="dois">Enter DOIs or paste text containing DOIs:</label>
<textarea id="dois" name="dois" placeholder="10.1038/nature12373
10.1126/science.1234567
10.1016/j.cell.2020.01.001
Or paste any text like:
Recent studies (doi:10.1038/nature12373) show that...
See https://doi.org/10.1126/science.1234567 for details..."></textarea>
<div class="info">Enter DOIs one per line, or paste any text - DOIs will be automatically extracted. Supports various formats including URLs and doi: prefixes.</div>
</div>
<div class="form-group">
<label>Output Format:</label>
<div class="radio-group">
<input type="radio" id="bibtex" name="format" value="bibtex" checked>
<label for="bibtex">BibTeX</label>
</div>
<div class="radio-group">
<input type="radio" id="xml" name="format" value="xml">
<label for="xml">MS Word XML</label>
</div>
</div>
<div class="form-group">
<div class="checkbox-group">
<input type="checkbox" id="autoDownload" name="autoDownload" checked>
<label for="autoDownload">Automatically download result file</label>
</div>
<div class="checkbox-group">
<input type="checkbox" id="forTex" name="forTex">
<label for="forTex">For TeX (generate TeX file with \\cite{} commands)</label>
<div class="tex-info" id="texInfo" style="display: none;">
This will create an additional TeX file where DOIs in your original text are replaced with \\cite{bibkey} commands matching the generated BibTeX entries.
</div>
</div>
<div class="checkbox-group">
<input type="checkbox" id="forMarkdown" name="forMarkdown">
<label for="forMarkdown">For Markdown (generate Markdown file with @bibkey citations)</label>
<div class="tex-info" id="markdownInfo" style="display: none;">
This will create an additional Markdown file where DOIs in your original text are replaced with @bibkey citations for use with Pandoc and the generated BibTeX file.
</div>
</div>
</div>
<button type="submit">Convert DOIs</button>
</form>
<div class="progress-container" id="progressContainer">
<div class="progress-bar">
<div class="progress-fill" id="progressFill"></div>
</div>
<div class="progress-text" id="progressText">Processing...</div>
</div>
<div id="result"></div>
<div class="footer">
<p>Script available at: <a href="https://github.com/NitroxHead/blog_posts/blob/main/Small%20Scripts/doi_bib_converter.py" target="_blank">Github</a></p>
<p>Created by: NitroxHead</p>
</div>
</div>
<script>
// Show/hide TeX info when checkbox is toggled
document.getElementById('forTex').addEventListener('change', function() {
const texInfo = document.getElementById('texInfo');
texInfo.style.display = this.checked ? 'block' : 'none';
});
// Show/hide Markdown info when checkbox is toggled
document.getElementById('forMarkdown').addEventListener('change', function() {
const markdownInfo = document.getElementById('markdownInfo');
markdownInfo.style.display = this.checked ? 'block' : 'none';
});
document.getElementById('doiForm').addEventListener('submit', async function(e) {
e.preventDefault();
const formData = new FormData(this);
const resultDiv = document.getElementById('result');
const submitButton = document.querySelector('button[type="submit"]');
const progressContainer = document.getElementById('progressContainer');
const progressFill = document.getElementById('progressFill');
const progressText = document.getElementById('progressText');
const autoDownload = document.getElementById('autoDownload').checked;
const forTex = document.getElementById('forTex').checked;
const forMarkdown = document.getElementById('forMarkdown').checked;
const format = formData.get('format');
// Count approximate DOIs for progress indication
const inputText = formData.get('dois');
const approxDoiCount = (inputText.match(/10\\.\\d{4,}/g) || []).length;
let progressMessage = 'Processing...';
if (approxDoiCount > 1) {
progressMessage = `Processing ${approxDoiCount} DOIs (estimated ${Math.ceil(approxDoiCount * 0.2)} seconds)...`;
}
// Show progress bar and start animation
progressContainer.style.display = 'block';
progressText.textContent = progressMessage;
progressFill.style.width = '0%';
// Animate progress bar
let progress = 0;
const estimatedTime = Math.max(2, approxDoiCount * 0.2) * 1000; // Convert to milliseconds
const progressInterval = setInterval(() => {
progress += (100 / (estimatedTime / 100)); // Update every 100ms
if (progress < 90) { // Don't go to 100% until actually done
progressFill.style.width = progress + '%';
}
}, 100);
resultDiv.innerHTML = '';
submitButton.disabled = true;
submitButton.textContent = 'Processing...';
try {
const response = await fetch('convert', {
method: 'POST',
body: formData
});
clearInterval(progressInterval);
progressFill.style.width = '100%';
progressText.textContent = 'Complete!';
if (response.ok) {
const contentType = response.headers.get('content-type');
if (contentType.includes('application/json')) {
// Multi-file response (when TeX or Markdown is enabled)
const result = await response.json();
// Display the main result
resultDiv.innerHTML = '<div class="result">' + escapeHtml(result.main_content) + '</div>';
// Auto-download if enabled
if (autoDownload) {
downloadFile(result.main_content, format);
// Also download TeX file if available
if (result.tex_content) {
downloadFile(result.tex_content, 'tex');
}
// Also download Markdown file if available
if (result.markdown_content) {
downloadFile(result.markdown_content, 'markdown');
}
}
} else {
// Single file response
const result = await response.text();
resultDiv.innerHTML = '<div class="result">' + escapeHtml(result) + '</div>';
// Auto-download if enabled
if (autoDownload) {
downloadFile(result, format);
}
}
} else {
const error = await response.text();
resultDiv.innerHTML = '<div class="result error">Error: ' + escapeHtml(error) + '</div>';
}
} catch (error) {
clearInterval(progressInterval);
resultDiv.innerHTML = '<div class="result error">Network error: ' + escapeHtml(error.message) + '</div>';
} finally {
submitButton.disabled = false;
submitButton.textContent = 'Convert DOIs';
// Hide progress bar after 2 seconds
setTimeout(() => {
progressContainer.style.display = 'none';
}, 2000);
}
});
function downloadFile(content, format) {
const timestamp = new Date().toISOString().slice(0, 19).replace(/[:.]/g, '-');
let extension, mimeType, prefix;
switch(format) {
case 'xml':
extension = 'xml';
mimeType = 'application/xml';
prefix = 'doi-bibliography';
break;
case 'tex':
extension = 'tex';
mimeType = 'text/plain';
prefix = 'doi-text-with-citations';
break;
case 'markdown':
extension = 'md';
mimeType = 'text/markdown';
prefix = 'doi-text-with-citations';
break;
default: // bibtex
extension = 'bib';
mimeType = 'text/plain';
prefix = 'doi-bibliography';
break;
}
const filename = `${prefix}-${timestamp}.${extension}`;
const blob = new Blob([content], { type: mimeType });
const url = window.URL.createObjectURL(blob);
const a = document.createElement('a');
a.style.display = 'none';
a.href = url;
a.download = filename;
document.body.appendChild(a);
a.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(a);
}
function escapeHtml(unsafe) {
return unsafe
.replace(/&/g, "&")
.replace(/</g, "<")
.replace(/>/g, ">")
.replace(/"/g, """)
.replace(/'/g, "'");
}
</script>
</body>
</html>
"""
# Rate limiting
last_request_time = 0
MIN_REQUEST_INTERVAL = 0.1 # 100ms between requests (10 requests/second to be polite)
def clean_doi(doi_string):
"""Extract clean DOI from various input formats"""
# Remove whitespace
doi_string = doi_string.strip()
# Extract DOI from URL if present
if 'doi.org/' in doi_string:
doi_string = doi_string.split('doi.org/')[-1]
# Remove 'doi:' prefix if present
if doi_string.lower().startswith('doi:'):
doi_string = doi_string[4:]
# Remove trailing punctuation that might be from sentence context
while doi_string and doi_string[-1] in '.,;:)]}':
doi_string = doi_string[:-1]
return doi_string
def extract_dois_from_text(text: str) -> List[Tuple[str, int, int]]:
"""Extract all DOIs from text using regex patterns, returning DOI, start, end positions"""
doi_patterns = [
# Standard DOI pattern - more permissive, stops at whitespace, brackets, or end of sentence
r'10\.\d{4,}\/[^\s\(\)\[\]\,\;]+',
# DOI with doi: prefix
r'doi:\s*10\.\d{4,}\/[^\s\(\)\[\]\,\;]+',
# DOI URLs
r'https?:\/\/(?:dx\.)?doi\.org\/10\.\d{4,}\/[^\s\(\)\[\]\,\;]+',
# DOI URLs without protocol
r'(?:dx\.)?doi\.org\/10\.\d{4,}\/[^\s\(\)\[\]\,\;]+'
]
found_dois = []
app.logger.info(f"Extracting DOIs from text: {text[:200]}...")
for i, pattern in enumerate(doi_patterns):
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
raw_doi = match.group()
start_pos = match.start()
end_pos = match.end()
app.logger.info(f"Pattern {i+1} found raw match: '{raw_doi}' at {start_pos}-{end_pos}")
# Clean the DOI
cleaned_doi = clean_doi(raw_doi)
app.logger.info(f"Cleaned to: '{cleaned_doi}'")
# Validate that it's actually a DOI (has the right structure)
if re.match(r'^10\.\d{4,}\/[a-zA-Z0-9\.\-_\(\)\/]+$', cleaned_doi):
found_dois.append((cleaned_doi, start_pos, end_pos))
app.logger.info(f"Added valid DOI: '{cleaned_doi}' at {start_pos}-{end_pos}")
else:
app.logger.warning(f"Rejected invalid DOI format: '{cleaned_doi}'")
# Sort by start position to maintain order
found_dois.sort(key=lambda x: x[1])
app.logger.info(f"Final extracted DOIs with positions: {found_dois}")
return found_dois
def parse_input_text(input_text: str) -> List[str]:
"""Parse input text to extract DOIs, handling both line-by-line DOIs and full text"""
input_text = input_text.strip()
if not input_text:
return []
app.logger.info(f"Parsing input text of length {len(input_text)}")
# First, try to extract DOIs from the entire text
extracted_dois_with_pos = extract_dois_from_text(input_text)
extracted_dois = [doi for doi, _, _ in extracted_dois_with_pos]
app.logger.info(f"Extracted DOIs from full text: {extracted_dois}")
# Also check if input looks like line-by-line DOIs
lines = [line.strip() for line in input_text.split('\n') if line.strip()]
line_dois = set()
app.logger.info(f"Processing {len(lines)} lines for line-by-line DOIs")
for line in lines:
# If line looks like it might be a DOI (contains the typical pattern)
if re.search(r'10\.\d{4,}', line):
clean_doi_str = clean_doi(line)
app.logger.info(f"Line '{line[:50]}...' -> cleaned: '{clean_doi_str}'")
if clean_doi_str and re.match(r'^10\.\d{4,}\/[a-zA-Z0-9\.\-_\(\)\/]+$', clean_doi_str):
line_dois.add(clean_doi_str)
app.logger.info(f"Added line DOI: '{clean_doi_str}'")
# Combine both methods, preferring extracted DOIs if we found any
all_dois_set = set(extracted_dois).union(line_dois) if extracted_dois else line_dois
# Preserve order from extracted DOIs, then add any additional from line parsing
result = []
for doi in extracted_dois:
if doi not in result:
result.append(doi)
for doi in line_dois:
if doi not in result:
result.append(doi)
app.logger.info(f"Final combined DOIs: {result}")
return result
def fetch_doi_metadata(doi):
"""Fetch metadata for a DOI from CrossRef with rate limiting"""
global last_request_time
try:
# Rate limiting - ensure we don't exceed CrossRef's limits
current_time = time.time()
time_since_last = current_time - last_request_time
if time_since_last < MIN_REQUEST_INTERVAL:
time.sleep(MIN_REQUEST_INTERVAL - time_since_last)
clean_doi_str = clean_doi(doi)
url = f"https://api.crossref.org/works/{quote(clean_doi_str)}"
headers = {
'User-Agent': 'DOI-Bibliography-Converter/1.0 (mailto:user@example.com)',
'Accept': 'application/json'
}
last_request_time = time.time()
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
return data['message']
except Exception as e:
app.logger.error(f"Error fetching DOI {doi}: {str(e)}")
return None
def format_authors_bibtex(authors):
"""Format authors for BibTeX"""
if not authors:
return ""
author_list = []
for author in authors:
if 'family' in author and 'given' in author:
author_list.append(f"{author['family']}, {author['given']}")
elif 'family' in author:
author_list.append(author['family'])
return " and ".join(author_list)
def format_authors_xml(authors):
"""Format authors for XML"""
if not authors:
return ""
author_list = []
for author in authors:
if 'family' in author and 'given' in author:
author_list.append(f"{author['given']} {author['family']}")
elif 'family' in author:
author_list.append(author['family'])
return "; ".join(author_list)
def generate_bibtex_key(metadata):
"""Generate a BibTeX key from metadata"""
# Use first author's last name + year
authors = metadata.get('author', [])
year = ""
if 'created' in metadata:
year = str(metadata['created']['date-parts'][0][0])
elif 'published-print' in metadata:
year = str(metadata['published-print']['date-parts'][0][0])
elif 'published-online' in metadata:
year = str(metadata['published-online']['date-parts'][0][0])
if authors and 'family' in authors[0]:
first_author = authors[0]['family'].replace(' ', '').replace('-', '')
key = f"{first_author}{year}"
else:
key = f"unknown{year}"
return key
def metadata_to_bibtex(metadata):
"""Convert CrossRef metadata to BibTeX format"""
entry_type = "article" # Default to article
# Determine entry type based on publication type
pub_type = metadata.get('type', '').lower()
if 'book' in pub_type:
entry_type = "book"
elif 'conference' in pub_type or 'proceedings' in pub_type:
entry_type = "inproceedings"
key = generate_bibtex_key(metadata)
bibtex = f"@{entry_type}{{{key},\n"
# Title
if 'title' in metadata and metadata['title']:
title = metadata['title'][0].replace('{', '').replace('}', '')
bibtex += f" title = {{{title}}},\n"
# Authors
if 'author' in metadata:
authors = format_authors_bibtex(metadata['author'])
if authors:
bibtex += f" author = {{{authors}}},\n"
# Journal
if 'container-title' in metadata and metadata['container-title']:
journal = metadata['container-title'][0]
bibtex += f" journal = {{{journal}}},\n"
# Year
year = ""
if 'created' in metadata:
year = str(metadata['created']['date-parts'][0][0])
elif 'published-print' in metadata:
year = str(metadata['published-print']['date-parts'][0][0])
elif 'published-online' in metadata:
year = str(metadata['published-online']['date-parts'][0][0])
if year:
bibtex += f" year = {{{year}}},\n"
# Volume
if 'volume' in metadata:
bibtex += f" volume = {{{metadata['volume']}}},\n"
# Issue/Number
if 'issue' in metadata:
bibtex += f" number = {{{metadata['issue']}}},\n"
# Pages
if 'page' in metadata:
bibtex += f" pages = {{{metadata['page']}}},\n"
# DOI
if 'DOI' in metadata:
bibtex += f" doi = {{{metadata['DOI']}}},\n"
# URL
if 'URL' in metadata:
bibtex += f" url = {{{metadata['URL']}}},\n"
bibtex += "}\n"
return bibtex, key
def metadata_to_msword_xml(metadata_list):
"""Convert list of CrossRef metadata to MS Word XML bibliography format"""
# Create root element
root = ET.Element("b:Sources")
root.set("SelectedStyle", "\\APASixthEditionOfficeOnline.xsl")
root.set("StyleName", "APA")
root.set("xmlns:b", "http://schemas.openxmlformats.org/officeDocument/2006/bibliography")
root.set("xmlns", "http://schemas.openxmlformats.org/officeDocument/2006/bibliography")
for i, metadata in enumerate(metadata_list):
source = ET.SubElement(root, "b:Source")
# Tag (unique identifier)
tag = ET.SubElement(source, "b:Tag")
tag.text = f"Source{i+1}"
# Source type (most will be journal articles)
source_type = ET.SubElement(source, "b:SourceType")
source_type.text = "ArticleInAPeriodical"
# Title
if 'title' in metadata and metadata['title']:
title = ET.SubElement(source, "b:Title")
title.text = metadata['title'][0]
# Authors
if 'author' in metadata and metadata['author']:
authors_elem = ET.SubElement(source, "b:Author")
name_list = ET.SubElement(authors_elem, "b:NameList")
for author in metadata['author'][:10]: # Limit to first 10 authors
person = ET.SubElement(name_list, "b:Person")
if 'given' in author:
first = ET.SubElement(person, "b:First")
first.text = author['given']
if 'family' in author:
last = ET.SubElement(person, "b:Last")
last.text = author['family']
# Journal name
if 'container-title' in metadata and metadata['container-title']:
journal = ET.SubElement(source, "b:JournalName")
journal.text = metadata['container-title'][0]
# Year
year = ""
if 'created' in metadata:
year = str(metadata['created']['date-parts'][0][0])
elif 'published-print' in metadata:
year = str(metadata['published-print']['date-parts'][0][0])
elif 'published-online' in metadata:
year = str(metadata['published-online']['date-parts'][0][0])
if year:
year_elem = ET.SubElement(source, "b:Year")
year_elem.text = year
# Volume
if 'volume' in metadata:
volume = ET.SubElement(source, "b:Volume")
volume.text = metadata['volume']
# Issue
if 'issue' in metadata:
issue = ET.SubElement(source, "b:Issue")
issue.text = metadata['issue']
# Pages
if 'page' in metadata:
pages = ET.SubElement(source, "b:Pages")
pages.text = metadata['page']
# DOI
if 'DOI' in metadata:
doi = ET.SubElement(source, "b:DOI")
doi.text = metadata['DOI']
# Convert to pretty-printed XML string
xml_str = ET.tostring(root, encoding='unicode')
dom = minidom.parseString(xml_str)
return dom.toprettyxml(indent=" ")
def create_tex_file_with_citations(original_text: str, doi_to_key_mapping: Dict[str, str]) -> str:
"""Replace DOIs in original text with TeX citation commands"""
# Get all DOIs with their positions
dois_with_positions = extract_dois_from_text(original_text)
# Sort by position in reverse order to replace from end to beginning
# This prevents position shifts from affecting subsequent replacements
dois_with_positions.sort(key=lambda x: x[1], reverse=True)
result_text = original_text
for doi, start_pos, end_pos in dois_with_positions:
clean_doi_str = clean_doi(doi)
if clean_doi_str in doi_to_key_mapping:
bibtex_key = doi_to_key_mapping[clean_doi_str]
citation_command = f"\\cite{{{bibtex_key}}}"
# Replace the DOI with the citation command
result_text = result_text[:start_pos] + citation_command + result_text[end_pos:]
app.logger.info(f"Replaced DOI '{doi}' at {start_pos}-{end_pos} with '{citation_command}'")
return result_text
def create_markdown_file_with_citations(original_text: str, doi_to_key_mapping: Dict[str, str]) -> str:
"""Replace DOIs in original text with Markdown citation commands for Pandoc"""
# Get all DOIs with their positions
dois_with_positions = extract_dois_from_text(original_text)
# Sort by position in reverse order to replace from end to beginning
# This prevents position shifts from affecting subsequent replacements
dois_with_positions.sort(key=lambda x: x[1], reverse=True)
result_text = original_text
for doi, start_pos, end_pos in dois_with_positions:
clean_doi_str = clean_doi(doi)
if clean_doi_str in doi_to_key_mapping:
bibtex_key = doi_to_key_mapping[clean_doi_str]
citation_command = f"@{bibtex_key}"
# Replace the DOI with the citation command
result_text = result_text[:start_pos] + citation_command + result_text[end_pos:]
app.logger.info(f"Replaced DOI '{doi}' at {start_pos}-{end_pos} with '{citation_command}'")
return result_text
@app.route('/')
def index():
"""Serve the main page"""
return render_template_string(HTML_TEMPLATE)
@app.route('/convert', methods=['POST'])
def convert_dois():
"""Convert DOIs to requested format"""
try:
input_text = request.form.get('dois', '').strip()
output_format = request.form.get('format', 'bibtex')
for_tex = request.form.get('forTex') == 'on'
for_markdown = request.form.get('forMarkdown') == 'on'
if not input_text:
return "Please enter some text or DOIs", 400
# Parse input text to extract DOIs
dois = parse_input_text(input_text)
if not dois:
return "No valid DOIs found in the input text", 400
# Remove duplicates while preserving order
unique_dois = list(dict.fromkeys(dois))
app.logger.info(f"Found {len(unique_dois)} unique DOIs: {unique_dois}")
# Fetch metadata for all DOIs
metadata_list = []
failed_dois = []
doi_to_key_mapping = {} # For TeX citation mapping
for i, doi in enumerate(unique_dois):
app.logger.info(f"Processing DOI {i+1}/{len(unique_dois)}: {doi}")
metadata = fetch_doi_metadata(doi)
if metadata:
metadata_list.append(metadata)
# Store the mapping from DOI to BibTeX key for TeX generation
if output_format == 'bibtex':
_, bibtex_key = metadata_to_bibtex(metadata)
doi_to_key_mapping[doi] = bibtex_key
else:
failed_dois.append(doi)
if not metadata_list:
return f"Failed to fetch metadata for all DOIs: {', '.join(failed_dois)}", 400
# Generate output based on format
if output_format == 'bibtex':
result = f"% Generated {len(metadata_list)} BibTeX entries from {len(unique_dois)} DOIs\n\n"
for metadata in metadata_list:
bibtex_entry, _ = metadata_to_bibtex(metadata)
result += bibtex_entry + "\n"
if failed_dois:
result += f"\n% Failed to process: {', '.join(failed_dois)}\n"
# Generate TeX and/or Markdown files if requested
if for_tex or for_markdown:
response_data = {
'main_content': result
}
if for_tex:
tex_content = create_tex_file_with_citations(input_text, doi_to_key_mapping)
response_data['tex_content'] = tex_content
if for_markdown:
markdown_content = create_markdown_file_with_citations(input_text, doi_to_key_mapping)
response_data['markdown_content'] = markdown_content
# Return all files as JSON
return Response(json.dumps(response_data), mimetype='application/json')
else:
return Response(result, mimetype='text/plain')
elif output_format == 'xml':
result = metadata_to_msword_xml(metadata_list)
if failed_dois:
result += f"\n<!-- Generated {len(metadata_list)} entries from {len(unique_dois)} DOIs -->\n"
result += f"<!-- Failed to process: {', '.join(failed_dois)} -->\n"
# Note: TeX/Markdown generation doesn't make sense for XML format
return Response(result, mimetype='application/xml')
else:
return "Invalid output format", 400
except Exception as e:
app.logger.error(f"Error in convert_dois: {str(e)}")
return f"Server error: {str(e)}", 500
# Production configuration
if __name__ == "__main__":
# This section won't be used in WSGI deployment
app.run(host='0.0.0.0', port=5000, debug=False)
bioRxiv_new_pub_discord_notif.py
Purpose:
To get Discord notifications when a new article is published on bioRxiv.
Known issues: not that I know of
Usage: Just use it!
Code:
#!/usr/bin/env python3
"""
RSS Feed Monitor with Discord Notifications
This script monitors RSS feeds for new publications and sends notifications
to Discord when new items are found. Designed to run via crontab.
Requirements:
pip install feedparser requests
Setup:
1. Configure feeds and Discord webhook in the script
2. Make executable: chmod +x rss_monitor.py
3. Add to crontab: */15 * * * * /path/to/rss_monitor.py
"""
import feedparser
import requests
import json
import os
import sys
import logging
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
import hashlib
# Configuration
CONFIG = {
# Discord webhook URL - replace with your actual webhook URL
'discord_webhook': 'https://discord.com/api/webhooks/YOUR_WEBHOOK_URL_HERE',
# RSS feeds to monitor
'feeds': [
{
'name': 'bioRxiv Ecology',
'url': 'https://connect.biorxiv.org/biorxiv_xml.php?subject=ecology',
'color': 0x00ff00 # Green color for Discord embed
},
{
'name': 'bioRxiv Systems Biology',
'url': 'https://connect.biorxiv.org/biorxiv_xml.php?subject=systems_biology',
'color': 0x00ff00 # Green color for Discord embed
},
],
# Directory to store state files
'data_dir': os.path.expanduser('~/.rss_monitor'),
# Logging configuration
'log_file': os.path.expanduser('~/.rss_monitor/monitor.log'),
'log_level': logging.DEBUG # Change to logging.INFO after initial testing
}
class RSSMonitor:
def __init__(self, config):
self.config = config
self.data_dir = Path(config['data_dir'])
self.data_dir.mkdir(exist_ok=True)
# Setup logging
logging.basicConfig(
level=config['log_level'],
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(config['log_file']),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def get_state_file(self, feed_name):
"""Get the state file path for a specific feed"""
safe_name = "".join(c for c in feed_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
safe_name = safe_name.replace(' ', '_').lower()
return self.data_dir / f"{safe_name}_state.json"
def load_seen_items(self, feed_name):
"""Load previously seen items for a feed"""
state_file = self.get_state_file(feed_name)
if state_file.exists():
try:
with open(state_file, 'r') as f:
data = json.load(f)
return set(data.get('seen_items', []))
except (json.JSONDecodeError, IOError) as e:
self.logger.warning(f"Error loading state for {feed_name}: {e}")
return set()
return set()
def save_seen_items(self, feed_name, seen_items):
"""Save seen items for a feed"""
state_file = self.get_state_file(feed_name)
try:
data = {
'seen_items': list(seen_items),
'last_updated': datetime.now().isoformat()
}
with open(state_file, 'w') as f:
json.dump(data, f, indent=2)
except IOError as e:
self.logger.error(f"Error saving state for {feed_name}: {e}")
def generate_item_id(self, entry):
"""Generate a unique ID for a feed entry"""
# Use DOI if available, otherwise use link, otherwise use title+date
if hasattr(entry, 'id') and entry.id:
return entry.id
elif hasattr(entry, 'link') and entry.link:
return entry.link
else:
# Fallback: hash of title and published date
content = f"{entry.get('title', '')}{entry.get('published', '')}"
return hashlib.md5(content.encode()).hexdigest()
def extract_doi_url(self, entry):
"""Extract DOI URL from entry"""
# Try dc_identifier field first (common in bioRxiv feeds)
if hasattr(entry, 'dc_identifier') and entry.dc_identifier:
identifier = entry.dc_identifier
if 'doi:' in identifier:
doi = identifier.replace('doi:', '').strip()
return f"https://doi.org/{doi}"
# Try to get DOI from id field
if hasattr(entry, 'id') and 'doi:' in str(entry.id):
doi = str(entry.id).replace('doi:', '').strip()
return f"https://doi.org/{doi}"
# Try to extract from description or other fields
for field in ['link', 'id', 'identifier']:
if hasattr(entry, field):
value = getattr(entry, field)
if value and 'doi.org' in str(value):
return str(value)
elif value and 'doi:' in str(value):
doi = str(value).split('doi:')[1].strip()
return f"https://doi.org/{doi}"
# Fallback to the entry link
return getattr(entry, 'link', '')
def clean_description(self, description, max_length=1000):
"""Clean and truncate description for Discord"""
if not description:
return "No description available"
# Remove HTML tags and extra whitespace
import re
description = re.sub(r'<[^>]+>', '', description)
description = re.sub(r'\s+', ' ', description).strip()
if len(description) > max_length:
description = description[:max_length-3] + "..."
return description
def extract_authors(self, entry):
"""Extract authors from entry with multiple fallback methods"""
# Try various author field names
for field in ['dc_creator', 'author', 'authors', 'creator']:
if hasattr(entry, field):
authors = getattr(entry, field)
if authors and authors.strip():
return authors.strip()
# Try accessing through tags
if hasattr(entry, 'tags'):
for tag in entry.tags:
if tag.get('term') and 'creator' in tag.get('rel', '').lower():
return tag['term']
return "Unknown authors"
def extract_publish_date(self, entry):
"""Extract publication date with multiple fallback methods"""
# Try various date field names
for field in ['dc_date', 'published', 'pubdate', 'prism_publicationdate']:
if hasattr(entry, field):
date_val = getattr(entry, field)
if date_val:
return str(date_val)
# Try accessing through other properties
if hasattr(entry, 'published_parsed') and entry.published_parsed:
from time import strftime
return strftime('%Y-%m-%d', entry.published_parsed)
return "Unknown date"
def send_discord_notification(self, feed_name, entries, feed_color):
"""Send Discord notification for new entries"""
if not entries:
return
webhook_url = self.config['discord_webhook']
if not webhook_url or 'YOUR_WEBHOOK_URL_HERE' in webhook_url:
self.logger.error("Discord webhook URL not configured")
return
for entry in entries:
title = entry.get('title', 'Untitled')
description = self.clean_description(entry.get('description', ''))
doi_url = self.extract_doi_url(entry)
published = self.extract_publish_date(entry)
authors = self.extract_authors(entry)
# Create Discord embed
embed = {
"title": title[:256], # Discord title limit
"description": description,
"url": doi_url,
"color": feed_color,
"fields": [
{
"name": "Authors",
"value": authors[:1024] if authors else "Unknown",
"inline": False
},
{
"name": "Published",
"value": published,
"inline": True
},
{
"name": "Source",
"value": feed_name,
"inline": True
}
],
"footer": {
"text": "RSS Monitor",
"icon_url": "https://cdn.discordapp.com/embed/avatars/0.png"
},
"timestamp": datetime.now().isoformat()
}
payload = {
"content": f"🔬 **New Publication Alert!**",
"embeds": [embed]
}
try:
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
self.logger.info(f"Sent notification for: {title[:50]}...")
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to send Discord notification: {e}")
def debug_entry_fields(self, entry):
"""Debug function to log available fields in an entry"""
self.logger.debug("Available entry fields:")
for attr in dir(entry):
if not attr.startswith('_'):
try:
value = getattr(entry, attr)
if value and not callable(value):
self.logger.debug(f" {attr}: {str(value)[:100]}...")
except:
pass
def process_feed(self, feed_config):
"""Process a single RSS feed"""
feed_name = feed_config['name']
feed_url = feed_config['url']
feed_color = feed_config.get('color', 0x0099ff)
self.logger.info(f"Processing feed: {feed_name}")
try:
# Parse RSS feed
feed = feedparser.parse(feed_url)
if feed.bozo:
self.logger.warning(f"Feed {feed_name} has parsing issues: {feed.bozo_exception}")
if not feed.entries:
self.logger.warning(f"No entries found in feed: {feed_name}")
return
# Load previously seen items
seen_items = self.load_seen_items(feed_name)
new_entries = []
# Check for new entries
for i, entry in enumerate(feed.entries):
# Debug first entry to see available fields
if i == 0:
self.debug_entry_fields(entry)
item_id = self.generate_item_id(entry)
if item_id not in seen_items:
new_entries.append(entry)
seen_items.add(item_id)
if new_entries:
self.logger.info(f"Found {len(new_entries)} new entries in {feed_name}")
self.send_discord_notification(feed_name, new_entries, feed_color)
else:
self.logger.info(f"No new entries in {feed_name}")
# Save updated state
self.save_seen_items(feed_name, seen_items)
except Exception as e:
self.logger.error(f"Error processing feed {feed_name}: {e}")
def run(self):
"""Main execution method"""
self.logger.info("Starting RSS monitor")
for feed_config in self.config['feeds']:
self.process_feed(feed_config)
self.logger.info("RSS monitor completed")
def main():
"""Main function"""
try:
monitor = RSSMonitor(CONFIG)
monitor.run()
except KeyboardInterrupt:
print("\nMonitor interrupted by user")
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()