Salut,
je poste ici un script qui permet de notifier tout le monde que la communauté change de serveur.
Ça nous permettra de sauvegarder et ça nous sera utile.
Perso, je pense que si Piefed integre un outil de migration de commu ce serait top.
- Le post d’origine : https://jlai.lu/post/14332673
- OP : UlrikHD@programming.dev (we can contact him for technical help)
With this script, can you get the list of subcribers ?
I don’t think the Lemmy API exposes the subscriber list of a community, you’ll need access to the instance database.
What I did was checking every post for the last 365 days for user activity and store every user that have interacted with the community.
lemmy_session and lemmy_references are the same as for the LiveThreadBot (inside the src folder): https://gitlab.com/UlrikHD/lemmy-match-thread-bot
get_posts() may be missing from the lemmy_session.py file though
def get_posts(self, *, community: int | str | LemmyCommunity, sort: str = 'New', page: int = 1) -> dict[str, any]:
""" Gets the posts of a community.
:param community: The ID of the community to get the posts of, can also be a LemmyCommunity parseable
string/object.
:param sort: The sorting method of the posts, by default 'New'.
:param page: The page number of the posts, by default 1.
:return: The response JSON of the request as a dictionary.
"""
if isinstance(community, LemmyCommunity) or isinstance(community, str):
response: Final[requests.Response] = self.srv.get_posts(community_name=community, sort=sort, page=page,
limit=50)
else:
response: Final[requests.Response] = self.srv.get_posts(community_id=community, sort=sort, page=page,
limit=50)
if response.status_code != 200:
raise requests.exceptions.HTTPError(response.text)
return response.json()
Excuse the ugly code, it was written as a one-off
import os
import time
import datetime
from json import load, dump
import requests
from lemmy_references import LemmyCommunity, LemmyUser
from lemmy_session import LemmySession
session: LemmySession = LemmySession(website='https://lemmy.world/',
username='TestUlrikHD',
password='---',
end_script_signal=None)
posts: list[dict[[str, any]]] = []
cutoff_date: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=365)
page_count: int = 1
loop_break: bool = False
while True:
post_response: dict[str, any] = session.get_posts(community=LemmyCommunity('football', 'lemmy.world'),
page=page_count)
page_count += 1
for post in post_response['posts']:
if datetime.datetime.fromisoformat(post['post']['published']) > cutoff_date:
posts.append(post)
else:
loop_break = True
break
if loop_break:
break
user_dict: dict[str, dict[str, any]] = {}
for post in posts:
user_dict[str(LemmyUser(post['creator']['actor_id']))] = {'post': True, 'post_id': post['post']['id']}
comments = session.get_post_comments(post_id=post['post']['id'])
for comment in comments['comments']:
user: str = str(LemmyUser(comment['creator']['actor_id']))
if user not in user_dict:
user_dict[user] = {'post': False, 'post_id': comment['post']['id'], 'parent_id': comment['comment']['id']}
del user_dict[str(LemmyUser('FootballAutoMod@lemmy.world'))]
del user_dict[str(LemmyUser('LiveThreadBot@lemmy.world'))]
with open('user_dict', 'w', encoding='utf-8') as file:
dump(user_dict, file, ensure_ascii=False, indent=4)
def log_reply(usr: str) -> None:
user_list: list[str] = []
if os.path.isfile('reply_list.json'):
with open('reply_list.json', 'r', encoding='utf-8') as file:
user_list = load(file)
user_list.append(str(usr))
with open('reply_list.json', 'w', encoding='utf-8') as file:
dump(user_list, file, ensure_ascii=False, indent=4)
for username, user in user_dict.items():
time.sleep(1)
try:
#if user['post']:
# session.reply(content='migration message', post_id=user['post_id'], parent_id=None)
#else:
# session.reply(content='migration message', post_id=user['post_id'], parent_id=user['parent_id'])
log_reply(usr=LemmyUser(username).str_link())
except requests.HTTPError as e:
print(f'Failed to send message to {username} - {e}')
and this part creates txt for easy copy pasting for tagging.
from json import load
with open('reply_list.json', 'r', encoding='utf-8') as file:
user_list: list[str] = load(file)
loop_count: int = len(', '.join(user_list)) // 9500 + 1
for i in range(loop_count):
with open(f'reply_list_{i}.txt', 'w', encoding='utf-8') as file:
print(len(' '.join(user_list[i * len(user_list) // loop_count:(i + 1) * len(user_list) // loop_count])))
file.write(', '.join(user_list[i * len(user_list) // loop_count:(i + 1) * len(user_list) // loop_count]))