我正在 Docker 容器中的 ASP.NET 上创建分布式分散数据存储系统。使用 Chord (DHT)。在节点之间保存文件可以正常工作 - 它们被分发和存储。尝试从节点上传(下载)文件时出现此问题。我无法从日志中找出问题所在。错误可能出在哪里?
日志:
2025-04-21 15:28:36 2025-04-21 12:28:36 warn: DistributedSystems_Lab4.WorkerController[0] Запрос на скачивание файла Дикая природа Амазонки.mp4
2025-04-21 15:28:36 2025-04-21 12:28:36 info: System.Net.Http.HttpClient.Default.LogicalHandler[100] Start processing HTTP request GET http://worker4:5004/store/%D0%94%D0%B8%D0%BA%D0%B0%D1%8F%20%D0%BF%D1%80%D0%B8%D1%80%D0%BE%D0%B4%D0%B0%20%D0%90%D0%BC%D0%B0%D0%B7%D0%BE%D0%BD%D0%BA%D0%B8.mp4_part_0
2025-04-21 15:28:36 2025-04-21 12:28:36 info: System.Net.Http.HttpClient.Default.ClientHandler[100] Sending HTTP request GET http://worker4:5004/store/%D0%94%D0%B8%D0%BA%D0%B0%D1%8F%20%D0%BF%D1%80%D0%B8%D1%80%D0%BE%D0%B4%D0%B0%20%D0%90%D0%BC%D0%B0%D0%B7%D0%BE%D0%BD%D0%BA%D0%B8.mp4_part_0
2025-04-21 15:28:36 2025-04-21 12:28:36 info: System.Net.Http.HttpClient.Default.ClientHandler[101] Received HTTP response headers after 61.7462ms - 400
2025-04-21 15:28:36 2025-04-21 12:28:36 info: System.Net.Http.HttpClient.Default.LogicalHandler[101] End processing HTTP request after 62.0256ms - 400
2025-04-21 15:28:36 2025-04-21 12:28:36 warn: DistributedSystems_Lab4.WorkerController[0] Не удалось скачать блок Дикая природа Амазонки.mp4_part_0 с http://worker4:5004
代码:
docker-compose.yml:
services:
worker1:
container_name: worker1
image: myapp
environment:
- NodeRole=worker
- ASPNETCORE_URLS=http://+:5001
ports:
- "5001:5001"
networks:
- app_net
volumes:
- worker1_storage:/app/storage
- metadata-volume:/app/metadata_storage
deploy:
restart_policy:
condition: on-failure
worker2:
container_name: worker2
image: myapp
environment:
- NodeRole=worker
- ASPNETCORE_URLS=http://+:5002
ports:
- "5002:5002"
networks:
- app_net
volumes:
- worker2_storage:/app/storage
- metadata-volume:/app/metadata_storage
deploy:
restart_policy:
condition: on-failure
worker3:
container_name: worker3
image: myapp
environment:
- NodeRole=worker
- ASPNETCORE_URLS=http://+:5003
ports:
- "5003:5003"
networks:
- app_net
volumes:
- worker3_storage:/app/storage
- metadata-volume:/app/metadata_storage
deploy:
restart_policy:
condition: on-failure
worker4:
container_name: worker4
image: myapp
environment:
- NodeRole=worker
- ASPNETCORE_URLS=http://+:5004
ports:
- "5004:5004"
networks:
- app_net
volumes:
- worker4_storage:/app/storage
- metadata-volume:/app/metadata_storage
deploy:
restart_policy:
condition: on-failure
worker5:
container_name: worker5
image: myapp
environment:
- NodeRole=worker
- ASPNETCORE_URLS=http://+:5005
ports:
- "5005:5005"
networks:
- app_net
volumes:
- worker5_storage:/app/storage
- metadata-volume:/app/metadata_storage
deploy:
restart_policy:
condition: on-failure
networks:
app_net:
driver: bridge
volumes:
metadata-volume:
driver: local
worker1_storage:
worker2_storage:
worker3_storage:
worker4_storage:
worker5_storage:
ChordNode.cs:
using DistributedSystems_Lab4;
public class ChordNode
{
public int Id { get; }
public string Url { get; }
public ChordNode? Successor { get; set; }
public ChordNode? Predecessor { get; set; }
public List<FingerEntry> FingerTable { get; set; }
public ChordNode(int id, string url)
{
Id = id;
Url = url;
FingerTable = new List<FingerEntry>();
}
public override string ToString() => $"{Url} ({Id})";
}
ChordRing.cs:
using DistributedSystems_Lab4;
using System.Security.Cryptography;
using System.Text;
public class ChordRing
{
private readonly List<ChordNode> nodes = new();
private readonly int m = 32;
public void InitializeFingerTables()
{
foreach (var node in nodes)
{
for (int i = 0; i < m; i++)
{
int start = (node.Id + (int)Math.Pow(2, i)) % (int)Math.Pow(2, m);
node.FingerTable.Add(new FingerEntry { Start = start, Node = FindNodeForFingerTable(start) });
}
}
}
private ChordNode FindNodeForFingerTable(int start)
{
foreach (var node in nodes)
{
if (node.Id >= start)
return node;
}
return nodes.First();
}
public void AddNode(string url)
{
int id = Hash(url);
if (nodes.Any(n => n.Id == id)) return;
var newNode = new ChordNode(id, url);
nodes.Add(newNode);
nodes.Sort((a, b) => a.Id.CompareTo(b.Id));
UpdateLinks();
}
public void RemoveNode(string url)
{
nodes.RemoveAll(n => n.Url == url);
UpdateLinks();
}
private void UpdateLinks()
{
int count = nodes.Count;
for (int i = 0; i < count; i++)
{
var current = nodes[i];
current.Successor = nodes[(i + 1) % count];
current.Predecessor = nodes[(i - 1 + count) % count];
}
}
public ChordNode FindResponsibleNode(string key)
{
int keyHash = Hash(key);
foreach (var node in nodes.OrderBy(n => n.Id))
{
if (keyHash <= node.Id)
return node;
}
return nodes.First(); // Если хеш больше всех — вернем первого (кольцо)
}
public static int Hash(string input)
{
using var sha1 = SHA1.Create();
var hash = sha1.ComputeHash(Encoding.UTF8.GetBytes(input));
return Math.Abs(BitConverter.ToInt32(hash, 0));
}
public IEnumerable<ChordNode> GetAllNodes() => nodes;
}
FingerEntry.cs:
namespace DistributedSystems_Lab4
{
public class FingerEntry
{
public int Start { get; set; }
public ChordNode? Node { get; set; }
}
}
键值控制器.cs:
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
namespace DistributedSystems_Lab4
{
[ApiController]
[Route("store")]
public class KeyValueController : ControllerBase
{
private readonly WorkerFileStorageService service;
private readonly ILogger logger;
public KeyValueController(WorkerFileStorageService service, ILogger<KeyValueController> logger)
{
this.service = service;
this.logger = logger;
}
[HttpPut("{key}")]
public async Task<IActionResult> Store(string key, IFormFile file)
{
if (file == null || file.Length == 0)
{
logger.LogWarning("Файл не передан или пуст. partName: {partName}", file.FileName);
return BadRequest("Файл не передан или пуст.");
}
logger.LogInformation("Получен запрос на сохранение файла: {FileName} с размером {FileSize} для части: {PartName}",
file.FileName, file.Length, file.FileName);
try
{
await service.SaveBlockAsync(file.FileName, file);
service.metadataService.SaveFileName(key, file.FileName);
logger.LogInformation("Файл {FileName} успешно сохранен.", file.FileName);
return Ok();
}
catch (Exception ex)
{
logger.LogError(ex, "Ошибка при сохранении файла {FileName} для части: {PartName}", file.FileName, file.FileName);
return StatusCode(500, "Ошибка при сохранении файла.");
}
}
[HttpGet("{key}")]
public async Task<IActionResult> Get(string partName)
{
var stream = await service.GetBlockAsync(partName);
if (stream == null) return NotFound();
return File(stream, "application/octet-stream", partName);
}
[HttpHead("{key}")]
public IActionResult Exists(string key)
{
return service.HasBlock(key) ? Ok() : NotFound();
}
[HttpDelete("{key}")]
public IActionResult Delete(string key)
{
return service.DeleteBlock(key) ? Ok() : NotFound();
}
}
}
元数据服务.cs:
using Newtonsoft.Json;
public class MetadataService
{
private readonly string metadataPath = "/app/metadata_storage/";
private Dictionary<string, string> fileNames = new();
private Dictionary<string, List<(string partName, string WorkerUrl)>> fileMetadata = new();
public MetadataService()
{
if (!Directory.Exists(metadataPath))
{
Directory.CreateDirectory(metadataPath);
}
LoadMetadata();
}
public void SaveFileName(string key, string fileName)
{
fileNames[key] = fileName;
SaveMetadata();
}
public void SaveFileMetadata(string key, List<(string partName, string WorkerUrl)> metadata)
{
fileMetadata[key] = metadata;
SaveMetadata();
}
public string GetFileName(string key)
{
return fileNames.ContainsKey(key) ? fileNames[key] : null;
}
public List<(string partName, string WorkerUrl)> GetFileMetadata(string key)
{
return fileMetadata.ContainsKey(key) ? fileMetadata[key] : null;
}
public void DeleteFileMetadata(string key)
{
if (fileMetadata.ContainsKey(key))
{
fileMetadata.Remove(key);
fileNames.Remove(key);
SaveMetadata();
}
}
private void SaveMetadata()
{
if (!Directory.Exists(metadataPath))
{
Directory.CreateDirectory(metadataPath);
}
var filePath = Path.Combine(metadataPath, "metadata.json");
var metadata = new
{
Files = fileNames,
Metadata = fileMetadata
};
var json = JsonConvert.SerializeObject(metadata, Formatting.Indented);
File.WriteAllText(filePath, json);
}
private void LoadMetadata()
{
var filePath = Path.Combine(metadataPath, "metadata.json");
if (File.Exists(filePath))
{
var json = File.ReadAllText(filePath);
var metadata = JsonConvert.DeserializeObject<dynamic>(json);
if (metadata?.Files != null)
fileNames = JsonConvert.DeserializeObject<Dictionary<string, string>>(metadata.Files.ToString());
if (metadata?.Metadata != null)
fileMetadata = JsonConvert.DeserializeObject<Dictionary<string, List<(string partName, string WorkerUrl)>>>(metadata.Metadata.ToString());
}
}
public bool KeyExists(string key)
{
if (fileNames[key] != null)
return true;
else return false;
}
}
程序.cs:
using DistributedSystems_Lab4;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
var builder = WebApplication.CreateBuilder(args);
builder.Logging.ClearProviders();
builder.Logging.AddSimpleConsole(options =>
{
options.IncludeScopes = false;
options.SingleLine = true;
options.TimestampFormat = "yyyy-MM-dd HH:mm:ss ";
});
builder.Services.AddMvc();
builder.Services.Configure<Microsoft.AspNetCore.Http.Features.FormOptions>(options =>
{
options.MultipartBodyLengthLimit = 10L * 1024 * 1024 * 1024; // 10GB
});
builder.WebHost.ConfigureKestrel(serverOptions =>
{
serverOptions.Limits.MaxRequestBodySize = 10L * 1024 * 1024 * 1024; // 10GB
});
builder.Services.Configure<FormOptions>(o =>
{
o.MultipartBodyLengthLimit = 10L * 1024 * 1024 * 1024; // 50 MB
});
builder.Services.AddControllers();
builder.Services.AddHttpClient();
builder.Services.AddSingleton<HttpClient>();
builder.Services.AddSingleton<WorkerFileStorageService>();
builder.Services.AddSingleton<ChordNode>();
builder.Services.AddSingleton<ChordRing>();
builder.Services.AddSingleton<FingerEntry>();
builder.Services.AddSingleton<KeyValueController>();
builder.Services.AddSingleton<MetadataService>();
var app = builder.Build();
app.Logger.LogInformation("Start");
app.MapGet("/", () => Results.Redirect("/index.html"));
app.UseStaticFiles();
app.UseRouting();
app.UseAuthorization();
app.MapControllers();
app.Run();
WorkerController.cs:
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System;
namespace DistributedSystems_Lab4
{
[Route("api/worker")]
[ApiController]
public class WorkerController : ControllerBase
{
private readonly WorkerFileStorageService workerStorage;
private readonly ILogger logger;
private readonly MetadataService metadataService;
private readonly IHttpClientFactory httpClientFactory;
public WorkerController(
WorkerFileStorageService workerStorage,
ILogger<WorkerController> logger,
MetadataService metadataService,
IHttpClientFactory httpClientFactory)
{
this.workerStorage = workerStorage;
this.logger = logger;
this.metadataService = metadataService;
this.httpClientFactory = httpClientFactory;
}
[HttpPut("save/{key}")]
public async Task<IActionResult> Save(string key, [FromForm] IFormFile file)
{
if (file == null || file.Length == 0)
return BadRequest("Файл не передан или пуст.");
const int blockSize = 5 * 1024 * 1024;
List<(string blockName, string WorkerUrl)> metadata = new();
var chord = workerStorage.getChordRing();
using var stream = file.OpenReadStream();
int index = 0;
byte[] buffer = new byte[blockSize];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var partName = $"{file.FileName}_part_{index}";
var node = chord.FindResponsibleNode(partName);
var client = httpClientFactory.CreateClient();
using var content = new MultipartFormDataContent();
using var blockStream = new MemoryStream(buffer, 0, bytesRead);
// Гарантируем, что поток начинает чтение с первого байта
blockStream.Position = 0;
content.Add(new StreamContent(blockStream), "file", partName);
var response = await client.PutAsync($"{node.Url}/store/{key}", content);
if (response.IsSuccessStatusCode)
{
metadata.Add((partName, node.Url));
logger.LogInformation($"Блок {index} успешно сохранен на {node.Url}");
}
else
{
logger.LogWarning($"Не удалось сохранить блок {index} на {node.Url}. Код ответа: {response.StatusCode}");
}
index++;
}
if (metadata.Count == index)
{
try
{
metadataService.SaveFileName(key, file.FileName);
metadataService.SaveFileMetadata(key, metadata);
logger.LogInformation($"Метаданные для файла {key} успешно сохранены.");
}
catch (Exception ex)
{
logger.LogError($"Ошибка при сохранении метаданных для файла {key}: {ex.Message}");
return StatusCode(500, "Ошибка при сохранении метаданных.");
}
}
return Ok("Файл сохранён.");
}
[HttpGet("download/{key}")]
public async Task<IActionResult> Download(string key)
{
string fileName = metadataService.GetFileName(key);
logger.LogWarning($"Запрос на скачивание файла {fileName}");
var parts = metadataService.GetFileMetadata(key);
if (parts == null || parts.Count == 0)
return NotFound("Файл не найден.");
var client = httpClientFactory.CreateClient();
var memoryStream = new MemoryStream();
foreach (var (partName, url) in parts.OrderBy(p => p.partName))
{
var response = await client.GetAsync($"{url}/store/{partName}");
if (!response.IsSuccessStatusCode)
{
logger.LogWarning($"Не удалось скачать блок {partName} с {url}");
continue;
}
var blockStream = await response.Content.ReadAsStreamAsync();
await blockStream.CopyToAsync(memoryStream);
}
memoryStream.Position = 0;
return File(memoryStream, "application/octet-stream", fileName);
}
[HttpGet("has/{key}")]
public IActionResult HasBlock(string key)
{
bool exists = metadataService.KeyExists(key);
if (exists)
{
logger.LogInformation($"Блок с ключом {key} существует.");
return Ok($"Блок с ключом {key} существует.");
}
else
{
logger.LogInformation($"Блок с ключом {key} не найден.");
return NotFound($"Блок с ключом {key} не найден.");
}
}
[HttpDelete("delete/{key}")]
public async Task<IActionResult> Delete(string key)
{
var parts = metadataService.GetFileMetadata(key);
if (parts == null || parts.Count == 0)
return NotFound("Файл не найден.");
var client = httpClientFactory.CreateClient();
bool allDeleted = true;
foreach (var (part, url) in parts)
{
var response = await client.DeleteAsync($"{url}/store/{key}_part_{part}");
if (!response.IsSuccessStatusCode)
{
logger.LogWarning($"Не удалось удалить блок: {key}_part_{part} на {url}");
allDeleted = false;
}
else
{
logger.LogInformation($"Блок {key}_part_{part} успешно удалён с {url}");
}
}
try
{
metadataService.DeleteFileMetadata(key);
logger.LogInformation($"Метаданные для файла {key} успешно удалены.");
}
catch (Exception ex)
{
logger.LogError($"Ошибка при удалении метаданных для файла {key}: {ex.Message}");
return StatusCode(500, "Ошибка при удалении метаданных.");
}
return allDeleted ? Ok("Файл удалён.") : StatusCode(500, "Удаление выполнено частично.");
}
[HttpGet("listFiles")]
public IActionResult ListFiles()
{
var files = workerStorage.ListAllKeys();
return Ok(files);
}
}
}
WorkerFileStorageService.cs:
using DistributedSystems_Lab4;
public class WorkerFileStorageService
{
private readonly ILogger logger;
private readonly HttpClient httpClient;
private readonly ChordRing chordRing;
public readonly MetadataService metadataService;
private readonly string storagePath = "/app/storage/";
public WorkerFileStorageService(
HttpClient httpClient,
ILogger<WorkerFileStorageService> logger,
MetadataService metadataService)
{
this.httpClient = httpClient;
this.logger = logger;
this.chordRing = new ChordRing();
this.metadataService = metadataService;
if (!Directory.Exists(storagePath))
Directory.CreateDirectory(storagePath);
InitializeChordRing();
}
public ChordRing getChordRing()
{
return chordRing;
}
private void InitializeChordRing()
{
var nodeUrls = new[]
{
"http://worker1:5001",
"http://worker2:5002",
"http://worker3:5003",
"http://worker4:5004",
"http://worker5:5005"
};
foreach (var url in nodeUrls)
{
chordRing.AddNode(url);
logger.LogInformation($"Добавлен узел в ChordRing: {url}");
}
chordRing.InitializeFingerTables();
}
public async Task SaveBlockAsync(string partName, IFormFile file)
{
var path = Path.Combine(storagePath, partName);
using var stream = new FileStream(path, FileMode.Create);
await file.CopyToAsync(stream);
logger.LogInformation($"Блок {partName} сохранён локально.");
}
public async Task<Stream?> GetBlockAsync(string partName)
{
var path = Path.Combine(storagePath, partName);
if (!File.Exists(path))
{
logger.LogWarning($"Файл {partName} не найден");
return null;
}
return new FileStream(path, FileMode.Open, FileAccess.Read);
}
public bool DeleteBlock(string key)
{
var path = Path.Combine(storagePath, key);
if (!File.Exists(path)) return false;
try
{
File.Delete(path);
logger.LogInformation($"Блок {key} успешно удалён");
return true;
}
catch (Exception ex)
{
logger.LogError($"Ошибка при удалении блока {key}: {ex.Message}");
return false;
}
}
public bool HasBlock(string key)
{
return File.Exists(Path.Combine(storagePath, key));
}
public List<string> ListAllKeys()
{
var files = Directory.GetFiles(storagePath)
.Select(Path.GetFileName)
.Where(name => !string.IsNullOrEmpty(name))
.ToList();
return files!;
}
}