982 líneas
28 KiB
Python
982 líneas
28 KiB
Python
# https://medium.com/nexttech/how-to-use-the-openweathermap-api-with-python-c84cc7075cfc
|
||
|
||
import urllib3
|
||
import datetime
|
||
import time
|
||
from mastodon import Mastodon
|
||
import psycopg2
|
||
import unidecode
|
||
import time
|
||
import re
|
||
import os
|
||
import json
|
||
import sys
|
||
import os.path # For checking whether secrets file exists
|
||
import requests # For doing the web stuff, dummy!
|
||
from datetime import timedelta, datetime
|
||
import urllib.request
|
||
import geopy
|
||
from geopy.geocoders import Nominatim
|
||
from PIL import Image, ImageFont, ImageDraw
|
||
import geotiler
|
||
import geocoder
|
||
import ssl
|
||
import pdb
|
||
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
def cleanhtml(raw_html):
|
||
|
||
cleanr = re.compile('<.*?>')
|
||
cleantext = re.sub(cleanr, '', raw_html)
|
||
return cleantext
|
||
|
||
def translate(i, detall, dir_vent):
|
||
|
||
if detall == 'Parcialmente nuboso':
|
||
detall = 'Parcialment ennuvolat'
|
||
if detall == 'Parcialmente nuboso con niebla':
|
||
detall = 'Parcialment ennuvolat amb boira'
|
||
if detall == 'Parcialmente nuboso con lluvias':
|
||
detall = 'Parcialment ennuvolat amb aiguats'
|
||
if detall == 'Nubes dispersas':
|
||
detall = 'Núvols dispersos'
|
||
if detall == 'Muy nuboso con nevadas':
|
||
detall = 'Molt ennuvolat amb nevades'
|
||
if detall == 'Muy nuboso con lluvia':
|
||
detall = 'Molt ennuvolat amb pluja'
|
||
if detall == 'Muy nuboso con niebla':
|
||
detall = 'Molt ennuvolat amb boira'
|
||
if detall == 'Muy nuboso con lluvias':
|
||
detall = 'Molt ennuvolat amb pluges'
|
||
if detall == 'Cubierto':
|
||
detall = 'Tapat'
|
||
if detall == 'Cubierto con lluvia':
|
||
detall = 'Tapat amb pluja'
|
||
if detall == 'Cubierto con lluvias':
|
||
detall = 'Tapat amb pluges'
|
||
if detall == 'Cubierto con probabilidad de lluvia':
|
||
detall = 'Tapat amb possible pluja'
|
||
if detall == 'Cubierto con nevadas':
|
||
detall = 'Tapat amb nevades'
|
||
if detall == 'Muy nuboso':
|
||
detall = 'Molt ennuvolat'
|
||
if detall == 'Despejado':
|
||
detall = 'Clar'
|
||
|
||
if dir_vent == 'Suroeste':
|
||
dir_vent = 'Sud-oest'
|
||
if dir_vent == 'Noroeste':
|
||
dir_vent = 'Nord-oest'
|
||
if dir_vent == 'Nordeste':
|
||
dir_vent = 'Nord-est'
|
||
if dir_vent == 'Sureste':
|
||
dir_vent = 'Sud-est'
|
||
if dir_vent == 'Norte':
|
||
dir_vent = 'Nord'
|
||
if dir_vent == 'Este':
|
||
dir_vent = 'Est'
|
||
if dir_vent == 'Sur':
|
||
dir_vent = 'Sud'
|
||
if dir_vent == 'Oeste':
|
||
dir_vent = 'Oest'
|
||
|
||
return i, detall, dir_vent
|
||
|
||
def createtilehours(data, pregunta):
|
||
|
||
i = 1
|
||
x = 10
|
||
y = 10
|
||
|
||
while i < 13:
|
||
|
||
hour_data = data['hour_hour']['hour'+str(i)]['hour_data']
|
||
temperature = data['hour_hour']['hour'+str(i)]['temperature']
|
||
detall = data['hour_hour']['hour'+str(i)]['text']
|
||
icona = data['hour_hour']['hour'+str(i)]['icon']
|
||
dir_vent = data['hour_hour']['hour'+str(i)]['wind_direction']
|
||
|
||
i, detall, dir_vent = translate(i, detall, dir_vent)
|
||
|
||
if i == 1:
|
||
fons = Image.open('images/fonsorig.jpg')
|
||
else:
|
||
fons = Image.open('images/tempsperhores'+str(i-1)+'.png')
|
||
|
||
if i == 1:
|
||
|
||
logo_img = Image.open('images/logo.png')
|
||
fons.paste(logo_img, (470, 16), logo_img)
|
||
|
||
# hi afegim l'icona del temps
|
||
icona_path = 'wi/'+icona+'.png'
|
||
temps_img = Image.open(icona_path)
|
||
|
||
fons.paste(temps_img, (x,y+30), temps_img)
|
||
|
||
fons.save('images/temphour'+str(i)+'.png',"PNG")
|
||
|
||
base = Image.open('images/temphour'+str(i)+'.png').convert('RGBA')
|
||
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 25, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
# get a drawing context
|
||
draw = ImageDraw.Draw(txt)
|
||
|
||
if i == 1:
|
||
|
||
draw.text((10,10), pregunta + ', 12 hores', font=fnt, fill=(255,255,255,255)) ## half opacity
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 18, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((x+50,y+45), str(temperature)+'\u00b0'+ ' ' +hour_data+' '+detall, font=fnt, fill=(255,255,255,200)) ## half opacity
|
||
|
||
if i == 1:
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 12, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((380,490), 'temps@mastodont.cat', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
||
draw.text((400,510), 'API: tutiempo.net', font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
||
|
||
out = Image.alpha_composite(base, txt)
|
||
out.save('images/tempsperhores'+str(i)+'.png')
|
||
|
||
y = y + 40
|
||
|
||
i += 1
|
||
|
||
def createtiletoday(people, long, lat, data, days):
|
||
|
||
i = 1
|
||
while i < days+1:
|
||
|
||
temperatura_max = data['day'+str(i)]['temperature_max']
|
||
temperatura_min = data['day'+str(i)]['temperature_min']
|
||
icona = data['day'+str(i)]['icon']
|
||
detall = data['day'+str(i)]['text']
|
||
vent = data['day'+str(i)]['wind']
|
||
icona_vent = data['day'+str(i)]['icon_wind']
|
||
dir_vent = data['day'+str(i)]['wind_direction']
|
||
humitat = data['day'+str(i)]['humidity']
|
||
solsurt = data['day'+str(i)]['sunrise']
|
||
solamaga = data['day'+str(i)]['sunset']
|
||
llunasurt = data['day'+str(i)]['moonrise']
|
||
llunaamaga = data['day'+str(i)]['moonset']
|
||
icona_lluna = data['day'+str(i)]['moon_phases_icon']
|
||
|
||
i, detall, dir_vent = translate(i, detall, dir_vent)
|
||
|
||
if people > 1000000:
|
||
zoom = 10
|
||
elif people > 100000 and people < 999999:
|
||
zoom = 11
|
||
elif people > 50000 and people < 99999:
|
||
zoom = 12
|
||
elif people < 49999:
|
||
zoom = 15
|
||
|
||
map = geotiler.Map(center=(long, lat), zoom=zoom, size=(256, 256))
|
||
image = geotiler.render_map(map)
|
||
image.save('images/map.png')
|
||
|
||
fons = Image.open("images/fons.jpg")
|
||
|
||
#hi afegim el mapa de la ciutat
|
||
map_img = Image.open('images/map.png')
|
||
fons.paste(map_img, (390,65), map_img)
|
||
|
||
if i == 1:
|
||
|
||
logo_img = Image.open('images/logo.png')
|
||
fons.paste(logo_img, (310, 320), logo_img)
|
||
|
||
# hi afegim l'icona del temps
|
||
icona_path = 'wi/'+icona+'.png'
|
||
temps_img = Image.open(icona_path)
|
||
|
||
fons.paste(temps_img, (10,55), temps_img)
|
||
|
||
# hi afegim l'icona del termometre
|
||
icona_temperature_path = 'temperatura/temperature.png'
|
||
temperature_img = Image.open(icona_temperature_path)
|
||
|
||
fons.paste(temperature_img, (10,115), temperature_img)
|
||
|
||
# hi afegim l'icona del vent
|
||
icona_vent_path = 'wd/'+icona_vent+'.png'
|
||
vent_img = Image.open(icona_vent_path)
|
||
|
||
fons.paste(vent_img, (22,180), vent_img)
|
||
|
||
# hi afegim l'icona d'humitat
|
||
icona_humitat_path = 'humitat/humedad.png'
|
||
hum_img = Image.open(icona_humitat_path)
|
||
|
||
fons.paste(hum_img, (8,210), hum_img)
|
||
|
||
# hi afegim l'icona del sol sortint
|
||
icona_sun_path = 'sol/solsurt.png'
|
||
sun_img = Image.open(icona_sun_path)
|
||
|
||
fons.paste(sun_img, (10,270), sun_img)
|
||
|
||
# hi afegim l'icona de la lluna
|
||
icona_lluna_path = 'canviats/'+icona_lluna+'.png'
|
||
lluna_img = Image.open(icona_lluna_path)
|
||
|
||
fons.paste(lluna_img, (12,315), lluna_img)
|
||
fons.save('images/temporal.png',"PNG")
|
||
|
||
base = Image.open('images/temporal.png').convert('RGBA')
|
||
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 35, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
# get a drawing context
|
||
draw = ImageDraw.Draw(txt)
|
||
|
||
draw.text((10,10), pregunta, font=fnt, fill=(255,255,255,255))
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 30, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((70,60), detall, font=fnt, fill=(255,255,255,200)) ## half opacity
|
||
draw.text((70,120), str(temperatura_max)+'\u00b0 / '+str(temperatura_min)+'\u00b0', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
||
draw.text((70,170), str(dir_vent)+', '+str(vent)+'km/h', font=fnt, fill=(255,255,255,200))
|
||
draw.text((70,220), str(humitat)+'%', font=fnt, fill=(255,255,255,200))
|
||
draw.text((70,270), str(solsurt)+', '+str(solamaga), font=fnt, fill=(255,255,255,200))
|
||
draw.text((70,320), str(llunasurt)+', '+str(llunaamaga), font=fnt, fill=(255,255,255,200))
|
||
|
||
if i == 1:
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 12, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((380,350), 'temps@mastodont.cat', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
||
draw.text((540,350), 'API: tutiempo.net', font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 10, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((395,300),"© Col·laboradors d'OpenStreetMap, CC-BY-SA", font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
||
|
||
out = Image.alpha_composite(base, txt)
|
||
out.save('images/temps.png')
|
||
|
||
i += 1
|
||
|
||
return detall
|
||
|
||
def createtileweek(data, days):
|
||
|
||
week_days= ['Dilluns', 'Dimarts', 'Dimecres', 'Dijous', 'Divendres', 'Dissabte','Diumenge']
|
||
|
||
x = 10
|
||
y = 10
|
||
|
||
i = 1
|
||
while i < days+1:
|
||
|
||
weather_date = data['day'+str(i)]['date']
|
||
temperatura_max = data['day'+str(i)]['temperature_max']
|
||
temperatura_min = data['day'+str(i)]['temperature_min']
|
||
icona = data['day'+str(i)]['icon']
|
||
detall = data['day'+str(i)]['text']
|
||
vent = data['day'+str(i)]['wind']
|
||
icona_vent = data['day'+str(i)]['icon_wind']
|
||
dir_vent = data['day'+str(i)]['wind_direction']
|
||
humitat = data['day'+str(i)]['humidity']
|
||
solsurt = data['day'+str(i)]['sunrise']
|
||
solamaga = data['day'+str(i)]['sunset']
|
||
llunasurt = data['day'+str(i)]['moonrise']
|
||
llunaamaga = data['day'+str(i)]['moonset']
|
||
icona_lluna = data['day'+str(i)]['moon_phases_icon']
|
||
|
||
i, detall, dir_vent = translate(i, detall, dir_vent)
|
||
|
||
date_list = list(map(str, weather_date. split('-')))
|
||
day=datetime(int(date_list[0]),int(date_list[1]),int(date_list[2])).weekday()
|
||
|
||
if i == 1:
|
||
fons = Image.open('images/fons.jpg')
|
||
else:
|
||
fons = Image.open('images/tempsweek'+str(i-1)+'.png')
|
||
|
||
# hi afegim l'icona del temps
|
||
icona_path = 'wi/'+icona+'.png'
|
||
temps_img = Image.open(icona_path)
|
||
|
||
fons.paste(temps_img, (y+10, x+80), temps_img)
|
||
|
||
if i == 1:
|
||
logo_img = Image.open('images/logo.png')
|
||
fons.paste(logo_img, (15, 320), logo_img)
|
||
|
||
fons.save('images/tempweek'+str(i)+'.png',"PNG")
|
||
|
||
base = Image.open('images/tempweek'+str(i)+'.png').convert('RGBA')
|
||
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 35, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
# get a drawing context
|
||
draw = ImageDraw.Draw(txt)
|
||
|
||
if i == 1:
|
||
|
||
draw.text((15,10), pregunta+', 7 dies', font=fnt, fill=(255,255,255,255))
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 16, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
|
||
if i == 1:
|
||
draw.text((y+10,x+50), "Avui", font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
elif week_days[day] == 'Dimarts':
|
||
draw.text((y+4,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
elif week_days[day] == 'Dimecres':
|
||
draw.text((y-5,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
elif week_days[day] == 'Dijous':
|
||
draw.text((y+6,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
elif week_days[day] == 'Divendres':
|
||
draw.text((y-6,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
elif week_days[day] == 'Dissabte':
|
||
draw.text((y,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
else:
|
||
draw.text((y+4,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 24, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((y+18,x+140), str(temperatura_max)+'\u00b0', font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
draw.text((y+20,x+190), str(temperatura_min)+'\u00b0', font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
||
|
||
if i == 1:
|
||
|
||
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 15, layout_engine=ImageFont.LAYOUT_BASIC)
|
||
draw.text((60,330), 'temps@mastodont.cat - 2020', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
||
draw.text((520,330), 'API: tutiempo.net', font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
||
|
||
out = Image.alpha_composite(base, txt)
|
||
out.save('images/tempsweek'+str(i)+'.png')
|
||
|
||
y += 95
|
||
i += 1
|
||
|
||
return detall
|
||
|
||
def format_query(pregunta):
|
||
|
||
if pregunta.find("'") != -1:
|
||
|
||
pregunta = pregunta.replace(''', "'")
|
||
|
||
elif pregunta.find("’") != -1:
|
||
|
||
pregunta = pregunta.replace("’", "'")
|
||
|
||
elif pregunta.find("'") != -1:
|
||
|
||
pregunta = pregunta.replace(''', "'")
|
||
|
||
return pregunta
|
||
|
||
def get_weather_data(pregunta):
|
||
|
||
nom = Nominatim(user_agent="temps@mastodont.cat")
|
||
coords = nom.geocode(pregunta)
|
||
|
||
if coords != None:
|
||
|
||
long = coords.longitude
|
||
lat = coords.latitude
|
||
|
||
headers = {"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"}
|
||
|
||
g = geocoder.geonames(pregunta, key='spla')
|
||
|
||
people = g.population if g.ok else 0
|
||
|
||
url = 'https://api.tutiempo.net/json/?lan=es&apid={}&ll={},{}'.format(api_key, lat, long)
|
||
res = requests.get(url, headers=headers, verify=False)
|
||
|
||
data = res.json()
|
||
|
||
else:
|
||
|
||
data = None
|
||
people = None
|
||
long = None
|
||
lat = None
|
||
|
||
return (data, people, long, lat)
|
||
|
||
def get_query(contingut):
|
||
|
||
inici = contingut.index("@")
|
||
|
||
final = contingut.index(" ")
|
||
|
||
if len(contingut) > final:
|
||
|
||
contingut = contingut[0: inici:] + contingut[final +1::]
|
||
|
||
neteja = contingut.count('@')
|
||
|
||
i = 0
|
||
while i < neteja :
|
||
|
||
inici = contingut.rfind("@")
|
||
final = len(contingut)
|
||
contingut = contingut[0: inici:] + contingut[final +1::]
|
||
i += 1
|
||
|
||
pregunta = contingut.lstrip(" ")
|
||
|
||
return pregunta
|
||
|
||
def get_country(pregunta):
|
||
|
||
if pregunta.find(',') != -1:
|
||
|
||
pais = pregunta.rsplit(',', 1)[1]
|
||
pais = pais.replace(' ', '')
|
||
pais = pais.upper()
|
||
|
||
else:
|
||
|
||
pais = 'ES'
|
||
|
||
return pais
|
||
|
||
def set_alarm(username, visibility, alarm_time, alarm_city):
|
||
|
||
insert_sql = "INSERT INTO alarms(username, visibility, time, city) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING"
|
||
|
||
conn = None
|
||
|
||
try:
|
||
|
||
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
||
|
||
cur = conn.cursor()
|
||
|
||
cur.execute(insert_sql, (username, visibility, alarm_time, alarm_city))
|
||
|
||
conn.commit()
|
||
|
||
cur.close()
|
||
|
||
except (Exception, psycopg2.DatabaseError) as error:
|
||
|
||
print(error)
|
||
|
||
finally:
|
||
|
||
if conn is not None:
|
||
|
||
conn.close()
|
||
|
||
def get_alarms():
|
||
|
||
alarm_user = []
|
||
|
||
alarm_visibility = []
|
||
|
||
alarm_time = []
|
||
|
||
alarm_city = []
|
||
|
||
conn = None
|
||
|
||
try:
|
||
|
||
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
||
|
||
cur = conn.cursor()
|
||
|
||
cur.execute('select username, visibility, time, city from alarms')
|
||
|
||
rows = cur.fetchall()
|
||
|
||
for row in rows:
|
||
|
||
alarm_user.append(row[0])
|
||
|
||
alarm_visibility.append(row[1])
|
||
|
||
alarm_time.append(row[2])
|
||
|
||
alarm_city.append(row[3])
|
||
|
||
cur.close()
|
||
|
||
except (Exception, psycopg2.DatabaseError) as error:
|
||
|
||
print(error)
|
||
|
||
finally:
|
||
|
||
if conn is not None:
|
||
|
||
conn.close()
|
||
|
||
return (alarm_user, alarm_visibility, alarm_time, alarm_city)
|
||
|
||
def check_alarm(username, city):
|
||
|
||
sql_query = 'select username, city from alarms where username=(%s) and city=(%s)'
|
||
|
||
alarm_exists = False
|
||
|
||
conn = None
|
||
|
||
try:
|
||
|
||
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
||
|
||
cur = conn.cursor()
|
||
|
||
cur.execute(sql_query, (username, city))
|
||
|
||
row = cur.fetchone()
|
||
|
||
if row != None:
|
||
|
||
alarm_exists = True
|
||
|
||
cur.close()
|
||
|
||
except (Exception, psycopg2.DatabaseError) as error:
|
||
|
||
print(error)
|
||
|
||
finally:
|
||
|
||
if conn is not None:
|
||
|
||
conn.close()
|
||
|
||
return alarm_exists
|
||
|
||
def delete_alarm(username, city):
|
||
|
||
delete_sql = 'delete from alarms where username=(%s) and city=(%s)'
|
||
|
||
conn = None
|
||
|
||
try:
|
||
|
||
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
||
|
||
cur = conn.cursor()
|
||
|
||
cur.execute(delete_sql, (username, city))
|
||
|
||
conn.commit()
|
||
|
||
cur.close()
|
||
|
||
except (Exception, psycopg2.DatabaseError) as error:
|
||
|
||
print(error)
|
||
|
||
finally:
|
||
|
||
if conn is not None:
|
||
|
||
conn.close()
|
||
|
||
def alarm_post(username, visibility, city):
|
||
|
||
data, people, long, lat = get_weather_data(city)
|
||
|
||
if data != None and 'error' not in data:
|
||
|
||
days = 1
|
||
|
||
detall = createtilehours(data, city)
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"El temps per hores a {city} \n"
|
||
|
||
image_id = mastodon.media_post('images/tempsperhores12.png', "image/png", description='temps').id
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=None, visibility=visibility, media_ids={image_id})
|
||
|
||
else:
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"No trobo {city} :-(\n"
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
def get_parameter( parameter, file_path ):
|
||
# Check if secrets file exists
|
||
if not os.path.isfile(file_path):
|
||
print("File %s not found, exiting."%file_path)
|
||
sys.exit(0)
|
||
|
||
# Find parameter in file
|
||
with open( file_path ) as f:
|
||
for line in f:
|
||
if line.startswith( parameter ):
|
||
return line.replace(parameter + ":", "").strip()
|
||
|
||
# Cannot find parameter, exit
|
||
print(file_path + " Missing parameter %s "%parameter)
|
||
sys.exit(0)
|
||
|
||
def create_dir():
|
||
|
||
if not os.path.exists('images'):
|
||
|
||
os.makedirs('images')
|
||
|
||
def mastodon():
|
||
|
||
# Load secrets from secrets file
|
||
secrets_filepath = "secrets/secrets.txt"
|
||
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
|
||
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
|
||
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
|
||
|
||
# Load configuration from config file
|
||
config_filepath = "config/config.txt"
|
||
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
|
||
|
||
# Initialise Mastodon API
|
||
mastodon = Mastodon(
|
||
client_id=uc_client_id,
|
||
client_secret=uc_client_secret,
|
||
access_token=uc_access_token,
|
||
api_base_url='https://' + mastodon_hostname,
|
||
)
|
||
|
||
# Initialise access headers
|
||
headers = {'Authorization': 'Bearer %s'%uc_access_token}
|
||
|
||
return (mastodon, mastodon_hostname)
|
||
|
||
def dbconfig():
|
||
|
||
# Load configuration from config file
|
||
config_filepath = "config/db_config.txt"
|
||
alarm_db = get_parameter("alarm_db", config_filepath)
|
||
alarm_db_user = get_parameter("alarm_db_user", config_filepath)
|
||
|
||
return (alarm_db, alarm_db_user)
|
||
|
||
def read_apikey():
|
||
|
||
# Load secrets from secrets file
|
||
secrets_filepath = "secrets/apikey.txt"
|
||
api_key = get_parameter("api_key", secrets_filepath)
|
||
|
||
return api_key
|
||
|
||
# main
|
||
|
||
if __name__ == '__main__':
|
||
|
||
t = time.localtime()
|
||
|
||
current_time = time.strftime("%H:%M", t)
|
||
|
||
create_dir()
|
||
|
||
mastodon, mastodon_hostname = mastodon()
|
||
|
||
api_key = read_apikey()
|
||
|
||
alarm_db, alarm_db_user = dbconfig()
|
||
|
||
peticions_restants = mastodon.ratelimit_remaining
|
||
print("Peticions al API restants: " + str(peticions_restants))
|
||
|
||
proxim_reset = mastodon.ratelimit_reset
|
||
proxim_reset = datetime.fromtimestamp(proxim_reset)
|
||
proxim_reset = proxim_reset.strftime("%d/%m/%Y, %H:%M:%S")
|
||
print(f"Pròxim reset: {str(proxim_reset)}")
|
||
|
||
alarm_user, alarm_visibility, alarm_time, alarm_city = get_alarms()
|
||
|
||
i = 0
|
||
|
||
while i < len (alarm_user):
|
||
|
||
print(current_time)
|
||
|
||
alarmtime = alarm_time[i].strftime("%H:%M")
|
||
|
||
print(alarmtime)
|
||
|
||
if alarmtime == current_time:
|
||
|
||
username = alarm_user[i]
|
||
|
||
visibility = alarm_visibility[i]
|
||
|
||
city = alarm_city[i]
|
||
|
||
alarm_post(username, visibility, city)
|
||
|
||
i += 1
|
||
|
||
####################################################################
|
||
# get notifications
|
||
|
||
notifications = mastodon.notifications()
|
||
|
||
if len(notifications) == 0:
|
||
|
||
print('No mentions')
|
||
|
||
sys.exit(0)
|
||
|
||
i = 0
|
||
|
||
while i < len(notifications):
|
||
|
||
notification_id = notifications[i].id
|
||
|
||
if notifications[i].type != 'mention':
|
||
|
||
i += 1
|
||
|
||
print(f'dismissing notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
continue
|
||
|
||
account_id = notifications[i]
|
||
|
||
username = notifications[i].account.acct
|
||
|
||
status_id = notifications[i].status.id
|
||
|
||
text = notifications[i].status.content
|
||
|
||
visibility = notifications[i].status.visibility
|
||
|
||
contingut = cleanhtml(text)
|
||
|
||
pregunta = get_query(contingut)
|
||
|
||
if unidecode.unidecode(pregunta)[0:9] == "consulta:":
|
||
|
||
pais = get_country(pregunta)
|
||
|
||
inici = 0
|
||
|
||
final = unidecode.unidecode(pregunta).index('consulta:',0)
|
||
|
||
pregunta = pregunta.split(',')[0]
|
||
|
||
if len(pregunta) > final :
|
||
|
||
pregunta = pregunta[0: inici:] + pregunta[final +10::]
|
||
|
||
try:
|
||
|
||
pregunta = format_query(pregunta)
|
||
|
||
data, people, long, lat = get_weather_data(pregunta)
|
||
|
||
if data != None and 'error' not in data:
|
||
|
||
days = 1
|
||
|
||
detall = createtiletoday(people, long, lat, data, days)
|
||
|
||
toot_text = f'@{username} \n'
|
||
|
||
toot_text += f'El temps a {pregunta} és:\n'
|
||
|
||
toot_text += f'{detall}\n\n'
|
||
|
||
image_id = mastodon.media_post('images/temps.png', "image/png", description='temps').id
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
||
|
||
else:
|
||
|
||
toot_text = f'@{username} \n'
|
||
|
||
toot_text += f'No trobo {pregunta} :-(\n'
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
print(f'Replied notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
except ValueError as verror:
|
||
|
||
country_error()
|
||
|
||
elif unidecode.unidecode(pregunta)[0:5] == "avui:":
|
||
|
||
pais = get_country(pregunta)
|
||
|
||
inici = 0
|
||
|
||
final = unidecode.unidecode(pregunta).index('avui:',0)
|
||
|
||
pregunta = pregunta.split(',')[0]
|
||
|
||
if len(pregunta) > final :
|
||
|
||
pregunta = pregunta[0: inici:] + pregunta[final +6::]
|
||
|
||
try:
|
||
|
||
pregunta = format_query(pregunta)
|
||
|
||
data, people, long, lat = get_weather_data(pregunta)
|
||
|
||
if data != None and 'error' not in data:
|
||
|
||
days = 1
|
||
|
||
detall = createtilehours(data, pregunta)
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"El temps per hores a {pregunta} \n"
|
||
|
||
image_id = mastodon.media_post('images/tempsperhores12.png', "image/png", description='temps').id
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
||
|
||
else:
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"No trobo {pregunta} :-(\n"
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
print(f'Replied notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
except ValueError as verror:
|
||
|
||
country_error()
|
||
|
||
elif unidecode.unidecode(pregunta)[0:8] == "setmana:":
|
||
|
||
pais = get_country(pregunta)
|
||
|
||
inici = 0
|
||
|
||
final = unidecode.unidecode(pregunta).index('setmana:',0)
|
||
|
||
pregunta = pregunta.split(',')[0]
|
||
|
||
if len(pregunta) > final :
|
||
|
||
pregunta = pregunta[0: inici:] + pregunta[final +9::]
|
||
|
||
try:
|
||
|
||
pregunta = format_query(pregunta)
|
||
|
||
data, people, long, lat = get_weather_data(pregunta)
|
||
|
||
if data != None:
|
||
|
||
days = 7
|
||
|
||
detall = createtileweek(data, days)
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"Previsió del temps de 7 dies a {pregunta} \n"
|
||
|
||
image_id = mastodon.media_post('images/tempsweek7.png', "image/png", description='temps').id
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
||
|
||
else:
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"No trobo {pregunta} :-(\n"
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
print(f'Replied notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
except ValueError as verror:
|
||
|
||
country_error()
|
||
|
||
elif unidecode.unidecode(pregunta)[0:7] == "alarma:":
|
||
|
||
inici = 0
|
||
|
||
final = unidecode.unidecode(pregunta).index('alarma:',0)
|
||
|
||
pregunta = pregunta.split(',')[0]
|
||
|
||
if len(pregunta) > final :
|
||
|
||
pregunta = pregunta[0: inici:] + pregunta[final +8::]
|
||
|
||
alarm_time = pregunta.split(' ')[0]
|
||
|
||
alarm_city = pregunta[6:]
|
||
|
||
if alarm_city != '':
|
||
|
||
set_alarm(username, visibility, alarm_time, alarm_city)
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"Alarma per a {alarm_city} fixada a les {alarm_time}\n"
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
print(f'Replied notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
else:
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"Per a configurar una alarma cal fer-ho així:\n@temps alarma: 08:00 Ciutat"
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
print(f'Replied notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
elif unidecode.unidecode(pregunta)[0:8] == "esborra:":
|
||
|
||
inici = 0
|
||
|
||
final = unidecode.unidecode(pregunta).index('esborra:',0)
|
||
|
||
pregunta = pregunta.split(',')[0]
|
||
|
||
if len(pregunta) > final :
|
||
|
||
pregunta = pregunta[0: inici:] + pregunta[final +9::]
|
||
|
||
alarm_exists = check_alarm(username, pregunta)
|
||
|
||
if alarm_exists:
|
||
|
||
delete_alarm(username, pregunta)
|
||
|
||
toot_text = f"@{username} \n"
|
||
|
||
toot_text += f"{pregunta}, alarma esborrada."
|
||
|
||
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
||
|
||
print(f'Replied notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
else:
|
||
|
||
print(f'Dismissing notification {notification_id}')
|
||
|
||
mastodon.notifications_dismiss(notification_id)
|
||
|
||
i += 1
|
||
|