982 líneas
28 KiB
Python
982 líneas
28 KiB
Python
|
# https://medium.com/nexttech/how-to-use-the-openweathermap-api-with-python-c84cc7075cfc
|
|||
|
|
|||
|
import urllib3
|
|||
|
import datetime
|
|||
|
import time
|
|||
|
from mastodon import Mastodon
|
|||
|
import psycopg2
|
|||
|
import unidecode
|
|||
|
import time
|
|||
|
import re
|
|||
|
import os
|
|||
|
import json
|
|||
|
import sys
|
|||
|
import os.path # For checking whether secrets file exists
|
|||
|
import requests # For doing the web stuff, dummy!
|
|||
|
from datetime import timedelta, datetime
|
|||
|
import urllib.request
|
|||
|
import geopy
|
|||
|
from geopy.geocoders import Nominatim
|
|||
|
from PIL import Image, ImageFont, ImageDraw
|
|||
|
import geotiler
|
|||
|
import geocoder
|
|||
|
import ssl
|
|||
|
import pdb
|
|||
|
|
|||
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|||
|
|
|||
|
def cleanhtml(raw_html):
|
|||
|
|
|||
|
cleanr = re.compile('<.*?>')
|
|||
|
cleantext = re.sub(cleanr, '', raw_html)
|
|||
|
return cleantext
|
|||
|
|
|||
|
def translate(i, detall, dir_vent):
|
|||
|
|
|||
|
if detall == 'Parcialmente nuboso':
|
|||
|
detall = 'Parcialment ennuvolat'
|
|||
|
if detall == 'Parcialmente nuboso con niebla':
|
|||
|
detall = 'Parcialment ennuvolat amb boira'
|
|||
|
if detall == 'Parcialmente nuboso con lluvias':
|
|||
|
detall = 'Parcialment ennuvolat amb aiguats'
|
|||
|
if detall == 'Nubes dispersas':
|
|||
|
detall = 'Núvols dispersos'
|
|||
|
if detall == 'Muy nuboso con nevadas':
|
|||
|
detall = 'Molt ennuvolat amb nevades'
|
|||
|
if detall == 'Muy nuboso con lluvia':
|
|||
|
detall = 'Molt ennuvolat amb pluja'
|
|||
|
if detall == 'Muy nuboso con niebla':
|
|||
|
detall = 'Molt ennuvolat amb boira'
|
|||
|
if detall == 'Muy nuboso con lluvias':
|
|||
|
detall = 'Molt ennuvolat amb pluges'
|
|||
|
if detall == 'Cubierto':
|
|||
|
detall = 'Tapat'
|
|||
|
if detall == 'Cubierto con lluvia':
|
|||
|
detall = 'Tapat amb pluja'
|
|||
|
if detall == 'Cubierto con lluvias':
|
|||
|
detall = 'Tapat amb pluges'
|
|||
|
if detall == 'Cubierto con probabilidad de lluvia':
|
|||
|
detall = 'Tapat amb possible pluja'
|
|||
|
if detall == 'Cubierto con nevadas':
|
|||
|
detall = 'Tapat amb nevades'
|
|||
|
if detall == 'Muy nuboso':
|
|||
|
detall = 'Molt ennuvolat'
|
|||
|
if detall == 'Despejado':
|
|||
|
detall = 'Clar'
|
|||
|
|
|||
|
if dir_vent == 'Suroeste':
|
|||
|
dir_vent = 'Sud-oest'
|
|||
|
if dir_vent == 'Noroeste':
|
|||
|
dir_vent = 'Nord-oest'
|
|||
|
if dir_vent == 'Nordeste':
|
|||
|
dir_vent = 'Nord-est'
|
|||
|
if dir_vent == 'Sureste':
|
|||
|
dir_vent = 'Sud-est'
|
|||
|
if dir_vent == 'Norte':
|
|||
|
dir_vent = 'Nord'
|
|||
|
if dir_vent == 'Este':
|
|||
|
dir_vent = 'Est'
|
|||
|
if dir_vent == 'Sur':
|
|||
|
dir_vent = 'Sud'
|
|||
|
if dir_vent == 'Oeste':
|
|||
|
dir_vent = 'Oest'
|
|||
|
|
|||
|
return i, detall, dir_vent
|
|||
|
|
|||
|
def createtilehours(data, pregunta):
|
|||
|
|
|||
|
i = 1
|
|||
|
x = 10
|
|||
|
y = 10
|
|||
|
|
|||
|
while i < 13:
|
|||
|
|
|||
|
hour_data = data['hour_hour']['hour'+str(i)]['hour_data']
|
|||
|
temperature = data['hour_hour']['hour'+str(i)]['temperature']
|
|||
|
detall = data['hour_hour']['hour'+str(i)]['text']
|
|||
|
icona = data['hour_hour']['hour'+str(i)]['icon']
|
|||
|
dir_vent = data['hour_hour']['hour'+str(i)]['wind_direction']
|
|||
|
|
|||
|
i, detall, dir_vent = translate(i, detall, dir_vent)
|
|||
|
|
|||
|
if i == 1:
|
|||
|
fons = Image.open('images/fonsorig.jpg')
|
|||
|
else:
|
|||
|
fons = Image.open('images/tempsperhores'+str(i-1)+'.png')
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
logo_img = Image.open('images/logo.png')
|
|||
|
fons.paste(logo_img, (470, 16), logo_img)
|
|||
|
|
|||
|
# hi afegim l'icona del temps
|
|||
|
icona_path = 'wi/'+icona+'.png'
|
|||
|
temps_img = Image.open(icona_path)
|
|||
|
|
|||
|
fons.paste(temps_img, (x,y+30), temps_img)
|
|||
|
|
|||
|
fons.save('images/temphour'+str(i)+'.png',"PNG")
|
|||
|
|
|||
|
base = Image.open('images/temphour'+str(i)+'.png').convert('RGBA')
|
|||
|
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 25, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
# get a drawing context
|
|||
|
draw = ImageDraw.Draw(txt)
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
draw.text((10,10), pregunta + ', 12 hores', font=fnt, fill=(255,255,255,255)) ## half opacity
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 18, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((x+50,y+45), str(temperature)+'\u00b0'+ ' ' +hour_data+' '+detall, font=fnt, fill=(255,255,255,200)) ## half opacity
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 12, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((380,490), 'temps@mastodont.cat', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
draw.text((400,510), 'API: tutiempo.net', font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
|
|||
|
out = Image.alpha_composite(base, txt)
|
|||
|
out.save('images/tempsperhores'+str(i)+'.png')
|
|||
|
|
|||
|
y = y + 40
|
|||
|
|
|||
|
i += 1
|
|||
|
|
|||
|
def createtiletoday(people, long, lat, data, days):
|
|||
|
|
|||
|
i = 1
|
|||
|
while i < days+1:
|
|||
|
|
|||
|
temperatura_max = data['day'+str(i)]['temperature_max']
|
|||
|
temperatura_min = data['day'+str(i)]['temperature_min']
|
|||
|
icona = data['day'+str(i)]['icon']
|
|||
|
detall = data['day'+str(i)]['text']
|
|||
|
vent = data['day'+str(i)]['wind']
|
|||
|
icona_vent = data['day'+str(i)]['icon_wind']
|
|||
|
dir_vent = data['day'+str(i)]['wind_direction']
|
|||
|
humitat = data['day'+str(i)]['humidity']
|
|||
|
solsurt = data['day'+str(i)]['sunrise']
|
|||
|
solamaga = data['day'+str(i)]['sunset']
|
|||
|
llunasurt = data['day'+str(i)]['moonrise']
|
|||
|
llunaamaga = data['day'+str(i)]['moonset']
|
|||
|
icona_lluna = data['day'+str(i)]['moon_phases_icon']
|
|||
|
|
|||
|
i, detall, dir_vent = translate(i, detall, dir_vent)
|
|||
|
|
|||
|
if people > 1000000:
|
|||
|
zoom = 10
|
|||
|
elif people > 100000 and people < 999999:
|
|||
|
zoom = 11
|
|||
|
elif people > 50000 and people < 99999:
|
|||
|
zoom = 12
|
|||
|
elif people < 49999:
|
|||
|
zoom = 15
|
|||
|
|
|||
|
map = geotiler.Map(center=(long, lat), zoom=zoom, size=(256, 256))
|
|||
|
image = geotiler.render_map(map)
|
|||
|
image.save('images/map.png')
|
|||
|
|
|||
|
fons = Image.open("images/fons.jpg")
|
|||
|
|
|||
|
#hi afegim el mapa de la ciutat
|
|||
|
map_img = Image.open('images/map.png')
|
|||
|
fons.paste(map_img, (390,65), map_img)
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
logo_img = Image.open('images/logo.png')
|
|||
|
fons.paste(logo_img, (310, 320), logo_img)
|
|||
|
|
|||
|
# hi afegim l'icona del temps
|
|||
|
icona_path = 'wi/'+icona+'.png'
|
|||
|
temps_img = Image.open(icona_path)
|
|||
|
|
|||
|
fons.paste(temps_img, (10,55), temps_img)
|
|||
|
|
|||
|
# hi afegim l'icona del termometre
|
|||
|
icona_temperature_path = 'temperatura/temperature.png'
|
|||
|
temperature_img = Image.open(icona_temperature_path)
|
|||
|
|
|||
|
fons.paste(temperature_img, (10,115), temperature_img)
|
|||
|
|
|||
|
# hi afegim l'icona del vent
|
|||
|
icona_vent_path = 'wd/'+icona_vent+'.png'
|
|||
|
vent_img = Image.open(icona_vent_path)
|
|||
|
|
|||
|
fons.paste(vent_img, (22,180), vent_img)
|
|||
|
|
|||
|
# hi afegim l'icona d'humitat
|
|||
|
icona_humitat_path = 'humitat/humedad.png'
|
|||
|
hum_img = Image.open(icona_humitat_path)
|
|||
|
|
|||
|
fons.paste(hum_img, (8,210), hum_img)
|
|||
|
|
|||
|
# hi afegim l'icona del sol sortint
|
|||
|
icona_sun_path = 'sol/solsurt.png'
|
|||
|
sun_img = Image.open(icona_sun_path)
|
|||
|
|
|||
|
fons.paste(sun_img, (10,270), sun_img)
|
|||
|
|
|||
|
# hi afegim l'icona de la lluna
|
|||
|
icona_lluna_path = 'canviats/'+icona_lluna+'.png'
|
|||
|
lluna_img = Image.open(icona_lluna_path)
|
|||
|
|
|||
|
fons.paste(lluna_img, (12,315), lluna_img)
|
|||
|
fons.save('images/temporal.png',"PNG")
|
|||
|
|
|||
|
base = Image.open('images/temporal.png').convert('RGBA')
|
|||
|
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 35, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
# get a drawing context
|
|||
|
draw = ImageDraw.Draw(txt)
|
|||
|
|
|||
|
draw.text((10,10), pregunta, font=fnt, fill=(255,255,255,255))
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 30, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((70,60), detall, font=fnt, fill=(255,255,255,200)) ## half opacity
|
|||
|
draw.text((70,120), str(temperatura_max)+'\u00b0 / '+str(temperatura_min)+'\u00b0', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
draw.text((70,170), str(dir_vent)+', '+str(vent)+'km/h', font=fnt, fill=(255,255,255,200))
|
|||
|
draw.text((70,220), str(humitat)+'%', font=fnt, fill=(255,255,255,200))
|
|||
|
draw.text((70,270), str(solsurt)+', '+str(solamaga), font=fnt, fill=(255,255,255,200))
|
|||
|
draw.text((70,320), str(llunasurt)+', '+str(llunaamaga), font=fnt, fill=(255,255,255,200))
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 12, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((380,350), 'temps@mastodont.cat', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
draw.text((540,350), 'API: tutiempo.net', font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 10, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((395,300),"© Col·laboradors d'OpenStreetMap, CC-BY-SA", font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
|
|||
|
out = Image.alpha_composite(base, txt)
|
|||
|
out.save('images/temps.png')
|
|||
|
|
|||
|
i += 1
|
|||
|
|
|||
|
return detall
|
|||
|
|
|||
|
def createtileweek(data, days):
|
|||
|
|
|||
|
week_days= ['Dilluns', 'Dimarts', 'Dimecres', 'Dijous', 'Divendres', 'Dissabte','Diumenge']
|
|||
|
|
|||
|
x = 10
|
|||
|
y = 10
|
|||
|
|
|||
|
i = 1
|
|||
|
while i < days+1:
|
|||
|
|
|||
|
weather_date = data['day'+str(i)]['date']
|
|||
|
temperatura_max = data['day'+str(i)]['temperature_max']
|
|||
|
temperatura_min = data['day'+str(i)]['temperature_min']
|
|||
|
icona = data['day'+str(i)]['icon']
|
|||
|
detall = data['day'+str(i)]['text']
|
|||
|
vent = data['day'+str(i)]['wind']
|
|||
|
icona_vent = data['day'+str(i)]['icon_wind']
|
|||
|
dir_vent = data['day'+str(i)]['wind_direction']
|
|||
|
humitat = data['day'+str(i)]['humidity']
|
|||
|
solsurt = data['day'+str(i)]['sunrise']
|
|||
|
solamaga = data['day'+str(i)]['sunset']
|
|||
|
llunasurt = data['day'+str(i)]['moonrise']
|
|||
|
llunaamaga = data['day'+str(i)]['moonset']
|
|||
|
icona_lluna = data['day'+str(i)]['moon_phases_icon']
|
|||
|
|
|||
|
i, detall, dir_vent = translate(i, detall, dir_vent)
|
|||
|
|
|||
|
date_list = list(map(str, weather_date. split('-')))
|
|||
|
day=datetime(int(date_list[0]),int(date_list[1]),int(date_list[2])).weekday()
|
|||
|
|
|||
|
if i == 1:
|
|||
|
fons = Image.open('images/fons.jpg')
|
|||
|
else:
|
|||
|
fons = Image.open('images/tempsweek'+str(i-1)+'.png')
|
|||
|
|
|||
|
# hi afegim l'icona del temps
|
|||
|
icona_path = 'wi/'+icona+'.png'
|
|||
|
temps_img = Image.open(icona_path)
|
|||
|
|
|||
|
fons.paste(temps_img, (y+10, x+80), temps_img)
|
|||
|
|
|||
|
if i == 1:
|
|||
|
logo_img = Image.open('images/logo.png')
|
|||
|
fons.paste(logo_img, (15, 320), logo_img)
|
|||
|
|
|||
|
fons.save('images/tempweek'+str(i)+'.png',"PNG")
|
|||
|
|
|||
|
base = Image.open('images/tempweek'+str(i)+'.png').convert('RGBA')
|
|||
|
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 35, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
# get a drawing context
|
|||
|
draw = ImageDraw.Draw(txt)
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
draw.text((15,10), pregunta+', 7 dies', font=fnt, fill=(255,255,255,255))
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 16, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
|
|||
|
if i == 1:
|
|||
|
draw.text((y+10,x+50), "Avui", font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
elif week_days[day] == 'Dimarts':
|
|||
|
draw.text((y+4,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
elif week_days[day] == 'Dimecres':
|
|||
|
draw.text((y-5,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
elif week_days[day] == 'Dijous':
|
|||
|
draw.text((y+6,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
elif week_days[day] == 'Divendres':
|
|||
|
draw.text((y-6,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
elif week_days[day] == 'Dissabte':
|
|||
|
draw.text((y,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
else:
|
|||
|
draw.text((y+4,x+50), str(week_days[day]), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 24, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((y+18,x+140), str(temperatura_max)+'\u00b0', font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
draw.text((y+20,x+190), str(temperatura_min)+'\u00b0', font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
|
|||
|
if i == 1:
|
|||
|
|
|||
|
fnt = ImageFont.truetype('fonts/DejaVuSerif.ttf', 15, layout_engine=ImageFont.LAYOUT_BASIC)
|
|||
|
draw.text((60,330), 'temps@mastodont.cat - 2020', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
draw.text((520,330), 'API: tutiempo.net', font=fnt, fill=(155,0,200,200)) #fill=(255,255,255,255)) ## full opacity
|
|||
|
|
|||
|
out = Image.alpha_composite(base, txt)
|
|||
|
out.save('images/tempsweek'+str(i)+'.png')
|
|||
|
|
|||
|
y += 95
|
|||
|
i += 1
|
|||
|
|
|||
|
return detall
|
|||
|
|
|||
|
def format_query(pregunta):
|
|||
|
|
|||
|
if pregunta.find("'") != -1:
|
|||
|
|
|||
|
pregunta = pregunta.replace(''', "'")
|
|||
|
|
|||
|
elif pregunta.find("’") != -1:
|
|||
|
|
|||
|
pregunta = pregunta.replace("’", "'")
|
|||
|
|
|||
|
elif pregunta.find("'") != -1:
|
|||
|
|
|||
|
pregunta = pregunta.replace(''', "'")
|
|||
|
|
|||
|
return pregunta
|
|||
|
|
|||
|
def get_weather_data(pregunta):
|
|||
|
|
|||
|
nom = Nominatim(user_agent="temps@mastodont.cat")
|
|||
|
coords = nom.geocode(pregunta)
|
|||
|
|
|||
|
if coords != None:
|
|||
|
|
|||
|
long = coords.longitude
|
|||
|
lat = coords.latitude
|
|||
|
|
|||
|
headers = {"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"}
|
|||
|
|
|||
|
g = geocoder.geonames(pregunta, key='spla')
|
|||
|
|
|||
|
people = g.population if g.ok else 0
|
|||
|
|
|||
|
url = 'https://api.tutiempo.net/json/?lan=es&apid={}&ll={},{}'.format(api_key, lat, long)
|
|||
|
res = requests.get(url, headers=headers, verify=False)
|
|||
|
|
|||
|
data = res.json()
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
data = None
|
|||
|
people = None
|
|||
|
long = None
|
|||
|
lat = None
|
|||
|
|
|||
|
return (data, people, long, lat)
|
|||
|
|
|||
|
def get_query(contingut):
|
|||
|
|
|||
|
inici = contingut.index("@")
|
|||
|
|
|||
|
final = contingut.index(" ")
|
|||
|
|
|||
|
if len(contingut) > final:
|
|||
|
|
|||
|
contingut = contingut[0: inici:] + contingut[final +1::]
|
|||
|
|
|||
|
neteja = contingut.count('@')
|
|||
|
|
|||
|
i = 0
|
|||
|
while i < neteja :
|
|||
|
|
|||
|
inici = contingut.rfind("@")
|
|||
|
final = len(contingut)
|
|||
|
contingut = contingut[0: inici:] + contingut[final +1::]
|
|||
|
i += 1
|
|||
|
|
|||
|
pregunta = contingut.lstrip(" ")
|
|||
|
|
|||
|
return pregunta
|
|||
|
|
|||
|
def get_country(pregunta):
|
|||
|
|
|||
|
if pregunta.find(',') != -1:
|
|||
|
|
|||
|
pais = pregunta.rsplit(',', 1)[1]
|
|||
|
pais = pais.replace(' ', '')
|
|||
|
pais = pais.upper()
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
pais = 'ES'
|
|||
|
|
|||
|
return pais
|
|||
|
|
|||
|
def set_alarm(username, visibility, alarm_time, alarm_city):
|
|||
|
|
|||
|
insert_sql = "INSERT INTO alarms(username, visibility, time, city) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING"
|
|||
|
|
|||
|
conn = None
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|||
|
|
|||
|
cur = conn.cursor()
|
|||
|
|
|||
|
cur.execute(insert_sql, (username, visibility, alarm_time, alarm_city))
|
|||
|
|
|||
|
conn.commit()
|
|||
|
|
|||
|
cur.close()
|
|||
|
|
|||
|
except (Exception, psycopg2.DatabaseError) as error:
|
|||
|
|
|||
|
print(error)
|
|||
|
|
|||
|
finally:
|
|||
|
|
|||
|
if conn is not None:
|
|||
|
|
|||
|
conn.close()
|
|||
|
|
|||
|
def get_alarms():
|
|||
|
|
|||
|
alarm_user = []
|
|||
|
|
|||
|
alarm_visibility = []
|
|||
|
|
|||
|
alarm_time = []
|
|||
|
|
|||
|
alarm_city = []
|
|||
|
|
|||
|
conn = None
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|||
|
|
|||
|
cur = conn.cursor()
|
|||
|
|
|||
|
cur.execute('select username, visibility, time, city from alarms')
|
|||
|
|
|||
|
rows = cur.fetchall()
|
|||
|
|
|||
|
for row in rows:
|
|||
|
|
|||
|
alarm_user.append(row[0])
|
|||
|
|
|||
|
alarm_visibility.append(row[1])
|
|||
|
|
|||
|
alarm_time.append(row[2])
|
|||
|
|
|||
|
alarm_city.append(row[3])
|
|||
|
|
|||
|
cur.close()
|
|||
|
|
|||
|
except (Exception, psycopg2.DatabaseError) as error:
|
|||
|
|
|||
|
print(error)
|
|||
|
|
|||
|
finally:
|
|||
|
|
|||
|
if conn is not None:
|
|||
|
|
|||
|
conn.close()
|
|||
|
|
|||
|
return (alarm_user, alarm_visibility, alarm_time, alarm_city)
|
|||
|
|
|||
|
def check_alarm(username, city):
|
|||
|
|
|||
|
sql_query = 'select username, city from alarms where username=(%s) and city=(%s)'
|
|||
|
|
|||
|
alarm_exists = False
|
|||
|
|
|||
|
conn = None
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|||
|
|
|||
|
cur = conn.cursor()
|
|||
|
|
|||
|
cur.execute(sql_query, (username, city))
|
|||
|
|
|||
|
row = cur.fetchone()
|
|||
|
|
|||
|
if row != None:
|
|||
|
|
|||
|
alarm_exists = True
|
|||
|
|
|||
|
cur.close()
|
|||
|
|
|||
|
except (Exception, psycopg2.DatabaseError) as error:
|
|||
|
|
|||
|
print(error)
|
|||
|
|
|||
|
finally:
|
|||
|
|
|||
|
if conn is not None:
|
|||
|
|
|||
|
conn.close()
|
|||
|
|
|||
|
return alarm_exists
|
|||
|
|
|||
|
def delete_alarm(username, city):
|
|||
|
|
|||
|
delete_sql = 'delete from alarms where username=(%s) and city=(%s)'
|
|||
|
|
|||
|
conn = None
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
conn = psycopg2.connect(database = alarm_db, user = alarm_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|||
|
|
|||
|
cur = conn.cursor()
|
|||
|
|
|||
|
cur.execute(delete_sql, (username, city))
|
|||
|
|
|||
|
conn.commit()
|
|||
|
|
|||
|
cur.close()
|
|||
|
|
|||
|
except (Exception, psycopg2.DatabaseError) as error:
|
|||
|
|
|||
|
print(error)
|
|||
|
|
|||
|
finally:
|
|||
|
|
|||
|
if conn is not None:
|
|||
|
|
|||
|
conn.close()
|
|||
|
|
|||
|
def alarm_post(username, visibility, city):
|
|||
|
|
|||
|
data, people, long, lat = get_weather_data(city)
|
|||
|
|
|||
|
if data != None and 'error' not in data:
|
|||
|
|
|||
|
days = 1
|
|||
|
|
|||
|
detall = createtilehours(data, city)
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"El temps per hores a {city} \n"
|
|||
|
|
|||
|
image_id = mastodon.media_post('images/tempsperhores12.png', "image/png", description='temps').id
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=None, visibility=visibility, media_ids={image_id})
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"No trobo {city} :-(\n"
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
def get_parameter( parameter, file_path ):
|
|||
|
# Check if secrets file exists
|
|||
|
if not os.path.isfile(file_path):
|
|||
|
print("File %s not found, exiting."%file_path)
|
|||
|
sys.exit(0)
|
|||
|
|
|||
|
# Find parameter in file
|
|||
|
with open( file_path ) as f:
|
|||
|
for line in f:
|
|||
|
if line.startswith( parameter ):
|
|||
|
return line.replace(parameter + ":", "").strip()
|
|||
|
|
|||
|
# Cannot find parameter, exit
|
|||
|
print(file_path + " Missing parameter %s "%parameter)
|
|||
|
sys.exit(0)
|
|||
|
|
|||
|
def create_dir():
|
|||
|
|
|||
|
if not os.path.exists('images'):
|
|||
|
|
|||
|
os.makedirs('images')
|
|||
|
|
|||
|
def mastodon():
|
|||
|
|
|||
|
# Load secrets from secrets file
|
|||
|
secrets_filepath = "secrets/secrets.txt"
|
|||
|
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
|
|||
|
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
|
|||
|
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
|
|||
|
|
|||
|
# Load configuration from config file
|
|||
|
config_filepath = "config/config.txt"
|
|||
|
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
|
|||
|
|
|||
|
# Initialise Mastodon API
|
|||
|
mastodon = Mastodon(
|
|||
|
client_id=uc_client_id,
|
|||
|
client_secret=uc_client_secret,
|
|||
|
access_token=uc_access_token,
|
|||
|
api_base_url='https://' + mastodon_hostname,
|
|||
|
)
|
|||
|
|
|||
|
# Initialise access headers
|
|||
|
headers = {'Authorization': 'Bearer %s'%uc_access_token}
|
|||
|
|
|||
|
return (mastodon, mastodon_hostname)
|
|||
|
|
|||
|
def dbconfig():
|
|||
|
|
|||
|
# Load configuration from config file
|
|||
|
config_filepath = "config/db_config.txt"
|
|||
|
alarm_db = get_parameter("alarm_db", config_filepath)
|
|||
|
alarm_db_user = get_parameter("alarm_db_user", config_filepath)
|
|||
|
|
|||
|
return (alarm_db, alarm_db_user)
|
|||
|
|
|||
|
def read_apikey():
|
|||
|
|
|||
|
# Load secrets from secrets file
|
|||
|
secrets_filepath = "secrets/apikey.txt"
|
|||
|
api_key = get_parameter("api_key", secrets_filepath)
|
|||
|
|
|||
|
return api_key
|
|||
|
|
|||
|
# main
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
|
|||
|
t = time.localtime()
|
|||
|
|
|||
|
current_time = time.strftime("%H:%M", t)
|
|||
|
|
|||
|
create_dir()
|
|||
|
|
|||
|
mastodon, mastodon_hostname = mastodon()
|
|||
|
|
|||
|
api_key = read_apikey()
|
|||
|
|
|||
|
alarm_db, alarm_db_user = dbconfig()
|
|||
|
|
|||
|
peticions_restants = mastodon.ratelimit_remaining
|
|||
|
print("Peticions al API restants: " + str(peticions_restants))
|
|||
|
|
|||
|
proxim_reset = mastodon.ratelimit_reset
|
|||
|
proxim_reset = datetime.fromtimestamp(proxim_reset)
|
|||
|
proxim_reset = proxim_reset.strftime("%d/%m/%Y, %H:%M:%S")
|
|||
|
print(f"Pròxim reset: {str(proxim_reset)}")
|
|||
|
|
|||
|
alarm_user, alarm_visibility, alarm_time, alarm_city = get_alarms()
|
|||
|
|
|||
|
i = 0
|
|||
|
|
|||
|
while i < len (alarm_user):
|
|||
|
|
|||
|
print(current_time)
|
|||
|
|
|||
|
alarmtime = alarm_time[i].strftime("%H:%M")
|
|||
|
|
|||
|
print(alarmtime)
|
|||
|
|
|||
|
if alarmtime == current_time:
|
|||
|
|
|||
|
username = alarm_user[i]
|
|||
|
|
|||
|
visibility = alarm_visibility[i]
|
|||
|
|
|||
|
city = alarm_city[i]
|
|||
|
|
|||
|
alarm_post(username, visibility, city)
|
|||
|
|
|||
|
i += 1
|
|||
|
|
|||
|
####################################################################
|
|||
|
# get notifications
|
|||
|
|
|||
|
notifications = mastodon.notifications()
|
|||
|
|
|||
|
if len(notifications) == 0:
|
|||
|
|
|||
|
print('No mentions')
|
|||
|
|
|||
|
sys.exit(0)
|
|||
|
|
|||
|
i = 0
|
|||
|
|
|||
|
while i < len(notifications):
|
|||
|
|
|||
|
notification_id = notifications[i].id
|
|||
|
|
|||
|
if notifications[i].type != 'mention':
|
|||
|
|
|||
|
i += 1
|
|||
|
|
|||
|
print(f'dismissing notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
continue
|
|||
|
|
|||
|
account_id = notifications[i]
|
|||
|
|
|||
|
username = notifications[i].account.acct
|
|||
|
|
|||
|
status_id = notifications[i].status.id
|
|||
|
|
|||
|
text = notifications[i].status.content
|
|||
|
|
|||
|
visibility = notifications[i].status.visibility
|
|||
|
|
|||
|
contingut = cleanhtml(text)
|
|||
|
|
|||
|
pregunta = get_query(contingut)
|
|||
|
|
|||
|
if unidecode.unidecode(pregunta)[0:9] == "consulta:":
|
|||
|
|
|||
|
pais = get_country(pregunta)
|
|||
|
|
|||
|
inici = 0
|
|||
|
|
|||
|
final = unidecode.unidecode(pregunta).index('consulta:',0)
|
|||
|
|
|||
|
pregunta = pregunta.split(',')[0]
|
|||
|
|
|||
|
if len(pregunta) > final :
|
|||
|
|
|||
|
pregunta = pregunta[0: inici:] + pregunta[final +10::]
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
pregunta = format_query(pregunta)
|
|||
|
|
|||
|
data, people, long, lat = get_weather_data(pregunta)
|
|||
|
|
|||
|
if data != None and 'error' not in data:
|
|||
|
|
|||
|
days = 1
|
|||
|
|
|||
|
detall = createtiletoday(people, long, lat, data, days)
|
|||
|
|
|||
|
toot_text = f'@{username} \n'
|
|||
|
|
|||
|
toot_text += f'El temps a {pregunta} és:\n'
|
|||
|
|
|||
|
toot_text += f'{detall}\n\n'
|
|||
|
|
|||
|
image_id = mastodon.media_post('images/temps.png', "image/png", description='temps').id
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
toot_text = f'@{username} \n'
|
|||
|
|
|||
|
toot_text += f'No trobo {pregunta} :-(\n'
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
print(f'Replied notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
except ValueError as verror:
|
|||
|
|
|||
|
country_error()
|
|||
|
|
|||
|
elif unidecode.unidecode(pregunta)[0:5] == "avui:":
|
|||
|
|
|||
|
pais = get_country(pregunta)
|
|||
|
|
|||
|
inici = 0
|
|||
|
|
|||
|
final = unidecode.unidecode(pregunta).index('avui:',0)
|
|||
|
|
|||
|
pregunta = pregunta.split(',')[0]
|
|||
|
|
|||
|
if len(pregunta) > final :
|
|||
|
|
|||
|
pregunta = pregunta[0: inici:] + pregunta[final +6::]
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
pregunta = format_query(pregunta)
|
|||
|
|
|||
|
data, people, long, lat = get_weather_data(pregunta)
|
|||
|
|
|||
|
if data != None and 'error' not in data:
|
|||
|
|
|||
|
days = 1
|
|||
|
|
|||
|
detall = createtilehours(data, pregunta)
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"El temps per hores a {pregunta} \n"
|
|||
|
|
|||
|
image_id = mastodon.media_post('images/tempsperhores12.png', "image/png", description='temps').id
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"No trobo {pregunta} :-(\n"
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
print(f'Replied notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
except ValueError as verror:
|
|||
|
|
|||
|
country_error()
|
|||
|
|
|||
|
elif unidecode.unidecode(pregunta)[0:8] == "setmana:":
|
|||
|
|
|||
|
pais = get_country(pregunta)
|
|||
|
|
|||
|
inici = 0
|
|||
|
|
|||
|
final = unidecode.unidecode(pregunta).index('setmana:',0)
|
|||
|
|
|||
|
pregunta = pregunta.split(',')[0]
|
|||
|
|
|||
|
if len(pregunta) > final :
|
|||
|
|
|||
|
pregunta = pregunta[0: inici:] + pregunta[final +9::]
|
|||
|
|
|||
|
try:
|
|||
|
|
|||
|
pregunta = format_query(pregunta)
|
|||
|
|
|||
|
data, people, long, lat = get_weather_data(pregunta)
|
|||
|
|
|||
|
if data != None:
|
|||
|
|
|||
|
days = 7
|
|||
|
|
|||
|
detall = createtileweek(data, days)
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"Previsió del temps de 7 dies a {pregunta} \n"
|
|||
|
|
|||
|
image_id = mastodon.media_post('images/tempsweek7.png', "image/png", description='temps').id
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"No trobo {pregunta} :-(\n"
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
print(f'Replied notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
except ValueError as verror:
|
|||
|
|
|||
|
country_error()
|
|||
|
|
|||
|
elif unidecode.unidecode(pregunta)[0:7] == "alarma:":
|
|||
|
|
|||
|
inici = 0
|
|||
|
|
|||
|
final = unidecode.unidecode(pregunta).index('alarma:',0)
|
|||
|
|
|||
|
pregunta = pregunta.split(',')[0]
|
|||
|
|
|||
|
if len(pregunta) > final :
|
|||
|
|
|||
|
pregunta = pregunta[0: inici:] + pregunta[final +8::]
|
|||
|
|
|||
|
alarm_time = pregunta.split(' ')[0]
|
|||
|
|
|||
|
alarm_city = pregunta[6:]
|
|||
|
|
|||
|
if alarm_city != '':
|
|||
|
|
|||
|
set_alarm(username, visibility, alarm_time, alarm_city)
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"Alarma per a {alarm_city} fixada a les {alarm_time}\n"
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
print(f'Replied notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"Per a configurar una alarma cal fer-ho així:\n@temps alarma: 08:00 Ciutat"
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
print(f'Replied notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
elif unidecode.unidecode(pregunta)[0:8] == "esborra:":
|
|||
|
|
|||
|
inici = 0
|
|||
|
|
|||
|
final = unidecode.unidecode(pregunta).index('esborra:',0)
|
|||
|
|
|||
|
pregunta = pregunta.split(',')[0]
|
|||
|
|
|||
|
if len(pregunta) > final :
|
|||
|
|
|||
|
pregunta = pregunta[0: inici:] + pregunta[final +9::]
|
|||
|
|
|||
|
alarm_exists = check_alarm(username, pregunta)
|
|||
|
|
|||
|
if alarm_exists:
|
|||
|
|
|||
|
delete_alarm(username, pregunta)
|
|||
|
|
|||
|
toot_text = f"@{username} \n"
|
|||
|
|
|||
|
toot_text += f"{pregunta}, alarma esborrada."
|
|||
|
|
|||
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|||
|
|
|||
|
print(f'Replied notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
else:
|
|||
|
|
|||
|
print(f'Dismissing notification {notification_id}')
|
|||
|
|
|||
|
mastodon.notifications_dismiss(notification_id)
|
|||
|
|
|||
|
i += 1
|
|||
|
|