Source code for py_niconico_comment.niconico
import json
import logging
from lxml import html
from py_niconico_comment import utils
from requests.sessions import Session
SRT_TEMPLATE = """
%0.f
%s,{0} --> %s,{0}
%s
"""
_logger = logging.getLogger(__name__)
[docs]class NiconicoComments(object):
_session = None
is_login = False
def __init__(self, username, password, loglevel=logging.CRITICAL):
_logger.setLevel(loglevel)
self._session = Session()
self.is_login = self._login(username, password)
def _cookie_has(self, key):
return key in self._session.cookies if self._session else False
def _login(self, username, password):
"""https://account.nicovideo.jp/api/v1/login
Args:
username ([str]): Username.
password ([str]): Password.
Returns:
[bool]: login success or not.
"""
login_form_strs = {
'mail_tel': username,
'password': password,
}
try:
self._request_webpage('https://account.nicovideo.jp/api/v1/login', action='login',
data=(login_form_strs), headers={'content-type': 'application/x-www-form-urlencoded'})
return self._cookie_has('user_session')
except Exception as error: # pragma: no cover
_logger.info('unable to log in: {}'.format(error))
return False # pragma: no cover
def _request_webpage(self, url_or_request, action=None, data=None, headers={}, query={}):
"""Returns the response handle
Args:
url_or_request ([str]): URL
action ([str], optional): login/get-comments. Defaults to None.
data ([dict], optional): request data. Defaults to None.
headers (dict, optional): request headers. Defaults to {}.
query (dict, optional): request query. Defaults to {}.
Returns:
[str]: text
"""
if action in ['login', 'get-comments']:
return self._session.post(url_or_request, data, headers).text
return self._session.get(url_or_request).text
def _get_api_data(self, html_string):
for div in html.fromstring(html_string).cssselect('div'):
api_data = div.get('data-api-data')
if api_data is not None:
return api_data
[docs] def get_list_user(self, url, language='en'):
"""Get all user who commented on the giving URL.
Args:
url ([str]): Nicovideo URL e.g. https://www.nicovideo.jp/watch/sm32047026
language ([str], optional): en/jp. Defaults to en.
Returns:
[dict]: e.g.
'{ \
"rbVzlqGMytJmCIimR1dPWvGRE7w": [ \
"wwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww" \
], \
"1oYUfTHMjDooDge1QKRpu3QeAmk": [ \
"wow", \
"wwwwwwwwwwww" \
] \
}'
"""
comments = self.get_comments(url, language=language)
users = {}
for x in comments:
if 'user_id' not in x.keys():
continue
if x['user_id'] not in users.keys():
users.update({x['user_id']: [x['content']]})
elif len(users[x['user_id']]) < 2:
users[x['user_id']].append(x['content'])
return users
[docs] def get_comments(self, url, user_id=None, language='en'):
"""Get all comment from the giving URL.
Args:
url ([str]): Nicovideo URL e.g. https://www.nicovideo.jp/watch/sm32047026
user_id ([str], optional): Filter by user_id. Defaults to None.
language ([str], optional): en/jp. Defaults to en.
Returns:
[list]: List of comment.
"""
comment_html = self._request_webpage(url)
raw_api_data = self._get_api_data(comment_html)
if not raw_api_data:
raise ValueError('Cannot access the giving URL!')
api_data = utils.parse_api_data(raw_api_data, language=language)
url = 'https://nmsg.nicovideo.jp/api.json/'
resp = self._request_webpage(
url, action='get-comments', data=json.dumps(api_data), headers={'content-type': 'application/json'})
chats = (x.get('chat') for x in json.loads(resp) if 'chat' in x.keys())
if user_id:
chats = (x for x in chats if x['user_id'] in user_id)
return sorted(chats, key=lambda chat: int(chat['vpos']))
[docs] def to_srt(self, list_comments=[], shift_time=0.0):
"""Convert the list of comment to .SRT file content.
Args:
list_comments (list, optional): List of comment. Defaults to [].
shift_time (float, optional): +/- time. Defaults to 0.0.
Returns:
[str]: SRT file content.
"""
content = ''
shift_time = float(shift_time)
shift_frame = int(str(shift_time).rpartition('.')[-1])
for index, chat in enumerate(list_comments):
start_time = int(chat.get('vpos', 0))
frame = start_time % 100 + shift_frame
start_time = start_time / 100 + shift_time
end_time = start_time + 3 + shift_time
line = chat.get('content', '').strip()
if line:
sub = SRT_TEMPLATE % (
index,
utils.seconds_to_string(start_time),
utils.seconds_to_string(end_time),
line)
content += sub.format(frame)
return content