Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F1726756
workrave-auto.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Size
18 KB
Subscribers
None
workrave-auto.py
View Options
#!/usr/bin/env python
from
PyQt5.QtCore
import
QTimer
,
pyqtSignal
,
QObject
,
pyqtSlot
,
QCoreApplication
,
QMetaObject
,
Qt
,
QEventLoop
,
QThread
from
PyQt5
import
QtGui
,
QtWidgets
from
dbus.mainloop.pyqt5
import
DBusQtMainLoop
import
dbus
import
os
import
configparser
import
datetime
import
time
import
sys
import
signal
import
asyncio
import
calendarsync
from
pprint
import
pprint
def
wk2int
(
day
):
if
day
==
'mon'
:
return
0
elif
day
==
'tue'
:
return
1
elif
day
==
'wed'
:
return
2
elif
day
==
'thu'
:
return
3
elif
day
==
'fri'
:
return
4
elif
day
==
'sat'
:
return
5
elif
day
==
'sun'
:
return
6
else
:
return
-
1
def
time2diff
(
now
,
time
):
datetime
.
datetime
.
now
()
to
=
time
.
split
(
':'
)
hr
=
int
(
to
[
0
])
mi
=
0
sec
=
0
if
len
(
to
)
==
2
:
mi
=
int
(
to
[
1
])
if
len
(
to
)
==
3
:
sec
=
int
(
to
[
2
])
return
datetime
.
datetime
.
combine
(
datetime
.
date
.
today
(),
datetime
.
time
(
hr
,
mi
,
sec
))
-
now
class
autoEntry
:
def
__init__
(
self
,
options
):
self
.
options
=
options
self
.
next_action
=
options
[
'default'
]
self
.
next_run
=
0
self
.
in_interval
=
False
self
.
startDt
=
False
self
.
endDt
=
False
self
.
update
()
def
print_options
(
self
):
print
(
self
.
options
)
def
update
(
self
):
self
.
in_interval
=
False
now
=
datetime
.
datetime
.
now
()
dateOn
=
now
.
date
()
dateOff
=
now
.
date
()
timeOn
=
datetime
.
time
(
0
,
0
)
timeOff
=
datetime
.
time
(
23
,
59
)
recurring
=
False
if
'weekday'
in
self
.
options
:
day
=
wk2int
(
self
.
options
[
'weekday'
])
if
now
.
date
()
.
weekday
()
==
day
:
dateOn
=
now
.
date
()
dateOff
=
now
.
date
()
else
:
td
=
day
-
now
.
date
()
.
weekday
()
if
td
<
0
:
td
+=
7
dateOn
=
now
.
date
()
+
datetime
.
timedelta
(
days
=
td
)
dateOff
=
dateOn
recurring
=
True
# Don't forget to add a week if we are outside of the interval
if
'startdate'
in
self
.
options
:
dateOn
=
datetime
.
datetime
.
strptime
(
self
.
options
[
'startdate'
],
"%Y-%m-
%d
"
)
.
date
()
if
'enddate'
in
self
.
options
:
dateOff
=
datetime
.
datetime
.
strptime
(
self
.
options
[
'enddate'
],
"%Y-%m-
%d
"
)
.
date
()
if
'starttime'
in
self
.
options
:
timeOn
=
datetime
.
datetime
.
strptime
(
self
.
options
[
'starttime'
],
"%H:%M"
)
.
time
()
if
'endtime'
in
self
.
options
:
timeOff
=
datetime
.
datetime
.
strptime
(
self
.
options
[
'endtime'
],
"%H:%M"
)
.
time
()
startDt
=
datetime
.
datetime
.
combine
(
dateOn
,
timeOn
)
endDt
=
datetime
.
datetime
.
combine
(
dateOff
,
timeOff
)
if
startDt
<
now
and
endDt
>
now
:
# in interval
self
.
next_action
=
self
.
options
[
'default'
]
self
.
next_run
=
(
endDt
-
now
)
.
total_seconds
()
self
.
in_interval
=
True
elif
startDt
<
now
and
endDt
<
now
:
# event is in the past
if
recurring
:
self
.
next_action
=
self
.
options
[
'mode'
]
self
.
next_run
=
((
endDt
+
datetime
.
timedelta
(
days
=
7
))
-
now
)
.
total_seconds
()
else
:
self
.
next_action
=
False
self
.
next_run
=
False
elif
startDt
>
now
and
endDt
>
now
:
# in the future
self
.
next_action
=
self
.
options
[
'mode'
]
self
.
next_run
=
(
startDt
-
now
)
.
total_seconds
()
else
:
print
(
'Invalid settings'
)
self
.
next_action
=
False
self
.
next_run
=
False
self
.
startDt
=
startDt
self
.
endDt
=
endDt
def
get_next_action
(
self
):
self
.
update
()
return
self
.
next_action
def
get_interval_action
(
self
):
return
self
.
options
[
'mode'
]
def
get_next_run
(
self
):
self
.
update
()
return
int
(
self
.
next_run
)
def
get_start
(
self
):
self
.
update
()
return
self
.
startDt
.
timestamp
()
def
get_end
(
self
):
self
.
update
()
return
self
.
endDt
.
timestamp
()
def
get_options
(
self
):
return
self
.
options
def
get_string
(
self
):
return
"Start: "
+
self
.
startDt
.
strftime
(
'%Y-%m-
%d
%H:%M:
%s
'
)
+
', End: '
+
self
.
endDt
.
strftime
(
'%Y-%m-
%d
%H:%M:
%s
'
)
def
is_in_interval
(
self
):
self
.
update
()
return
self
.
in_interval
def
load_config
(
cfgFile
):
config
=
configparser
.
ConfigParser
()
config
.
read
(
cfgFile
)
return
config
def
save_config
(
config
,
cfgFile
):
with
open
(
cfgFile
,
'w'
)
as
configFile
:
config
.
write
(
configFile
)
def
recreate_config
(
cfgFile
):
config
=
configparser
.
ConfigParser
()
config
[
'general'
]
=
{
'mode'
:
'normal'
,
'entries'
:
'entry1,entry2'
,
'calendards'
:
'calendar1,calendar2'
,
'calendarsyncinterval'
:
'60'
}
config
[
'calendar1'
]
=
{
'url'
:
'https://my.dav.server'
,
'calendar'
:
'Personal'
,
'wholeday'
:
'skip'
}
config
[
'calendar2'
]
=
{
'url'
:
'https://my.dav.server'
,
'calendar'
:
'Personal'
,
'wholeday'
:
'skip'
}
config
[
'entry1'
]
=
{
'name'
:
'workrave'
,
'mode'
:
'suspended'
,
'weekday'
:
'wed'
,
'starttime'
:
'07:00'
,
'endtime'
:
'15:10'
}
config
[
'entry2'
]
=
{
'name'
:
'workrave'
,
'mode'
:
'suspended'
,
'weekday'
:
'thu'
,
'starttime'
:
'07:00'
,
'endtime'
:
'18:00'
}
save_config
(
config
,
cfgFile
)
class
workraveManager
(
QObject
):
start_sync
=
pyqtSignal
(
bool
,
bool
)
finished
=
pyqtSignal
()
timer_time
=
pyqtSignal
(
int
)
def
__init__
(
self
,
cfg
,
parent
=
None
):
QObject
.
__init__
(
self
)
self
.
cfg
=
cfg
self
.
parent
=
parent
self
.
entryobjects
=
[]
self
.
default_mode
=
cfg
[
'general'
][
'mode'
]
self
.
sync_interval
=
cfg
[
'general'
][
'calendarsyncinterval'
]
self
.
inhibited
=
False
self
.
first_run
=
True
def
build_entryobjects
(
self
):
self
.
entryobjects
.
clear
()
entries
=
self
.
parse_config
(
self
.
cfg
)
for
entry
in
entries
[
'entries'
]:
entryCfg
=
entries
[
'entries'
][
entry
]
entryCfg
[
'default'
]
=
self
.
default_mode
self
.
entryobjects
.
append
(
autoEntry
(
entryCfg
))
for
calendar
in
entries
[
'calendars'
]:
calendarCfg
=
entries
[
'calendars'
][
calendar
]
calendarCfg
[
'default'
]
=
self
.
default_mode
calendarCfg
[
'cfgpath'
]
=
self
.
cfg
[
'general'
][
'cfgpath'
]
if
self
.
first_run
:
self
.
entryobjects
.
extend
(
self
.
calendar2autoEntry
(
calendarCfg
,
onlyCache
=
True
))
else
:
self
.
entryobjects
.
extend
(
self
.
calendar2autoEntry
(
calendarCfg
))
self
.
entryobjects
=
self
.
deduplicate
(
self
.
entryobjects
)
self
.
first_run
=
False
if
len
(
entries
[
'calendars'
])
>
0
and
not
self
.
syncTimer
.
isActive
():
self
.
syncTimer
.
start
(
int
(
self
.
sync_interval
)
*
1000
*
60
)
def
deduplicate
(
self
,
entries
):
retlist
=
[]
print
(
'Before simplification we had '
+
str
(
len
(
entries
))
+
' entries'
)
for
entry
in
entries
:
duplicate
=
False
for
ret
in
retlist
:
# New entry is contained in existing one
if
entry
.
get_start
()
>
ret
.
get_start
()
and
entry
.
get_end
()
<
ret
.
get_end
():
print
(
'Entry is contained in existing one, not adding'
)
duplicate
=
True
# New entry includes existing one
elif
entry
.
get_start
()
<
ret
.
get_start
()
and
entry
.
get_end
()
>
ret
.
get_end
():
print
(
'Existing entry is contained in new one, deleting existing one'
)
duplicate
=
False
retlist
.
remove
(
ret
)
# Starts before and ends between
elif
entry
.
get_start
()
<
ret
.
get_start
()
and
entry
.
get_end
()
<
ret
.
get_end
()
and
entry
.
get_end
()
>
ret
.
get_start
():
print
(
'New entry starts before and ends between existing one'
)
duplicate
=
False
dts
=
datetime
.
datetime
.
fromtimestamp
(
entry
.
get_start
())
dte
=
datetime
.
datetime
.
fromtimestamp
(
ret
.
get_end
())
tmp
=
{}
tmp
[
'default'
]
=
self
.
default_mode
tmp
[
'mode'
]
=
'suspended'
tmp
[
'starttime'
]
=
dts
.
strftime
(
"%H:%M"
)
tmp
[
'startdate'
]
=
dts
.
strftime
(
"%Y-%m-
%d
"
)
tmp
[
'endtime'
]
=
dte
.
strftime
(
"%H:%M"
)
tmp
[
'enddate'
]
=
dte
.
strftime
(
"%Y-%m-
%d
"
)
retlist
.
remove
(
ret
)
entry
=
autoEntry
(
tmp
)
elif
entry
.
get_start
()
>
ret
.
get_start
()
and
entry
.
get_end
()
>
ret
.
get_end
()
and
entry
.
get_start
()
<
ret
.
get_end
():
print
(
'New entry starts between and ends after existing one'
)
duplicate
=
False
dts
=
datetime
.
datetime
.
fromtimestamp
(
ret
.
get_start
())
dte
=
datetime
.
datetime
.
fromtimestamp
(
entry
.
get_end
())
tmp
=
{}
tmp
[
'default'
]
=
self
.
default_mode
tmp
[
'mode'
]
=
'suspended'
tmp
[
'starttime'
]
=
dts
.
strftime
(
"%H:%M"
)
tmp
[
'startdate'
]
=
dts
.
strftime
(
"%Y-%m-
%d
"
)
tmp
[
'endtime'
]
=
dte
.
strftime
(
"%H:%M"
)
tmp
[
'enddate'
]
=
dte
.
strftime
(
"%Y-%m-
%d
"
)
retlist
.
remove
(
ret
)
entry
=
autoEntry
(
tmp
)
if
not
duplicate
:
print
(
'Appending: '
+
entry
.
get_string
())
retlist
.
append
(
entry
)
print
(
'After simplification we have '
+
str
(
len
(
retlist
))
+
' entries'
)
return
retlist
def
calendar2autoEntry
(
self
,
cfg
,
onlyCache
=
False
):
ret
=
[]
syncEngine
=
calendarsync
.
CalendarSync
(
cfg
[
'url'
])
syncEngine
.
setCalendars
([
a
.
strip
()
for
a
in
cfg
[
'calendar'
]
.
split
(
','
)])
syncEngine
.
setPersistencePath
(
cfg
[
'cfgpath'
])
events
=
syncEngine
.
sync
(
useCache
=
True
,
onlyCache
=
onlyCache
)
for
event
in
events
:
tmp
=
{}
tmp
[
'default'
]
=
self
.
default_mode
tmp
[
'mode'
]
=
'suspended'
if
event
.
get
(
'dtstart'
)
and
event
.
get
(
'dtend'
):
dts
=
event
[
'dtstart'
]
.
dt
if
type
(
dts
)
is
datetime
.
datetime
:
dts
=
dts
.
astimezone
(
tz
=
None
)
elif
type
(
dts
)
is
datetime
.
date
:
if
cfg
[
'wholeday'
]
==
'skip'
:
continue
dte
=
event
[
'dtend'
]
.
dt
if
type
(
dte
)
is
datetime
.
datetime
:
dte
=
dte
.
astimezone
(
tz
=
None
)
elif
type
(
dte
)
is
datetime
.
date
:
if
cfg
[
'wholeday'
]
==
'skip'
:
continue
tmp
[
'starttime'
]
=
dts
.
strftime
(
"%H:%M"
)
tmp
[
'startdate'
]
=
dts
.
strftime
(
"%Y-%m-
%d
"
)
tmp
[
'endtime'
]
=
dte
.
strftime
(
"%H:%M"
)
tmp
[
'enddate'
]
=
dte
.
strftime
(
"%Y-%m-
%d
"
)
ret
.
append
(
autoEntry
(
tmp
))
return
ret
@pyqtSlot
()
def
run_action
(
self
):
print
(
'Timer fired, about to run action: '
+
self
.
action
)
self
.
set_workrave_mode
(
self
.
action
)
# This prevents the timer from firing several times
time
.
sleep
(
1
)
self
.
setup_timer
()
@pyqtSlot
()
def
sync_calendars
(
self
):
print
(
'Syncing calendars...'
)
self
.
build_entryobjects
()
self
.
startup_check
()
self
.
setup_timer
()
def
suspend_handler
(
self
,
suspended
):
if
suspended
:
print
(
'Going to sleep...'
)
self
.
timer
.
stop
()
else
:
if
self
.
inhibited
:
return
print
(
'Resuming, doing the recalculation math!'
)
self
.
startup_check
()
self
.
setup_timer
()
def
init_dbus
(
self
):
dbus_loop
=
DBusQtMainLoop
(
set_as_default
=
True
)
self
.
system_bus
=
dbus
.
SystemBus
(
mainloop
=
dbus_loop
)
manager_interface
=
'org.freedesktop.login1.Manager'
self
.
system_bus
.
add_signal_receiver
(
self
.
suspend_handler
,
'PrepareForSleep'
,
manager_interface
)
def
parse_config
(
self
,
cfg
):
ret
=
{}
if
'entries'
in
cfg
[
'general'
]:
entrylist
=
cfg
[
'general'
][
'entries'
]
if
entrylist
!=
''
:
entrylist
=
entrylist
.
split
(
','
)
else
:
entrylist
=
[]
else
:
entrylist
=
[]
if
'calendars'
in
cfg
[
'general'
]:
calendarlist
=
cfg
[
'general'
][
'calendars'
]
if
calendarlist
!=
''
:
calendarlist
=
calendarlist
.
split
(
','
)
else
:
calendarlist
=
[]
else
:
calendarlist
=
[]
ret
[
'entries'
]
=
{}
ret
[
'calendars'
]
=
{}
for
entry
in
entrylist
:
ret
[
'entries'
][
entry
]
=
{}
for
key
in
cfg
[
entry
]:
ret
[
'entries'
][
entry
][
key
]
=
cfg
[
entry
][
key
]
for
cal
in
calendarlist
:
ret
[
'calendars'
][
cal
]
=
{}
for
key
in
cfg
[
cal
]:
ret
[
'calendars'
][
cal
][
key
]
=
cfg
[
cal
][
key
]
return
ret
def
startup_check
(
self
):
if
self
.
inhibited
:
return
print
(
'Startup check...'
)
in_interval
=
False
for
entry
in
self
.
entryobjects
:
if
entry
.
is_in_interval
():
action
=
entry
.
get_interval_action
()
print
(
'Interval currently running, setting mode: '
+
action
)
self
.
set_workrave_mode
(
action
)
in_interval
=
True
if
not
in_interval
:
print
(
'Not in interval, setting default workrave mode: '
+
self
.
default_mode
)
self
.
set_workrave_mode
(
self
.
default_mode
)
def
set_workrave_mode
(
self
,
action
):
wr
=
dbus
.
SessionBus
()
.
get_object
(
'org.workrave.Workrave'
,
'/org/workrave/Workrave/Core'
)
iface
=
dbus
.
Interface
(
wr
,
'org.workrave.CoreInterface'
)
mode
=
iface
.
GetOperationMode
()
if
mode
==
action
:
print
(
'Mode already set, no action'
)
else
:
iface
.
SetOperationMode
(
action
)
@pyqtSlot
()
def
get_timer_time
(
self
):
if
self
.
timer
.
isActive
():
tm
=
self
.
timer
.
remainingTime
()
self
.
timer_time
.
emit
(
tm
)
return
tm
else
:
return
0
@pyqtSlot
()
def
startup
(
self
):
self
.
timer
=
QTimer
()
self
.
syncTimer
=
QTimer
()
self
.
timer
.
setSingleShot
(
True
)
self
.
timer
.
timeout
.
connect
(
self
.
run_action
)
self
.
syncTimer
.
timeout
.
connect
(
self
.
sync_calendars
)
self
.
build_entryobjects
()
self
.
init_dbus
()
self
.
startup_check
()
self
.
setup_timer
()
def
is_inhibited
(
self
):
return
self
.
inhibited
def
inhibit
(
self
,
action
):
if
action
:
self
.
timer
.
stop
()
self
.
inhibited
=
True
else
:
self
.
startup_check
()
self
.
setup_timer
()
self
.
inhibited
=
False
def
setup_timer
(
self
):
self
.
timer
.
stop
()
next_action_list
=
[]
next_run_list
=
[]
for
entry
in
self
.
entryobjects
:
action
=
entry
.
get_next_action
()
run
=
entry
.
get_next_run
()
if
action
and
run
:
next_action_list
.
append
(
action
)
next_run_list
.
append
(
run
)
sleep_time
=
min
(
next_run_list
)
self
.
action
=
next_action_list
[
next_run_list
.
index
(
sleep_time
)]
print
(
'Setting up timer for '
+
str
(
sleep_time
)
+
' seconds and then running action: '
+
self
.
action
)
self
.
timer
.
start
(
sleep_time
*
1000
)
def
quitWR
(
self
):
self
.
timer
.
stop
()
self
.
finished
.
emit
()
class
SystemTrayIcon
(
QtWidgets
.
QSystemTrayIcon
):
get_timer_time
=
pyqtSignal
()
def
__init__
(
self
,
icon
,
manager
,
parent
=
None
):
self
.
parent
=
parent
QtWidgets
.
QSystemTrayIcon
.
__init__
(
self
,
icon
,
parent
)
menu
=
QtWidgets
.
QMenu
(
parent
)
self
.
remaining
=
menu
.
addAction
(
'Remaining: '
)
self
.
presentationAction
=
menu
.
addAction
(
"Inhibit"
)
self
.
presentationAction
.
triggered
.
connect
(
self
.
inhibit
)
self
.
exitAction
=
menu
.
addAction
(
"Exit"
)
self
.
exitAction
.
triggered
.
connect
(
manager
.
quitWR
)
self
.
setContextMenu
(
menu
)
self
.
manager
=
manager
self
.
activated
.
connect
(
self
.
tray_activated
)
@pyqtSlot
(
int
)
def
timer_time
(
self
,
tm
):
rem
=
tm
/
1000
sec
=
datetime
.
timedelta
(
seconds
=
int
(
rem
))
d
=
datetime
.
datetime
(
1
,
1
,
1
)
+
sec
if
rem
==
0
:
self
.
remaining
.
setText
(
"No action planned."
)
else
:
if
d
.
day
-
1
==
0
:
fmt
=
"
%d
:
%d
:
%d
"
%
(
d
.
hour
,
d
.
minute
,
d
.
second
)
else
:
fmt
=
"
%d
days
%d
:
%d
:
%d
"
%
(
d
.
day
-
1
,
d
.
hour
,
d
.
minute
,
d
.
second
)
self
.
remaining
.
setText
(
"Next Action: "
+
fmt
)
@pyqtSlot
()
def
tray_activated
(
self
):
self
.
get_timer_time
.
emit
()
@pyqtSlot
()
def
inhibit
(
self
):
if
self
.
manager
.
is_inhibited
():
self
.
manager
.
inhibit
(
False
)
self
.
presentationAction
.
setText
(
"Inhibit"
)
else
:
self
.
manager
.
inhibit
(
True
)
self
.
presentationAction
.
setText
(
"Resume"
)
def
main
():
signal
.
signal
(
signal
.
SIGINT
,
signal
.
SIG_DFL
)
app
=
QtWidgets
.
QApplication
(
sys
.
argv
)
app
.
setQuitOnLastWindowClosed
(
False
);
cfgPath
=
os
.
path
.
expanduser
(
"~"
)
+
"/.workrave-auto/"
if
not
os
.
path
.
isdir
(
cfgPath
):
os
.
makedirs
(
cfgPath
)
cfgFile
=
cfgPath
+
"config.ini"
if
not
os
.
path
.
exists
(
cfgFile
):
recreate_config
(
cfgFile
)
cfg
=
load_config
(
cfgFile
)
cfg
[
'general'
][
'cfgpath'
]
=
cfgPath
w
=
QtWidgets
.
QWidget
()
m
=
workraveManager
(
cfg
,
w
)
objThread
=
QThread
()
m
.
moveToThread
(
objThread
)
objThread
.
started
.
connect
(
m
.
startup
)
#QMetaObject.invokeMethod(m, "startup", Qt.QueuedConnection)
m
.
finished
.
connect
(
objThread
.
quit
)
m
.
finished
.
connect
(
app
.
exit
)
objThread
.
start
()
#QMetaObject.invokeMethod(objThread, "start", Qt.QueuedConnection)
# Wait for up to 30 seconds for the systemTray to become available (required for, e.g., Xfce)
count
=
0
while
not
QtWidgets
.
QSystemTrayIcon
.
isSystemTrayAvailable
()
and
count
<
30
:
time
.
sleep
(
1
)
count
+=
1
trayIcon
=
SystemTrayIcon
(
QtGui
.
QIcon
(
"/usr/share/icons/workrave-auto.png"
),
m
,
w
)
trayIcon
.
show
()
trayIcon
.
get_timer_time
.
connect
(
m
.
get_timer_time
)
m
.
timer_time
.
connect
(
trayIcon
.
timer_time
)
sys
.
exit
(
app
.
exec_
())
if
__name__
==
'__main__'
:
main
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Thu, Dec 5, 4:58 PM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
634167
Default Alt Text
workrave-auto.py (18 KB)
Attached To
rWRA workrave-auto
Event Timeline
Log In to Comment