Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]
Groups > comp.lang.python > #197641
| Subject | AI Generated Usenet Client in Python |
|---|---|
| From | street@shellcrash.com |
| Newsgroups | comp.lang.python |
| Date | 2025-12-19 20:54 +0000 |
| Message-ID | <6945bb89$0$24$882e4bbb@reader.netnews.com> (permalink) |
AI Generated Usenet Client in Python
If you have any improvements to this or helpful ideas I would like to see them.
https://github.com/alt-magick/Newsgroup-Client-/
#!/usr/bin/env python3
import nntplib
import sys
import termios
import tty
import re
import quopri
import base64
from email.message import EmailMessage
import os
import tempfile
import subprocess
# ================= USER CONFIG =================
NNTP_SERVER = "usnews.blocknews.net"
NNTP_PORT = 563
USERNAME = ""
PASSWORD = ""
PAGE_LINES = 12
MAX_ARTICLES_LIST = 200
MAX_REPLY_SCAN = 300
START_GROUP = "alt.test"
SHOW_REPLY_COUNT = False
SHOW_REPLY_COUNT_MAIN = True
# ==============================================
RE_REPLY = re.compile(r"^(re|fwd):", re.IGNORECASE)
CLEAN_RE = re.compile(r"[ ---]")
# ---------- STATUS LINE ----------
STATUS_LINE = ""
def set_status(msg):
global STATUS_LINE
STATUS_LINE = msg
def show_status():
global STATUS_LINE
if STATUS_LINE:
print(f"
[{STATUS_LINE}]")
STATUS_LINE = ""
# ---------- RAW KEY INPUT ----------
def get_key():
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
return sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def prompt(text):
sys.stdout.write(text)
sys.stdout.flush()
return sys.stdin.readline().strip()
# ---------- HARD-CODED PAGER ----------
def paged_print(lines):
i = 0
total = len(lines)
while i < total:
end = min(i + PAGE_LINES, total)
for line in lines[i:end]:
print(line)
i = end
if i >= total:
break
print("
--- ENTER = next page | SPACE = skip ---")
if get_key() == " ":
break
# ---------- BODY DECODER ----------
def decode_body_line(line_bytes):
s = line_bytes.decode("utf-8", errors="replace")
s = CLEAN_RE.sub("", s)
if "=" in s:
try:
s = quopri.decodestring(s).decode("utf-8", errors="replace")
except Exception:
pass
if re.fullmatch(r"[A-Za-z0-9+/=\s]+", s) and len(s.strip()) > 20:
try:
s = base64.b64decode(s, validate=True).decode("utf-8", errors="replace")
except Exception:
pass
return s
# ---------- POST BODY EDITOR ----------
def edit_body(initial=""):
editor = os.environ.get("EDITOR", "nano")
fd, path = tempfile.mkstemp(suffix=".txt")
try:
with os.fdopen(fd, "w") as f:
f.write(initial)
subprocess.call([editor, path])
with open(path, "r") as f:
return f.read()
finally:
os.unlink(path)
# ---------- POST BODY SOURCE ----------
def get_post_body():
print("
Post body source:")
print(" E = Edit in editor")
print(" F = Load from external text file")
print(" T = Type directly in terminal")
choice = get_key().lower()
if choice == "f":
path = prompt("
Enter path to text file: ")
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
body = f.read()
if not body.strip():
set_status("Post aborted (file is empty)")
return None
return body
except Exception as e:
set_status(f"Failed to read file: {e}")
return None
elif choice == "t":
print("
Type your post below. End with a period on a line by itself.")
lines = []
while True:
line = input()
if line.strip() == ".":
break # stop input when user enters a single period on a line
lines.append(line)
body = "
".join(lines)
if not body.strip():
set_status("Post aborted (empty input)")
return None
return body
# default: editor
body = edit_body()
if not body.strip():
set_status("Post aborted (empty body)")
return None
return body
# ---------- POSTING ----------
def post_article(nntp, group, subject=None, references=None):
name = prompt("Enter your display name: ")
email = prompt("Enter your email: ")
if not subject:
subject = prompt("Enter subject: ")
body = get_post_body()
if not body:
return False
msg = EmailMessage()
msg["From"] = f"{name} <{email}>"
msg["Newsgroups"] = group
msg["Subject"] = subject
if references:
msg["References"] = references
msg.set_content(body)
try:
nntp.post(msg.as_bytes())
set_status("Article posted successfully")
return True
except Exception as e:
set_status(f"Post failed: {e}")
return False
# ---------- REPLY POSTING ----------
def post_reply(nntp, group, article_num):
try:
_, hinfo = nntp.head(str(article_num))
headers = {}
for raw in hinfo.lines:
line = decode_body_line(raw)
if ":" in line:
k, v = line.split(":", 1)
headers[k.lower()] = v.strip()
subject = headers.get("subject", "(no subject)")
if not RE_REPLY.match(subject):
subject = "Re: " + subject
refs = []
if "references" in headers:
refs.append(headers["references"])
if "message-id" in headers:
refs.append(headers["message-id"])
return post_article(nntp, group, subject, " ".join(refs))
except Exception as e:
set_status(f"Reply failed: {e}")
return False
# ---------- ARTICLE DISPLAY ----------
def show_article(nntp, num, group=None, allow_reply=False):
try:
_, hinfo = nntp.head(str(num))
headers = {}
for raw in hinfo.lines:
line = decode_body_line(raw)
if ":" in line:
k, v = line.split(":", 1)
headers[k.lower()] = v.strip()
_, body = nntp.body(str(num))
lines = [decode_body_line(l) for l in body.lines]
paged_print([
f"From: {headers.get('from','?')}",
f"Date: {headers.get('date','?')}",
f"Subject: {headers.get('subject','(no subject)')}",
""
] + lines)
if allow_reply and group:
print("
P=reply (any other key to continue)")
if get_key().lower() == "p":
post_reply(nntp, group, num)
except Exception as e:
set_status(f"Fetch failed: {e}")
# ---------- REPLY SCANNING ----------
def scan_replies_xover(nntp, msgid, first, last):
replies = []
start = max(first, last - MAX_REPLY_SCAN)
try:
_, overviews = nntp.over((start, last))
except:
return replies
for num, hdr in overviews:
if msgid in hdr.get("references", ""):
replies.append(int(num))
return replies
# ---------- GROUP RELOAD ----------
def reload_group(nntp, group):
try:
_, _, first, last, _ = nntp.group(group)
first = int(first)
last = int(last)
_, overviews = nntp.over((max(first, last - MAX_ARTICLES_LIST), last))
posts = []
for num, hdr in reversed(overviews):
subject = hdr.get("subject", "")
if RE_REPLY.match(subject):
continue
msgid = hdr.get("message-id", "")
replies = sum(
1 for _, h in overviews if msgid in h.get("references", "")
) if SHOW_REPLY_COUNT_MAIN else 0
posts.append({
"num": int(num),
"subject": CLEAN_RE.sub("", subject),
"from": CLEAN_RE.sub("", hdr.get("from", "?")),
"date": hdr.get("date", "?"),
"msgid": msgid,
"replies": replies
})
return posts, first, last
except Exception as e:
set_status(f"Reload failed: {e}")
return None, None, None
# ---------- GROUP BROWSER ----------
def browse_group(nntp, group):
posts, first, last = reload_group(nntp, group)
if not posts:
return
index = 0
while index < len(posts):
p = posts[index]
print(f"
[{index+1}] #{p['num']}")
print(f"From: {p['from']}")
print(f"Date: {p['date']}")
print(f"Replies: {p['replies'] if SHOW_REPLY_COUNT_MAIN else '?'}")
print(f"Subject: {p['subject']}")
show_status()
print("
ENTER=read SPACE=next R=replies N=new post L=reload J=jump G=group Q=quit")
key = get_key().lower()
if key == "q":
sys.exit(0)
elif key == " ":
index += 1
elif key in ("
", "
"):
show_article(nntp, p["num"], group, True)
elif key == "n":
if post_article(nntp, group):
posts, first, last = reload_group(nntp, group)
index = 0
elif key == "l":
posts, first, last = reload_group(nntp, group)
index = 0
set_status("Group reloaded")
elif key == "j":
val = prompt("Jump to post number: ")
if val.isdigit():
idx = int(val) - 1
if 0 <= idx < len(posts):
index = idx
elif key == "g":
browse_group(nntp, prompt("New group: "))
return
elif key == "r":
replies = scan_replies_xover(nntp, p["msgid"], first, last)
if not replies:
set_status("No replies found")
continue
for i, rnum in enumerate(replies):
if i < len(replies) - 1:
print("
ENTER=next reply | SPACE=skip remaining | P=reply")
else:
print("
End of replies | P=reply")
k = get_key().lower()
if k == " ":
set_status("Skipped remaining replies")
break
elif k == "p":
post_reply(nntp, group, rnum)
elif k in ("
", "
"):
show_article(nntp, rnum, group, True)
# ---------- MAIN ----------
def main():
print(f"Connecting to {NNTP_SERVER}:{NNTP_PORT}...")
nntp = nntplib.NNTP_SSL(NNTP_SERVER, NNTP_PORT, USERNAME, PASSWORD)
set_status("Connected")
browse_group(nntp, START_GROUP)
nntp.quit()
if __name__ == "__main__":
main()
Back to comp.lang.python | Previous | Next | Find similar
AI Generated Usenet Client in Python street@shellcrash.com - 2025-12-19 20:54 +0000
csiph-web