mirror of https://github.com/tLDP/python-tldp
completed first round of testing of driver code
This commit is contained in:
parent
2825263f20
commit
c425c2a8e8
|
@ -0,0 +1,136 @@
|
|||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
import os
|
||||
import time
|
||||
import errno
|
||||
import random
|
||||
import shutil
|
||||
|
||||
try:
|
||||
from types import SimpleNamespace
|
||||
except ImportError:
|
||||
from utils import SimpleNamespace
|
||||
|
||||
from tldp.outputs import OutputNamingConvention
|
||||
from tldptesttools import TestToolsFilesystem
|
||||
|
||||
# -- Test Data
|
||||
import example
|
||||
|
||||
# -- SUT
|
||||
from tldp.driver import Inventory
|
||||
|
||||
datadir = os.path.join(os.path.dirname(__file__), 'testdata')
|
||||
|
||||
|
||||
class TestOutputDirSkeleton(OutputNamingConvention):
|
||||
|
||||
def mkdir(self):
|
||||
if not os.path.isdir(self.dirname):
|
||||
os.mkdir(self.dirname)
|
||||
|
||||
def create_expected_docs(self):
|
||||
for name in self.expected:
|
||||
fname = getattr(self, name)
|
||||
with open(fname, 'w'):
|
||||
pass
|
||||
|
||||
|
||||
class TestSourceDocSkeleton(object):
|
||||
|
||||
def __init__(self, dirname):
|
||||
if not os.path.abspath(dirname):
|
||||
raise Exception("Please use absolute path in unit tests....")
|
||||
self.dirname = dirname
|
||||
if not os.path.isdir(self.dirname):
|
||||
os.mkdir(self.dirname)
|
||||
|
||||
def addsourcefile(self, filename, content):
|
||||
fname = os.path.join(self.dirname, filename)
|
||||
if os.path.isfile(content):
|
||||
shutil.copy(content, fname)
|
||||
else:
|
||||
with open(fname, 'w') as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
class TestInventoryBase(TestToolsFilesystem):
|
||||
|
||||
def setupcollections(self):
|
||||
self.pubdir = os.path.join(self.tempdir, 'outputs')
|
||||
self.sourcedir = os.path.join(self.tempdir, 'sources')
|
||||
self.sourcedirs = [self.sourcedir]
|
||||
for d in (self.sourcedir, self.pubdir):
|
||||
if not os.path.isdir(d):
|
||||
os.mkdir(d)
|
||||
|
||||
def add_stale(self, stem, ex):
|
||||
self.setupcollections()
|
||||
myoutput = TestOutputDirSkeleton(os.path.join(self.pubdir, stem), stem)
|
||||
myoutput.mkdir()
|
||||
myoutput.create_expected_docs()
|
||||
time.sleep(0.002)
|
||||
mysource = TestSourceDocSkeleton(self.sourcedir)
|
||||
mysource.addsourcefile(stem + ex.ext, ex.filename)
|
||||
|
||||
def add_new(self, stem, ex):
|
||||
self.setupcollections()
|
||||
mysource = TestSourceDocSkeleton(self.sourcedir)
|
||||
mysource.addsourcefile(stem + ex.ext, ex.filename)
|
||||
|
||||
def add_orphan(self, stem, ex):
|
||||
self.setupcollections()
|
||||
myoutput = TestOutputDirSkeleton(os.path.join(self.pubdir, stem), stem)
|
||||
myoutput.mkdir()
|
||||
myoutput.create_expected_docs()
|
||||
|
||||
def add_published(self, stem, ex):
|
||||
self.setupcollections()
|
||||
mysource = TestSourceDocSkeleton(self.sourcedir)
|
||||
mysource.addsourcefile(stem + ex.ext, ex.filename)
|
||||
myoutput = TestOutputDirSkeleton(os.path.join(self.pubdir, stem), stem)
|
||||
myoutput.mkdir()
|
||||
myoutput.create_expected_docs()
|
||||
|
||||
|
||||
class TestInventoryUsage(TestInventoryBase):
|
||||
|
||||
def test_detect_status_published(self):
|
||||
ex = random.choice(example.sources)
|
||||
self.add_published('Frobnitz-HOWTO', ex)
|
||||
i = Inventory(self.pubdir, self.sourcedirs)
|
||||
self.assertEquals(0, len(i.stale))
|
||||
self.assertEquals(1, len(i.published))
|
||||
self.assertEquals(0, len(i.new))
|
||||
self.assertEquals(0, len(i.orphans))
|
||||
|
||||
def test_detect_status_new(self):
|
||||
ex = random.choice(example.sources)
|
||||
self.add_new('Frobnitz-HOWTO', ex)
|
||||
i = Inventory(self.pubdir, self.sourcedirs)
|
||||
self.assertEquals(0, len(i.stale))
|
||||
self.assertEquals(0, len(i.published))
|
||||
self.assertEquals(1, len(i.new))
|
||||
self.assertEquals(0, len(i.orphans))
|
||||
|
||||
def test_detect_status_orphan(self):
|
||||
ex = random.choice(example.sources)
|
||||
self.add_orphan('Frobnitz-HOWTO', ex)
|
||||
i = Inventory(self.pubdir, self.sourcedirs)
|
||||
self.assertEquals(0, len(i.stale))
|
||||
self.assertEquals(0, len(i.published))
|
||||
self.assertEquals(0, len(i.new))
|
||||
self.assertEquals(1, len(i.orphans))
|
||||
|
||||
def test_detect_status_stale(self):
|
||||
ex = random.choice(example.sources)
|
||||
self.add_stale('Frobnitz-HOWTO', ex)
|
||||
i = Inventory(self.pubdir, self.sourcedirs)
|
||||
self.assertEquals(1, len(i.stale))
|
||||
self.assertEquals(1, len(i.published))
|
||||
self.assertEquals(0, len(i.new))
|
||||
self.assertEquals(0, len(i.orphans))
|
||||
|
||||
#
|
||||
# -- end of file
|
|
@ -70,7 +70,7 @@ class Inventory(object):
|
|||
fset = mtime_gt(mtime, sdoc.statinfo)
|
||||
if fset:
|
||||
for f in fset:
|
||||
logger.info("%s updated source file %s", stem, f)
|
||||
logger.debug("%s found updated source file %s", stem, f)
|
||||
odoc.status = sdoc.status = 'stale'
|
||||
self.stale[stem] = sdoc
|
||||
logger.info("Identified %d stale documents: %r.", len(self.stale),
|
||||
|
|
Loading…
Reference in New Issue