When working with automated data pipelines, a common requirement is triggering actions when new files arrive in a specific directory. In your case, you're dealing with daily CSV file updates in /tmp/data_upload
where old files get replaced, and need to execute test.py
upon each update.
While cron jobs can check for file changes periodically, they're inefficient for real-time monitoring. Linux's inotify subsystem provides kernel-level file change notifications, making it ideal for this scenario. Here's why it's superior:
- Immediate triggering (no polling delays)
- Low system resource usage
- Granular event types (modify, create, delete)
The easiest cross-platform solution uses Python's watchdog library. First install it:
pip install watchdog
Then create file_monitor.py
:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class CSVHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.src_path.endswith('.csv'):
print(f"Detected change in: {event.src_path}")
subprocess.run(['python3', '/path/to/test.py'])
if __name__ == "__main__":
path = "/tmp/data_upload"
event_handler = CSVHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
For systems without Python, use the inotify-tools package:
sudo apt-get install inotify-tools
Then create a monitoring script:
#!/bin/bash
while inotifywait -e modify -e create -e delete /tmp/data_upload/*.csv; do
python3 /path/to/test.py
done
For production systems, consider these enhancements:
- Debouncing: Add a delay to handle rapid successive writes
- File Locking: Check if writes are complete before processing
- Logging: Implement proper logging for debugging
Here's an improved version with debouncing:
import time
import subprocess
from threading import Timer
last_trigger = 0
debounce_sec = 5 # Wait 5 seconds after last change
def run_script():
subprocess.run(['python3', '/path/to/test.py'])
class CSVHandler(FileSystemEventHandler):
def on_modified(self, event):
global last_trigger
if event.src_path.endswith('.csv'):
current_time = time.time()
if current_time - last_trigger > debounce_sec:
last_trigger = current_time
Timer(debounce_sec, run_script).start()
For permanent monitoring, create a systemd service:
[Unit]
Description=CSV File Monitor Service
[Service]
ExecStart=/usr/bin/python3 /path/to/file_monitor.py
Restart=always
User=your_user
[Install]
WantedBy=multi-user.target
Save as /etc/systemd/system/csv-monitor.service
then:
sudo systemctl daemon-reload
sudo systemctl enable csv-monitor
sudo systemctl start csv-monitor
When working with data pipelines, a common requirement is to trigger actions when new files arrive in a specific directory. In this case, we need to execute a Python script whenever CSV files are updated in /tmp/data_upload
. The challenge is to detect these changes reliably without manual intervention.
While cron jobs can check for file changes periodically, they're inefficient for real-time monitoring. Linux's inotify API provides the perfect solution by notifying applications about filesystem events instantly. Here's why it's superior:
- Immediate response to changes (no polling delay)
- Low resource consumption
- Granular event types (create, modify, delete, etc.)
The easiest way to implement this in Python is using the watchdog
library, which provides a nice abstraction over inotify:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
import time
class CSVHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.src_path.endswith('.csv'):
print(f"Detected change in: {event.src_path}")
subprocess.run(['python', 'test.py', event.src_path])
if __name__ == "__main__":
path = "/tmp/data_upload"
event_handler = CSVHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
For those preferring shell scripts, here's a solution using inotifywait
from the inotify-tools package:
#!/bin/bash
while inotifywait -e close_write /tmp/data_upload/*.csv; do
python test.py
done
When deploying this in production environments, consider these enhancements:
- Add error handling for cases where the Python script might fail
- Implement logging to track file changes and script executions
- Handle cases where multiple files might change simultaneously
- Consider adding a small delay to avoid rapid consecutive executions
To prevent multiple rapid executions when files are being written, you can implement a debounce mechanism:
from threading import Timer
class DebouncedHandler(FileSystemEventHandler):
def __init__(self):
self.timer = None
def on_modified(self, event):
if self.timer:
self.timer.cancel()
self.timer = Timer(5.0, self.execute_script, [event.src_path])
self.timer.start()
def execute_script(self, path):
if path.endswith('.csv'):
subprocess.run(['python', 'test.py', path])