m3u: Implement write-replace context manager.
This commit is contained in:
parent
b2d1e1b4f7
commit
2b8508d3c7
@ -1,10 +1,12 @@
|
|||||||
from __future__ import absolute_import, unicode_literals
|
from __future__ import absolute_import, unicode_literals
|
||||||
|
|
||||||
|
import contextlib
|
||||||
import io
|
import io
|
||||||
import locale
|
import locale
|
||||||
import logging
|
import logging
|
||||||
import operator
|
import operator
|
||||||
import os
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
from mopidy import backend
|
from mopidy import backend
|
||||||
|
|
||||||
@ -21,6 +23,33 @@ def log_environment_error(message, error):
|
|||||||
logger.error('%s: %s', message, strerror)
|
logger.error('%s: %s', message, strerror)
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def replace(path, mode='w+b', encoding=None, errors=None):
|
||||||
|
try:
|
||||||
|
(fd, tempname) = tempfile.mkstemp(dir=os.path.dirname(path))
|
||||||
|
except TypeError:
|
||||||
|
# Python 3 requires dir to be of type str until v3.5
|
||||||
|
import sys
|
||||||
|
path = path.decode(sys.getfilesystemencoding())
|
||||||
|
(fd, tempname) = tempfile.mkstemp(dir=os.path.dirname(path))
|
||||||
|
try:
|
||||||
|
fp = io.open(fd, mode, encoding=encoding, errors=errors)
|
||||||
|
except:
|
||||||
|
os.remove(tempname)
|
||||||
|
os.close(fd)
|
||||||
|
raise
|
||||||
|
try:
|
||||||
|
yield fp
|
||||||
|
fp.flush()
|
||||||
|
os.fsync(fd)
|
||||||
|
os.rename(tempname, path)
|
||||||
|
except:
|
||||||
|
os.remove(tempname)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
fp.close()
|
||||||
|
|
||||||
|
|
||||||
class M3UPlaylistsProvider(backend.PlaylistsProvider):
|
class M3UPlaylistsProvider(backend.PlaylistsProvider):
|
||||||
|
|
||||||
def __init__(self, backend, config):
|
def __init__(self, backend, config):
|
||||||
@ -114,4 +143,7 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
|
|||||||
encoding = self._default_encoding
|
encoding = self._default_encoding
|
||||||
if not os.path.isabs(path):
|
if not os.path.isabs(path):
|
||||||
path = os.path.join(self._playlists_dir, path)
|
path = os.path.join(self._playlists_dir, path)
|
||||||
return io.open(path, mode, encoding=encoding, errors='replace')
|
if 'w' in mode:
|
||||||
|
return replace(path, mode, encoding=encoding, errors='replace')
|
||||||
|
else:
|
||||||
|
return io.open(path, mode, encoding=encoding, errors='replace')
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user